HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
[hive.git] / hcatalog / streaming / src / java / org / apache / hive / hcatalog / streaming / mutate / worker / MutatorCoordinator.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hive.hcatalog.streaming.mutate.worker;
19
20 import java.io.Closeable;
21 import java.io.Flushable;
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Objects;
26
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hive.common.JavaUtils;
30 import org.apache.hadoop.hive.conf.HiveConf;
31 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
32 import org.apache.hadoop.hive.ql.io.AcidUtils;
33 import org.apache.hadoop.hive.ql.io.BucketCodec;
34 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
35 import org.apache.hadoop.hive.ql.io.RecordUpdater;
36 import org.apache.hadoop.util.ReflectionUtils;
37 import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42 * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped
43 * by partition, then bucket and ordered by origTxnId, then rowId. Ordering is enforced by the {@link SequenceValidator}
44 * and grouping is by the {@link GroupingValidator}. An acid delta file is created for each combination partition, and
45 * bucket id (a single transaction id is implied). Once a delta file has been closed it cannot be reopened. Therefore
46 * care is needed as to group the data correctly otherwise failures will occur if a delta belonging to group has been
47 * previously closed. The {@link MutatorCoordinator} will seamlessly handle transitions between groups, creating and
48 * closing {@link Mutator Mutators} as needed to write to the appropriate partition and bucket. New partitions will be
49 * created in the meta store if {@link AcidTable#createPartitions()} is set.
50 * <p/>
51 * {@link #insert(List, Object) Insert} events must be artificially assigned appropriate bucket ids in the preceding
52 * grouping phase so that they are grouped correctly. Note that any transaction id or row id assigned to the
53 * {@link RecordIdentifier RecordIdentifier} of such events will be ignored by both the coordinator and the underlying
54 * {@link RecordUpdater}.
55 */
56 public class MutatorCoordinator implements Closeable, Flushable {
57
58 private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
59
60 private final MutatorFactory mutatorFactory;
61 private final GroupingValidator groupingValidator;
62 private final SequenceValidator sequenceValidator;
63 private final AcidTable table;
64 private final RecordInspector recordInspector;
65 private final PartitionHelper partitionHelper;
66 private final AcidOutputFormat<?, ?> outputFormat;
67 private final BucketIdResolver bucketIdResolver;
68 private final HiveConf configuration;
69 private final boolean deleteDeltaIfExists;
70
71 private int bucketId;
72 private List<String> partitionValues;
73 private Path partitionPath;
74 private Mutator mutator;
75
76 MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
77 AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
78 this(configuration, mutatorFactory, partitionHelper, new GroupingValidator(), new SequenceValidator(), table,
79 deleteDeltaIfExists);
80 }
81
82 /** Visible for testing only. */
83 MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
84 GroupingValidator groupingValidator, SequenceValidator sequenceValidator, AcidTable table,
85 boolean deleteDeltaIfExists) throws WorkerException {
86 this.configuration = configuration;
87 this.mutatorFactory = mutatorFactory;
88 this.partitionHelper = partitionHelper;
89 this.groupingValidator = groupingValidator;
90 this.sequenceValidator = sequenceValidator;
91 this.table = table;
92 this.deleteDeltaIfExists = deleteDeltaIfExists;
93 this.recordInspector = this.mutatorFactory.newRecordInspector();
94 bucketIdResolver = this.mutatorFactory.newBucketIdResolver(table.getTotalBuckets());
95
96 bucketId = -1;
97 outputFormat = createOutputFormat(table.getOutputFormatName(), configuration);
98 }
99
100 /**
101 * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
102 *
103 * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
104 * using the values in the record's bucketed columns.
105 * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
106 * sequence.
107 * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
108 * been closed.
109 * @throws PartitionCreationException Could not create a new partition in the meta store.
110 * @throws WorkerException
111 */
112 public void insert(List<String> partitionValues, Object record) throws WorkerException {
113 reconfigureState(OperationType.INSERT, partitionValues, record);
114 try {
115 mutator.insert(record);
116 LOG.debug("Inserted into partition={}, record={}", partitionValues, record);
117 } catch (IOException e) {
118 throw new WorkerException("Failed to insert record '" + record + " using mutator '" + mutator + "'.", e);
119 }
120 }
121
122 /**
123 * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
124 *
125 * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
126 * using the values in the record's bucketed columns.
127 * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
128 * sequence.
129 * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
130 * been closed.
131 * @throws PartitionCreationException Could not create a new partition in the meta store.
132 * @throws WorkerException
133 */
134 public void update(List<String> partitionValues, Object record) throws WorkerException {
135 reconfigureState(OperationType.UPDATE, partitionValues, record);
136 try {
137 mutator.update(record);
138 LOG.debug("Updated in partition={}, record={}", partitionValues, record);
139 } catch (IOException e) {
140 throw new WorkerException("Failed to update record '" + record + " using mutator '" + mutator + "'.", e);
141 }
142 }
143
144 /**
145 * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
146 *
147 * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
148 * using the values in the record's bucketed columns.
149 * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
150 * sequence.
151 * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
152 * been closed.
153 * @throws PartitionCreationException Could not create a new partition in the meta store.
154 * @throws WorkerException
155 */
156 public void delete(List<String> partitionValues, Object record) throws WorkerException {
157 reconfigureState(OperationType.DELETE, partitionValues, record);
158 try {
159 mutator.delete(record);
160 LOG.debug("Deleted from partition={}, record={}", partitionValues, record);
161 } catch (IOException e) {
162 throw new WorkerException("Failed to delete record '" + record + " using mutator '" + mutator + "'.", e);
163 }
164 }
165
166 @Override
167 public void close() throws IOException {
168 try {
169 if (mutator != null) {
170 mutator.close();
171 }
172 } finally {
173 partitionHelper.close();
174 }
175 }
176
177 @Override
178 public void flush() throws IOException {
179 if (mutator != null) {
180 mutator.flush();
181 }
182 }
183
184 private void reconfigureState(OperationType operationType, List<String> newPartitionValues, Object record)
185 throws WorkerException {
186 RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record);
187 int newBucketId = newRecordIdentifier.getBucketProperty();
188
189 if (newPartitionValues == null) {
190 newPartitionValues = Collections.emptyList();
191 }
192
193 try {
194 if (partitionHasChanged(newPartitionValues)) {
195 if (table.createPartitions() && operationType == OperationType.INSERT) {
196 partitionHelper.createPartitionIfNotExists(newPartitionValues);
197 }
198 Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
199 resetMutator(newBucketId, newPartitionValues, newPartitionPath);
200 } else if (bucketIdHasChanged(newBucketId)) {
201 resetMutator(newBucketId, partitionValues, partitionPath);
202 } else {
203 validateRecordSequence(operationType, newRecordIdentifier);
204 }
205 } catch (IOException e) {
206 throw new WorkerException("Failed to reset mutator when performing " + operationType + " of record: " + record, e);
207 }
208 }
209
210 private RecordIdentifier extractRecordIdentifier(OperationType operationType, List<String> newPartitionValues,
211 Object record) throws BucketIdException {
212 RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record);
213 int bucketIdFromRecord = BucketCodec.determineVersion(
214 recordIdentifier.getBucketProperty()).decodeWriterId(recordIdentifier.getBucketProperty());
215 int computedBucketId = bucketIdResolver.computeBucketId(record);
216 if (operationType != OperationType.DELETE && bucketIdFromRecord != computedBucketId) {
217 throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId
218 + ") for record " + recordIdentifier + " in partition " + newPartitionValues + ".");
219 }
220 return recordIdentifier;
221 }
222
223 private void resetMutator(int newBucketId, List<String> newPartitionValues, Path newPartitionPath)
224 throws IOException, GroupRevisitedException {
225 if (mutator != null) {
226 mutator.close();
227 }
228 validateGrouping(newPartitionValues, newBucketId);
229 sequenceValidator.reset();
230 if (deleteDeltaIfExists) {
231 // TODO: Should this be the concern of the mutator?
232 deleteDeltaIfExists(newPartitionPath, table.getWriteId(), newBucketId);
233 }
234 mutator = mutatorFactory.newMutator(outputFormat, table.getWriteId(), newPartitionPath, newBucketId);
235 bucketId = newBucketId;
236 partitionValues = newPartitionValues;
237 partitionPath = newPartitionPath;
238 LOG.debug("Reset mutator: bucketId={}, partition={}, partitionPath={}", bucketId, partitionValues, partitionPath);
239 }
240
241 private boolean partitionHasChanged(List<String> newPartitionValues) {
242 boolean partitionHasChanged = !Objects.equals(this.partitionValues, newPartitionValues);
243 if (partitionHasChanged) {
244 LOG.debug("Partition changed from={}, to={}", this.partitionValues, newPartitionValues);
245 }
246 return partitionHasChanged;
247 }
248
249 private boolean bucketIdHasChanged(int newBucketId) {
250 boolean bucketIdHasChanged = this.bucketId != newBucketId;
251 if (bucketIdHasChanged) {
252 LOG.debug("Bucket ID changed from={}, to={}", this.bucketId, newBucketId);
253 }
254 return bucketIdHasChanged;
255 }
256
257 private void validateGrouping(List<String> newPartitionValues, int newBucketId) throws GroupRevisitedException {
258 if (!groupingValidator.isInSequence(newPartitionValues, bucketId)) {
259 throw new GroupRevisitedException("Group out of sequence: state=" + groupingValidator + ", partition="
260 + newPartitionValues + ", bucketId=" + newBucketId);
261 }
262 }
263
264 private void validateRecordSequence(OperationType operationType, RecordIdentifier newRecordIdentifier)
265 throws RecordSequenceException {
266 boolean identiferOutOfSequence = operationType != OperationType.INSERT
267 && !sequenceValidator.isInSequence(newRecordIdentifier);
268 if (identiferOutOfSequence) {
269 throw new RecordSequenceException("Records not in sequence: state=" + sequenceValidator + ", recordIdentifier="
270 + newRecordIdentifier);
271 }
272 }
273
274 @SuppressWarnings("unchecked")
275 private AcidOutputFormat<?, ?> createOutputFormat(String outputFormatName, HiveConf configuration)
276 throws WorkerException {
277 try {
278 return (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outputFormatName), configuration);
279 } catch (ClassNotFoundException e) {
280 throw new WorkerException("Could not locate class for '" + outputFormatName + "'.", e);
281 }
282 }
283
284 /* A delta may be present from a previous failed task attempt. */
285 private void deleteDeltaIfExists(Path partitionPath, long writeId, int bucketId) throws IOException {
286 Path deltaPath = AcidUtils.createFilename(partitionPath,
287 new AcidOutputFormat.Options(configuration)
288 .bucket(bucketId)
289 .minimumWriteId(writeId)
290 .maximumWriteId(writeId));
291 FileSystem fileSystem = deltaPath.getFileSystem(configuration);
292 if (fileSystem.exists(deltaPath)) {
293 LOG.info("Deleting existing delta path: {}", deltaPath);
294 fileSystem.delete(deltaPath, false);
295 }
296 }
297
298 }