HIVE-18192: Introduce WriteID per table rather than using global transaction ID ...
authorSankar Hariappan <sankarh@apache.org>
Fri, 23 Feb 2018 16:30:23 +0000 (22:00 +0530)
committerSankar Hariappan <sankarh@apache.org>
Fri, 23 Feb 2018 16:30:23 +0000 (22:00 +0530)
169 files changed:
common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/RecordWriter.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTable.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/AcidTableSerializer.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/MutatorClient.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorFactory.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/SequenceValidator.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ReflectiveMutatorFactory.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestAcidTableSerializer.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/TestMutatorClient.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql
metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql [new file with mode: 0644]
metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql
metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql
ql/src/java/org/apache/hadoop/hive/ql/Driver.java
ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java
ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java
ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
ql/src/test/results/clientpositive/acid_nullscan.q.out
ql/src/test/results/clientpositive/acid_table_stats.q.out
ql/src/test/results/clientpositive/autoColumnStats_4.q.out
ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
ql/src/test/results/clientpositive/row__id.q.out
standalone-metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.cpp
standalone-metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore.h
standalone-metastore/src/gen/thrift/gen-cpp/ThriftHiveMetastore_server.skeleton.cpp
standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AddDynamicPartitions.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsResponse.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClearFileMetadataRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ClientCapabilities.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CreationMetadata.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FireEventRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetAllFunctionsResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetFileMetadataByExprRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetFileMetadataByExprResult.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetFileMetadataRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetFileMetadataResult.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTablesRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetTablesResult.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsRequest.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetValidWriteIdsResponse.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/HeartbeatTxnRangeResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/Materialization.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/PutFileMetadataRequest.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowLocksResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TableValidWriteIds.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ThriftHiveMetastore.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnToWriteId.java [new file with mode: 0644]
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMFullResourcePlan.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMGetAllResourcePlanResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMGetTriggersForResourePlanResponse.java
standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/WMValidateResourcePlanResponse.java
standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
standalone-metastore/src/main/thrift/hive_metastore.thrift
storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java [moved from storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java with 52% similarity]
storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java [new file with mode: 0644]
storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnWriteIdList.java [new file with mode: 0644]
storage-api/src/java/org/apache/hadoop/hive/common/ValidWriteIdList.java [new file with mode: 0644]
storage-api/src/test/org/apache/hadoop/hive/common/TestValidCompactorTxnList.java [deleted file]
storage-api/src/test/org/apache/hadoop/hive/common/TestValidCompactorWriteIdList.java [new file with mode: 0644]
storage-api/src/test/org/apache/hadoop/hive/common/TestValidReaderWriteIdList.java [new file with mode: 0644]

index 57afbf8..de0c283 100644 (file)
@@ -150,14 +150,15 @@ public final class JavaUtils {
   public static String lockIdToString(long extLockId) {
     return "lockid:" + extLockId;
   }
-  /**
-   * Utility method for ACID to normalize logging info.  Matches
-   * org.apache.hadoop.hive.metastore.api.LockResponse#toString
-   */
+
   public static String txnIdToString(long txnId) {
     return "txnid:" + txnId;
   }
 
+  public static String writeIdToString(long writeId) {
+    return "writeid:" + writeId;
+  }
+
   public static String txnIdsToString(List<Long> txnIds) {
     return "Transactions requested to be aborted: " + txnIds.toString();
   }
@@ -166,7 +167,7 @@ public final class JavaUtils {
     // prevent instantiation
   }
 
-  public static Long extractTxnId(Path file) {
+  public static Long extractWriteId(Path file) {
     String fileName = file.getName();
     String[] parts = fileName.split("_", 4);  // e.g. delta_0000001_0000001_0000 or base_0000022
     if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) {
index 169ddcb..0880a96 100644 (file)
@@ -1094,7 +1094,7 @@ public class HiveConf extends Configuration {
     HIVESCRIPTTRUNCATEENV("hive.script.operator.truncate.env", false,
         "Truncate each environment variable for external script in scripts operator to 20KB (to fit system limits)"),
     HIVESCRIPT_ENV_BLACKLIST("hive.script.operator.env.blacklist",
-        "hive.txn.valid.txns,hive.script.operator.env.blacklist",
+        "hive.txn.valid.txns,hive.txn.tables.valid.writeids,hive.txn.valid.writeids,hive.script.operator.env.blacklist",
         "Comma separated list of keys from the configuration file not to convert to environment " +
         "variables when invoking the script operator"),
     HIVE_STRICT_CHECKS_ORDERBY_NO_LIMIT("hive.strict.checks.orderby.no.limit", false,
index 4ec10ad..924e233 100644 (file)
@@ -73,8 +73,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   private final AcidOutputFormat<?,?> outf;
   private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
-  private Long curBatchMinTxnId;
-  private Long curBatchMaxTxnId;
+  private Long curBatchMinWriteId;
+  private Long curBatchMaxWriteId;
 
   private static final class TableWriterPair {
     private final Table tbl;
@@ -143,7 +143,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
    * used to tag error msgs to provied some breadcrumbs
    */
   String getWatermark() {
-    return partitionPath + " txnIds[" + curBatchMinTxnId + "," + curBatchMaxTxnId + "]";
+    return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
   }
   // return the column numbers of the bucketed columns
   private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
@@ -207,15 +207,15 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   /**
    * Creates a new record updater for the new batch
-   * @param minTxnId smallest Txnid in the batch
-   * @param maxTxnID largest Txnid in the batch
+   * @param minWriteId smallest writeid in the batch
+   * @param maxWriteID largest writeid in the batch
    * @throws StreamingIOFailure if failed to create record updater
    */
   @Override
-  public void newBatch(Long minTxnId, Long maxTxnID)
+  public void newBatch(Long minWriteId, Long maxWriteID)
           throws StreamingIOFailure, SerializationError {
-    curBatchMinTxnId = minTxnId;
-    curBatchMaxTxnId = maxTxnID;
+    curBatchMinWriteId = minWriteId;
+    curBatchMaxWriteId = maxWriteID;
     updaters = new ArrayList<RecordUpdater>(totalBuckets);
     for (int bucket = 0; bucket < totalBuckets; bucket++) {
       updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
@@ -265,7 +265,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return bucketFieldData;
   }
 
-  private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
+  private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID)
           throws IOException, SerializationError {
     try {
       // Initialize table properties from the table parameters. This is required because the table
@@ -278,8 +278,8 @@ public abstract class AbstractRecordWriter implements RecordWriter {
                       .inspector(getSerde().getObjectInspector())
                       .bucket(bucketId)
                       .tableProperties(tblProperties)
-                      .minimumTransactionId(minTxnId)
-                      .maximumTransactionId(maxTxnID)
+                      .minimumWriteId(minWriteId)
+                      .maximumWriteId(maxWriteID)
                       .statementId(-1)
                       .finalDestination(partitionPath));
     } catch (SerDeException e) {
@@ -292,7 +292,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     RecordUpdater recordUpdater = updaters.get(bucketId);
     if (recordUpdater == null) {
       try {
-        recordUpdater = createRecordUpdater(bucketId, curBatchMinTxnId, curBatchMaxTxnId);
+        recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId);
       } catch (IOException e) {
         String errMsg = "Failed creating RecordUpdater for " + getWatermark();
         LOG.error(errMsg, e);
index 0a5492c..999c37e 100644 (file)
@@ -255,16 +255,16 @@ public class DelimitedInputWriter extends AbstractRecordWriter {
   }
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws SerializationError, StreamingIOFailure {
     try {
       byte[] orderedFields = reorderFields(record);
       Object encodedRow = encode(orderedFields);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction ("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id ("
+              + writeId + ")", e);
     }
   }
 
index 6793d09..90731dc 100644 (file)
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -551,7 +552,7 @@ public class HiveEndPoint {
     private final IMetaStoreClient msClient;
     private final IMetaStoreClient heartbeaterMSClient;
     private final RecordWriter recordWriter;
-    private final List<Long> txnIds;
+    private final List<TxnToWriteId> txnToWriteIds;
 
     //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
     private volatile int currentTxnIndex = -1;
@@ -602,14 +603,19 @@ public class HiveEndPoint {
         this.recordWriter = recordWriter;
         this.agentInfo = agentInfo;
 
-        txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
+        assert(txnToWriteIds.size() == numTxns);
+
         txnStatus = new TxnState[numTxns];
         for(int i = 0; i < txnStatus.length; i++) {
+          assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
           txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
         }
-
         this.state = TxnState.INACTIVE;
-        recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
+
+        // The Write Ids returned for the transaction batch is also sequential
+        recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
         success = true;
       } catch (TException e) {
         throw new TransactionBatchUnAvailable(endPt, e);
@@ -632,12 +638,26 @@ public class HiveEndPoint {
         public Object run() throws Exception {
           return msClient.openTxns(user, numTxns).getTxn_ids();
         }
-      }) ;
+      });
+    }
+
+    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+                                                    final List<Long> txnIds, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+      }
+      return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+        }
+      });
     }
 
     @Override
     public String toString() {
-      if (txnIds==null || txnIds.isEmpty()) {
+      if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
         return "{}";
       }
       StringBuilder sb = new StringBuilder(" TxnStatus[");
@@ -646,7 +666,11 @@ public class HiveEndPoint {
         sb.append(state == null ? "N" : state);
       }
       sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
-      return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1)
+      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+              + "/" + txnToWriteIds.get(0).getWriteId()
+              + "..."
+              + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
+              + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
               + "] on endPoint = " + endPt + "; " + sb;
     }
 
@@ -680,7 +704,8 @@ public class HiveEndPoint {
 
     private void beginNextTransactionImpl() throws TransactionError {
       state = TxnState.INACTIVE;//clear state from previous txn
-      if ( currentTxnIndex + 1 >= txnIds.size() ) {
+
+      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
         throw new InvalidTrasactionState("No more transactions available in" +
                 " current batch for end point : " + endPt);
       }
@@ -699,13 +724,25 @@ public class HiveEndPoint {
     }
 
     /**
-     * Get Id of currently open transaction
+     * Get Id of currently open transaction.
      * @return -1 if there is no open TX
      */
     @Override
     public Long getCurrentTxnId() {
-      if(currentTxnIndex >= 0) {
-        return txnIds.get(currentTxnIndex);
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getTxnId();
+      }
+      return -1L;
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentWriteId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getWriteId();
       }
       return -1L;
     }
@@ -727,9 +764,9 @@ public class HiveEndPoint {
     @Override
     public int remainingTransactions() {
       if (currentTxnIndex>=0) {
-        return txnIds.size() - currentTxnIndex -1;
+        return txnToWriteIds.size() - currentTxnIndex -1;
       }
-      return txnIds.size();
+      return txnToWriteIds.size();
     }
 
 
@@ -824,7 +861,7 @@ public class HiveEndPoint {
     private void writeImpl(Collection<byte[]> records)
             throws StreamingException {
       for (byte[] record : records) {
-        recordWriter.write(getCurrentTxnId(), record);
+        recordWriter.write(getCurrentWriteId(), record);
       }
     }
 
@@ -869,7 +906,7 @@ public class HiveEndPoint {
     private void commitImpl() throws TransactionError, StreamingException {
       try {
         recordWriter.flush();
-        msClient.commitTxn(txnIds.get(currentTxnIndex));
+        msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
         state = TxnState.COMMITTED;
         txnStatus[currentTxnIndex] = TxnState.COMMITTED;
       } catch (NoSuchTxnException e) {
@@ -932,8 +969,8 @@ public class HiveEndPoint {
           int minOpenTxnIndex = Math.max(currentTxnIndex +
             (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
           for(currentTxnIndex = minOpenTxnIndex;
-              currentTxnIndex < txnIds.size(); currentTxnIndex++) {
-            msClient.rollbackTxn(txnIds.get(currentTxnIndex));
+              currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+            msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
             txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
           currentTxnIndex--;//since the loop left it == txnId.size()
@@ -960,15 +997,15 @@ public class HiveEndPoint {
       if(isClosed) {
         return;
       }
-      if(state != TxnState.OPEN && currentTxnIndex >= txnIds.size() - 1) {
+      if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
         //here means last txn in the batch is resolved but the close() hasn't been called yet so
         //there is nothing to heartbeat
         return;
       }
       //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
       //points at the last txn which we don't want to heartbeat
-      Long first = txnIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1);
-      Long last = txnIds.get(txnIds.size()-1);
+      Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
+      Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
       try {
         HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
         if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
index cddb8de..a9bcd9f 100644 (file)
@@ -23,21 +23,21 @@ public interface RecordWriter {
 
   /** Writes using a hive RecordUpdater
    *
-   * @param transactionId the ID of the Txn in which the write occurs
+   * @param writeId the write ID of the table mapping to Txn in which the write occurs
    * @param record the record to be written
    */
-  public void write(long transactionId, byte[] record) throws StreamingException;
+  void write(long writeId, byte[] record) throws StreamingException;
 
   /** Flush records from buffer. Invoked by TransactionBatch.commit() */
-  public void flush() throws StreamingException;
+  void flush() throws StreamingException;
 
   /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
-  public void clear() throws StreamingException;
+  void clear() throws StreamingException;
 
   /** Acquire a new RecordUpdater. Invoked when
    * StreamingConnection.fetchTransactionBatch() is called */
-  public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException;
+  void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
 
   /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
-  public void closeBatch() throws StreamingException;
+  void closeBatch() throws StreamingException;
 }
index 357c537..4d92c55 100644 (file)
@@ -117,15 +117,15 @@ public class StrictJsonWriter extends AbstractRecordWriter {
 
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws StreamingIOFailure, SerializationError {
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+              + writeId + ")", e);
     }
 
   }
index 58db252..ae25662 100644 (file)
@@ -124,15 +124,15 @@ public class StrictRegexWriter extends AbstractRecordWriter {
 
 
   @Override
-  public void write(long transactionId, byte[] record)
+  public void write(long writeId, byte[] record)
           throws StreamingIOFailure, SerializationError {
     try {
       Object encodedRow = encode(record);
       int bucket = getBucket(encodedRow);
-      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
     } catch (IOException e) {
-      throw new StreamingIOFailure("Error writing record in transaction("
-              + transactionId + ")", e);
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+              + writeId + ")", e);
     }
   }
 
index e5ad475..1208377 100644 (file)
@@ -46,73 +46,80 @@ public interface TransactionBatch  {
   }
 
   /**
-   * Activate the next available transaction in the current transaction batch
+   * Activate the next available transaction in the current transaction batch.
    * @throws StreamingException if not able to switch to next Txn
    * @throws InterruptedException if call in interrupted
    */
-  public void beginNextTransaction() throws StreamingException, InterruptedException;
+  void beginNextTransaction() throws StreamingException, InterruptedException;
 
   /**
-   * Get Id of currently open transaction
+   * Get Id of currently open transaction.
    * @return transaction id
    */
-  public Long getCurrentTxnId();
+  Long getCurrentTxnId();
+
+
+  /**
+   * Get write Id mapping to currently open transaction.
+   * @return write id
+   */
+  Long getCurrentWriteId();
 
   /**
-   * get state of current transaction
+   * get state of current transaction.
    */
-  public TxnState getCurrentTransactionState();
+  TxnState getCurrentTransactionState();
 
   /**
-   * Commit the currently open transaction
+   * Commit the currently open transaction.
    * @throws StreamingException if there are errors committing
    * @throws InterruptedException if call in interrupted
    */
-  public void commit() throws StreamingException, InterruptedException;
+  void commit() throws StreamingException, InterruptedException;
 
   /**
-   * Abort the currently open transaction
+   * Abort the currently open transaction.
    * @throws StreamingException if there are errors
    * @throws InterruptedException if call in interrupted
    */
-  public void abort() throws StreamingException, InterruptedException;
+  void abort() throws StreamingException, InterruptedException;
 
   /**
    * Remaining transactions are the ones that are not committed or aborted or open.
    * Current open transaction is not considered part of remaining txns.
    * @return number of transactions remaining this batch.
    */
-  public int remainingTransactions();
+  int remainingTransactions();
 
 
   /**
-   *  Write record using RecordWriter
+   *  Write record using RecordWriter.
    * @param record  the data to be written
    * @throws StreamingException if there are errors when writing
    * @throws InterruptedException if call in interrupted
    */
-  public void write(byte[] record) throws StreamingException, InterruptedException;
+  void write(byte[] record) throws StreamingException, InterruptedException;
 
   /**
-   *  Write records using RecordWriter
+   *  Write records using RecordWriter.
    * @throws StreamingException if there are errors when writing
    * @throws InterruptedException if call in interrupted
    */
-  public void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+  void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
 
 
   /**
    * Issues a heartbeat to hive metastore on the current and remaining txn ids
-   * to keep them from expiring
+   * to keep them from expiring.
    * @throws StreamingException if there are errors
    */
-  public void heartbeat() throws StreamingException;
+  void heartbeat() throws StreamingException;
 
   /**
-   * Close the TransactionBatch
+   * Close the TransactionBatch.
    * @throws StreamingException if there are errors closing batch
    * @throws InterruptedException if call in interrupted
    */
-  public void close() throws StreamingException, InterruptedException;
-  public boolean isClosed();
+  void close() throws StreamingException, InterruptedException;
+  boolean isClosed();
 }
index 5b371e3..50ba0c7 100644 (file)
@@ -34,7 +34,7 @@ public class AcidTable implements Serializable {
   private final String tableName;
   private final boolean createPartitions;
   private final TableType tableType;
-  private long transactionId;
+  private long writeId;
 
   private Table table;
 
@@ -48,10 +48,10 @@ public class AcidTable implements Serializable {
   /**
    * Returns {@code 0} until such a time that a {@link Transaction} has been acquired (when
    * {@link MutatorClient#newTransaction()} exits), at which point this will return the
-   * {@link Transaction#getTransactionId() transaction id}.
+   * write id.
    */
-  public long getTransactionId() {
-    return transactionId;
+  public long getWriteId() {
+    return writeId;
   }
 
   public String getDatabaseName() {
@@ -105,8 +105,8 @@ public class AcidTable implements Serializable {
     return table;
   }
 
-  void setTransactionId(long transactionId) {
-    this.transactionId = transactionId;
+  void setWriteId(long writeId) {
+    this.writeId = writeId;
   }
 
   void setTable(Table table) {
@@ -123,7 +123,7 @@ public class AcidTable implements Serializable {
   public String toString() {
     return "AcidTable [databaseName=" + databaseName + ", tableName=" + tableName + ", createPartitions="
         + createPartitions + ", tableType=" + tableType + ", outputFormatName=" + getOutputFormatName()
-        + ", totalBuckets=" + getTotalBuckets() + ", transactionId=" + transactionId + "]";
+        + ", totalBuckets=" + getTotalBuckets() + ", writeId=" + writeId + "]";
   }
 
 }
\ No newline at end of file
index 32db5e3..98f9f40 100644 (file)
@@ -54,10 +54,10 @@ public class AcidTableSerializer {
       data.writeUTF(table.getDatabaseName());
       data.writeUTF(table.getTableName());
       data.writeBoolean(table.createPartitions());
-      if (table.getTransactionId() <= 0) {
-        LOG.warn("Transaction ID <= 0. The recipient is probably expecting a transaction ID.");
+      if (table.getWriteId() <= 0) {
+        LOG.warn("Write ID <= 0. The recipient is probably expecting a table write ID.");
       }
-      data.writeLong(table.getTransactionId());
+      data.writeLong(table.getWriteId());
       data.writeByte(table.getTableType().getId());
 
       Table metaTable = table.getTable();
@@ -91,12 +91,12 @@ public class AcidTableSerializer {
       String databaseName = in.readUTF();
       String tableName = in.readUTF();
       boolean createPartitions = in.readBoolean();
-      long transactionId = in.readLong();
+      long writeId = in.readLong();
       TableType tableType = TableType.valueOf(in.readByte());
       int thriftLength = in.readInt();
 
       table = new AcidTable(databaseName, tableName, createPartitions, tableType);
-      table.setTransactionId(transactionId);
+      table.setWriteId(writeId);
 
       Table metaTable = null;
       if (thriftLength > 0) {
index 645274e..8ba6cf6 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hive.hcatalog.streaming.mutate.client.lock.Lock;
 import org.apache.hive.hcatalog.streaming.mutate.client.lock.LockFailureListener;
 import org.apache.thrift.TException;
@@ -94,8 +95,22 @@ public class MutatorClient implements Closeable {
       throw new TransactionException("Not connected - cannot create transaction.");
     }
     Transaction transaction = new Transaction(metaStoreClient, lockOptions);
+    long txnId = transaction.getTransactionId();
     for (AcidTable table : tables) {
-      table.setTransactionId(transaction.getTransactionId());
+      try {
+        table.setWriteId(metaStoreClient.allocateTableWriteId(txnId,
+                table.getDatabaseName(), table.getTableName()));
+      } catch (TException ex) {
+        try {
+          metaStoreClient.rollbackTxn(txnId);
+        } catch (TException e) {
+          LOG.warn("Allocation of write id failed for table {} and rollback transaction {} failed due to {}",
+                  AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName()), txnId, e.getMessage());
+        }
+        throw new TransactionException("Unable to allocate table write ID for table "
+                + AcidUtils.getFullTableName(table.getDatabaseName(), table.getTableName())
+                + " under txn " + txnId, ex);
+      }
     }
     LOG.debug("Created transaction {}", transaction);
     return transaction;
index ae23153..5e804d7 100644 (file)
@@ -98,11 +98,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -120,11 +120,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -142,11 +142,11 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /**
-   * We expect records grouped by (partitionValues,bucketId) and ordered by (origTxnId,rowId).
+   * We expect records grouped by (partitionValues,bucketId) and ordered by (origWriteId,rowId).
    * 
    * @throws BucketIdException The bucket ID in the {@link RecordIdentifier} of the record does not match that computed
    *           using the values in the record's bucketed columns.
-   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origTxnId, rowId)
+   * @throws RecordSequenceException The record was submitted that was not in the correct ascending (origWriteId, rowId)
    *           sequence.
    * @throws GroupRevisitedException If an event was submitted for a (partition, bucketId) combination that has already
    *           been closed.
@@ -229,9 +229,9 @@ public class MutatorCoordinator implements Closeable, Flushable {
     sequenceValidator.reset();
     if (deleteDeltaIfExists) {
       // TODO: Should this be the concern of the mutator?
-      deleteDeltaIfExists(newPartitionPath, table.getTransactionId(), newBucketId);
+      deleteDeltaIfExists(newPartitionPath, table.getWriteId(), newBucketId);
     }
-    mutator = mutatorFactory.newMutator(outputFormat, table.getTransactionId(), newPartitionPath, newBucketId);
+    mutator = mutatorFactory.newMutator(outputFormat, table.getWriteId(), newPartitionPath, newBucketId);
     bucketId = newBucketId;
     partitionValues = newPartitionValues;
     partitionPath = newPartitionPath;
@@ -282,12 +282,12 @@ public class MutatorCoordinator implements Closeable, Flushable {
   }
 
   /* A delta may be present from a previous failed task attempt. */
-  private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
+  private void deleteDeltaIfExists(Path partitionPath, long writeId, int bucketId) throws IOException {
     Path deltaPath = AcidUtils.createFilename(partitionPath,
         new AcidOutputFormat.Options(configuration)
             .bucket(bucketId)
-            .minimumTransactionId(transactionId)
-            .maximumTransactionId(transactionId));
+            .minimumWriteId(writeId)
+            .maximumWriteId(writeId));
     FileSystem fileSystem = deltaPath.getFileSystem(configuration);
     if (fileSystem.exists(deltaPath)) {
       LOG.info("Deleting existing delta path: {}", deltaPath);
index 22cd9b7..da7558f 100644 (file)
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 
 public interface MutatorFactory {
 
-  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException;
+  Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId)
+          throws IOException;
   
   RecordInspector newRecordInspector();
   
index 05cf8b7..84c477f 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 /** Base {@link Mutator} implementation. Creates a suitable {@link RecordUpdater} and delegates mutation events. */
 public class MutatorImpl implements Mutator {
 
-  private final long transactionId;
+  private final long writeId;
   private final Path partitionPath;
   private final int bucketProperty;
   private final Configuration configuration;
@@ -44,11 +44,11 @@ public class MutatorImpl implements Mutator {
    * @throws IOException
    */
   public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector,
-      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketProperty) throws IOException {
+      AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketProperty) throws IOException {
     this.configuration = configuration;
     this.recordIdColumn = recordIdColumn;
     this.objectInspector = objectInspector;
-    this.transactionId = transactionId;
+    this.writeId = writeId;
     this.partitionPath = partitionPath;
     this.bucketProperty = bucketProperty;
 
@@ -57,17 +57,17 @@ public class MutatorImpl implements Mutator {
 
   @Override
   public void insert(Object record) throws IOException {
-    updater.insert(transactionId, record);
+    updater.insert(writeId, record);
   }
 
   @Override
   public void update(Object record) throws IOException {
-    updater.update(transactionId, record);
+    updater.update(writeId, record);
   }
 
   @Override
   public void delete(Object record) throws IOException {
-    updater.delete(transactionId, record);
+    updater.delete(writeId, record);
   }
 
   /**
@@ -89,7 +89,7 @@ public class MutatorImpl implements Mutator {
 
   @Override
   public String toString() {
-    return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath
+    return "ObjectInspectorMutator [writeId=" + writeId + ", partitionPath=" + partitionPath
         + ", bucketId=" + bucketProperty + "]";
   }
 
@@ -101,8 +101,8 @@ public class MutatorImpl implements Mutator {
         new AcidOutputFormat.Options(configuration)
             .inspector(objectInspector)
             .bucket(bucketId)
-            .minimumTransactionId(transactionId)
-            .maximumTransactionId(transactionId)
+            .minimumWriteId(writeId)
+            .maximumWriteId(writeId)
             .recordIdColumn(recordIdColumn)
             .finalDestination(partitionPath)
             .statementId(-1));
index 5cd8081..320b987 100644 (file)
@@ -29,22 +29,22 @@ class SequenceValidator {
 
   private static final Logger LOG = LoggerFactory.getLogger(SequenceValidator.class);
 
-  private Long lastTxId;
+  private Long lastWriteId;
   private Long lastRowId;
 
   SequenceValidator() {
   }
 
   boolean isInSequence(RecordIdentifier recordIdentifier) {
-    if (lastTxId != null && recordIdentifier.getTransactionId() < lastTxId) {
-      LOG.debug("Non-sequential transaction ID. Expected >{}, recordIdentifier={}", lastTxId, recordIdentifier);
+    if (lastWriteId != null && recordIdentifier.getWriteId() < lastWriteId) {
+      LOG.debug("Non-sequential write ID. Expected >{}, recordIdentifier={}", lastWriteId, recordIdentifier);
       return false;
-    } else if (lastTxId != null && recordIdentifier.getTransactionId() == lastTxId && lastRowId != null
+    } else if (lastWriteId != null && recordIdentifier.getWriteId() == lastWriteId && lastRowId != null
         && recordIdentifier.getRowId() <= lastRowId) {
       LOG.debug("Non-sequential row ID. Expected >{}, recordIdentifier={}", lastRowId, recordIdentifier);
       return false;
     }
-    lastTxId = recordIdentifier.getTransactionId();
+    lastWriteId = recordIdentifier.getWriteId();
     lastRowId = recordIdentifier.getRowId();
     return true;
   }
@@ -53,14 +53,14 @@ class SequenceValidator {
    * Validator must be reset for each new partition and or bucket.
    */
   void reset() {
-    lastTxId = null;
+    lastWriteId = null;
     lastRowId = null;
     LOG.debug("reset");
   }
 
   @Override
   public String toString() {
-    return "SequenceValidator [lastTxId=" + lastTxId + ", lastRowId=" + lastRowId + "]";
+    return "SequenceValidator [lastWriteId=" + lastWriteId + ", lastRowId=" + lastRowId + "]";
   }
 
 }
index da2ca72..b042049 100644 (file)
@@ -45,7 +45,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -374,16 +374,16 @@ public class TestStreaming {
     Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
-    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
-    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
 
     queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
     queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
@@ -398,14 +398,14 @@ public class TestStreaming {
     runWorker(conf);
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
   }
 
   /**
@@ -540,8 +540,8 @@ public class TestStreaming {
   @Deprecated
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -555,11 +555,11 @@ public class TestStreaming {
     long min = Long.MAX_VALUE;
     long max = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta pd : current) {
-      if (pd.getMaxTransaction() > max) {
-        max = pd.getMaxTransaction();
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
       }
-      if (pd.getMinTransaction() < min) {
-        min = pd.getMinTransaction();
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
       }
     }
     Assert.assertEquals(minTxn, min);
@@ -573,7 +573,7 @@ public class TestStreaming {
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
@@ -593,7 +593,7 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
                                 String validationQuery, boolean vectorize, String... records) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
+    ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
@@ -608,11 +608,11 @@ public class TestStreaming {
     long min = Long.MAX_VALUE;
     long max = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta pd : current) {
-      if (pd.getMaxTransaction() > max) {
-        max = pd.getMaxTransaction();
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
       }
-      if (pd.getMinTransaction() < min) {
-        min = pd.getMinTransaction();
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
       }
     }
     Assert.assertEquals(minTxn, min);
@@ -637,8 +637,8 @@ public class TestStreaming {
   }
 
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    ValidTxnList txns = msClient.getValidTxns();
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -877,7 +877,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -889,11 +889,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -945,7 +945,7 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
@@ -957,11 +957,11 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
 
     // data should not be visible
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
       "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1007,7 +1007,7 @@ public class TestStreaming {
     txnBatch.write(rec1.getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -1134,7 +1134,7 @@ public class TestStreaming {
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
             "{2, Welcome to streaming}");
 
     txnBatch.close();
@@ -1153,13 +1153,13 @@ public class TestStreaming {
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
     String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
-    checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
+    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 24,  1, validationQuery, true, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 10,  1, validationQuery, true, "1\tHello streaming",
             "2\tWelcome to streaming");
 
     txnBatch.close();
@@ -1170,14 +1170,14 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, false, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, false, "1\tHello streaming",
             "2\tWelcome to streaming", "3\tHello streaming - once again");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, true, "1\tHello streaming",
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, true, "1\tHello streaming",
             "2\tWelcome to streaming", "3\tHello streaming - once again",
             "4\tWelcome to streaming - once again");
 
@@ -1214,14 +1214,15 @@ public class TestStreaming {
     txnBatch2.commit();
 
     String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
-    checkDataWritten2(partLoc, 24, 33, 1,
+    checkDataWritten2(partLoc, 11, 20, 1,
       validationQuery, true, "3\tHello streaming - once again");
 
     txnBatch1.commit();
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
+            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1234,7 +1235,7 @@ public class TestStreaming {
         Assert.assertTrue("", logicalLength == actualLength);
       }
     }
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.beginNextTransaction();
@@ -1246,7 +1247,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1259,19 +1260,19 @@ public class TestStreaming {
         Assert.assertTrue("", logicalLength <= actualLength);
       }
     }
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.commit();
 
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, false, "1\tHello streaming",
         "2\tWelcome to streaming",
         "3\tHello streaming - once again");
 
     txnBatch2.commit();
 
-    checkDataWritten2(partLoc, 14, 33, 2,
+    checkDataWritten2(partLoc, 1, 20, 2,
       validationQuery, true, "1\tHello streaming",
         "2\tWelcome to streaming",
         "3\tHello streaming - once again",
@@ -2281,8 +2282,8 @@ public class TestStreaming {
       this.delegate = delegate;
     }
     @Override
-    public void write(long transactionId, byte[] record) throws StreamingException {
-      delegate.write(transactionId, record);
+    public void write(long writeId, byte[] record) throws StreamingException {
+      delegate.write(writeId, record);
       produceFault();
     }
     @Override
index e057da7..c05ddcf 100644 (file)
@@ -49,9 +49,9 @@ public class ReflectiveMutatorFactory implements MutatorFactory {
   }
 
   @Override
-  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId)
+  public Mutator newMutator(AcidOutputFormat<?, ?> outputFormat, long writeId, Path partitionPath, int bucketId)
     throws IOException {
-    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, transactionId, partitionPath,
+    return new MutatorImpl(configuration, recordIdColumn, objectInspector, outputFormat, writeId, partitionPath,
         bucketId);
   }
 
index 873cddf..2aa8674 100644 (file)
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -71,7 +71,7 @@ public class StreamingAssert {
   private List<String> partition;
   private IMetaStoreClient metaStoreClient;
   private Directory dir;
-  private ValidTxnList txns;
+  private ValidWriteIdList writeIds;
   private List<AcidUtils.ParsedDelta> currentDeltas;
   private long min;
   private long max;
@@ -83,9 +83,9 @@ public class StreamingAssert {
     this.table = table;
     this.partition = partition;
 
-    txns = metaStoreClient.getValidTxns();
+    writeIds = metaStoreClient.getValidWriteIds(AcidUtils.getFullTableName(table.getDbName(), table.getTableName()));
     partitionLocation = getPartitionLocation();
-    dir = AcidUtils.getAcidState(partitionLocation, conf, txns);
+    dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds);
     assertEquals(0, dir.getObsolete().size());
     assertEquals(0, dir.getOriginalFiles().size());
 
@@ -95,8 +95,8 @@ public class StreamingAssert {
     System.out.println("Files found: ");
     for (AcidUtils.ParsedDelta parsedDelta : currentDeltas) {
       System.out.println(parsedDelta.getPath().toString());
-      max = Math.max(parsedDelta.getMaxTransaction(), max);
-      min = Math.min(parsedDelta.getMinTransaction(), min);
+      max = Math.max(parsedDelta.getMaxWriteId(), max);
+      min = Math.min(parsedDelta.getMinWriteId(), min);
     }
   }
 
@@ -145,7 +145,7 @@ public class StreamingAssert {
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
-    job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
     assertEquals(numSplitsExpected, splits.length);
 
@@ -160,7 +160,7 @@ public class StreamingAssert {
 
       while (recordReader.next(key, value)) {
         RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
-        Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+        Record record = new Record(new RecordIdentifier(recordIdentifier.getWriteId(),
           recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
         System.out.println(record);
         records.add(record);
index 7876e8d..1523a10 100644 (file)
@@ -45,7 +45,7 @@ public class TestAcidTableSerializer {
 
     AcidTable acidTable = new AcidTable("db_1", "table_1", true, TableType.SINK);
     acidTable.setTable(table);
-    acidTable.setTransactionId(42L);
+    acidTable.setWriteId(42L);
 
     String encoded = AcidTableSerializer.encode(acidTable);
     System.out.println(encoded);
@@ -57,7 +57,7 @@ public class TestAcidTableSerializer {
     assertThat(decoded.getOutputFormatName(), is("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"));
     assertThat(decoded.getTotalBuckets(), is(10));
     assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
-    assertThat(decoded.getTransactionId(), is(42L));
+    assertThat(decoded.getWriteId(), is(42L));
     assertThat(decoded.getTableType(), is(TableType.SINK));
     assertThat(decoded.getTable(), is(table));
   }
@@ -75,7 +75,7 @@ public class TestAcidTableSerializer {
     assertThat(decoded.getOutputFormatName(), is(nullValue()));
     assertThat(decoded.getTotalBuckets(), is(0));
     assertThat(decoded.getQualifiedName(), is("DB_1.TABLE_1"));
-    assertThat(decoded.getTransactionId(), is(0L));
+    assertThat(decoded.getWriteId(), is(0L));
     assertThat(decoded.getTableType(), is(TableType.SINK));
     assertThat(decoded.getTable(), is(nullValue()));
   }
index cfe3a96..91b90ed 100644 (file)
@@ -48,6 +48,8 @@ import org.mockito.runners.MockitoJUnitRunner;
 public class TestMutatorClient {
 
   private static final long TRANSACTION_ID = 42L;
+  private static final long WRITE_ID1 = 78L;
+  private static final long WRITE_ID2 = 33L;
   private static final String TABLE_NAME_1 = "TABLE_1";
   private static final String TABLE_NAME_2 = "TABLE_2";
   private static final String DB_NAME = "DB_1";
@@ -89,6 +91,8 @@ public class TestMutatorClient {
     when(mockParameters.get("transactional")).thenReturn(Boolean.TRUE.toString());
 
     when(mockMetaStoreClient.openTxn(USER)).thenReturn(TRANSACTION_ID);
+    when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_1)).thenReturn(WRITE_ID1);
+    when(mockMetaStoreClient.allocateTableWriteId(TRANSACTION_ID, DB_NAME, TABLE_NAME_2)).thenReturn(WRITE_ID2);
 
     client = new MutatorClient(mockMetaStoreClient, mockConfiguration, mockLockFailureListener, USER,
         Collections.singletonList(TABLE_1));
@@ -110,13 +114,13 @@ public class TestMutatorClient {
     assertThat(outTables.get(0).getTableName(), is(TABLE_NAME_1));
     assertThat(outTables.get(0).getTotalBuckets(), is(2));
     assertThat(outTables.get(0).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
-    assertThat(outTables.get(0).getTransactionId(), is(0L));
+    assertThat(outTables.get(0).getWriteId(), is(0L));
     assertThat(outTables.get(0).getTable(), is(mockTable1));
     assertThat(outTables.get(1).getDatabaseName(), is(DB_NAME));
     assertThat(outTables.get(1).getTableName(), is(TABLE_NAME_2));
     assertThat(outTables.get(1).getTotalBuckets(), is(2));
     assertThat(outTables.get(1).getOutputFormatName(), is(OrcOutputFormat.class.getName()));
-    assertThat(outTables.get(1).getTransactionId(), is(0L));
+    assertThat(outTables.get(1).getWriteId(), is(0L));
     assertThat(outTables.get(1).getTable(), is(mockTable2));
   }
 
@@ -179,8 +183,8 @@ public class TestMutatorClient {
 
     assertThat(transaction.getTransactionId(), is(TRANSACTION_ID));
     assertThat(transaction.getState(), is(TxnState.INACTIVE));
-    assertThat(outTables.get(0).getTransactionId(), is(TRANSACTION_ID));
-    assertThat(outTables.get(1).getTransactionId(), is(TRANSACTION_ID));
+    assertThat(outTables.get(0).getWriteId(), is(WRITE_ID1));
+    assertThat(outTables.get(1).getWriteId(), is(WRITE_ID2));
   }
 
   @Test
index d897477..fab56b3 100644 (file)
@@ -49,7 +49,7 @@ public class TestMutatorCoordinator {
   private static final List<String> UNPARTITIONED = Collections.<String> emptyList();
   private static final List<String> PARTITION_B = Arrays.asList("B");
   private static final List<String> PARTITION_A = Arrays.asList("A");
-  private static final long TRANSACTION_ID = 2L;
+  private static final long WRITE_ID = 2L;
   private static final int BUCKET_ID = 0;
   private static final Path PATH_A = new Path("X");
   private static final Path PATH_B = new Path("B");
@@ -84,7 +84,7 @@ public class TestMutatorCoordinator {
   public void createCoordinator() throws Exception {
     when(mockAcidTable.getOutputFormatName()).thenReturn(OrcOutputFormat.class.getName());
     when(mockAcidTable.getTotalBuckets()).thenReturn(1);
-    when(mockAcidTable.getTransactionId()).thenReturn(TRANSACTION_ID);
+    when(mockAcidTable.getWriteId()).thenReturn(WRITE_ID);
     when(mockAcidTable.createPartitions()).thenReturn(true);
     when(mockMutatorFactory.newRecordInspector()).thenReturn(mockRecordInspector);
     when(mockMutatorFactory.newBucketIdResolver(anyInt())).thenReturn(mockBucketIdResolver);
@@ -104,7 +104,7 @@ public class TestMutatorCoordinator {
     coordinator.insert(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).insert(RECORD);
   }
 
@@ -115,7 +115,7 @@ public class TestMutatorCoordinator {
     coordinator.insert(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator, times(3)).insert(RECORD);
   }
 
@@ -129,8 +129,8 @@ public class TestMutatorCoordinator {
 
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID));
     verify(mockMutator, times(2)).insert(RECORD);
   }
 
@@ -143,9 +143,9 @@ public class TestMutatorCoordinator {
     coordinator.update(UNPARTITIONED, RECORD);
     coordinator.delete(UNPARTITIONED, RECORD);
 
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutatorFactory)
-        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
+        .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID + 1));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
@@ -166,11 +166,11 @@ public class TestMutatorCoordinator {
     coordinator.update(PARTITION_B, RECORD); /* PbB1 */
 
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
-    verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B),
         eq(BUCKET_ID));
     verify(mockMutatorFactory)
-        .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B), eq(BUCKET_ID + 1));
+        .newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_B), eq(BUCKET_ID + 1));
     verify(mockMutator, times(2)).update(RECORD);
     verify(mockMutator).delete(RECORD);
     verify(mockMutator).insert(RECORD);
@@ -197,7 +197,7 @@ public class TestMutatorCoordinator {
     coordinator.delete(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
@@ -210,7 +210,7 @@ public class TestMutatorCoordinator {
     coordinator.delete(UNPARTITIONED, RECORD);
 
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
-    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
+    verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(WRITE_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
index 2273e06..d2c89e5 100644 (file)
@@ -49,7 +49,7 @@ public class TestMutatorImpl {
   private static final int RECORD_ID_COLUMN = 2;
   private static final int BUCKET_ID = 0;
   private static final Path PATH = new Path("X");
-  private static final long TRANSACTION_ID = 1L;
+  private static final long WRITE_ID = 1L;
 
   @Mock
   private AcidOutputFormat<?, ?> mockOutputFormat;
@@ -67,7 +67,7 @@ public class TestMutatorImpl {
   @Before
   public void injectMocks() throws IOException {
     when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater);
-    mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, TRANSACTION_ID,
+    mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, WRITE_ID,
         PATH, BUCKET_ID);
   }
 
@@ -79,26 +79,26 @@ public class TestMutatorImpl {
     assertThat(options.getConfiguration(), is((Configuration) configuration));
     assertThat(options.getInspector(), is(mockObjectInspector));
     assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));
-    assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID));
-    assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID));
+    assertThat(options.getMinimumWriteId(), is(WRITE_ID));
+    assertThat(options.getMaximumWriteId(), is(WRITE_ID));
   }
 
   @Test
   public void testInsertDelegates() throws IOException {
     mutator.insert(RECORD);
-    verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD);
+    verify(mockRecordUpdater).insert(WRITE_ID, RECORD);
   }
 
   @Test
   public void testUpdateDelegates() throws IOException {
     mutator.update(RECORD);
-    verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD);
+    verify(mockRecordUpdater).update(WRITE_ID, RECORD);
   }
 
   @Test
   public void testDeleteDelegates() throws IOException {
     mutator.delete(RECORD);
-    verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD);
+    verify(mockRecordUpdater).delete(WRITE_ID, RECORD);
   }
 
   @Test
index 7967a24..53ae2c0 100644 (file)
@@ -244,7 +244,7 @@ public class TestAcidOnTez {
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"}
+      {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000001_0000001_0000/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
     //verify data and layout
@@ -256,7 +256,7 @@ public class TestAcidOnTez {
     FileSystem fs = FileSystem.get(hiveConf);
     FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"};
+    String[] expectedDelDelta = {"delete_delta_0000001_0000001_0000", "delete_delta_0000002_0000002_0000"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta.length; i++) {
         if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
@@ -285,7 +285,7 @@ public class TestAcidOnTez {
     //check we have right delete delta files after minor compaction
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"};
+    String[] expectedDelDelta2 = {"delete_delta_0000001_0000001_0000", "delete_delta_0000002_0000002_0000", "delete_delta_0000001_0000002"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta2.length; i++) {
         if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
@@ -309,7 +309,7 @@ public class TestAcidOnTez {
     for(int i = 0; i < expected2.length; i++) {
       Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
       //everything is now in base/
-      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000"));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000002/bucket_00000"));
     }
   }
   /**
@@ -453,12 +453,12 @@ public class TestAcidOnTez {
     /*
     * Expected result 0th entry is the RecordIdentifier + data.  1st entry file before compact*/
     String expected[][] = {
-      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"},
     };
     Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
     //verify data and layout
@@ -475,10 +475,10 @@ public class TestAcidOnTez {
       LOG.warn(s);
     }
     String[][] expected2 = {
-      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"},
-      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"}
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"},
+      {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "/delta_0000002_0000002_0000/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
     //verify data and layout
@@ -490,7 +490,7 @@ public class TestAcidOnTez {
     FileSystem fs = FileSystem.get(hiveConf);
     FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"};
+    String[] expectedDelDelta = {"delete_delta_0000002_0000002_0000", "delete_delta_0000003_0000003_0000"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta.length; i++) {
         if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
@@ -519,7 +519,7 @@ public class TestAcidOnTez {
     //check we have right delete delta files after minor compaction
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"};
+    String[] expectedDelDelta2 = { "delete_delta_0000002_0000002_0000", "delete_delta_0000003_0000003_0000", "delete_delta_0000001_0000003"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta2.length; i++) {
         if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
@@ -543,7 +543,7 @@ public class TestAcidOnTez {
     for(int i = 0; i < expected2.length; i++) {
       Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
       //everything is now in base/
-      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000"));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000003/bucket_00000"));
     }
   }
   /**
@@ -638,17 +638,17 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
     ├── HIVE_UNION_SUBDIR_1
     │   └── 000000_0
     │       ├── _orc_acid_version
-    │       └── delta_0000019_0000019_0001
+    │       └── delta_0000001_0000001_0001
     │           └── bucket_00000
     ├── HIVE_UNION_SUBDIR_2
     │   └── 000000_0
     │       ├── _orc_acid_version
-    │       └── delta_0000019_0000019_0002
+    │       └── delta_0000001_0000001_0002
     │           └── bucket_00000
     └── HIVE_UNION_SUBDIR_3
         └── 000000_0
             ├── _orc_acid_version
-            └── delta_0000019_0000019_0003
+            └── delta_0000001_0000001_0003
                 └── bucket_00000
 
 10 directories, 6 files     */
@@ -660,11 +660,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
     }
 
     String[][] expected2 = {
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"},
-      {"{\"transactionid\":19,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000019_0000019_0003/bucket_00000"}
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
     for(int i = 0; i < expected2.length; i++) {
@@ -688,11 +688,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
 └── -ext-10000
     ├── 000000_0
     │   ├── _orc_acid_version
-    │   └── delta_0000021_0000021_0000
+    │   └── delta_0000001_0000001_0000
     │       └── bucket_00000
     └── 000001_0
         ├── _orc_acid_version
-        └── delta_0000021_0000021_0000
+        └── delta_0000001_0000001_0000
             └── bucket_00001
 
 5 directories, 4 files
@@ -705,11 +705,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
       LOG.warn(s);
     }
     String[][] expected2 = {
-      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"},
-      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"},
-      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}
+      {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
+      {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"},
+      {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"},
+      {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}
     };
     Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
     for(int i = 0; i < expected2.length; i++) {
index 6dd7305..0410fb0 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -661,17 +661,18 @@ public class TestCompactor {
       Path resultFile = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000003_0000006")) {
+        if (names[i].equals("delta_0000001_0000004")) {
           resultFile = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000003_0000004",
-          "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty,
+              0, 1L, 4L, 1);
 
     } finally {
       connection.close();
@@ -721,11 +722,11 @@ public class TestCompactor {
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
+      Assert.assertEquals(name, "base_0000004");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
     } finally {
       connection.close();
     }
@@ -781,17 +782,17 @@ public class TestCompactor {
       Path resultDelta = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000003_0000006")) {
+        if (names[i].equals("delta_0000001_0000004")) {
           resultDelta = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000003_0000004",
-          "delta_0000003_0000006", "delta_0000005_0000006"};
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
     } finally {
       connection.close();
     }
@@ -847,13 +848,13 @@ public class TestCompactor {
         Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
       }
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      if (!name.equals("base_0000006")) {
-        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
+      if (!name.equals("base_0000004")) {
+        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
       }
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
     } finally {
       connection.close();
     }
@@ -902,11 +903,11 @@ public class TestCompactor {
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
       if (1 != stat.length) {
-        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2);
+      Assert.assertEquals(name, "base_0000004");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 2);
     } finally {
       connection.close();
     }
@@ -960,16 +961,17 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000003_0000005")) {
+      if (deltas[i].equals("delta_0000001_0000003")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"};
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty,
+            0, 1L, 2L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -978,16 +980,17 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) {
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty,
+            0, 2L, 2L, 1);
   }
 
   @Test
@@ -1037,16 +1040,17 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000003_0000004")) {
+      if (deltas[i].equals("delta_0000001_0000002")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"};
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty,
+            0, 1L, 2L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -1055,12 +1059,12 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) {
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
@@ -1114,17 +1118,18 @@ public class TestCompactor {
       Path resultFile = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000003_0000006")) {
+        if (names[i].equals("delta_0000001_0000004")) {
           resultFile = stat[i].getPath();
         }
       }
       Arrays.sort(names);
-      String[] expected = new String[]{"delta_0000003_0000004",
-          "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty,
+              0, 1L, 4L, 1);
 
       // Verify that we have got correct set of delete_deltas also
       FileStatus[] deleteDeltaStat =
@@ -1133,12 +1138,12 @@ public class TestCompactor {
       Path minorCompactedDeleteDelta = null;
       for (int i = 0; i < deleteDeltas.length; i++) {
         deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-        if (deleteDeltas[i].equals("delete_delta_0000003_0000006")) {
+        if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
           minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
         }
       }
       Arrays.sort(deleteDeltas);
-      String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000006"};
+      String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
       if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
         Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
       }
@@ -1330,14 +1335,19 @@ public class TestCompactor {
   private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
       String columnTypesProperty, int bucket, long min, long max, int numBuckets)
       throws IOException {
-    ValidTxnList txnList = new ValidTxnList() {
+    ValidWriteIdList writeIdList = new ValidWriteIdList() {
       @Override
-      public boolean isTxnValid(long txnid) {
+      public String getTableName() {
+        return "AcidTable";
+      }
+
+      @Override
+      public boolean isWriteIdValid(long writeid) {
         return true;
       }
 
       @Override
-      public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+      public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {
         return RangeResponse.ALL;
       }
 
@@ -1352,7 +1362,9 @@ public class TestCompactor {
       }
 
       @Override
-      public Long getMinOpenTxn() { return null; }
+      public Long getMinOpenWriteId() {
+        return null;
+      }
 
       @Override
       public long getHighWatermark() {
@@ -1360,7 +1372,7 @@ public class TestCompactor {
       }
 
       @Override
-      public long[] getInvalidTransactions() {
+      public long[] getInvalidWriteIds() {
         return new long[0];
       }
       @Override
@@ -1369,12 +1381,12 @@ public class TestCompactor {
       }
 
       @Override
-      public boolean isTxnAborted(long txnid) {
+      public boolean isWriteIdAborted(long txnid) {
         return true;
       }
 
       @Override
-      public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+      public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) {
         return RangeResponse.ALL;
       }
     };
@@ -1387,18 +1399,18 @@ public class TestCompactor {
     conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets));
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(conf, true, bucket, txnList, base, deltas);
+        aif.getRawReader(conf, true, bucket, writeIdList, base, deltas);
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;
     boolean seenCurrentTxn = false;
     while (reader.next(identifier, value)) {
       if (!seenCurrentTxn) {
-        Assert.assertEquals(currentTxn, identifier.getTransactionId());
+        Assert.assertEquals(currentTxn, identifier.getWriteId());
         seenCurrentTxn = true;
       }
-      if (currentTxn != identifier.getTransactionId()) {
-        Assert.assertEquals(currentTxn + 1, identifier.getTransactionId());
+      if (currentTxn != identifier.getWriteId()) {
+        Assert.assertEquals(currentTxn + 1, identifier.getWriteId());
         currentTxn++;
       }
     }
index ae9649c..2c2177b 100644 (file)
@@ -1,2 +1 @@
 ALTER TABLE "APP"."PART_COL_STATS" ADD COLUMN "BIT_VECTOR" BLOB;
-ALTER TABLE "APP"."TAB_COL_STATS" ADD COLUMN "BIT_VECTOR" BLOB;
diff --git a/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql b/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql
new file mode 100644 (file)
index 0000000..b0bc5b1
--- /dev/null
@@ -0,0 +1,27 @@
+-- Add new tables/index for per table write id support
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;
+
+-- Modify Compaction related tables to use write id instead of txn id
+RENAME COLUMN COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+RENAME COLUMN COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+
index 85d593f..2033bdc 100644 (file)
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
   TC_PARTITION varchar(767),
-  TC_OPERATION_TYPE char(1) NOT NULL
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  TC_WRITEID bigint
 );
 
 CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
@@ -43,7 +44,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
   CTC_DATABASE varchar(128) NOT NULL,
   CTC_TABLE varchar(256),
   CTC_PARTITION varchar(767),
-  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+  CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+  CTC_WRITEID bigint
 );
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -53,6 +55,23 @@ CREATE TABLE NEXT_TXN_ID (
 );
 INSERT INTO NEXT_TXN_ID VALUES(1);
 
+CREATE TABLE TXN_TO_WRITE_ID (
+  T2W_TXNID bigint NOT NULL,
+  T2W_DATABASE varchar(128) NOT NULL,
+  T2W_TABLE varchar(256) NOT NULL,
+  T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+  NWI_DATABASE varchar(128) NOT NULL,
+  NWI_TABLE varchar(256) NOT NULL,
+  NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
 CREATE TABLE HIVE_LOCKS (
   HL_LOCK_EXT_ID bigint NOT NULL,
   HL_LOCK_INT_ID bigint NOT NULL,
@@ -91,7 +110,7 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_WORKER_ID varchar(128),
   CQ_START bigint,
   CQ_RUN_AS varchar(128),
-  CQ_HIGHEST_TXN_ID bigint,
+  CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO varchar(2048) for bit data,
   CQ_HADOOP_JOB_ID varchar(32)
 );
@@ -113,7 +132,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_START bigint,
   CC_END bigint,
   CC_RUN_AS varchar(128),
-  CC_HIGHEST_TXN_ID bigint,
+  CC_HIGHEST_WRITE_ID bigint,
   CC_META_INFO varchar(2048) for bit data,
   CC_HADOOP_JOB_ID varchar(32)
 );
index 3a11881..55b89e7 100644 (file)
@@ -7,5 +7,6 @@ RUN '045-HIVE-16886.derby.sql';
 RUN '046-HIVE-17566.derby.sql';
 RUN '048-HIVE-14498.derby.sql';
 RUN '049-HIVE-18489.derby.sql';
+RUN '050-HIVE-18192.derby.sql';
 
 UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1;
index d00e639..94999fe 100644 (file)
@@ -44,8 +44,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
@@ -1190,30 +1193,85 @@ public class Driver implements IDriver {
     return fetchTask;
   }
 
-  // Write the current set of valid transactions into the conf file so that it can be read by
-  // the input format.
+  // Write the current set of valid transactions into the conf file
   private void recordValidTxns(HiveTxnManager txnMgr) throws LockException {
-    ValidTxnList oldList = null;
-    String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
-    if(s != null && s.length() > 0) {
-      oldList = new ValidReadTxnList(s);
-    }
-    ValidTxnList txns = txnMgr.getValidTxns();
-    if(oldList != null) {
+    String oldTxnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if ((oldTxnString != null) && (oldTxnString.length() > 0)) {
       throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
-        JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+              JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
     }
-    String txnStr = txns.toString();
+    ValidTxnList txnList = txnMgr.getValidTxns();
+    String txnStr = txnList.toString();
     conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
-    if(plan.getFetchTask() != null) {
+    LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
+  }
+
+  // Write the current set of valid write ids for the operated acid tables into the conf file so
+  // that it can be read by the input format.
+  private void recordValidWriteIds(HiveTxnManager txnMgr) throws LockException {
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if ((txnString == null) || (txnString.isEmpty())) {
+      throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
+              JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+    }
+    ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString);
+    String writeIdStr = txnWriteIds.toString();
+    conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
+    if (plan.getFetchTask() != null) {
       /**
        * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which
        * initializes JobConf in FetchOperator before recordValidTxns() but this has to be done
        * after locks are acquired to avoid race conditions in ACID.
+       * This case is supported only for single source query.
        */
-      plan.getFetchTask().setValidTxnList(txnStr);
+      Operator<?> source = plan.getFetchTask().getWork().getSource();
+      if (source instanceof TableScanOperator) {
+        TableScanOperator tsOp = (TableScanOperator)source;
+        String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(),
+                                                          tsOp.getConf().getTableName());
+        ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName);
+        if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) {
+          throw new IllegalStateException("ACID table: " + fullTableName
+                  + " is missing from the ValidWriteIdList config: " + writeIdStr);
+        }
+        if (writeIdList != null) {
+          plan.getFetchTask().setValidWriteIdList(writeIdList.toString());
+        }
+      }
+    }
+    LOG.debug("Encoding valid txn write ids info " + writeIdStr + " txnid:" + txnMgr.getCurrentTxnId());
+  }
+
+  // Make the list of transactional tables list which are getting read or written by current txn
+  private List<String> getTransactionalTableList(QueryPlan plan) {
+    List<String> tableList = new ArrayList<>();
+
+    for (ReadEntity input : plan.getInputs()) {
+      addTableFromEntity(input, tableList);
+    }
+    return tableList;
+  }
+
+  private void addTableFromEntity(Entity entity, List<String> tableList) {
+    Table tbl;
+    switch (entity.getType()) {
+      case TABLE: {
+        tbl = entity.getTable();
+        break;
+      }
+      case PARTITION:
+      case DUMMYPARTITION: {
+        tbl = entity.getPartition().getTable();
+        break;
+      }
+      default: {
+        return;
+      }
+    }
+    String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
+    if (AcidUtils.isTransactionalTable(tbl) && !tableList.contains(fullTableName)) {
+      tableList.add(fullTableName);
     }
-    LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
   }
 
   private String getUserFromUGI() {
@@ -1256,7 +1314,7 @@ public class Driver implements IDriver {
       if(userFromUGI == null) {
         throw createProcessorResponse(10);
       }
-      // Set the transaction id in all of the acid file sinks
+      // Set the table write id in all of the acid file sinks
       if (haveAcidWrite()) {
         List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks());
         //sorting makes tests easier to write since file names and ROW__IDs depend on statementId
@@ -1264,18 +1322,25 @@ public class Driver implements IDriver {
         acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) ->
           fsd1.getDirName().compareTo(fsd2.getDirName()));
         for (FileSinkDesc desc : acidSinks) {
-          desc.setTransactionId(queryTxnMgr.getCurrentTxnId());
+          TableDesc tableInfo = desc.getTableInfo();
+          long writeId = queryTxnMgr.getTableWriteId(Utilities.getDatabaseName(tableInfo.getTableName()),
+                  Utilities.getTableName(tableInfo.getTableName()));
+          desc.setTableWriteId(writeId);
+
           //it's possible to have > 1 FileSink writing to the same table/partition
           //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
-          desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement());
+          desc.setStatementId(queryTxnMgr.getStmtIdAndIncrement());
         }
       }
       /*It's imperative that {@code acquireLocks()} is called for all commands so that
       HiveTxnManager can transition its state machine correctly*/
       queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
-      if(queryTxnMgr.recordSnapshot(plan)) {
+      if (queryTxnMgr.recordSnapshot(plan)) {
         recordValidTxns(queryTxnMgr);
       }
+      if (plan.hasAcidResourcesInQuery()) {
+        recordValidWriteIds(queryTxnMgr);
+      }
     } catch (Exception e) {
       errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
       SQLState = ErrorMsg.findSQLState(e.getMessage());
@@ -1317,6 +1382,7 @@ public class Driver implements IDriver {
     // If we've opened a transaction we need to commit or rollback rather than explicitly
     // releasing the locks.
     conf.unset(ValidTxnList.VALID_TXNS_KEY);
+    conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
     if(!checkConcurrency()) {
       return;
     }
@@ -1456,8 +1522,6 @@ public class Driver implements IDriver {
   private static final ReentrantLock globalCompileLock = new ReentrantLock();
 
   private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
-    int ret;
-
     Metrics metrics = MetricsFactory.getInstance();
     if (metrics != null) {
       metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);
@@ -1626,7 +1690,6 @@ public class Driver implements IDriver {
         throw cpr;
       }
 
-
       //if needRequireLock is false, the release here will do nothing because there is no lock
       try {
         //since set autocommit starts an implicit txn, close it
index cf19351..df84417 100644 (file)
@@ -260,7 +260,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
           // There's always just one file that we have merged.
           // The union/DP/etc. should already be account for in the path.
           Utilities.writeMmCommitManifest(Lists.newArrayList(outPath),
-              tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false);
+              tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false);
           LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes).");
         }
       }
@@ -322,7 +322,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
     try {
       Path outputDir = conf.getOutputPath();
       FileSystem fs = outputDir.getFileSystem(hconf);
-      Long mmWriteId = conf.getTxnId();
+      Long mmWriteId = conf.getWriteId();
       int stmtId = conf.getStmtId();
       if (!isMmTable) {
         Path backupPath = backupOutputPath(fs, outputDir);
index 16b9107..f99178d 100644 (file)
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -4264,7 +4265,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
   }
 
   private void handleRemoveMm(
-      Path path, ValidTxnList validTxnList, List<Path> result) throws HiveException {
+      Path path, ValidWriteIdList validWriteIdList, List<Path> result) throws HiveException {
     // Note: doesn't take LB into account; that is not presently supported here (throws above).
     try {
       FileSystem fs = path.getFileSystem(conf);
@@ -4274,10 +4275,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           ensureDelete(fs, childPath, "a non-directory file");
           continue;
         }
-        Long writeId = JavaUtils.extractTxnId(childPath);
+        Long writeId = JavaUtils.extractWriteId(childPath);
         if (writeId == null) {
           ensureDelete(fs, childPath, "an unknown directory");
-        } else if (!validTxnList.isTxnValid(writeId)) {
+        } else if (!validWriteIdList.isWriteIdValid(writeId)) {
           // Assume no concurrent active writes - we rely on locks here. We could check and fail.
           ensureDelete(fs, childPath, "an uncommitted directory");
         } else {
@@ -4312,9 +4313,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     try {
       HiveTxnManager txnManager = SessionState.get().getTxnMgr();
       if (txnManager.isTxnOpen()) {
-        mmWriteId = txnManager.getCurrentTxnId();
+        mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
       } else {
-        mmWriteId = txnManager.openTxn(new Context(conf), conf.getUser());
+        txnManager.openTxn(new Context(conf), conf.getUser());
+        mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName());
         txnManager.commitTxn();
       }
     } catch (Exception e) {
index 97e1e36..969c591 100644 (file)
@@ -37,8 +37,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -151,8 +151,9 @@ public class FetchOperator implements Serializable {
     initialize();
   }
 
-  public void setValidTxnList(String txnStr) {
-    job.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
+  public void setValidWriteIdList(String writeIdStr) {
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIdStr);
+    LOG.debug("FetchOperator set writeIdStr: " + writeIdStr);
   }
   private void initialize() throws HiveException {
     if (isStatReader) {
@@ -274,7 +275,7 @@ public class FetchOperator implements Serializable {
       }
       FileSystem fs = currPath.getFileSystem(job);
       if (fs.exists(currPath)) {
-        if (extractValidTxnList() != null &&
+        if (extractValidWriteIdList() != null &&
             AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) {
           return true;
         }
@@ -407,17 +408,17 @@ public class FetchOperator implements Serializable {
     if (inputFormat instanceof HiveInputFormat) {
       return StringUtils.escapeString(currPath.toString()); // No need to process here.
     }
-    ValidTxnList validTxnList;
+    ValidWriteIdList validWriteIdList;
     if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) {
-      validTxnList = extractValidTxnList();
+      validWriteIdList = extractValidWriteIdList();
     } else {
-      validTxnList = null;  // non-MM case
+      validWriteIdList = null;  // non-MM case
     }
-    if (validTxnList != null) {
+    if (validWriteIdList != null) {
       Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + " for MM paths");
     }
 
-    Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validTxnList);
+    Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validWriteIdList);
     if (dirs == null || dirs.length == 0) {
       return null; // No valid inputs. This condition is logged inside the call.
     }
@@ -428,10 +429,11 @@ public class FetchOperator implements Serializable {
     return str.toString();
   }
 
-  private ValidTxnList extractValidTxnList() {
+  private ValidWriteIdList extractValidWriteIdList() {
     if (currDesc.getTableName() == null || !org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) {
-      String txnString = job.get(ValidTxnList.VALID_TXNS_KEY);
-      return txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+      String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+      LOG.debug("FetchOperator get writeIdStr: " + txnString);
+      return txnString == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
     }
     return null;  // not fetching from a table directly but from a temp location
   }
index ada4aba..e555aec 100644 (file)
@@ -56,8 +56,8 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
     super();
   }
 
-  public void setValidTxnList(String txnStr) {
-    fetch.setValidTxnList(txnStr);
+  public void setValidWriteIdList(String writeIdStr) {
+    fetch.setValidWriteIdList(writeIdStr);
   }
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
index 98bb938..ff62863 100644 (file)
@@ -173,7 +173,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     int acidLastBucket = -1;
     int acidFileOffset = -1;
     private boolean isMmTable;
-    private Long txnId;
+    private Long writeId;
     private int stmtId;
     String dpDir;
 
@@ -185,7 +185,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       } else {
         tmpPath = specPath;
         taskOutputTempPath = null; // Should not be used.
-        txnId = conf.getTransactionId();
+        writeId = conf.getTableWriteId();
         stmtId = conf.getStatementId();
       }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -337,7 +337,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
-          String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId);
+          String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), writeId, writeId, stmtId);
           if (unionPath != null) {
             // Create the union directory inside the MM directory.
             subdirPath += Path.SEPARATOR + unionPath;
@@ -961,7 +961,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) {
         rowOutWriters[findWriterOffset(row)].write(recordValue);
       } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
-        fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), row);
+        fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row);
       } else {
         // TODO I suspect we could skip much of the stuff above this in the function in the case
         // of update and delete.  But I don't understand all of the side effects of the above
@@ -1018,9 +1018,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
         }
         if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
-            fpaths.updaters[writerOffset].update(conf.getTransactionId(), row);
+          fpaths.updaters[writerOffset].update(conf.getTableWriteId(), row);
         } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
-            fpaths.updaters[writerOffset].delete(conf.getTransactionId(), row);
+          fpaths.updaters[writerOffset].delete(conf.getTableWriteId(), row);
         } else {
           throw new HiveException("Unknown write type " + conf.getWriteType().toString());
         }
@@ -1321,8 +1321,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
       }
       if (conf.isMmTable()) {
-        Utilities.writeMmCommitManifest(
-            commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite());
+        Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId,
+                conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite());
       }
       // Only publish stats if this operator's flag was set to gather stats
       if (conf.isGatherStats()) {
@@ -1380,7 +1380,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           MissingBucketsContext mbc = new MissingBucketsContext(
               conf.getTableInfo(), numBuckets, conf.getCompressed());
           Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
-              dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter,
+              dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter,
               conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite());
         }
       }
index e092795..b3c62ad 100644 (file)
@@ -34,7 +34,7 @@ public class ImportCommitTask extends Task<ImportCommitWork> {
   @Override
   public int execute(DriverContext driverContext) {
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-      Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getTxnId());
+      Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getWriteId());
     }
 
     try {
index 649b8e6..a119250 100644 (file)
@@ -26,18 +26,18 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
 public class ImportCommitWork implements Serializable {
   private static final long serialVersionUID = 1L;
   private String dbName, tblName;
-  private long txnId;
+  private long writeId;
   private int stmtId;
 
-  public ImportCommitWork(String dbName, String tblName, long txnId, int stmtId) {
-    this.txnId = txnId;
+  public ImportCommitWork(String dbName, String tblName, long writeId, int stmtId) {
+    this.writeId = writeId;
     this.stmtId = stmtId;
     this.dbName = dbName;
     this.tblName = tblName;
   }
 
-  public long getTxnId() {
-    return txnId;
+  public long getWriteId() {
+    return writeId;
   }
 
   public int getStmtId() {
index 40eb659..b490325 100644 (file)
@@ -369,7 +369,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           }
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
-              tbd.getTxnId(), tbd.getStmtId());
+              tbd.getWriteId(), tbd.getStmtId());
           if (work.getOutputs() != null) {
             DDLTask.addIfAbsentByName(new WriteEntity(table,
               getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
@@ -469,7 +469,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
         work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
             !tbd.isMmTable(),
-        hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId());
+        hasFollowingStatsTask(), tbd.getWriteId(), tbd.getStmtId());
     Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 
     // See the comment inside updatePartitionBucketSortColumns.
@@ -512,7 +512,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(),
         work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
             !tbd.isMmTable(),
-        work.getLoadTableWork().getTxnId(),
+        work.getLoadTableWork().getWriteId(),
         tbd.getStmtId(),
         hasFollowingStatsTask(),
         work.getLoadTableWork().getWriteType(),
index 4732da4..bfdb7d2 100644 (file)
@@ -211,6 +211,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp
 
       AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().isTranscationalTable(),
           ts.getConf().getAcidOperationalProperties());
+      AcidUtils.setValidWriteIdList(jobClone, ts.getConf());
 
       ts.passExecContext(getExecContext());
 
index 8248442..fd84231 100644 (file)
@@ -100,7 +100,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -4393,7 +4393,7 @@ public final class Utilities {
    * if the entire directory is valid (has no uncommitted/temporary files).
    */
   public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf,
-      ValidTxnList validTxnList, int lbLevels) throws IOException {
+      ValidWriteIdList validWriteIdList, int lbLevels) throws IOException {
     Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", path);
     // NULL means this directory is entirely valid.
     List<Path> result = null;
@@ -4403,8 +4403,8 @@ public final class Utilities {
     for (int i = 0; i < children.length; ++i) {
       FileStatus file = children[i];
       Path childPath = file.getPath();
-      Long txnId = JavaUtils.extractTxnId(childPath);
-      if (!file.isDirectory() || txnId == null || !validTxnList.isTxnValid(txnId)) {
+      Long writeId = JavaUtils.extractWriteId(childPath);
+      if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) {
         Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath);
         if (result == null) {
           result = new ArrayList<>(children.length - 1);
index 30bf534..4bc7568 100644 (file)
@@ -487,6 +487,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
 
       AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().isTranscationalTable(),
           ts.getConf().getAcidOperationalProperties());
+      AcidUtils.setValidWriteIdList(jobClone, ts.getConf());
 
       // create a fetch operator
       FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);
index 65eb434..1ed35b3 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -109,8 +109,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     extends InputFormat<KEY, VALUE>, InputFormatChecker {
 
   static final class DeltaMetaData implements Writable {
-    private long minTxnId;
-    private long maxTxnId;
+    private long minWriteId;
+    private long maxWriteId;
     private List<Integer> stmtIds;
     //would be useful to have enum for Type: insert/delete/load data
 
@@ -120,27 +120,27 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     /**
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      */
-    DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) {
-      this.minTxnId = minTxnId;
-      this.maxTxnId = maxTxnId;
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds) {
+      this.minWriteId = minWriteId;
+      this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
     }
-    long getMinTxnId() {
-      return minTxnId;
+    long getMinWriteId() {
+      return minWriteId;
     }
-    long getMaxTxnId() {
-      return maxTxnId;
+    long getMaxWriteId() {
+      return maxWriteId;
     }
     List<Integer> getStmtIds() {
       return stmtIds;
     }
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeLong(minTxnId);
-      out.writeLong(maxTxnId);
+      out.writeLong(minWriteId);
+      out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
       for(Integer id : stmtIds) {
         out.writeInt(id);
@@ -148,8 +148,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     }
     @Override
     public void readFields(DataInput in) throws IOException {
-      minTxnId = in.readLong();
-      maxTxnId = in.readLong();
+      minWriteId = in.readLong();
+      maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
       for(int i = 0; i < numStatements; i++) {
@@ -159,7 +159,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     @Override
     public String toString() {
       //? is Type - when implemented
-      return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")";
+      return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + ")";
     }
   }
   /**
@@ -227,7 +227,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
    * @param collapseEvents should the ACID events be collapsed so that only
    *                       the last version of the row is kept.
    * @param bucket the bucket to read
-   * @param validTxnList the list of valid transactions to use
+   * @param validWriteIdList the list of valid write ids to use
    * @param baseDirectory the base directory to read or the root directory for
    *                      old style files
    * @param deltaDirectory a list of delta files to include in the merge
@@ -237,7 +237,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
    RawReader<VALUE> getRawReader(Configuration conf,
                              boolean collapseEvents,
                              int bucket,
-                             ValidTxnList validTxnList,
+                             ValidWriteIdList validWriteIdList,
                              Path baseDirectory,
                              Path[] deltaDirectory
                              ) throws IOException;
index 26d4dc6..05beafe 100644 (file)
@@ -49,8 +49,8 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private boolean isCompressed = false;
     private Properties properties;
     private Reporter reporter;
-    private long minimumTransactionId;
-    private long maximumTransactionId;
+    private long minimumWriteId;
+    private long maximumWriteId;
     private int bucketId;
     /**
      * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)}
@@ -156,22 +156,22 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     }
 
     /**
-     * The minimum transaction id that is included in this file.
-     * @param min minimum transaction id
+     * The minimum write id that is included in this file.
+     * @param min minimum write id
      * @return this
      */
-    public Options minimumTransactionId(long min) {
-      this.minimumTransactionId = min;
+    public Options minimumWriteId(long min) {
+      this.minimumWriteId = min;
       return this;
     }
 
     /**
-     * The maximum transaction id that is included in this file.
-     * @param max maximum transaction id
+     * The maximum write id that is included in this file.
+     * @param max maximum write id
      * @return this
      */
-    public Options maximumTransactionId(long max) {
-      this.maximumTransactionId = max;
+    public Options maximumWriteId(long max) {
+      this.maximumWriteId = max;
       return this;
     }
 
@@ -236,7 +236,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
      */
     public Options statementId(int id) {
       if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) {
-        throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId);
+        throw new RuntimeException("Too many statements for writeId: " + maximumWriteId);
       }
       if(id < -1) {
         throw new IllegalArgumentException("Illegal statementId value: " + id);
@@ -277,12 +277,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return reporter;
     }
 
-    public long getMinimumTransactionId() {
-      return minimumTransactionId;
+    public long getMinimumWriteId() {
+      return minimumWriteId;
     }
 
-    public long getMaximumTransactionId() {
-      return maximumTransactionId;
+    public long getMaximumWriteId() {
+      return maximumWriteId;
     }
 
     public boolean isWritingBase() {
index 553e8bc..70fcd2c 100644 (file)
@@ -36,7 +36,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -224,8 +226,8 @@ public class AcidUtils {
     return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
   }
 
-  public static String baseDir(long txnId) {
-    return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
+  public static String baseDir(long writeId) {
+    return BASE_PREFIX + String.format(DELTA_DIGITS, writeId);
   }
 
   /**
@@ -254,31 +256,31 @@ public class AcidUtils {
           options.getBucketId()) + "_0");
     } else if (options.isWritingBase()) {
       subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
-          options.getMaximumTransactionId());
+          options.getMaximumWriteId());
     } else if(options.getStatementId() == -1) {
       //when minor compaction runs, we collapse per statement delta files inside a single
       //transaction so we no longer need a statementId in the file name
       subdir = options.isWritingDeleteDelta() ?
-          deleteDeltaSubdir(options.getMinimumTransactionId(),
-                            options.getMaximumTransactionId())
-          : deltaSubdir(options.getMinimumTransactionId(),
-                        options.getMaximumTransactionId());
+          deleteDeltaSubdir(options.getMinimumWriteId(),
+                            options.getMaximumWriteId())
+          : deltaSubdir(options.getMinimumWriteId(),
+                        options.getMaximumWriteId());
     } else {
       subdir = options.isWritingDeleteDelta() ?
-          deleteDeltaSubdir(options.getMinimumTransactionId(),
-                            options.getMaximumTransactionId(),
+          deleteDeltaSubdir(options.getMinimumWriteId(),
+                            options.getMaximumWriteId(),
                             options.getStatementId())
-          : deltaSubdir(options.getMinimumTransactionId(),
-                        options.getMaximumTransactionId(),
+          : deltaSubdir(options.getMinimumWriteId(),
+                        options.getMaximumWriteId(),
                         options.getStatementId());
     }
     return createBucketFile(new Path(directory, subdir), options.getBucketId());
   }
 
   /**
-   * Get the transaction id from a base directory name.
+   * Get the write id from a base directory name.
    * @param path the base directory name
-   * @return the maximum transaction id that is included
+   * @return the maximum write id that is included
    */
   public static long parseBase(Path path) {
     String filename = path.getName();
@@ -306,8 +308,8 @@ public class AcidUtils {
           Integer.parseInt(filename.substring(0, filename.indexOf('_')));
       result
           .setOldStyle(true)
-          .minimumTransactionId(0)
-          .maximumTransactionId(0)
+          .minimumWriteId(0)
+          .maximumWriteId(0)
           .bucket(bucket)
           .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
@@ -318,8 +320,8 @@ public class AcidUtils {
       int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1));
       result
         .setOldStyle(true)
-        .minimumTransactionId(0)
-        .maximumTransactionId(0)
+        .minimumWriteId(0)
+        .maximumWriteId(0)
         .bucket(bucket)
         .copyNumber(copyNumber)
         .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
@@ -330,8 +332,8 @@ public class AcidUtils {
       if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
         result
             .setOldStyle(false)
-            .minimumTransactionId(0)
-            .maximumTransactionId(parseBase(bucketFile.getParent()))
+            .minimumWriteId(0)
+            .maximumWriteId(parseBase(bucketFile.getParent()))
             .bucket(bucket)
             .writingBase(true);
       } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
@@ -339,21 +341,21 @@ public class AcidUtils {
           bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
-            .minimumTransactionId(parsedDelta.minTransaction)
-            .maximumTransactionId(parsedDelta.maxTransaction)
+            .minimumWriteId(parsedDelta.minWriteId)
+            .maximumWriteId(parsedDelta.maxWriteId)
             .bucket(bucket);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
         ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
           bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
-            .minimumTransactionId(parsedDelta.minTransaction)
-            .maximumTransactionId(parsedDelta.maxTransaction)
+            .minimumWriteId(parsedDelta.minWriteId)
+            .maximumWriteId(parsedDelta.maxWriteId)
             .bucket(bucket);
       }
     } else {
-      result.setOldStyle(true).bucket(-1).minimumTransactionId(0)
-          .maximumTransactionId(0);
+      result.setOldStyle(true).bucket(-1).minimumWriteId(0)
+          .maximumWriteId(0);
     }
     return result;
   }
@@ -637,8 +639,8 @@ public class AcidUtils {
    * Immutable
    */
   public static final class ParsedDelta implements Comparable<ParsedDelta> {
-    private final long minTransaction;
-    private final long maxTransaction;
+    private final long minWriteId;
+    private final long maxWriteId;
     private final FileStatus path;
     //-1 is for internal (getAcidState()) purposes and means the delta dir
     //had no statement ID
@@ -655,8 +657,8 @@ public class AcidUtils {
     }
     private ParsedDelta(long min, long max, FileStatus path, int statementId,
         boolean isDeleteDelta, boolean isRawFormat) {
-      this.minTransaction = min;
-      this.maxTransaction = max;
+      this.minWriteId = min;
+      this.maxWriteId = max;
       this.path = path;
       this.statementId = statementId;
       this.isDeleteDelta = isDeleteDelta;
@@ -664,12 +666,12 @@ public class AcidUtils {
       assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
     }
 
-    public long getMinTransaction() {
-      return minTransaction;
+    public long getMinWriteId() {
+      return minWriteId;
     }
 
-    public long getMaxTransaction() {
-      return maxTransaction;
+    public long getMaxWriteId() {
+      return maxWriteId;
     }
 
     public Path getPath() {
@@ -698,14 +700,14 @@ public class AcidUtils {
      */
     @Override
     public int compareTo(ParsedDelta parsedDelta) {
-      if (minTransaction != parsedDelta.minTransaction) {
-        if (minTransaction < parsedDelta.minTransaction) {
+      if (minWriteId != parsedDelta.minWriteId) {
+        if (minWriteId < parsedDelta.minWriteId) {
           return -1;
         } else {
           return 1;
         }
-      } else if (maxTransaction != parsedDelta.maxTransaction) {
-        if (maxTransaction < parsedDelta.maxTransaction) {
+      } else if (maxWriteId != parsedDelta.maxWriteId) {
+        if (maxWriteId < parsedDelta.maxWriteId) {
           return 1;
         } else {
           return -1;
@@ -753,14 +755,17 @@ public class AcidUtils {
   public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) {
     List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
     AcidInputFormat.DeltaMetaData last = null;
-    for(ParsedDelta parsedDelta : deltas) {
-      if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) {
+    for (ParsedDelta parsedDelta : deltas) {
+      if ((last != null)
+              && (last.getMinWriteId() == parsedDelta.getMinWriteId())
+              && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) {
         last.getStmtIds().add(parsedDelta.getStatementId());
         continue;
       }
-      last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>());
+      last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(),
+              parsedDelta.getMaxWriteId(), new ArrayList<Integer>());
       result.add(last);
-      if(parsedDelta.statementId >= 0) {
+      if (parsedDelta.statementId >= 0) {
         last.getStmtIds().add(parsedDelta.getStatementId());
       }
     }
@@ -780,11 +785,11 @@ public class AcidUtils {
     List<Path> results = new ArrayList<Path>(deleteDeltas.size());
     for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
       if(dmd.getStmtIds().isEmpty()) {
-        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId())));
         continue;
       }
       for(Integer stmtId : dmd.getStmtIds()) {
-        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtId)));
       }
     }
     return results.toArray(new Path[results.size()]);
@@ -802,8 +807,8 @@ public class AcidUtils {
     throws IOException {
     ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
-    return new ParsedDelta(p.getMinTransaction(),
-        p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat());
+    return new ParsedDelta(p.getMinWriteId(),
+        p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat());
   }
 
   public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
@@ -856,16 +861,16 @@ public class AcidUtils {
   @VisibleForTesting
   public static Directory getAcidState(Path directory,
       Configuration conf,
-      ValidTxnList txnList
+      ValidWriteIdList writeIdList
       ) throws IOException {
-    return getAcidState(directory, conf, txnList, false, false);
+    return getAcidState(directory, conf, writeIdList, false, false);
   }
 
   /** State class for getChildState; cannot modify 2 things in a method. */
   private static class TxnBase {
     private FileStatus status;
-    private long txn = 0;
-    private long oldestBaseTxnId = Long.MAX_VALUE;
+    private long writeId = 0;
+    private long oldestBaseWriteId = Long.MAX_VALUE;
     private Path oldestBase = null;
   }
 
@@ -876,22 +881,22 @@ public class AcidUtils {
    * transaction id that we must exclude.
    * @param directory the partition directory to analyze
    * @param conf the configuration
-   * @param txnList the list of transactions that we are reading
+   * @param writeIdList the list of write ids that we are reading
    * @return the state of the directory
    * @throws IOException
    */
   public static Directory getAcidState(Path directory,
                                        Configuration conf,
-                                       ValidTxnList txnList,
+                                       ValidWriteIdList writeIdList,
                                        boolean useFileIds,
                                        boolean ignoreEmptyFiles
                                        ) throws IOException {
-    return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null);
+    return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
   }
 
   public static Directory getAcidState(Path directory,
                                        Configuration conf,
-                                       ValidTxnList txnList,
+                                       ValidWriteIdList writeIdList,
                                        Ref<Boolean> useFileIds,
                                        boolean ignoreEmptyFiles,
                                        Map<String, String> tblproperties) throws IOException {
@@ -921,13 +926,13 @@ public class AcidUtils {
     final List<HdfsFileStatusWithId> original = new ArrayList<>();
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
-        getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
+        getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original,
             obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
       for (FileStatus child : children) {
-        getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
+        getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete,
             bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     }
@@ -955,30 +960,30 @@ public class AcidUtils {
     Collections.sort(working);
     //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
     //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
-    //subject to list of 'exceptions' in 'txnList' (not show in above example).
-    long current = bestBase.txn;
+    //subject to list of 'exceptions' in 'writeIdList' (not show in above example).
+    long current = bestBase.writeId;
     int lastStmtId = -1;
     ParsedDelta prev = null;
     for(ParsedDelta next: working) {
-      if (next.maxTransaction > current) {
+      if (next.maxWriteId > current) {
         // are any of the new transactions ones that we care about?
-        if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
-          ValidTxnList.RangeResponse.NONE) {
+        if (writeIdList.isWriteIdRangeValid(current+1, next.maxWriteId) !=
+                ValidWriteIdList.RangeResponse.NONE) {
           deltas.add(next);
-          current = next.maxTransaction;
+          current = next.maxWriteId;
           lastStmtId = next.statementId;
           prev = next;
         }
       }
-      else if(next.maxTransaction == current && lastStmtId >= 0) {
+      else if(next.maxWriteId == current && lastStmtId >= 0) {
         //make sure to get all deltas within a single transaction;  multi-statement txn
         //generate multiple delta files with the same txnId range
-        //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
+        //of course, if maxWriteId has already been minor compacted, all per statement deltas are obsolete
         deltas.add(next);
         prev = next;
       }
-      else if (prev != null && next.maxTransaction == prev.maxTransaction
-                  && next.minTransaction == prev.minTransaction
+      else if (prev != null && next.maxWriteId == prev.maxWriteId
+                  && next.minWriteId == prev.minWriteId
                   && next.statementId == prev.statementId) {
         // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
         // the path. This may happen when we have split update and we have two types of delta
@@ -1002,15 +1007,15 @@ public class AcidUtils {
     if(bestBase.oldestBase != null && bestBase.status == null) {
       /**
        * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
-       * {@link txnList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
+       * {@link writeIdList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
        * cannot have any data for an open txn.  We could check {@link deltas} has files to cover
        * [1,n] w/o gaps but this would almost never happen...*/
-      long[] exceptions = txnList.getInvalidTransactions();
-      String minOpenTxn = exceptions != null && exceptions.length > 0 ?
+      long[] exceptions = writeIdList.getInvalidWriteIds();
+      String minOpenWriteId = exceptions != null && exceptions.length > 0 ?
         Long.toString(exceptions[0]) : "x";
       throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
-        Long.toString(txnList.getHighWatermark()),
-        minOpenTxn, bestBase.oldestBase.toString()));
+        Long.toString(writeIdList.getHighWatermark()),
+              minOpenWriteId, bestBase.oldestBase.toString()));
     }
 
     final Path base = bestBase.status == null ? null : bestBase.status.getPath();
@@ -1071,43 +1076,44 @@ public class AcidUtils {
    * causes anything written previously is ignored (hence the overwrite).  In this case, base_x
    * is visible if txnid:x is committed for current reader.
    */
-  private static boolean isValidBase(long baseTxnId, ValidTxnList txnList, Path baseDir,
-      FileSystem fs) throws IOException {
-    if(baseTxnId == Long.MIN_VALUE) {
+  private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir,
+            FileSystem fs) throws IOException {
+    if(baseWriteId == Long.MIN_VALUE) {
       //such base is created by 1st compaction in case of non-acid to acid table conversion
       //By definition there are no open txns with id < 1.
       return true;
     }
     if(!MetaDataFile.isCompacted(baseDir, fs)) {
       //this is the IOW case
-      return txnList.isTxnValid(baseTxnId);
+      return writeIdList.isWriteIdValid(baseWriteId);
     }
-    return txnList.isValidBase(baseTxnId);
+    return writeIdList.isValidBase(baseWriteId);
   }
+
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
-      ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
+      ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
       boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
       FileSystem fs) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
-      long txn = parseBase(p);
-      if(bestBase.oldestBaseTxnId > txn) {
+      long writeId = parseBase(p);
+      if(bestBase.oldestBaseWriteId > writeId) {
         //keep track for error reporting
         bestBase.oldestBase = p;
-        bestBase.oldestBaseTxnId = txn;
+        bestBase.oldestBaseWriteId = writeId;
       }
       if (bestBase.status == null) {
-        if(isValidBase(txn, txnList, p, fs)) {
+        if(isValidBase(writeId, writeIdList, p, fs)) {
           bestBase.status = child;
-          bestBase.txn = txn;
+          bestBase.writeId = writeId;
         }
-      } else if (bestBase.txn < txn) {
-        if(isValidBase(txn, txnList, p, fs)) {
+      } else if (bestBase.writeId < writeId) {
+        if(isValidBase(writeId, writeIdList, p, fs)) {
           obsolete.add(bestBase.status);
           bestBase.status = child;
-          bestBase.txn = txn;
+          bestBase.writeId = writeId;
         }
       } else {
         obsolete.add(child);
@@ -1118,12 +1124,12 @@ public class AcidUtils {
               (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
       ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
       if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) &&
-          ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
+        ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
         aborted.add(child);
       }
-      if (txnList.isTxnRangeValid(delta.minTransaction,
-          delta.maxTransaction) !=
-          ValidTxnList.RangeResponse.NONE) {
+      if (writeIdList.isWriteIdRangeValid(delta.minWriteId,
+          delta.maxWriteId) !=
+              ValidWriteIdList.RangeResponse.NONE) {
         working.add(delta);
       }
     } else if (child.isDir()) {
@@ -1391,7 +1397,7 @@ public class AcidUtils {
    * Returns the logical end of file for an acid data file.
    *
    * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
-   * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all.
+   * by {@link #getAcidState(Path, Configuration, ValidWriteIdList)} and so won't be read at all.
    * @param file - data file to read/compute splits on
    */
   public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
@@ -1490,6 +1496,54 @@ public class AcidUtils {
   }
 
   /**
+   * Extract the ValidWriteIdList for the given table from the list of tables' ValidWriteIdList.
+   */
+  public static ValidWriteIdList getTableValidWriteIdList(Configuration conf, String fullTableName) {
+    String txnString = conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
+    ValidTxnWriteIdList validTxnList = new ValidTxnWriteIdList(txnString);
+    return validTxnList.getTableValidWriteIdList(fullTableName);
+  }
+
+  /**
+   * Set the valid write id list for the current table scan.
+   */
+  public static void setValidWriteIdList(Configuration conf, ValidWriteIdList validWriteIds) {
+    conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, validWriteIds.toString());
+    LOG.debug("Setting ValidWriteIdList: " + validWriteIds.toString()
+            + " isAcidTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, false)
+            + " acidProperty: " + getAcidOperationalProperties(conf));
+  }
+
+  /**
+   * Set the valid write id list for the current table scan.
+   */
+  public static void setValidWriteIdList(Configuration conf, TableScanDesc tsDesc) {
+    if (tsDesc.isTranscationalTable()) {
+      String dbName = tsDesc.getDatabaseName();
+      String tableName = tsDesc.getTableName();
+      ValidWriteIdList validWriteIdList = getTableValidWriteIdList(conf,
+                                                    AcidUtils.getFullTableName(dbName, tableName));
+      if (validWriteIdList != null) {
+        setValidWriteIdList(conf, validWriteIdList);
+      } else {
+        // Log error if the acid table is missing from the ValidWriteIdList conf
+        LOG.error("setValidWriteIdList on table: " + AcidUtils.getFullTableName(dbName, tableName)
+                + " isAcidTable: " + true
+                + " acidProperty: " + getAcidOperationalProperties(conf)
+                + " couldn't find the ValidWriteId list from ValidTxnWriteIdList: "
+                + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+        throw new IllegalStateException("ACID table: " + AcidUtils.getFullTableName(dbName, tableName)
+                + " is missing from the ValidWriteIdList config: "
+                + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+      }
+    }
+  }
+
+  public static String getFullTableName(String dbName, String tableName) {
+    return dbName.toLowerCase() + "." + tableName.toLowerCase();
+  }
+
+  /**
    * General facility to place a metadta file into a dir created by acid/compactor write.
    *
    * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
index f0d4988..71498a1 100644 (file)
@@ -341,8 +341,8 @@ public final class HiveFileFormatUtils {
         .tableProperties(tableProp)
         .reporter(reporter)
         .writingBase(conf.getInsertOverwrite())
-        .minimumTransactionId(conf.getTransactionId())
-        .maximumTransactionId(conf.getTransactionId())
+        .minimumWriteId(conf.getTableWriteId())
+        .maximumWriteId(conf.getTableWriteId())
         .bucket(bucket)
         .inspector(inspector)
         .recordIdColumn(rowIdColNum)
index 912eb10..7987c4e 100755 (executable)
@@ -36,8 +36,8 @@ import java.util.Map.Entry;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
@@ -478,12 +478,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
       TableDesc table, List<InputSplit> result)
           throws IOException {
-    ValidTxnList validTxnList;
+    ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName());
+    ValidWriteIdList validMmWriteIdList;
     if (AcidUtils.isInsertOnlyTable(table.getProperties())) {
-      String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-      validTxnList = txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+      if (validWriteIdList == null) {
+        throw new IOException("Insert-Only table: " + table.getTableName()
+                + " is missing from the ValidWriteIdList config: "
+                + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+      }
+      validMmWriteIdList = validWriteIdList;
     } else {
-      validTxnList = null;  // for non-MM case
+      validMmWriteIdList = null;  // for non-MM case
     }
 
     try {
@@ -491,6 +496,15 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       if (tableScan != null) {
         AcidUtils.setAcidOperationalProperties(conf, tableScan.getConf().isTranscationalTable(),
             tableScan.getConf().getAcidOperationalProperties());
+
+        if (tableScan.getConf().isTranscationalTable() && (validWriteIdList == null)) {
+          throw new IOException("Acid table: " + table.getTableName()
+                  + " is missing from the ValidWriteIdList config: "
+                  + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+        }
+        if (validWriteIdList != null) {
+          AcidUtils.setValidWriteIdList(conf, validWriteIdList);
+        }
       }
     } catch (HiveException e) {
       throw new IOException(e);
@@ -500,7 +514,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       pushFilters(conf, tableScan, this.mrwork);
     }
 
-    Path[] finalDirs = processPathsForMmRead(dirs, conf, validTxnList);
+    Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList);
     if (finalDirs == null) {
       return; // No valid inputs.
     }
@@ -531,13 +545,13 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
-      ValidTxnList validTxnList) throws IOException {
-    if (validTxnList == null) {
+      ValidWriteIdList validWriteIdList) throws IOException {
+    if (validWriteIdList == null) {
       return dirs.toArray(new Path[dirs.size()]);
     } else {
       List<Path> finalPaths = new ArrayList<>(dirs.size());
       for (Path dir : dirs) {
-        processForWriteIds(dir, conf, validTxnList, finalPaths);
+        processForWriteIds(dir, conf, validWriteIdList, finalPaths);
       }
       if (finalPaths.isEmpty()) {
         LOG.warn("No valid inputs found in " + dirs);
@@ -548,7 +562,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   private static void processForWriteIds(Path dir, JobConf conf,
-      ValidTxnList validTxnList, List<Path> finalPaths) throws IOException {
+      ValidWriteIdList validWriteIdList, List<Path> finalPaths) throws IOException {
     FileSystem fs = dir.getFileSystem(conf);
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs");
@@ -574,10 +588,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         }
         if (!file.isDirectory()) {
           Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path);
-        } else if (JavaUtils.extractTxnId(path) == null) {
+        } else if (JavaUtils.extractWriteId(path) == null) {
           subdirs.add(path);
         } else if (!hadAcidState) {
-          AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null);
+          AcidUtils.Directory dirInfo
+                  = AcidUtils.getAcidState(currDir, conf, validWriteIdList, Ref.from(false), true, null);
           hadAcidState = true;
 
           // Find the base, created for IOW.
@@ -890,6 +905,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
         AcidUtils.setAcidOperationalProperties(job, ts.getConf().isTranscationalTable(),
             ts.getConf().getAcidOperationalProperties());
+        AcidUtils.setValidWriteIdList(job, ts.getConf());
       }
     }
   }
index 0c37203..1f673da 100644 (file)
@@ -88,7 +88,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
         Arrays.fill(struct, null);
         return;
       }
-      struct[Field.transactionId.ordinal()] = ri.getTransactionId();
+      struct[Field.transactionId.ordinal()] = ri.getWriteId();
       struct[Field.bucketId.ordinal()] = ri.getBucketProperty();
       struct[Field.rowId.ordinal()] = ri.getRowId();
     }
@@ -101,20 +101,20 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
   public RecordIdentifier() {
   }
 
-  public RecordIdentifier(long transactionId, int bucket, long rowId) {
-    this.transactionId = transactionId;
+  public RecordIdentifier(long writeId, int bucket, long rowId) {
+    this.transactionId = writeId;
     this.bucketId = bucket;
     this.rowId = rowId;
   }
 
   /**
    * Set the identifier.
-   * @param transactionId the transaction id
+   * @param writeId the write id
    * @param bucketId the bucket id
    * @param rowId the row id
    */
-  public void setValues(long transactionId, int bucketId, long rowId) {
-    this.transactionId = transactionId;
+  public void setValues(long writeId, int bucketId, long rowId) {
+    this.transactionId = writeId;
     this.bucketId = bucketId;
     this.rowId = rowId;
   }
@@ -134,10 +134,10 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
   }
 
   /**
-   * What was the original transaction id for the last row?
-   * @return the transaction id
+   * What was the original write id for the last row?
+   * @return the write id
    */
-  public long getTransactionId() {
+  public long getWriteId() {
     return transactionId;
   }
 
@@ -223,7 +223,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
       BucketCodec.determineVersion(bucketId);
     String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) +
       "." + codec.decodeStatementId(bucketId) + ")";
-    return "{originalTxn: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}";
+    return "{originalWriteId: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}";
   }
   protected String bucketToString() {
     BucketCodec codec =
index 36111f0..0aed172 100644 (file)
@@ -30,27 +30,26 @@ public interface RecordUpdater {
 
   /**
    * Insert a new record into the table.
-   * @param currentTransaction the transaction id of the current transaction.
+   * @param currentWriteId the table write id of the current transaction.
    * @param row the row of data to insert
    * @throws IOException
    */
-  void insert(long currentTransaction,
-              Object row) throws IOException;
+  void insert(long currentWriteId, Object row) throws IOException;
 
   /**
    * Update an old record with a new set of values.
-   * @param currentTransaction the current transaction id
+   * @param currentWriteId the current write id
    * @param row the new values for the row
    * @throws IOException
    */
-  void update(long currentTransaction, Object row) throws IOException;
+  void update(long currentWriteId, Object row) throws IOException;
 
   /**
    * Delete a row from the table.
-   * @param currentTransaction the current transaction id
+   * @param currentWriteId the current write id
    * @throws IOException
    */
-  void delete(long currentTransaction, Object row) throws IOException;
+  void delete(long currentWriteId, Object row) throws IOException;
 
   /**
    * Flush the current set of rows to the underlying file system, so that
index 5e29070..e956485 100644 (file)
@@ -49,8 +49,8 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Metastore;
@@ -568,7 +568,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final boolean forceThreadpool;
     private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
     private final AtomicInteger numFilesCounter = new AtomicInteger(0);
-    private final ValidTxnList transactionList;
+    private final ValidWriteIdList writeIdList;
     private SplitStrategyKind splitStrategyKind;
     private final SearchArgument sarg;
     private final AcidOperationalProperties acidOperationalProperties;
@@ -650,8 +650,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           footerCache = useExternalCache ? metaCache : localCache;
         }
       }
-      String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
-      transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
 
       // Determine the transactional_properties of the table from the job conf stored in context.
       // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
@@ -662,6 +660,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
       this.acidOperationalProperties = isTableTransactional ?
           AcidOperationalProperties.parseString(transactionalProperties) : null;
+
+      String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+      writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value);
+      LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString()
+              + " isTransactionalTable: " + isTableTransactional);
     }
 
     @VisibleForTesting
@@ -933,10 +936,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     }
 
-
-
-
-
     private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
         List<OrcSplit> splits, UserGroupInformation ugi) throws IOException {
       UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
@@ -1093,8 +1092,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * For plain or acid tables this is the root of the partition (or table if not partitioned).
      * For MM table this is delta/ or base/ dir.  In MM case applying of the ValidTxnList that
-     * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already
-     * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}.
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} normally does has already
+     * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidWriteIdList)}.
      */
     private final Path dir;
     private final Ref<Boolean> useFileIds;
@@ -1134,7 +1133,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private AcidDirInfo callInternal() throws IOException {
       //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
-          context.transactionList, useFileIds, true, null);
+          context.writeIdList, useFileIds, true, null);
       // find the base files (original or new style)
       List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
       if (dirInfo.getBaseDirectory() == null) {
@@ -1173,8 +1172,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
             PathFilter bucketFilter = parsedDelta.isRawFormat() ?
               AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
-            if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() !=
-              parsedDelta.getMaxTransaction()) {
+            if (parsedDelta.isRawFormat() && parsedDelta.getMinWriteId() != parsedDelta.getMaxWriteId()) {
               //delta/ with files in raw format are a result of Load Data (as opposed to compaction
               //or streaming ingest so must have interval length == 1.
               throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
@@ -2009,12 +2007,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
     readOptions.range(split.getStart(), split.getLength());
 
-    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-    ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
-      new ValidReadTxnList(txnString);
+    String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+    ValidWriteIdList validWriteIdList
+            = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
+    LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString()
+            + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN));
+
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
-            validTxnList, readOptions, deltas, mergerOptions);
+            validWriteIdList, readOptions, deltas, mergerOptions);
     return new RowReader<OrcStruct>() {
       OrcStruct innerRecord = records.createValue();
 
@@ -2296,7 +2297,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   public RawReader<OrcStruct> getRawReader(Configuration conf,
                                            boolean collapseEvents,
                                            int bucket,
-                                           ValidTxnList validTxnList,
+                                           ValidWriteIdList validWriteIdList,
                                            Path baseDirectory,
                                            Path[] deltaDirectory
                                            ) throws IOException {
@@ -2320,7 +2321,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       mergerOptions.rootPath(deltaDirectory[0].getParent());
     }
     return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
-        bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
+        bucket, validWriteIdList, new Reader.Options(), deltaDirectory, mergerOptions);
   }
 
   /**
index f1f638d..57e005d 100644 (file)
@@ -204,20 +204,20 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
     }
 
     @Override
-    public void insert(long currentTransaction, Object row) throws IOException {
-      out.println("insert " + path + " currTxn: " + currentTransaction +
+    public void insert(long currentWriteId, Object row) throws IOException {
+      out.println("insert " + path + " currWriteId: " + currentWriteId +
           " obj: " + stringifyObject(row, inspector));
     }
 
     @Override
-    public void update(long currentTransaction, Object row) throws IOException {
-      out.println("update " + path + " currTxn: " + currentTransaction +
+    public void update(long currentWriteId, Object row) throws IOException {
+      out.println("update " + path + " currWriteId: " + currentWriteId +
           " obj: " + stringifyObject(row, inspector));
     }
 
     @Override
-    public void delete(long currentTransaction, Object row) throws IOException {
-      out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row);
+    public void delete(long currentWriteId, Object row) throws IOException {
+      out.println("delete " + path + " currWriteId: " + currentWriteId + " obj: " + row);
     }
 
     @Override
@@ -307,7 +307,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
         watcher.addKey(
             ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(),
             ((LongWritable)
-                orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(),
+                orc.getFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID)).get(),
             ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(),
             ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get());
         writer.addRow(w);
index 779da4f..27f28de 100644 (file)
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -60,7 +60,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final ObjectInspector objectInspector;
   private final long offset;
   private final long length;
-  private final ValidTxnList validTxnList;
+  private final ValidWriteIdList validWriteIdList;
   private final int columns;
   private final ReaderKey prevKey = new ReaderKey();
   // this is the key less than the lowest key we need to process
@@ -70,15 +70,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   // an extra value so that we can return it while reading ahead
   private OrcStruct extraValue;
   /**
-   * A RecordIdentifier extended with the current transaction id. This is the
-   * key of our merge sort with the originalTransaction, bucket, and rowId
-   * ascending and the currentTransaction, statementId descending. This means that if the
+   * A RecordIdentifier extended with the current write id. This is the
+   * key of our merge sort with the originalWriteId, bucket, and rowId
+   * ascending and the currentWriteId, statementId descending. This means that if the
    * reader is collapsing events to just the last update, just the first
    * instance of each record is required.
    */
   @VisibleForTesting
   public final static class ReaderKey extends RecordIdentifier{
-    private long currentTransactionId;
+    private long currentWriteId;
     /**
      * This is the value from delta file name which may be different from value encode in 
      * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete.
@@ -86,54 +86,54 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * or delete event.  For Acid 2.0 + multi-stmt txn, it must be a delete event.
      * No 2 Insert events from can ever agree on {@link RecordIdentifier}
      */
-    private int statementId;//sort on this descending, like currentTransactionId
+    private int statementId; //sort on this descending, like currentWriteId
 
     ReaderKey() {
       this(-1, -1, -1, -1, 0);
     }
 
-    ReaderKey(long originalTransaction, int bucket, long rowId,
-                     long currentTransactionId) {
-      this(originalTransaction, bucket, rowId, currentTransactionId, 0);
+    ReaderKey(long originalWriteId, int bucket, long rowId,
+                     long currentWriteId) {
+      this(originalWriteId, bucket, rowId, currentWriteId, 0);
     }
     /**
      * @param statementId - set this to 0 if N/A
      */
-    public ReaderKey(long originalTransaction, int bucket, long rowId,
-                     long currentTransactionId, int statementId) {
-      super(originalTransaction, bucket, rowId);
-      this.currentTransactionId = currentTransactionId;
+    public ReaderKey(long originalWriteId, int bucket, long rowId,
+                     long currentWriteId, int statementId) {
+      super(originalWriteId, bucket, rowId);
+      this.currentWriteId = currentWriteId;
       this.statementId = statementId;
     }
 
     @Override
     public void set(RecordIdentifier other) {
       super.set(other);
-      currentTransactionId = ((ReaderKey) other).currentTransactionId;
+      currentWriteId = ((ReaderKey) other).currentWriteId;
       statementId = ((ReaderKey) other).statementId;
     }
 
-    public void setValues(long originalTransactionId,
+    public void setValues(long originalWriteId,
                           int bucket,
                           long rowId,
-                          long currentTransactionId,
+                          long currentWriteId,
                           int statementId) {
-      setValues(originalTransactionId, bucket, rowId);
-      this.currentTransactionId = currentTransactionId;
+      setValues(originalWriteId, bucket, rowId);
+      this.currentWriteId = currentWriteId;
       this.statementId = statementId;
     }
 
     @Override
     public boolean equals(Object other) {
       return super.equals(other) &&
-          currentTransactionId == ((ReaderKey) other).currentTransactionId
+              currentWriteId == ((ReaderKey) other).currentWriteId
             && statementId == ((ReaderKey) other).statementId//consistent with compareTo()
           ;
     }
     @Override
     public int hashCode() {
       int result = super.hashCode();
-      result = 31 * result + (int)(currentTransactionId ^ (currentTransactionId >>> 32));
+      result = 31 * result + (int)(currentWriteId ^ (currentWriteId >>> 32));
       result = 31 * result + statementId;
       return result;
     }
@@ -145,8 +145,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       if (sup == 0) {
         if (other.getClass() == ReaderKey.class) {
           ReaderKey oth = (ReaderKey) other;
-          if (currentTransactionId != oth.currentTransactionId) {
-            return currentTransactionId < oth.currentTransactionId ? +1 : -1;
+          if (currentWriteId != oth.currentWriteId) {
+            return currentWriteId < oth.currentWriteId ? +1 : -1;
           }
           if(statementId != oth.statementId) {
             return statementId < oth.statementId ? +1 : -1;
@@ -162,15 +162,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * This means 1 txn modified the same row more than once
      */
     private boolean isSameRow(ReaderKey other) {
-      return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId;
+      return compareRow(other) == 0 && currentWriteId == other.currentWriteId;
     }
 
-    long getCurrentTransactionId() {
-      return currentTransactionId;
+    long getCurrentWriteId() {
+      return currentWriteId;
     }
 
     /**
-     * Compare rows without considering the currentTransactionId.
+     * Compare rows without considering the currentWriteId.
      * @param other the value to compare to
      * @return -1, 0, +1
      */
@@ -180,9 +180,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     @Override
     public String toString() {
-      return "{originalTxn: " + getTransactionId() + ", " +
-          bucketToString() + ", row: " + getRowId() + ", currentTxn: " +
-          currentTransactionId + ", statementId: "+ statementId + "}";
+      return "{originalWriteId: " + getWriteId() + ", " +
+          bucketToString() + ", row: " + getRowId() + ", currentWriteId " +
+              currentWriteId + ", statementId: "+ statementId + "}";
     }
   }
   interface ReaderPair {
@@ -389,9 +389,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           IntWritable operation =
               new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
           nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
-          nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+          nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_WRITEID,
               new LongWritable(transactionId));
-          nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+          nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID,
               new LongWritable(transactionId));
           nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
               new IntWritable(bucketProperty));
@@ -403,11 +403,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord = next;
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
               .set(OrcRecordUpdater.INSERT_OPERATION);
-          ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
+          ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID))
               .set(transactionId);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
               .set(bucketProperty);
-          ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
+          ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_WRITEID))
               .set(transactionId);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
               .set(nextRowId);
@@ -445,7 +445,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
                              final RecordIdentifier minKey, final RecordIdentifier maxKey,
                              Reader.Options options, Options mergerOptions, Configuration conf,
-                             ValidTxnList validTxnList, int statementId) throws IOException {
+                             ValidWriteIdList validWriteIdList, int statementId) throws IOException {
       super(key, bucketId, conf, mergerOptions, statementId);
       this.reader = reader;
       assert !mergerOptions.isCompacting();
@@ -472,8 +472,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
          */
         //the split is from something other than the 1st file of the logical bucket - compute offset
-        AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
-          conf, validTxnList, false, true);
+        AcidUtils.Directory directoryState
+                = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
           AcidOutputFormat.Options bucketOptions =
             AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
@@ -577,7 +577,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     OriginalReaderPairToCompact(ReaderKey key, int bucketId,
                        Reader.Options options, Options mergerOptions, Configuration conf,
-                       ValidTxnList validTxnList, int statementId) throws IOException {
+                       ValidWriteIdList validWriteIdList, int statementId) throws IOException {
       super(key, bucketId, conf, mergerOptions, statementId);
       assert mergerOptions.isCompacting() : "Should only be used for Compaction";
       this.conf = conf;
@@ -587,8 +587,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       //when compacting each split needs to process the whole logical bucket
       assert options.getOffset() == 0;
       assert options.getMaxOffset() == Long.MAX_VALUE;
-      AcidUtils.Directory directoryState = AcidUtils.getAcidState(
-        mergerOptions.getRootPath(), conf, validTxnList, false, true);
+      AcidUtils.Directory directoryState
+              = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
       /**
        * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
        * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
@@ -714,7 +714,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isTail = true;
     RecordIdentifier minKey = null;
     RecordIdentifier maxKey = null;
-    TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+    TransactionMetaData tfp = TransactionMetaData.findWriteIDForSynthetcRowIDs(
       mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
     int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId);
    /**
@@ -939,13 +939,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                      Reader reader,
                      boolean isOriginal,
                      int bucket,
-                     ValidTxnList validTxnList,
+                     ValidWriteIdList validWriteIdList,
                      Reader.Options options,
                      Path[] deltaDirectory, Options mergerOptions) throws IOException {
     this.collapse = collapseEvents;
     this.offset = options.getOffset();
     this.length = options.getLength();
-    this.validTxnList = validTxnList;
+    this.validWriteIdList = validWriteIdList;
     /**
      * @since Hive 3.0
      * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only
@@ -1028,7 +1028,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
               AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
           }
           pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
-            conf, validTxnList,
+            conf, validWriteIdList,
             0);//0 since base_x doesn't have a suffix (neither does pre acid write)
         } else {
           assert mergerOptions.getBucketPath() != null : " since this is not compaction: "
@@ -1036,14 +1036,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           //if here it's a non-acid schema file - check if from before table was marked transactional
           //or in base_x/delta_x_x from Load Data
           Options readerPairOptions = mergerOptions;
-          TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+          TransactionMetaData tfp = TransactionMetaData.findWriteIDForSynthetcRowIDs(
             mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
-          if(tfp.syntheticTransactionId > 0) {
+          if(tfp.syntheticWriteId > 0) {
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
-              tfp.syntheticTransactionId, tfp.folder);
+              tfp.syntheticWriteId, tfp.folder);
           }
           pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
-            keyInterval.getMaxKey(), options,  readerPairOptions, conf, validTxnList, tfp.statementId);
+            keyInterval.getMaxKey(), options,  readerPairOptions, conf, validWriteIdList, tfp.statementId);
         }
       } else {
         if(mergerOptions.isCompacting()) {
@@ -1100,11 +1100,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           assert !deltaDir.isDeleteDelta() : delta.toString();
           assert mergerOptions.isCompacting() : "during regular read anything which is not a" +
             " delete_delta is treated like base: " + delta;
-          Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions,
-            deltaDir.getMinTransaction(), delta);
+          Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions, deltaDir.getMinWriteId(), delta);
+
           //this will also handle copy_N files if any
           ReaderPair deltaPair =  new OriginalReaderPairToCompact(key, bucket, options,
-            rawCompactOptions, conf, validTxnList, deltaDir.getStatementId());
+              rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
             readers.put(key, deltaPair);
           }
@@ -1170,24 +1170,24 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * type files into a base_x/ or delta_x_x.  The data in these are then assigned ROW_IDs at read
    * time and made permanent at compaction time.  This is identical to how 'original' files (i.e.
    * those that existed in the table before it was converted to an Acid table) except that the
-   * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
+   * write ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
    */
   static final class TransactionMetaData {
-    final long syntheticTransactionId;
+    final long syntheticWriteId;
     /**
      * folder which determines the transaction id to use in synthetic ROW_IDs
      */
     final Path folder;
     final int statementId;
-    TransactionMetaData(long syntheticTransactionId, Path folder) {
-      this(syntheticTransactionId, folder, 0);
+    TransactionMetaData(long syntheticWriteId, Path folder) {
+      this(syntheticWriteId, folder, 0);
     }
-    TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) {
-      this.syntheticTransactionId = syntheticTransactionId;
+    TransactionMetaData(long syntheticWriteId, Path folder, int statementId) {
+      this.syntheticWriteId = syntheticWriteId;
       this.folder = folder;
       this.statementId = statementId;
     }
-    static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath,
+    static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path rootPath,
       Configuration conf) throws IOException {
       Path parent = splitPath.getParent();
       if(rootPath.equals(parent)) {
@@ -1205,10 +1205,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           else {
             AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
               parent.getFileSystem(conf));
-            assert pd.getMinTransaction() == pd.getMaxTransaction() :
+            assert pd.getMinWriteId() == pd.getMaxWriteId() :
               "This a delta with raw non acid schema, must be result of single write, no compaction: "
                 + splitPath;
-            return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId());
+            return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId());
           }
         }
         parent = parent.getParent();
@@ -1227,7 +1227,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   /**
    * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which
    * happens as a result of Load Data statement.  Setting {@code rootPath} to base_x/ or delta_x_x
-   * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent
+   * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} in subsequent
    * {@link OriginalReaderPair} object to return the files in this dir
    * in {@link AcidUtils.Directory#getOriginalFiles()}
    * @return modified clone of {@code baseOptions}
@@ -1350,8 +1350,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
 
       // if this transaction isn't ok, skip over it
-      if (!validTxnList.isTxnValid(
-          ((ReaderKey) recordIdentifier).getCurrentTransactionId())) {
+      if (!validWriteIdList.isWriteIdValid(
+          ((ReaderKey) recordIdentifier).getCurrentWriteId())) {
         continue;
       }
 
index b90ce6e..2e4db22 100644 (file)
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
  * A RecordUpdater where the files are stored as ORC.
  * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)}
  * for example), is a struct like <RecordIdentifier, f1, ... fn> but what is written to the file
- * * is <op, otid, writerId, rowid, ctid, <f1, ... fn>> (see {@link #createEventSchema(ObjectInspector)})
+ * * is <op, owid, writerId, rowid, cwid, <f1, ... fn>> (see {@link #createEventSchema(ObjectInspector)})
  * So there are OIs here to make the translation.
  */
 public class OrcRecordUpdater implements RecordUpdater {
@@ -72,10 +72,10 @@ public class OrcRecordUpdater implements RecordUpdater {
   final static int DELETE_OPERATION = 2;
   //column indexes of corresponding data in storage layer
   final static int OPERATION = 0;
-  final static int ORIGINAL_TRANSACTION = 1;
+  final static int ORIGINAL_WRITEID = 1;
   final static int BUCKET = 2;
   final static int ROW_ID = 3;
-  final static int CURRENT_TRANSACTION = 4;
+  final static int CURRENT_WRITEID = 4;
   final static int ROW = 5;
   /**
    * total number of fields (above)
@@ -100,8 +100,8 @@ public class OrcRecordUpdater implements RecordUpdater {
   private final FSDataOutputStream flushLengths;
   private final OrcStruct item;
   private final IntWritable operation = new IntWritable();
-  private final LongWritable currentTransaction = new LongWritable(-1);
-  private final LongWritable originalTransaction = new LongWritable(-1);
+  private final LongWritable currentWriteId = new LongWritable(-1);
+  private final LongWritable originalWriteId = new LongWritable(-1);
   private final IntWritable bucket = new IntWritable();
   private final LongWritable rowId = new LongWritable();
   private long insertedRows = 0;
@@ -112,12 +112,12 @@ public class OrcRecordUpdater implements RecordUpdater {
   private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record identifier in
   private StructField rowIdField = null; // field inside recId to look for row id in
-  private StructField originalTxnField = null;  // field inside recId to look for original txn in
+  private StructField originalWriteIdField = null;  // field inside recId to look for original write id in
   private StructField bucketField = null; // field inside recId to look for bucket in
   private StructObjectInspector rowInspector; // OI for the original row
   private StructObjectInspector recIdInspector; // OI for the record identifier struct
   private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier
-  private LongObjectInspector origTxnInspector; // OI for the original txn inside the record
+  private LongObjectInspector origWriteIdInspector; // OI for the original write id inside the record
   // identifer
   private IntObjectInspector bucketInspector;
 
@@ -126,11 +126,11 @@ public class OrcRecordUpdater implements RecordUpdater {
   }
 
   static long getCurrentTransaction(OrcStruct struct) {
-    return ((LongWritable) struct.getFieldValue(CURRENT_TRANSACTION)).get();
+    return ((LongWritable) struct.getFieldValue(CURRENT_WRITEID)).get();
   }
 
   static long getOriginalTransaction(OrcStruct struct) {
-    return ((LongWritable) struct.getFieldValue(ORIGINAL_TRANSACTION)).get();
+    return ((LongWritable) struct.getFieldValue(ORIGINAL_WRITEID)).get();
   }
 
   static int getBucket(OrcStruct struct) {
@@ -184,15 +184,13 @@ public class OrcRecordUpdater implements RecordUpdater {
     fields.add(new OrcStruct.Field("operation",
         PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION));
     fields.add(new OrcStruct.Field("originalTransaction",
-        PrimitiveObjectInspectorFactory.writableLongObjectInspector,
-        ORIGINAL_TRANSACTION));
+        PrimitiveObjectInspectorFactory.writableLongObjectInspector, ORIGINAL_WRITEID));
     fields.add(new OrcStruct.Field("bucket",
         PrimitiveObjectInspectorFactory.writableIntObjectInspector, BUCKET));
     fields.add(new OrcStruct.Field("rowId",
         PrimitiveObjectInspectorFactory.writableLongObjectInspector, ROW_ID));
     fields.add(new OrcStruct.Field("currentTransaction",
-        PrimitiveObjectInspectorFactory.writableLongObjectInspector,
-        CURRENT_TRANSACTION));
+        PrimitiveObjectInspectorFactory.writableLongObjectInspector, CURRENT_WRITEID));
     fields.add(new OrcStruct.Field("row", rowInspector, ROW));
     return new OrcStruct.OrcStructInspector(fields);
   }
@@ -246,7 +244,7 @@ public class OrcRecordUpdater implements RecordUpdater {
         }
       }
     }
-    if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
+    if (options.getMinimumWriteId() != options.getMaximumWriteId()
         && !options.isWritingBase()){
       //throw if file already exists as that should never happen
       flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
@@ -316,8 +314,8 @@ public class OrcRecordUpdater implements RecordUpdater {
         options.getRecordIdColumn())));
     item = new OrcStruct(FIELDS);
     item.setFieldValue(OPERATION, operation);
-    item.setFieldValue(CURRENT_TRANSACTION, currentTransaction);
-    item.setFieldValue(ORIGINAL_TRANSACTION, originalTransaction);
+    item.setFieldValue(CURRENT_WRITEID, currentWriteId);
+    item.setFieldValue(ORIGINAL_WRITEID, originalWriteId);
     item.setFieldValue(BUCKET, bucket);
     item.setFieldValue(ROW_ID, rowId);
   }
@@ -342,9 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater {
       List<? extends StructField> fields =
           ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs();
       // Go by position, not field name, as field names aren't guaranteed.  The order of fields
-      // in RecordIdentifier is transactionId, bucketId, rowId
-      originalTxnField = fields.get(0);
-      origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector();
+      // in RecordIdentifier is writeId, bucketId, rowId
+      originalWriteIdField = fields.get(0);
+      origWriteIdInspector = (LongObjectInspector)originalWriteIdField.getFieldObjectInspector();
       bucketField = fields.get(1);
       bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector();
       rowIdField = fields.get(2);
@@ -361,27 +359,27 @@ public class OrcRecordUpdater implements RecordUpdater {
    * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for
    * late split
    */
-  private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
+  private void addSimpleEvent(int operation, long currentWriteId, long rowId, Object row)
       throws IOException {
     this.operation.set(operation);
-    this.currentTransaction.set(currentTransaction);
+    this.currentWriteId.set(currentWriteId);
     Integer currentBucket = null;
-    // If this is an insert, originalTransaction should be set to this transaction.  If not,
+    // If this is an insert, originalWriteId should be set to this transaction.  If not,
     // it will be reset by the following if anyway.
-    long originalTransaction = currentTransaction;
+    long originalWriteId = currentWriteId;
     if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
       Object rowIdValue = rowInspector.getStructFieldData(row, recIdField);
-      originalTransaction = origTxnInspector.get(
-          recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
+      originalWriteId = origWriteIdInspector.get(
+          recIdInspector.getStructFieldData(rowIdValue, originalWriteIdField));
       rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
       currentBucket = setBucket(bucketInspector.get(
         recIdInspector.getStructFieldData(rowIdValue, bucketField)), operation);
     }
     this.rowId.set(rowId);
-    this.originalTransaction.set(originalTransaction);
+    this.originalWriteId.set(originalWriteId);
     item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
     item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
-    indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
+    indexBuilder.addKey(operation, originalWriteId, bucket.get(), rowId);
     if (writer == null) {
       writer = OrcFile.createWriter(path, writerOptions);
     }
@@ -389,18 +387,18 @@ public class OrcRecordUpdater implements RecordUpdater {
     restoreBucket(currentBucket, operation);
   }
 
-  private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row)
+  private void addSplitUpdateEvent(int operation, long currentWriteId, long rowId, Object row)
       throws IOException {
     if (operation == INSERT_OPERATION) {
       // Just insert the record in the usual way, i.e., default to the simple behavior.
-      addSimpleEvent(operation, currentTransaction, rowId, row);
+      addSimpleEvent(operation, currentWriteId, rowId, row);
       return;
     }
     this.operation.set(operation);
-    this.currentTransaction.set(currentTransaction);
+    this.currentWriteId.set(currentWriteId);
     Object rowValue = rowInspector.getStructFieldData(row, recIdField);
-    long originalTransaction = origTxnInspector.get(
-            recIdInspector.getStructFieldData(rowValue, originalTxnField));
+    long originalWriteId = origWriteIdInspector.get(
+            recIdInspector.getStructFieldData(rowValue, originalWriteIdField));
     rowId = rowIdInspector.get(
             recIdInspector.getStructFieldData(rowValue, rowIdField));
     Integer currentBucket = null;
@@ -423,54 +421,54 @@ public class OrcRecordUpdater implements RecordUpdater {
 
       // A delete/update generates a delete event for the original row.
       this.rowId.set(rowId);
-      this.originalTransaction.set(originalTransaction);
+      this.originalWriteId.set(originalWriteId);
       item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION));
       item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events.
-      deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId);
+      deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalWriteId, bucket.get(), rowId);
       deleteEventWriter.addRow(item);
       restoreBucket(currentBucket, operation);
     }
 
     if (operation == UPDATE_OPERATION) {
       // A new row is also inserted in the usual delta file for an update event.
-      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+      addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row);
     }
   }
 
   @Override
-  public void insert(long currentTransaction, Object row) throws IOException {
-    if (this.currentTransaction.get() != currentTransaction) {
+  public void insert(long currentWriteId, Object row) throws IOException {
+    if (this.currentWriteId.get() != currentWriteId) {
       insertedRows = 0;
     }
     if (acidOperationalProperties.isSplitUpdate()) {
-      addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+      addSplitUpdateEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row);
     } else {
-      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+      addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row);
     }
     rowCountDelta++;
   }
 
   @Override
-  public void update(long currentTransaction, Object row) throws IOException {
-    if (this.currentTransaction.get() != currentTransaction) {
+  public void update(long currentWriteId, Object row) throws IOException {
+    if (this.currentWriteId.get() != currentWriteId) {
       insertedRows = 0;
     }
     if (acidOperationalProperties.isSplitUpdate()) {
-      addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
+      addSplitUpdateEvent(UPDATE_OPERATION, currentWriteId, -1L, row);
     } else {
-      addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
+      addSimpleEvent(UPDATE_OPERATION, currentWriteId, -1L, row);
     }
   }
 
   @Override
-  public void delete(long currentTransaction, Object row) throws IOException {
-    if (this.currentTransaction.get() != currentTransaction) {
+  public void delete(long currentWriteId, Object row) throws IOException {
+    if (this.currentWriteId.get() != currentWriteId) {
       insertedRows = 0;
     }
     if (acidOperationalProperties.isSplitUpdate()) {
-      addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+      addSplitUpdateEvent(DELETE_OPERATION, currentWriteId, -1L, row);
     } else {
-      addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+      addSimpleEvent(DELETE_OPERATION, currentWriteId, -1L, row);
     }
     rowCountDelta--;
   }
index d4b29d5..d67b1e9 100644 (file)
@@ -27,8 +27,8 @@ import java.util.TreeMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -72,7 +72,7 @@ public class VectorizedOrcAcidRowBatchReader
   protected float progress = 0.0f;
   protected Object[] partitionValues;
   private boolean addPartitionCols = true;
-  private final ValidTxnList validTxnList;
+  private final ValidWriteIdList validWriteIdList;
   private final DeleteEventRegistry deleteEventRegistry;
   /**
    * {@link RecordIdentifier}/{@link VirtualColumn#ROWID} information
@@ -183,8 +183,10 @@ public class VectorizedOrcAcidRowBatchReader
       partitionValues = null;
     }
 
-    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-    this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+    String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+    this.validWriteIdList = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
+    LOG.debug("VectorizedOrcAcidRowBatchReader:: Read ValidWriteIdList: " + this.validWriteIdList.toString()
+            + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
 
     // Clone readerOptions for deleteEvents.
     Reader.Options deleteEventReaderOptions = readerOptions.clone();
@@ -214,7 +216,7 @@ public class VectorizedOrcAcidRowBatchReader
     }
     rowIdProjected = areRowIdsProjected(rbCtx);
     rootPath = orcSplit.getRootDir();
-    syntheticProps = computeOffsetAndBucket(orcSplit, conf, validTxnList);
+    syntheticProps = computeOffsetAndBucket(orcSplit, conf, validWriteIdList);
   }
 
   /**
@@ -242,8 +244,8 @@ public class VectorizedOrcAcidRowBatchReader
    * before/during split computation and passing the info in the split.  (HIVE-17917)
    */
   private OffsetAndBucketProperty computeOffsetAndBucket(
-    OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
-    if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+      OrcSplit split, JobConf conf, ValidWriteIdList validWriteIdList) throws IOException {
+    if (!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
       if(split.isOriginal()) {
         /**
          * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that
@@ -252,22 +254,20 @@ public class VectorizedOrcAcidRowBatchReader
          * filter out base/delta files but this makes fewer dependencies)
          */
         OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
-          OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
-            split.getRootDir(), conf);
-        return new OffsetAndBucketProperty(-1,-1,
-          syntheticTxnInfo.syntheticTransactionId);
+            OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(),
+                    split.getRootDir(), conf);
+        return new OffsetAndBucketProperty(-1,-1, syntheticTxnInfo.syntheticWriteId);
       }
       return null;
     }
     long rowIdOffset = 0;
     OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
-      OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
-        split.getRootDir(), conf);
+        OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(split.getPath(), split.getRootDir(), conf);
     int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
     int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
       .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
     AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf,
-      validTxnList, false, true);
+        validWriteIdList, false, true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       AcidOutputFormat.Options bucketOptions =
         AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
@@ -283,7 +283,7 @@ public class VectorizedOrcAcidRowBatchReader
       rowIdOffset += reader.getNumberOfRows();
     }
     return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
-      syntheticTxnInfo.syntheticTransactionId);
+      syntheticTxnInfo.syntheticWriteId);
   }
   /**
    * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
@@ -426,7 +426,7 @@ public class VectorizedOrcAcidRowBatchReader
             " to handle original files that require ROW__IDs: " + rootPath);
         }
         /**
-         * {@link RecordIdentifier#getTransactionId()}
+         * {@link RecordIdentifier#getWriteId()}
          */
         recordIdColumnVector.fields[0].noNulls = true;
         recordIdColumnVector.fields[0].isRepeating = true;
@@ -450,11 +450,11 @@ public class VectorizedOrcAcidRowBatchReader
         }
         //Now populate a structure to use to apply delete events
         innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS];
-        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
+        innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = recordIdColumnVector.fields[0];
         innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
         innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
         //these are insert events so (original txn == current) txn for all rows
-        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0];
+        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0];
       }
       if(syntheticProps.syntheticTxnId > 0) {
         //"originals" (written before table was converted to acid) is considered written by
@@ -470,7 +470,7 @@ public class VectorizedOrcAcidRowBatchReader
           * reader (transactions) is concerned.  Since here we are reading 'original' schema file,
           * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId'
           */
-          if (!validTxnList.isTxnValid(syntheticProps.syntheticTxnId)) {
+          if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) {
             selectedBitSet.clear(0, vectorizedRowBatchBase.size);
           }
         }
@@ -514,7 +514,7 @@ public class VectorizedOrcAcidRowBatchReader
       // Transfer columnVector objects from base batch to outgoing batch.
       System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
       if(rowIdProjected) {
-        recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION];
+        recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID];
         recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET];
         recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID];
       }
@@ -531,24 +531,24 @@ public class VectorizedOrcAcidRowBatchReader
   }
 
   private void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) {
-    if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) {
+    if (cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating) {
       // When we have repeating values, we can unset the whole bitset at once
       // if the repeating value is not a valid transaction.
       long currentTransactionIdForBatch = ((LongColumnVector)
-          cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0];
-      if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) {
+          cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[0];
+      if (!validWriteIdList.isWriteIdValid(currentTransactionIdForBatch)) {
         selectedBitSet.clear(0, size);
       }
       return;
     }
     long[] currentTransactionVector =
-        ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector;
+        ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_WRITEID]).vector;
     // Loop through the bits that are set to true and mark those rows as false, if their
     // current transactions are not valid.
     for (int setBitIndex = selectedBitSet.nextSetBit(0);
         setBitIndex >= 0;
         setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
-      if (!validTxnList.isTxnValid(currentTransactionVector[setBitIndex])) {
+      if (!validWriteIdList.isWriteIdValid(currentTransactionVector[setBitIndex])) {
         selectedBitSet.clear(setBitIndex);
       }
    }
@@ -630,30 +630,33 @@ public class VectorizedOrcAcidRowBatchReader
     private OrcRawRecordMerger.ReaderKey deleteRecordKey;
     private OrcStruct deleteRecordValue;
     private Boolean isDeleteRecordAvailable = null;
-    private ValidTxnList validTxnList;
+    private ValidWriteIdList validWriteIdList;
 
     SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Options readerOptions)
-      throws IOException {
-        final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltas.length > 0) {
-          int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
-          String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-          this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
-          OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true);
-          assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
-          this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
-                                                      validTxnList, readerOptions, deleteDeltas,
-                                                      mergerOptions);
-          this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
-          this.deleteRecordValue = this.deleteRecords.createValue();
-          // Initialize the first value in the delete reader.
-          this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
-        } else {
-          this.isDeleteRecordAvailable = false;
-          this.deleteRecordKey = null;
-          this.deleteRecordValue = null;
-          this.deleteRecords = null;
-        }
+            throws IOException {
+      final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
+      if (deleteDeltas.length > 0) {
+        int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
+        String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+        this.validWriteIdList
+                = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
+        LOG.debug("SortMergedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString()
+                + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
+        OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isDeleteReader(true);
+        assert !orcSplit.isOriginal() : "If this now supports Original splits, set up mergeOptions properly";
+        this.deleteRecords = new OrcRawRecordMerger(conf, true, null, false, bucket,
+                                                    validWriteIdList, readerOptions, deleteDeltas,
+                                                    mergerOptions);
+        this.deleteRecordKey = new OrcRawRecordMerger.ReaderKey();
+        this.deleteRecordValue = this.deleteRecords.createValue();
+        // Initialize the first value in the delete reader.
+        this.isDeleteRecordAvailable = this.deleteRecords.next(deleteRecordKey, deleteRecordValue);
+      } else {
+        this.isDeleteRecordAvailable = false;
+        this.deleteRecordKey = null;
+        this.deleteRecordValue = null;
+        this.deleteRecords = null;
+      }
     }
 
     @Override
@@ -671,8 +674,8 @@ public class VectorizedOrcAcidRowBatchReader
       }
 
       long[] originalTransaction =
-          cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
-              : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+          cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null
+              : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector;
       long[] bucket =
           cols[OrcRecordUpdater.BUCKET].isRepeating ? null
               : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector;
@@ -682,7 +685,7 @@ public class VectorizedOrcAcidRowBatchReader
 
       // The following repeatedX values will be set, if any of the columns are repeating.
       long repeatedOriginalTransaction = (originalTransaction != null) ? -1
-          : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+          : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0];
       long repeatedBucket = (bucket != null) ? -1
           : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0];
       long repeatedRowId = (rowId != null) ? -1
@@ -828,12 +831,12 @@ public class VectorizedOrcAcidRowBatchReader
       private final RecordReader recordReader;
       private int indexPtrInBatch;
       private final int bucketForSplit; // The bucket value should be same for all the records.
-      private final ValidTxnList validTxnList;
+      private final ValidWriteIdList validWriteIdList;
       private boolean isBucketPropertyRepeating;
       private final boolean isBucketedTable;
 
       DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
-          ValidTxnList validTxnList, boolean isBucketedTable) throws IOException {
+          ValidWriteIdList validWriteIdList, boolean isBucketedTable) throws IOException {
         this.recordReader  = deleteDeltaReader.rowsOptions(readerOptions);
         this.bucketForSplit = bucket;
         this.batch = deleteDeltaReader.getSchema().createRowBatch();
@@ -841,7 +844,7 @@ public class VectorizedOrcAcidRowBatchReader
           this.batch = null; // Oh! the first batch itself was null. Close the reader.
         }
         this.indexPtrInBatch = 0;
-        this.validTxnList = validTxnList;
+        this.validWriteIdList = validWriteIdList;
         this.isBucketedTable = isBucketedTable;
         checkBucketId();//check 1st batch
       }
@@ -866,7 +869,7 @@ public class VectorizedOrcAcidRowBatchReader
             checkBucketId(deleteRecordKey.bucketProperty);
           }
           ++indexPtrInBatch;
-          if (validTxnList.isTxnValid(currentTransaction)) {
+          if (validWriteIdList.isWriteIdValid(currentTransaction)) {
             isValidNext = true;
           }
         }
@@ -878,17 +881,17 @@ public class VectorizedOrcAcidRowBatchReader
       }
       private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) {
         int originalTransactionIndex =
-          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
-        long originalTransaction =
-          ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+          batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? 0 : indexPtrInBatch;
+        long originalTransaction
+                = ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[originalTransactionIndex];
         int bucketPropertyIndex =
           batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : indexPtrInBatch;
         int bucketProperty = (int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex];
         long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
-        int currentTransactionIndex =
-          batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
-        long currentTransaction =
-          ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+        int currentTransactionIndex
+                = batch.cols[OrcRecordUpdater.CURRENT_WRITEID].isRepeating ? 0 : indexPtrInBatch;
+        long currentTransaction
+                = ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_WRITEID]).vector[currentTransactionIndex];
         deleteRecordKey.set(originalTransaction, bucketProperty, rowId);
         return currentTransaction;
       }
@@ -976,14 +979,17 @@ public class VectorizedOrcAcidRowBatchReader
     private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
     private long rowIds[];
     private CompressedOtid compressedOtids[];
-    private ValidTxnList validTxnList;
+    private ValidWriteIdList validWriteIdList;
     private Boolean isEmpty = null;
 
     ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
       int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
-      String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-      this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
+      String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY);
+      this.validWriteIdList
+              = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString);
+      LOG.debug("ColumnizedDeleteEventRegistry:: Read ValidWriteIdList: " + this.validWriteIdList.toString()
+              + " isFullAcidTable: " + AcidUtils.isFullAcidScan(conf));
       this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
       this.rowIds = null;
       this.compressedOtids = null;
@@ -1025,7 +1031,7 @@ public class VectorizedOrcAcidRowBatchReader
                 throw new DeleteEventsOverflowMemoryException();
               }
               DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                  readerOptions, bucket, validTxnList, isBucketedTable);
+                  readerOptions, bucket, validWriteIdList, isBucketedTable);
               DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
               if (deleteReaderValue.next(deleteRecordKey)) {
                 sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -1165,10 +1171,10 @@ public class VectorizedOrcAcidRowBatchReader
       // check if it is deleted or not.
 
       long[] originalTransactionVector =
-          cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null
-              : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector;
+          cols[OrcRecordUpdater.ORIGINAL_WRITEID].isRepeating ? null
+              : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector;
       long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1
-          : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
+          : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_WRITEID]).vector[0];
 
       long[] bucketProperties =
         cols[OrcRecordUpdater.BUCKET].isRepeating ? null
index 5bbfe95..683aa95 100644 (file)
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
@@ -50,7 +51,9 @@ import org.apache.thrift.TException;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -84,18 +87,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * transaction id.  Thus is 1 is first transaction id.
    */
   private volatile long txnId = 0;
+
+  /**
+   * The local cache of table write IDs allocated/created by the current transaction
+   */
+  private Map<String, Long> tableWriteIds = new HashMap<>();
+
   /**
    * assigns a unique monotonically increasing ID to each statement
    * which is part of an open transaction.  This is used by storage
    * layer (see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)})
    * to keep apart multiple writes of the same data within the same transaction
-   * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
+   * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}.
    */
-  private int writeId = -1;
+  private int stmtId = -1;
+
   /**
-   * counts number of statements in the current transaction
+   * counts number of statements in the current transaction.
    */
   private int numStatements = 0;
+
   /**
    * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
    * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
@@ -125,9 +136,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    *
    * As a side note: what should the lock manager do with locks for non-transactional resources?
    * Should it it release them at the end of the stmt or txn?
-   * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+   * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html.
    */
   private boolean isExplicitTransaction = false;
+
   /**
    * To ensure transactions don't nest.
    */
@@ -141,6 +153,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   private ScheduledFuture<?> heartbeatTask = null;
   private Runnable shutdownRunner = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
+
   /**
    * We do this on every call to make sure TM uses same MS connection as is used by the caller (Driver,
    * SemanticAnalyzer, etc).  {@code Hive} instances are cached using ThreadLocal and
@@ -208,8 +221,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
     try {
       txnId = getMS().openTxn(user);
-      writeId = 0;
+      stmtId = 0;
       numStatements = 0;
+      tableWriteIds.clear();
       isExplicitTransaction = false;
       startTransactionCount = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
@@ -241,7 +255,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
         txnId = 0;
-        writeId = -1;
+        stmtId = -1;
+        tableWriteIds.clear();
       }
       throw e;
     }
@@ -597,8 +612,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
+      stmtId = -1;
       numStatements = 0;
+      tableWriteIds.clear();
     }
   }
 
@@ -622,8 +638,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
           e);
     } finally {
       txnId = 0;
-      writeId = -1;
+      stmtId = -1;
       numStatements = 0;
+      tableWriteIds.clear();
     }
   }
 
@@ -743,12 +760,24 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public ValidTxnList getValidTxns() throws LockException {
+    assert isTxnOpen();
     init();
     try {
       return getMS().getValidTxns(txnId);
     } catch (TException e) {
-      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
-          e);
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
+  @Override
+  public ValidTxnWriteIdList getValidWriteIds(List<String> tableList,
+                                              String validTxnList) throws LockException {
+    assert isTxnOpen();
+    assert validTxnList != null && !validTxnList.isEmpty();
+    try {
+      return getMS().getValidWriteIds(txnId, tableList, validTxnList);
+    } catch (TException e) {
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
   }
 
@@ -886,9 +915,25 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     return txnId;
   }
   @Override
-  public int getWriteIdAndIncrement() {
+  public int getStmtIdAndIncrement() {
     assert isTxnOpen();
-    return writeId++;
+    return stmtId++;
+  }
+
+  @Override
+  public long getTableWriteId(String dbName, String tableName) throws LockException {
+    assert isTxnOpen();
+    String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
+    if (tableWriteIds.containsKey(fullTableName)) {
+      return tableWriteIds.get(fullTableName);
+    }
+    try {
+      long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName);
+      tableWriteIds.put(fullTableName, writeId);
+      return writeId;
+    } catch (TException e) {
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {
index fca6408..7413074 100644 (file)
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.lockmgr;
 
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -62,12 +63,15 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   public long getCurrentTxnId() {
     return 0L;
   }
-
   @Override
-  public int getWriteIdAndIncrement() {
+  public int getStmtIdAndIncrement() {
     return 0;
   }
   @Override
+  public long getTableWriteId(String dbName, String tableName) throws LockException {
+    return 0L;
+  }
+  @Override
   public HiveLockManager getLockManager() throws LockException {
     if (lockMgr == null) {
       boolean supportConcurrency =
@@ -220,6 +224,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public ValidTxnWriteIdList getValidWriteIds(List<String> tableList,
+                                              String validTxnList) throws LockException {
+    return new ValidTxnWriteIdList(getCurrentTxnId());
+  }
+
+  @Override
   public String getTxnManagerName() {
     return DummyTxnManager.class.getName();
   }
index 4f9f0c2..0db2a2c 100644 (file)
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -122,8 +123,7 @@ public interface HiveTxnManager {
 
   /**
    * Get the transactions that are currently valid.  The resulting
-   * {@link ValidTxnList} object is a thrift object and can
-   * be  passed to  the processing
+   * {@link ValidTxnList} object can be passed as string to the processing
    * tasks for use in the reading the data.  This call should be made once up
    * front by the planner and should never be called on the backend,
    * as this will violate the isolation level semantics.
@@ -133,6 +133,18 @@ public interface HiveTxnManager {
   ValidTxnList getValidTxns() throws LockException;
 
   /**
+   * Get the table write Ids that are valid for the current transaction.  The resulting
+   * {@link ValidTxnWriteIdList} object can be passed as string to the processing
+   * tasks for use in the reading the data.  This call will return same results as long as validTxnString
+   * passed is same.
+   * @param tableList list of tables (<db_name>.<table_name>) read/written by current transaction.
+   * @param validTxnList snapshot of valid txns for the current txn
+   * @return list of valid table write Ids.
+   * @throws LockException
+   */
+  ValidTxnWriteIdList getValidWriteIds(List<String> tableList, String validTxnList) throws LockException;
+
+  /**
    * Get the name for currently installed transaction manager.
    * @return transaction manager name
    */
@@ -202,7 +214,7 @@ public interface HiveTxnManager {
   boolean useNewShowLocksFormat();
 
   /**
-   * Indicate whether this transaction manager supports ACID operations
+   * Indicate whether this transaction manager supports ACID operations.
    * @return true if this transaction manager does ACID
    */
   boolean supportsAcid();
@@ -217,14 +229,19 @@ public interface HiveTxnManager {
   
   boolean isTxnOpen();
   /**
-   * if {@code isTxnOpen()}, returns the currently active transaction ID
+   * if {@code isTxnOpen()}, returns the currently active transaction ID.
    */
   long getCurrentTxnId();
 
   /**
+   * if {@code isTxnOpen()}, returns the table write ID associated with current active transaction.
+   */
+  long getTableWriteId(String dbName, String tableName) throws LockException;
+
+  /**
    * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
    * Each statement writing data within a multi statement txn should have a unique WriteId.
    * Even a single statement, (e.g. Merge, multi-insert may generates several writes).
    */
-  int getWriteIdAndIncrement();
+  int getStmtIdAndIncrement();
 }
index 9c3b54f..8b0af3e 100644 (file)
@@ -1497,17 +1497,19 @@ public class Hive {
    * @param isSrcLocal
    * @param isAcid
    * @param hasFollowingStatsTask
+   * @param writeId
+   * @param stmtId
    * @return
    * @throws HiveException
    */
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, LoadFileType loadFileType, boolean inheritTableSpecs,
       boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
-      boolean hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean hasFollowingStatsTask, Long writeId, int stmtId)
           throws HiveException {
     Table tbl = getTable(tableName);
     loadPartition(loadPath, tbl, partSpec, loadFileType, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, txnId, stmtId);
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, writeId, stmtId);
   }
 
   /**
@@ -1533,11 +1535,13 @@ public class Hive {
    *          true if this is an ACID operation Insert/Update/Delete operation
    * @param hasFollowingStatsTask
    *          true if there is a following task which updates the stats, so, this method need not update.
+   * @param writeId write ID allocated for the current load operation
+   * @param stmtId statement ID of the current load statement
    * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
       LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long writeId, int stmtId)
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
@@ -1596,7 +1600,7 @@ public class Hive {
         }
         assert !isAcidIUDoperation;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
-          newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
+          newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
@@ -1608,12 +1612,12 @@ public class Hive {
         Path destPath = newPartPath;
         if (isMmTableWrite) {
           // We will load into MM directory, and delete from the parent if needed.
-          destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+          destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
           filter = (loadFileType == LoadFileType.REPLACE_ALL)
-            ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
+            ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter;
         }
         else if(!isAcidIUDoperation && isFullAcidTable) {
-          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, stmtId, tbl);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath);
@@ -1723,7 +1727,7 @@ public class Hive {
    * delta_x_x directory - same as any other Acid write.  This method modifies the destPath to add
    * this path component.
    * @param txnId - id of current transaction (in which this operation is running)
-   * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+   * @param stmtId - see {@link DbTxnManager#getStmtIdAndIncrement()}
    * @return appropriately modified path
    */
   private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException {
@@ -1987,13 +1991,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param loadFileType
    * @param numDP number of dynamic partitions
    * @param isAcid true if this is an ACID operation
-   * @param txnId txnId, can be 0 unless isAcid == true
+   * @param writeId writeId, can be 0 unless isAcid == true
    * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
   public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path loadPath,
       final String tableName, final Map<String, String> partSpec, final LoadFileType loadFileType,
-      final int numDP, final int numLB, final boolean isAcid, final long txnId, final int stmtId,
+      final int numDP, final int numLB, final boolean isAcid, final long writeId, final int stmtId,
       final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, boolean isInsertOverwrite)
       throws HiveException {
 
@@ -2009,7 +2013,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     // Get all valid partition paths and existing partitions for them (if any)
     final Table tbl = getTable(tableName);
-    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, txnId, stmtId,
+    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, writeId, stmtId,
         AcidUtils.isInsertOnlyTable(tbl.getParameters()), isInsertOverwrite);
 
     final int partsToLoad = validPartitions.size();
@@ -2044,7 +2048,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               // load the partition
               Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
                   loadFileType, true, numLB > 0,
-                  false, isAcid, hasFollowingStatsTask, txnId, stmtId);
+                  false, isAcid, hasFollowingStatsTask, writeId, stmtId);
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -2103,8 +2107,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
         for (Partition p : partitionsMap.values()) {
           partNames.add(p.getName());
         }
-        getMSC().addDynamicPartitions(txnId, tbl.getDbName(), tbl.getTableName(),
-          partNames, AcidUtils.toDataOperationType(operation));
+        getMSC().addDynamicPartitions(parentSession.getTxnMgr().getCurrentTxnId(), writeId,
+                tbl.getDbName(), tbl.getTableName(), partNames,
+                AcidUtils.toDataOperationType(operation));
       }
       LOG.info("Loaded " + partitionsMap.size() + " partitions");
       return partitionsMap;
@@ -2134,10 +2139,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param hasFollowingStatsTask
    *          if there is any following stats task
    * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
+   * @param writeId write ID allocated for the current load operation
+   * @param stmtId statement ID of the current load statement
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
-      Long txnId, int stmtId) throws HiveException {
+      Long writeId, int stmtId) throws HiveException {
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
@@ -2150,7 +2157,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Note: this assumes both paths are qualified; which they are, currently.
     if (isMmTable && loadPath.equals(tbl.getPath())) {
       Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath());
-      newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
+      newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
       Path tblPath = tbl.getPath();
@@ -2159,12 +2166,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (isMmTable) {
         assert !isAcidIUDoperation;
         // We will load into MM directory, and delete from the parent if needed.
-        destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+        destPath = new Path(destPath, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
         filter = loadFileType == LoadFileType.REPLACE_ALL
-            ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
+            ? new JavaUtils.IdPathFilter(writeId, stmtId, false, true) : filter;
       }
       else if(!isAcidIUDoperation && isFullAcidTable) {
-        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, writeId, stmtId, tbl);
       }
       Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
           + " (replace = " + loadFileType + ")");
index 3023144..4f396a0 100644 (file)
@@ -1270,8 +1270,8 @@ public final class GenMapRedUtils {
     FileSinkDesc fsInputDesc = fsInput.getConf();
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("Creating merge work from " + System.identityHashCode(fsInput)
-        + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getTransactionId() : null)
-        + " into " + finalName);
+          + " with write ID " + (fsInputDesc.isMmTable() ? fsInputDesc.getTableWriteId() : null)
+          + " into " + finalName);
     }
 
     boolean isBlockMerge = (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
@@ -1280,7 +1280,7 @@ public final class GenMapRedUtils {
             fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class));
 
     RowSchema inputRS = fsInput.getSchema();
-    Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getTransactionId() : null;
+    Long srcMmWriteId = fsInputDesc.isMmTable() ? fsInputDesc.getTableWriteId() : null;
     FileSinkDesc fsOutputDesc = null;
     TableScanOperator tsMerge = null;
     if (!isBlockMerge) {
@@ -1665,7 +1665,7 @@ public final class GenMapRedUtils {
       fmd = new OrcFileMergeDesc();
     }
     fmd.setIsMmTable(fsInputDesc.isMmTable());
-    fmd.setTxnId(fsInputDesc.getTransactionId());
+    fmd.setWriteId(fsInputDesc.getTableWriteId());
     int stmtId = fsInputDesc.getStatementId();
     fmd.setStmtId(stmtId == -1 ? 0 : stmtId);
     fmd.setDpCtx(fsInputDesc.getDynPartCtx());
index 718faff..e926b63 100644 (file)
@@ -1563,7 +1563,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
         Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
         truncateTblDesc.setOutputDir(queryTmpdir);
         LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-            partSpec == null ? new HashMap<>() : partSpec, null);
+            partSpec == null ? new HashMap<>() : partSpec);
         ltd.setLbCtx(lbCtx);
         @SuppressWarnings("unchecked")
         Task<MoveWork> moveTsk =
@@ -2017,7 +2017,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
       mergeDesc.setOutputDir(queryTmpdir);
       // No need to handle MM tables - unsupported path.
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-          partSpec == null ? new HashMap<>() : partSpec, null);
+          partSpec == null ? new HashMap<>() : partSpec);
       ltd.setLbCtx(lbCtx);
       Task<MoveWork> moveTsk =
           TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
index 5520bc2..67d05e6 100644 (file)
@@ -318,31 +318,39 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     Table table = tableIfExists(tblDesc, x.getHive());
     boolean tableExists = false;
 
-    if (table != null){
+    if (table != null) {
       checkTable(table, tblDesc,replicationSpec, x.getConf());
       x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked");
       tableExists = true;
     }
 
-    Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+    Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables.
+    if (((table != null) && AcidUtils.isTransactionalTable(table))
+            || AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) {
+      // Explain plan doesn't open a txn and hence no need to allocate write id.
+      if (x.getCtx().getExplainConfig() == null) {
+        writeId = SessionState.get().getTxnMgr().getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName());
+      }
+    }
     int stmtId = 0;
+
     // TODO [MM gap?]: bad merge; tblDesc is no longer CreateTableDesc, but ImportTableDesc.
     //                 We need to verify the tests to see if this works correctly.
     /*
-    if (isAcid(txnId)) {
-      tblDesc.setInitialMmWriteId(txnId);
+    if (isAcid(writeId)) {
+      tblDesc.setInitialMmWriteId(writeId);
     }
     */
     if (!replicationSpec.isInReplicationScope()) {
       createRegularImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, table,
-          fromURI, fs, wh, x, txnId, stmtId, isSourceMm);
+          fromURI, fs, wh, x, writeId, stmtId, isSourceMm);
     } else {
       createReplImportTasks(
           tblDesc, partitionDescs,
           replicationSpec, waitOnPrecursor, table,
-          fromURI, fs, wh, x, txnId, stmtId, isSourceMm, updatedMetadata);
+          fromURI, fs, wh, x, writeId, stmtId, isSourceMm, updatedMetadata);
     }
     return tableExists;
   }
@@ -377,13 +385,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
       ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
-      Long txnId, int stmtId, boolean isSourceMm) {
+      Long writeId, int stmtId, boolean isSourceMm) {
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
     Path destPath = null, loadPath = null;
     LoadFileType lft;
     if (AcidUtils.isInsertOnlyTable(table)) {
-      String mmSubdir = replace ? AcidUtils.baseDir(txnId)
-          : AcidUtils.deltaSubdir(txnId, txnId, stmtId);
+      String mmSubdir = replace ? AcidUtils.baseDir(writeId)
+          : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
       destPath = new Path(tgtPath, mmSubdir);
       loadPath = tgtPath;
       lft = LoadFileType.KEEP_EXISTING;
@@ -395,13 +403,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " +
-        dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + txnId +
+        dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId +
         " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
     }
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
-      if (isSourceMm || isAcid(txnId)) {
+      if (isSourceMm || isAcid(writeId)) {
         // Note: this is replication gap, not MM gap... Repl V2 is not ready yet.
         throw new RuntimeException("Replicating MM and ACID tables is not supported");
       }
@@ -413,7 +421,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
-        loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, txnId);
+        loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
     loadTableWork.setStmtId(stmtId);
     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -423,11 +431,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   /**
-   * todo: this is odd: transactions are opened for all statements.  what is this supposed to check?
+   * todo: this is odd: write id allocated for all write operations on ACID tables.  what is this supposed to check?
    */
   @Deprecated
-  private static boolean isAcid(Long txnId) {
-    return (txnId != null) && (txnId != 0);
+  private static boolean isAcid(Long writeId) {
+    return (writeId != null) && (writeId != 0);
   }
 
   private static Task<?> createTableTask(ImportTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
@@ -467,7 +475,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm,
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm,
       Task<?> commitTask)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
@@ -487,18 +495,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           + " with source location: " + srcLocation);
       Path tgtLocation = new Path(partSpec.getLocation());
       Path destPath = !AcidUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation)
-          : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+          : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
       Path moveTaskSrc =  !AcidUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation;
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: "
           + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
-          + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
+          + writeId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
       }
 
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
-        if (isSourceMm || isAcid(txnId)) {
+        if (isSourceMm || isAcid(writeId)) {
           // Note: this is replication gap, not MM gap... Repl V2 is not ready yet.
           throw new RuntimeException("Replicating MM and ACID tables is not supported");
         }
@@ -515,7 +523,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
           partSpec.getPartSpec(),
           replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
-          txnId);
+              writeId);
       loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
@@ -814,14 +822,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static void createRegularImportTasks(
       ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
       ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     if (table != null) {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
         Task<?> ict = createImportCommitTask(
-            table.getDbName(), table.getTableName(), txnId, stmtId, x.getConf(),
+            table.getDbName(), table.getTableName(), writeId, stmtId, x.getConf(),
             AcidUtils.isInsertOnlyTable(table.getParameters()));
 
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
@@ -829,7 +837,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm, ict));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -841,7 +849,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
-        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, stmtId, isSourceMm);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, isSourceMm);
       }
       // Set this to read because we can't overwrite any existing partitions
       x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
@@ -858,11 +866,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if (isPartitioned(tblDesc)) {
         Task<?> ict = createImportCommitTask(
-            tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf(),
+            tblDesc.getDatabaseName(), tblDesc.getTableName(), writeId, stmtId, x.getConf(),
             AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()));
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-            replicationSpec, x, txnId, stmtId, isSourceMm, ict));
+            replicationSpec, x, writeId, stmtId, isSourceMm, ict));
         }
       } else {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -885,7 +893,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             tblproperties.put("transactional_properties", "insert_only");
             table.setParameters(tblproperties);
           }
-          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, txnId, stmtId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, writeId, stmtId, isSourceMm));
         }
       }
       x.getTasks().add(t);
@@ -893,11 +901,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private static Task<?> createImportCommitTask(
-      String dbName, String tblName, Long txnId, int stmtId, HiveConf conf, boolean isMmTable) {
+      String dbName, String tblName, Long writeId, int stmtId, HiveConf conf, boolean isMmTable) {
     // TODO: noop, remove?
     @SuppressWarnings("unchecked")
     Task<ImportCommitWork> ict = (!isMmTable) ? null : TaskFactory.get(
-        new ImportCommitWork(dbName, tblName, txnId, stmtId), conf);
+        new ImportCommitWork(dbName, tblName, writeId, stmtId), conf);
     return ict;
   }
 
@@ -909,7 +917,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 &