HIVE-19800 Create separate submodules for pre and post upgrade and add rename file...
[hive.git] / ql / src / test / org / apache / hadoop / hive / ql / TestTxnCommands.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hive.ql;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Timer;
29 import java.util.TimerTask;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.curator.shaded.com.google.common.collect.Lists;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hive.conf.HiveConf;
42 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
43 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
44 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
45 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
46 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
47 import org.apache.hadoop.hive.metastore.api.LockState;
48 import org.apache.hadoop.hive.metastore.api.LockType;
49 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
50 import org.apache.hadoop.hive.metastore.api.MetaException;
51 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
52 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
53 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
54 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
55 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
56 import org.apache.hadoop.hive.metastore.api.TxnInfo;
57 import org.apache.hadoop.hive.metastore.api.TxnState;
58 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
59 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
60 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
61 import org.apache.hadoop.hive.metastore.txn.TxnStore;
62 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
63 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
64 import org.apache.hadoop.hive.ql.io.AcidUtils;
65 import org.apache.hadoop.hive.ql.io.BucketCodec;
66 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
67 import org.apache.hadoop.hive.ql.metadata.HiveException;
68 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
69 import org.apache.hadoop.hive.ql.session.SessionState;
70 import org.apache.thrift.TException;
71 import org.junit.Assert;
72 import org.junit.Ignore;
73 import org.junit.Test;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 /**
78 * The LockManager is not ready, but for no-concurrency straight-line path we can
79 * test AC=true, and AC=false with commit/rollback/exception and test resulting data.
80 *
81 * Can also test, calling commit in AC=true mode, etc, toggling AC...
82 *
83 * Tests here are for multi-statement transactions (WIP) and others
84 * Mostly uses bucketed tables
85 */
86 public class TestTxnCommands extends TxnCommandsBaseForTests {
87
88 static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
89 private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
90 File.separator + TestTxnCommands.class.getCanonicalName()
91 + "-" + System.currentTimeMillis()
92 ).getPath().replaceAll("\\\\", "/");
93 @Override
94 protected String getTestDataDir() {
95 return TEST_DATA_DIR;
96 }
97
98 /**
99 * tests that a failing Insert Overwrite (which creates a new base_x) is properly marked as
100 * aborted.
101 */
102 @Test
103 public void testInsertOverwrite() throws Exception {
104 runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select a,b from " + Table.NONACIDORCTBL2);
105 runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "3(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
106 runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2)");
107 List<String> rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " where b = 2");
108 Assert.assertEquals(1, rs.size());
109 Assert.assertEquals("1", rs.get(0));
110 hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
111 runStatementOnDriver("insert into " + Table.ACIDTBL + " values(3,2)");
112 hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
113 runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6)");
114 rs = runStatementOnDriver("select a from " + Table.ACIDTBL + " order by a");
115 Assert.assertEquals(2, rs.size());
116 Assert.assertEquals("1", rs.get(0));
117 Assert.assertEquals("5", rs.get(1));
118 }
119
120 @Ignore("not needed but useful for testing")
121 @Test
122 public void testNonAcidInsert() throws Exception {
123 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
124 List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
125 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(2,3)");
126 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
127 }
128
129 /**
130 * Useful for debugging. Dumps ORC file in JSON to CWD.
131 */
132 private void dumpBucketData(Table table, long writeId, int stmtId, int bucketNum) throws Exception {
133 if(true) {
134 return;
135 }
136 Path bucket = AcidUtils.createBucketFile(new Path(new Path(getWarehouseDir(), table.toString().toLowerCase()), AcidUtils.deltaSubdir(writeId, writeId, stmtId)), bucketNum);
137 FileOutputStream delta = new FileOutputStream(testName.getMethodName() + "_" + bucket.getParent().getName() + "_" + bucket.getName());
138 // try {
139 // FileDump.printJsonData(conf, bucket.toString(), delta);
140 // }
141 // catch(FileNotFoundException ex) {
142 ;//this happens if you change BUCKET_COUNT
143 // }
144 delta.close();
145 }
146 /**
147 * Dump all data in the table by bucket in JSON format
148 */
149 private void dumpTableData(Table table, long writeId, int stmtId) throws Exception {
150 for(int bucketNum = 0; bucketNum < BUCKET_COUNT; bucketNum++) {
151 dumpBucketData(table, writeId, stmtId, bucketNum);
152 }
153 }
154 @Test
155 public void testSimpleAcidInsert() throws Exception {
156 int[][] rows1 = {{1,2},{3,4}};
157 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
158 //List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
159 //Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs);
160 runStatementOnDriver("START TRANSACTION");
161 int[][] rows2 = {{5,6},{7,8}};
162 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
163 List<String> allData = stringifyValues(rows1);
164 allData.addAll(stringifyValues(rows2));
165 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
166 Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs0);
167 runStatementOnDriver("COMMIT WORK");
168 dumpTableData(Table.ACIDTBL, 1, 0);
169 dumpTableData(Table.ACIDTBL, 2, 0);
170 runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
171 CommandProcessorResponse cpr = runStatementOnDriverNegative("COMMIT");//txn started implicitly by previous statement
172 Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr.getErrorCode());
173 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
174 Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
175 }
176
177 @Test
178 public void testMmExim() throws Exception {
179 String tableName = "mm_table", importName = tableName + "_import";
180 runStatementOnDriver("drop table if exists " + tableName);
181 runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
182 "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
183 tableName));
184
185 // Regular insert: export some MM deltas, then import into a new table.
186 int[][] rows1 = {{1,2},{3,4}};
187 runStatementOnDriver(String.format("insert into %s (a,b) %s",
188 tableName, makeValuesClause(rows1)));
189 runStatementOnDriver(String.format("insert into %s (a,b) %s",
190 tableName, makeValuesClause(rows1)));
191 IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
192 org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName);
193 FileSystem fs = FileSystem.get(hiveConf);
194 Path exportPath = new Path(table.getSd().getLocation() + "_export");
195 fs.delete(exportPath, true);
196 runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
197 List<String> paths = listPathsRecursive(fs, exportPath);
198 verifyMmExportPaths(paths, 2);
199 runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
200 org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName);
201 Assert.assertEquals(imported.toString(), "insert_only",
202 imported.getParameters().get("transactional_properties"));
203 Path importPath = new Path(imported.getSd().getLocation());
204 FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
205 Assert.assertEquals(Arrays.toString(stat), 1, stat.length);
206 assertIsDelta(stat[0]);
207 List<String> allData = stringifyValues(rows1);
208 allData.addAll(stringifyValues(rows1));
209 allData.sort(null);
210 Collections.sort(allData);
211 List<String> rs = runStatementOnDriver(
212 String.format("select a,b from %s order by a,b", importName));
213 Assert.assertEquals("After import: " + rs, allData, rs);
214 runStatementOnDriver("drop table if exists " + importName);
215
216 // Do insert overwrite to create some invalid deltas, and import into a non-MM table.
217 int[][] rows2 = {{5,6},{7,8}};
218 runStatementOnDriver(String.format("insert overwrite table %s %s",
219 tableName, makeValuesClause(rows2)));
220 fs.delete(exportPath, true);
221 runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
222 paths = listPathsRecursive(fs, exportPath);
223 verifyMmExportPaths(paths, 1);
224 runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
225 "TBLPROPERTIES ('transactional'='false')", importName));
226 runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
227 imported = msClient.getTable("default", importName);
228 Assert.assertNull(imported.toString(), imported.getParameters().get("transactional"));
229 Assert.assertNull(imported.toString(),
230 imported.getParameters().get("transactional_properties"));
231 importPath = new Path(imported.getSd().getLocation());
232 stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
233 allData = stringifyValues(rows2);
234 Collections.sort(allData);
235 rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName));
236 Assert.assertEquals("After import: " + rs, allData, rs);
237 runStatementOnDriver("drop table if exists " + importName);
238 runStatementOnDriver("drop table if exists " + tableName);
239 msClient.close();
240 }
241
242 private static final class QueryRunnable implements Runnable {
243 private final CountDownLatch cdlIn, cdlOut;
244 private final String query;
245 private final HiveConf hiveConf;
246
247 QueryRunnable(HiveConf hiveConf, String query, CountDownLatch cdlIn, CountDownLatch cdlOut) {
248 this.query = query;
249 this.cdlIn = cdlIn;
250 this.cdlOut = cdlOut;
251 this.hiveConf = new HiveConf(hiveConf);
252 }
253
254 @Override
255 public void run() {
256 SessionState ss = SessionState.start(hiveConf);
257 try {
258 ss.applyAuthorizationPolicy();
259 } catch (HiveException e) {
260 throw new RuntimeException(e);
261 }
262 QueryState qs = new QueryState.Builder().withHiveConf(hiveConf).nonIsolated().build();
263 Driver d = new Driver(qs, null);
264 try {
265 LOG.info("Ready to run the query: " + query);
266 syncThreadStart(cdlIn, cdlOut);
267 try {
268 CommandProcessorResponse cpr = d.run(query);
269 if(cpr.getResponseCode() != 0) {
270 throw new RuntimeException(query + " failed: " + cpr);
271 }
272 d.getResults(new ArrayList<String>());
273 } catch (Exception e) {
274 throw new RuntimeException(e);
275 }
276 } finally {
277 d.close();
278 }
279 }
280 }
281
282
283 private static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
284 cdlIn.countDown();
285 try {
286 cdlOut.await();
287 } catch (InterruptedException e) {
288 throw new RuntimeException(e);
289 }
290 }
291
292 @Test
293 public void testParallelInsertStats() throws Exception {
294 final int TASK_COUNT = 4;
295 String tableName = "mm_table";
296 List<ColumnStatisticsObj> stats;
297 IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
298
299 String[] queries = new String[TASK_COUNT];
300 for (int i = 0; i < queries.length; ++i) {
301 queries[i] = String.format("insert into %s (a) values (" + i + ")", tableName);
302 }
303
304 runParallelQueries(queries);
305
306 // Verify stats are either invalid, or valid and correct.
307 stats = getTxnTableStats(msClient, tableName);
308 boolean hasStats = 0 != stats.size();
309 if (hasStats) {
310 verifyLongStats(TASK_COUNT, 0, TASK_COUNT - 1, stats);
311 }
312
313 runStatementOnDriver(String.format("insert into %s (a) values (" + TASK_COUNT + ")", tableName));
314 if (!hasStats) {
315 // Stats should still be invalid if they were invalid.
316 stats = getTxnTableStats(msClient, tableName);
317 Assert.assertEquals(0, stats.size());
318 }
319
320 // Stats should be valid after analyze.
321 runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
322 verifyLongStats(TASK_COUNT + 1, 0, TASK_COUNT, getTxnTableStats(msClient, tableName));
323 }
324
325 private void verifyLongStats(int dvCount, int min, int max, List<ColumnStatisticsObj> stats) {
326 Assert.assertEquals(1, stats.size());
327 LongColumnStatsData data = stats.get(0).getStatsData().getLongStats();
328 Assert.assertEquals(min, data.getLowValue());
329 Assert.assertEquals(max, data.getHighValue());
330 Assert.assertEquals(dvCount, data.getNumDVs());
331 }
332
333 private void runParallelQueries(String[] queries)
334 throws InterruptedException, ExecutionException {
335 ExecutorService executor = Executors.newFixedThreadPool(queries.length);
336 final CountDownLatch cdlIn = new CountDownLatch(queries.length), cdlOut = new CountDownLatch(1);
337 Future<?>[] tasks = new Future[queries.length];
338 for (int i = 0; i < tasks.length; ++i) {
339 tasks[i] = executor.submit(new QueryRunnable(hiveConf, queries[i], cdlIn, cdlOut));
340 }
341 cdlIn.await(); // Wait for all threads to be ready.
342 cdlOut.countDown(); // Release them at the same time.
343 for (int i = 0; i < tasks.length; ++i) {
344 tasks[i].get();
345 }
346 }
347
348 private IMetaStoreClient prepareParallelTest(String tableName, int val)
349 throws Exception, MetaException, TException, NoSuchObjectException {
350 hiveConf.setBoolean("hive.stats.autogather", true);
351 hiveConf.setBoolean("hive.stats.column.autogather", true);
352 runStatementOnDriver("drop table if exists " + tableName);
353 runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
354 "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
355 tableName));
356 runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
357 runStatementOnDriver(String.format("insert into %s (a) values (" + val + ")", tableName));
358 IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
359 // Stats should be valid after serial inserts.
360 List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
361 Assert.assertEquals(1, stats.size());
362 return msClient;
363 }
364
365
366 @Test
367 public void testParallelInsertAnalyzeStats() throws Exception {
368 String tableName = "mm_table";
369 List<ColumnStatisticsObj> stats;
370 IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
371
372 String[] queries = {
373 String.format("insert into %s (a) values (999)", tableName),
374 String.format("analyze table %s compute statistics for columns", tableName)
375 };
376 runParallelQueries(queries);
377
378 // Verify stats are either invalid, or valid and correct.
379 stats = getTxnTableStats(msClient, tableName);
380 boolean hasStats = 0 != stats.size();
381 if (hasStats) {
382 verifyLongStats(2, 0, 999, stats);
383 }
384
385 runStatementOnDriver(String.format("insert into %s (a) values (1000)", tableName));
386 if (!hasStats) {
387 // Stats should still be invalid if they were invalid.
388 stats = getTxnTableStats(msClient, tableName);
389 Assert.assertEquals(0, stats.size());
390 }
391
392 // Stats should be valid after analyze.
393 runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
394 verifyLongStats(3, 0, 1000, getTxnTableStats(msClient, tableName));
395 }
396
397 @Test
398 public void testParallelTruncateAnalyzeStats() throws Exception {
399 String tableName = "mm_table";
400 List<ColumnStatisticsObj> stats;
401 IMetaStoreClient msClient = prepareParallelTest(tableName, 0);
402
403 String[] queries = {
404 String.format("truncate table %s", tableName),
405 String.format("analyze table %s compute statistics for columns", tableName)
406 };
407 runParallelQueries(queries);
408
409 // Verify stats are either invalid, or valid and correct.
410 stats = getTxnTableStats(msClient, tableName);
411 boolean hasStats = 0 != stats.size();
412 if (hasStats) {
413 verifyLongStats(0, 0, 0, stats);
414 }
415
416 // Stats should be valid after analyze.
417 runStatementOnDriver(String.format("analyze table %s compute statistics for columns", tableName));
418 verifyLongStats(0, 0, 0, getTxnTableStats(msClient, tableName));
419 }
420
421
422 @Test
423 public void testTxnStatsOnOff() throws Exception {
424 String tableName = "mm_table";
425 hiveConf.setBoolean("hive.stats.autogather", true);
426 hiveConf.setBoolean("hive.stats.column.autogather", true);
427 runStatementOnDriver("drop table if exists " + tableName);
428 runStatementOnDriver(String.format("create table %s (a int) stored as orc " +
429 "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
430 tableName));
431
432 runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
433 IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
434 List<ColumnStatisticsObj> stats = getTxnTableStats(msClient, tableName);
435 Assert.assertEquals(1, stats.size());
436 runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
437 stats = getTxnTableStats(msClient, tableName);
438 Assert.assertEquals(1, stats.size());
439 msClient.close();
440 hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false);
441 msClient = new HiveMetaStoreClient(hiveConf);
442 // Even though the stats are valid in metastore, txn stats are disabled.
443 stats = getTxnTableStats(msClient, tableName);
444 Assert.assertEquals(0, stats.size());
445 msClient.close();
446 hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true);
447 msClient = new HiveMetaStoreClient(hiveConf);
448 stats = getTxnTableStats(msClient, tableName);
449 // Now the stats are visible again.
450 Assert.assertEquals(1, stats.size());
451 msClient.close();
452 hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), false);
453 // Running the query with stats disabled will cause stats in metastore itself to become invalid.
454 runStatementOnDriver(String.format("insert into %s (a) values (1)", tableName));
455 hiveConf.setBoolean(MetastoreConf.ConfVars.HIVE_TXN_STATS_ENABLED.getVarname(), true);
456 msClient = new HiveMetaStoreClient(hiveConf);
457 stats = getTxnTableStats(msClient, tableName);
458 Assert.assertEquals(0, stats.size());
459 msClient.close();
460 }
461
462 public List<ColumnStatisticsObj> getTxnTableStats(IMetaStoreClient msClient,
463 String tableName) throws TException, NoSuchObjectException, MetaException {
464 String validWriteIds;
465 List<ColumnStatisticsObj> stats;
466 validWriteIds = msClient.getValidWriteIds("default." + tableName).toString();
467 stats = msClient.getTableColumnStatistics(
468 "default", tableName, Lists.newArrayList("a"), validWriteIds);
469 return stats;
470 }
471
472 private void assertIsDelta(FileStatus stat) {
473 Assert.assertTrue(stat.toString(),
474 stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
475 }
476
477 private void verifyMmExportPaths(List<String> paths, int deltasOrBases) {
478 // 1 file, 1 dir for each, for now. Plus export "data" dir.
479 // This could be changed to a flat file list later.
480 Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size());
481 // No confusing directories in export.
482 for (String path : paths) {
483 Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX));
484 Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX));
485 }
486 }
487
488 private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException {
489 List<String> paths = new ArrayList<>();
490 LinkedList<Path> queue = new LinkedList<>();
491 queue.add(path);
492 while (!queue.isEmpty()) {
493 Path next = queue.pollFirst();
494 FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter);
495 for (FileStatus stat : stats) {
496 Path child = stat.getPath();
497 paths.add(child.toString());
498 if (stat.isDirectory()) {
499 queue.add(child);
500 }
501 }
502 }
503 return paths;
504 }
505
506
507 /**
508 * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
509 * @throws Exception
510 */
511 @Test
512 public void testErrors() throws Exception {
513 runStatementOnDriver("start transaction");
514 CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)");
515 Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode());
516 CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1");
517 Assert.assertEquals("Expected update of bucket column to fail",
518 "FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.",
519 cpr3.getErrorMessage());
520 Assert.assertEquals("Expected update of bucket column to fail",
521 ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode());
522 cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx
523 Assert.assertEquals("Error didn't match: " + cpr3,
524 ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr3.getErrorCode());
525 cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx
526 Assert.assertEquals("Error didn't match: " + cpr3,
527 ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr3.getErrorCode());
528 runStatementOnDriver("start transaction");
529 cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx
530 Assert.assertEquals("Expected start transaction to fail",
531 ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode());
532 runStatementOnDriver("start transaction");//ok since previously opened txn was killed
533 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
534 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
535 Assert.assertEquals("Can't see my own write", 1, rs0.size());
536 runStatementOnDriver("commit work");
537 rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
538 Assert.assertEquals("Can't see my own write", 1, rs0.size());
539 }
540 @Test
541 public void testReadMyOwnInsert() throws Exception {
542 runStatementOnDriver("START TRANSACTION");
543 List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
544 Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size());
545 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
546 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
547 Assert.assertEquals("Can't see my own write", 1, rs0.size());
548 runStatementOnDriver("commit");
549 runStatementOnDriver("START TRANSACTION");
550 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
551 runStatementOnDriver("rollback work");
552 Assert.assertEquals("Can't see write after commit", 1, rs1.size());
553 }
554 @Test
555 public void testImplicitRollback() throws Exception {
556 runStatementOnDriver("START TRANSACTION");
557 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
558 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
559 Assert.assertEquals("Can't see my own write", 1, rs0.size());
560 //next command should produce an error
561 CommandProcessorResponse cpr = runStatementOnDriverNegative("select * from no_such_table");
562 Assert.assertEquals("Txn didn't fail?",
563 "FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'no_such_table'",
564 cpr.getErrorMessage());
565 runStatementOnDriver("start transaction");
566 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
567 runStatementOnDriver("commit");
568 Assert.assertEquals("Didn't rollback as expected", 0, rs1.size());
569 }
570 @Test
571 public void testExplicitRollback() throws Exception {
572 runStatementOnDriver("START TRANSACTION");
573 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
574 runStatementOnDriver("ROLLBACK");
575 List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
576 Assert.assertEquals("Rollback didn't rollback", 0, rs.size());
577 }
578
579 @Test
580 public void testMultipleInserts() throws Exception {
581 runStatementOnDriver("START TRANSACTION");
582 int[][] rows1 = {{1,2},{3,4}};
583 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
584 int[][] rows2 = {{5,6},{7,8}};
585 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
586 List<String> allData = stringifyValues(rows1);
587 allData.addAll(stringifyValues(rows2));
588 List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
589 Assert.assertEquals("Content didn't match before commit rs", allData, rs);
590 runStatementOnDriver("commit");
591 dumpTableData(Table.ACIDTBL, 1, 0);
592 dumpTableData(Table.ACIDTBL, 1, 1);
593 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
594 Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
595 }
596 @Test
597 public void testDelete() throws Exception {
598 int[][] rows1 = {{1,2},{3,4}};
599 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
600 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
601 Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
602 runStatementOnDriver("START TRANSACTION");
603 runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
604 int[][] updatedData2 = {{1,2}};
605 List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
606 Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
607 runStatementOnDriver("commit");
608 List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
609 Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
610 }
611
612 @Test
613 public void testUpdateOfInserts() throws Exception {
614 int[][] rows1 = {{1,2},{3,4}};
615 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
616 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
617 Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
618 runStatementOnDriver("START TRANSACTION");
619 int[][] rows2 = {{5,6},{7,8}};
620 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
621 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
622 List<String> allData = stringifyValues(rows1);
623 allData.addAll(stringifyValues(rows2));
624 Assert.assertEquals("Content didn't match rs1", allData, rs1);
625 runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
626 int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
627 List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
628 Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
629 runStatementOnDriver("commit");
630 List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
631 Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4);
632 }
633 @Test
634 public void testUpdateDeleteOfInserts() throws Exception {
635 int[][] rows1 = {{1,2},{3,4}};
636 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
637 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
638 Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
639 runStatementOnDriver("START TRANSACTION");
640 int[][] rows2 = {{5,6},{7,8}};
641 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
642 List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
643 List<String> allData = stringifyValues(rows1);
644 allData.addAll(stringifyValues(rows2));
645 Assert.assertEquals("Content didn't match rs1", allData, rs1);
646 runStatementOnDriver("update " + Table.ACIDTBL + " set b = 1 where b != 1");
647 int[][] updatedData = {{1,1},{3,1},{5,1},{7,1}};
648 List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
649 Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
650 runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7 and b = 1");
651 dumpTableData(Table.ACIDTBL, 1, 0);
652 dumpTableData(Table.ACIDTBL, 2, 0);
653 dumpTableData(Table.ACIDTBL, 2, 2);
654 dumpTableData(Table.ACIDTBL, 2, 4);
655 int[][] updatedData2 = {{1,1},{3,1},{5,1}};
656 List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
657 Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
658 runStatementOnDriver("commit");
659 List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
660 Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
661 }
662 @Test
663 public void testMultipleDelete() throws Exception {
664 int[][] rows1 = {{1,2},{3,4},{5,6},{7,8}};
665 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
666 List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
667 Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
668 runStatementOnDriver("START TRANSACTION");
669 runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
670 int[][] updatedData2 = {{1,2},{3,4},{5,6}};
671 List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
672 Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs2);
673 runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
674 int[][] updatedData3 = {{1, 2}, {5, 6}};
675 List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
676 Assert.assertEquals("Wrong data after delete2", stringifyValues(updatedData3), rs3);
677 runStatementOnDriver("update " + Table.ACIDTBL + " set b=3");
678 dumpTableData(Table.ACIDTBL, 1, 0);
679 //nothing actually hashes to bucket0, so update/delete deltas don't have it
680 dumpTableData(Table.ACIDTBL, 2, 0);
681 dumpTableData(Table.ACIDTBL, 2, 2);
682 dumpTableData(Table.ACIDTBL, 2, 4);
683 List<String> rs5 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
684 int [][] updatedData4 = {{1,3},{5,3}};
685 Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5);
686 runStatementOnDriver("commit");
687 List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
688 Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4);
689 }
690 @Test
691 public void testDeleteIn() throws Exception {
692 runStatementOnDriver("delete from " + Table.ACIDTBL + " where a IN (SELECT A.a from " +
693 Table.ACIDTBL + " A)");
694 int[][] tableData = {{1,2},{3,2},{5,2},{1,3},{3,3},{5,3}};
695 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
696 runStatementOnDriver("insert into " + Table.ACIDTBL2 + "(a,b,c) values(1,7,17),(3,7,17)");
697 // runStatementOnDriver("select b from " + Table.ACIDTBL + " where a in (select b from " + Table.NONACIDORCTBL + ")");
698 runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.ACIDTBL2 + ")");
699 // runStatementOnDriver("delete from " + Table.ACIDTBL + " where a in(select a from " + Table.NONACIDORCTBL + ")");
700 runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBL2);
701 List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
702 int[][] updatedData = {{1,7},{3,7},{5,2},{5,3}};
703 Assert.assertEquals("Bulk update failed", stringifyValues(updatedData), rs);
704 }
705 @Test
706 public void testTimeOutReaper() throws Exception {
707 runStatementOnDriver("start transaction");
708 runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
709 //make sure currently running txn is considered aborted by housekeeper
710 hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
711 MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
712 houseKeeperService.setConf(hiveConf);
713 //this will abort the txn
714 houseKeeperService.run();
715 //this should fail because txn aborted due to timeout
716 CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
717 Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
718
719 //now test that we don't timeout locks we should not
720 //heartbeater should be running in the background every 1/2 second
721 hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
722 // Have to reset the conf when we change it so that the change takes affect
723 houseKeeperService.setConf(hiveConf);
724 runStatementOnDriver("start transaction");
725 runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
726 pause(750);
727
728 TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
729
730 //since there is txn open, we are heartbeating the txn not individual locks
731 GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
732 Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
733 TxnInfo txnInfo = null;
734 for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) {
735 if(ti.getState() == TxnState.OPEN) {
736 txnInfo = ti;
737 break;
738 }
739 }
740 Assert.assertNotNull(txnInfo);
741 Assert.assertEquals(14, txnInfo.getId());
742 Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
743 String s =TxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
744 String[] vals = s.split("\\s+");
745 Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
746 long lastHeartbeat = Long.parseLong(vals[1]);
747 //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we
748 //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
749 Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
750
751 ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
752 TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
753 pause(750);
754 houseKeeperService.run();
755 pause(750);
756 slr = txnHandler.showLocks(new ShowLocksRequest());
757 Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
758 TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
759
760 pause(750);
761 houseKeeperService.run();
762 slr = txnHandler.showLocks(new ShowLocksRequest());
763 Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
764 TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
765
766 //should've done several heartbeats
767 s =TxnDbUtil.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
768 vals = s.split("\\s+");
769 Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
770 Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
771 lastHeartbeat < Long.parseLong(vals[1]));
772
773 runStatementOnDriver("rollback");
774 slr = txnHandler.showLocks(new ShowLocksRequest());
775 Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size());
776 }
777 private static void pause(int timeMillis) {
778 try {
779 Thread.sleep(timeMillis);
780 }
781 catch (InterruptedException e) {
782 }
783 }
784
785 @Test
786 public void exchangePartition() throws Exception {
787 runStatementOnDriver("create database ex1");
788 runStatementOnDriver("create database ex2");
789
790 runStatementOnDriver("CREATE TABLE ex1.exchange_part_test1 (f1 string) PARTITIONED BY (ds STRING)");
791 runStatementOnDriver("CREATE TABLE ex2.exchange_part_test2 (f1 string) PARTITIONED BY (ds STRING)");
792 runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
793 runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
794 }
795 @Test
796 public void testMergeNegative() throws Exception {
797 CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO " + Table.ACIDTBL +
798 " target USING " + Table.NONACIDORCTBL +
799 " source\nON target.a = source.a " +
800 "\nWHEN MATCHED THEN UPDATE set b = 1 " +
801 "\nWHEN MATCHED THEN DELETE " +
802 "\nWHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)");
803 Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
804 }
805 @Test
806 public void testMergeNegative2() throws Exception {
807 CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +
808 " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " +
809 "\nWHEN MATCHED THEN UPDATE set b = 1 " +
810 "\nWHEN MATCHED THEN UPDATE set b=a");
811 Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
812 }
813
814 /**
815 * `1` means 1 is a column name and '1' means 1 is a string literal
816 * HiveConf.HIVE_QUOTEDID_SUPPORT
817 * HiveConf.HIVE_SUPPORT_SPECICAL_CHARACTERS_IN_TABLE_NAMES
818 * {@link TestTxnCommands#testMergeType2SCD01()}
819 */
820 @Test
821 public void testQuotedIdentifier() throws Exception {
822 String target = "`aci/d_u/ami`";
823 String src = "`src/name`";
824 runStatementOnDriver("drop table if exists " + target);
825 runStatementOnDriver("drop table if exists " + src);
826 runStatementOnDriver("create table " + target + "(i int," +
827 "`d?*de e` decimal(5,2)," +
828 "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
829 runStatementOnDriver("create table " + src + "(gh int, j decimal(5,2), k varchar(128))");
830 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
831 "\nwhen matched and i > 5 then delete " +
832 "\nwhen matched then update set vc='blah' " +
833 "\nwhen not matched then insert values(1,2.1,'baz')");
834 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
835 "\nwhen matched and i > 5 then delete " +
836 "\nwhen matched then update set vc='blah', `d?*de e` = current_timestamp() " +
837 "\nwhen not matched then insert values(1,2.1, concat('baz', current_timestamp()))");
838 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
839 "\nwhen matched and i > 5 then delete " +
840 "\nwhen matched then update set vc='blah' " +
841 "\nwhen not matched then insert values(1,2.1,'a\\b')");
842 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=gh " +
843 "\nwhen matched and i > 5 then delete " +
844 "\nwhen matched then update set vc='∆∋'" +
845 "\nwhen not matched then insert values(`a/b`.gh,`a/b`.j,'c\\t')");
846 }
847 @Test
848 public void testQuotedIdentifier2() throws Exception {
849 String target = "`aci/d_u/ami`";
850 String src = "`src/name`";
851 runStatementOnDriver("drop table if exists " + target);
852 runStatementOnDriver("drop table if exists " + src);
853 runStatementOnDriver("create table " + target + "(i int," +
854 "`d?*de e` decimal(5,2)," +
855 "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
856 runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))");
857 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" +
858 "\nwhen matched and `g/h` > 5 then delete " +
859 "\nwhen matched and `g/h` < 0 then update set vc='∆∋', `d?*de e` = `d?*de e` * j + 1" +
860 "\nwhen not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
861 runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h`" +
862 "\nwhen matched and `g/h` > 5 then delete" +
863 "\n when matched and `g/h` < 0 then update set vc='∆∋' , `d?*de e` = `d?*de e` * j + 1 " +
864 "\n when not matched and `d?*de e` <> 0 then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
865 }
866 /**
867 * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
868 * also test QuotedIdentifier inside source expression
869 * {@link TestTxnCommands#testQuotedIdentifier()}
870 * {@link TestTxnCommands#testQuotedIdentifier2()}
871 */
872 @Test
873 public void testMergeType2SCD01() throws Exception {
874 runStatementOnDriver("drop table if exists target");
875 runStatementOnDriver("drop table if exists source");
876 runStatementOnDriver("drop table if exists splitTable");
877
878 runStatementOnDriver("create table splitTable(op int)");
879 runStatementOnDriver("insert into splitTable values (0),(1)");
880 runStatementOnDriver("create table source (key int, data int)");
881 runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
882 int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
883 runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
884 int[][] sourceVals = {{1, 7}, {3, 8}};
885 runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
886 //augment source with a col which has 1 if it will cause an update in target, 0 otherwise
887 String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key";
888 //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update
889 String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end `o/p\\n` from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1";
890 if(false) {
891 //this is just for debug
892 List<String> r1 = runStatementOnDriver(curMatch);
893 List<String> r2 = runStatementOnDriver(teeCurMatch);
894 }
895 String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
896 "when matched then update set cur=0 " +
897 "when not matched then insert values(s.key,s.data,1)";
898 //to allow cross join from 'teeCurMatch'
899 hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
900 runStatementOnDriver(stmt);
901 int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
902 List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
903 Assert.assertEquals(stringifyValues(resultVals), r);
904 }
905 /**
906 * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
907 * Same as testMergeType2SCD01 but with a more intuitive "source" expression
908 */
909 @Test
910 public void testMergeType2SCD02() throws Exception {
911 runStatementOnDriver("drop table if exists target");
912 runStatementOnDriver("drop table if exists source");
913 runStatementOnDriver("create table source (key int, data int)");
914 runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
915 int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
916 runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
917 int[][] sourceVals = {{1, 7}, {3, 8}};
918 runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
919
920 String baseSrc = "select source.*, 0 c from source " +
921 "union all " +
922 "select source.*, 1 c from source " +
923 "inner join target " +
924 "on source.key=target.key where target.cur=1";
925 if(false) {
926 //this is just for debug
927 List<String> r1 = runStatementOnDriver(baseSrc);
928 List<String> r2 = runStatementOnDriver(
929 "select t.*, s.* from target t right outer join (" + baseSrc + ") s " +
930 "\non t.key=s.key and t.cur=s.c and t.cur=1");
931 }
932 String stmt = "merge into target t using " +
933 "(" + baseSrc + ") s " +
934 "on t.key=s.key and t.cur=s.c and t.cur=1 " +
935 "when matched then update set cur=0 " +
936 "when not matched then insert values(s.key,s.data,1)";
937
938 runStatementOnDriver(stmt);
939 int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
940 List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
941 Assert.assertEquals(stringifyValues(resultVals), r);
942 }
943
944 @Test
945 public void testMergeOnTezEdges() throws Exception {
946 String query = "merge into " + Table.ACIDTBL +
947 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
948 "WHEN MATCHED AND s.a > 8 THEN DELETE " +
949 "WHEN MATCHED THEN UPDATE SET b = 7 " +
950 "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
951 d.destroy();
952 HiveConf hc = new HiveConf(hiveConf);
953 hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
954 hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
955 d = new Driver(hc);
956 d.setMaxRows(10000);
957
958 List<String> explain = runStatementOnDriver("explain " + query);
959 StringBuilder sb = new StringBuilder();
960 for(String s : explain) {
961 sb.append(s).append('\n');
962 }
963 LOG.info("Explain1: " + sb);
964 for(int i = 0; i < explain.size(); i++) {
965 if(explain.get(i).contains("Edges:")) {
966 Assert.assertTrue("At i+1=" + (i+1) + explain.get(i + 1), explain.get(i + 1).contains("Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)"));
967 Assert.assertTrue("At i+1=" + (i+2) + explain.get(i + 2), explain.get(i + 2).contains("Reducer 3 <- Reducer 2 (SIMPLE_EDGE)"));
968 Assert.assertTrue("At i+1=" + (i+3) + explain.get(i + 3), explain.get(i + 3).contains("Reducer 4 <- Reducer 2 (SIMPLE_EDGE)"));
969 Assert.assertTrue("At i+1=" + (i+4) + explain.get(i + 4), explain.get(i + 4).contains("Reducer 5 <- Reducer 2 (SIMPLE_EDGE)"));
970 Assert.assertTrue("At i+1=" + (i+5) + explain.get(i + 5), explain.get(i + 5).contains("Reducer 6 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)"));
971 break;
972 }
973 }
974 }
975 @Test
976 public void testMergeUpdateDelete() throws Exception {
977 int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
978 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
979 int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
980 runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
981 String query = "merge into " + Table.ACIDTBL +
982 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
983 "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> (2,0)
984 "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3)
985 "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";//inserts (11,11)
986 runStatementOnDriver(query);
987
988 List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
989 int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}};
990 Assert.assertEquals(stringifyValues(rExpected), r);
991 }
992 @Test
993 public void testMergeUpdateDeleteNoCardCheck() throws Exception {
994 d.destroy();
995 HiveConf hc = new HiveConf(hiveConf);
996 hc.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, false);
997 d = new Driver(hc);
998 d.setMaxRows(10000);
999
1000 int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
1001 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
1002 int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
1003 runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
1004 String query = "merge into " + Table.ACIDTBL +
1005 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
1006 "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
1007 "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE ";
1008 runStatementOnDriver(query);
1009
1010 List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
1011 int[][] rExpected = {{2,0},{5,6},{7,8}};
1012 Assert.assertEquals(stringifyValues(rExpected), r);
1013 }
1014 @Test
1015 public void testMergeDeleteUpdate() throws Exception {
1016 int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}};
1017 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
1018 int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
1019 runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
1020 String query = "merge into " + Table.ACIDTBL +
1021 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
1022 "WHEN MATCHED and s.a < 5 THEN DELETE " +
1023 "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
1024 "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
1025 runStatementOnDriver(query);
1026
1027 List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
1028 int[][] rExpected = {{5,6},{7,8},{11,11}};
1029 Assert.assertEquals(stringifyValues(rExpected), r);
1030 }
1031
1032 /**
1033 * see https://issues.apache.org/jira/browse/HIVE-14949 for details
1034 * @throws Exception
1035 */
1036 @Test
1037 public void testMergeCardinalityViolation() throws Exception {
1038 int[][] sourceVals = {{2,2},{2,44},{5,5},{11,11}};
1039 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
1040 int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
1041 runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
1042 String query = "merge into " + Table.ACIDTBL +
1043 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
1044 "WHEN MATCHED and s.a < 5 THEN DELETE " +
1045 "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
1046 "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
1047 runStatementOnDriverNegative(query);
1048 runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
1049 query = "merge into " + Table.ACIDTBLPART +
1050 " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
1051 "WHEN MATCHED and s.a < 5 THEN DELETE " +
1052 "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
1053 "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b, 'p1') ";
1054 runStatementOnDriverNegative(query);
1055 }
1056 @Test
1057 public void testSetClauseFakeColumn() throws Exception {
1058 CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +
1059 " target USING " + Table.NONACIDORCTBL +
1060 "\n source ON target.a = source.a " +
1061 "\nWHEN MATCHED THEN UPDATE set t = 1");
1062 Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE,
1063 ((HiveException)cpr.getException()).getCanonicalErrorMsg());
1064 cpr = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set t = 1");
1065 Assert.assertEquals(ErrorMsg.INVALID_TARGET_COLUMN_IN_SET_CLAUSE,
1066 ((HiveException)cpr.getException()).getCanonicalErrorMsg());
1067 }
1068 @Test
1069 public void testBadOnClause() throws Exception {
1070 CommandProcessorResponse cpr = runStatementOnDriverNegative("merge into " + Table.ACIDTBL +
1071 " trgt using (select * from " + Table.NONACIDORCTBL +
1072 "src) sub on sub.a = target.a when not matched then insert values (sub.a,sub.b)");
1073 Assert.assertTrue("Error didn't match: " + cpr, cpr.getErrorMessage().contains(
1074 "No columns from target table 'trgt' found in ON clause '`sub`.`a` = `target`.`a`' of MERGE statement."));
1075
1076 }
1077
1078 /**
1079 * Writing UTs that need multiple threads is challenging since Derby chokes on concurrent access.
1080 * This tests that "AND WAIT" actually blocks and responds to interrupt
1081 * @throws Exception
1082 */
1083 @Test
1084 public void testCompactionBlocking() throws Exception {
1085 Timer cancelCompact = new Timer("CancelCompactionTimer", false);
1086 final Thread threadToInterrupt= Thread.currentThread();
1087 cancelCompact.schedule(new TimerTask() {
1088 @Override
1089 public void run() {
1090 threadToInterrupt.interrupt();
1091 }
1092 }, 5000);
1093 long start = System.currentTimeMillis();
1094 runStatementOnDriver("alter table "+ TestTxnCommands2.Table.ACIDTBL +" compact 'major' AND WAIT");
1095 //no Worker so it stays in initiated state
1096 //w/o AND WAIT the above alter table retunrs almost immediately, so the test here to check that
1097 //> 2 seconds pass, i.e. that the command in Driver actually blocks before cancel is fired
1098 Assert.assertTrue(System.currentTimeMillis() > start + 2);
1099 }
1100
1101 @Test
1102 public void testMergeCase() throws Exception {
1103 runStatementOnDriver("create table merge_test (c1 integer, c2 integer, c3 integer) CLUSTERED BY (c1) into 2 buckets stored as orc tblproperties(\"transactional\"=\"true\")");
1104 runStatementOnDriver("create table if not exists e011_02 (c1 float, c2 double, c3 float)");
1105 runStatementOnDriver("merge into merge_test using e011_02 on (merge_test.c1 = e011_02.c1) when not matched then insert values (case when e011_02.c1 > 0 then e011_02.c1 + 1 else e011_02.c1 end, e011_02.c2 + e011_02.c3, coalesce(e011_02.c3, 1))");
1106 }
1107 /**
1108 * HIVE-16177
1109 * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}
1110 */
1111 @Test
1112 public void testNonAcidToAcidConversion01() throws Exception {
1113 //create 1 row in a file 000001_0 (and an empty 000000_0)
1114 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
1115 //create 1 row in a file 000000_0_copy1 and 1 row in a file 000001_0_copy1
1116 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(0,12),(1,5)");
1117
1118 //convert the table to Acid
1119 runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
1120 //create a delta directory
1121 runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)");
1122
1123 //make sure we assign correct Ids
1124 List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID");
1125 LOG.warn("before compact");
1126 for(String s : rs) {
1127 LOG.warn(s);
1128 }
1129 Assert.assertEquals(536870912,
1130 BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
1131 Assert.assertEquals(536936448,
1132 BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
1133 Assert.assertEquals("", 4, rs.size());
1134 Assert.assertTrue(rs.get(0),
1135 rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
1136 Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000001_0"));
1137 Assert.assertTrue(rs.get(1),
1138 rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
1139 Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0_copy_1"));
1140 Assert.assertTrue(rs.get(2),
1141 rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12"));
1142 Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
1143 Assert.assertTrue(rs.get(3),
1144 rs.get(3).startsWith("{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
1145 Assert.assertTrue(rs.get(3), rs.get(3)
1146 .endsWith("nonacidorctbl/delta_10000001_10000001_0000/bucket_00001"));
1147 //run Compaction
1148 runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
1149 TestTxnCommands2.runWorker(hiveConf);
1150 rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " +
1151 Table.NONACIDORCTBL + " order by ROW__ID");
1152 LOG.warn("after compact");
1153 for(String s : rs) {
1154 LOG.warn(s);
1155 }
1156 Assert.assertEquals("", 4, rs.size());
1157 Assert.assertTrue(rs.get(0),
1158 rs.get(0).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
1159 Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_10000001/bucket_00001"));
1160 Assert.assertTrue(rs.get(1),
1161 rs.get(1).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
1162 Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_10000001/bucket_00001"));
1163 Assert.assertTrue(rs.get(2),
1164 rs.get(2).startsWith("{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t12"));
1165 Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_10000001/bucket_00001"));
1166 Assert.assertTrue(rs.get(3),
1167 rs.get(3)
1168 .startsWith("{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
1169 Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_10000001/bucket_00001"));
1170
1171 //make sure they are the same before and after compaction
1172 }
1173 //@Ignore("see bucket_num_reducers_acid.q")
1174 @Test
1175 public void testMoreBucketsThanReducers() throws Exception {
1176 //see bucket_num_reducers.q bucket_num_reducers2.q
1177 // todo: try using set VerifyNumReducersHook.num.reducers=10;
1178 d.destroy();
1179 HiveConf hc = new HiveConf(hiveConf);
1180 hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 1);
1181 //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others
1182 hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 1);
1183 hc.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
1184 d = new Driver(hc);
1185 d.setMaxRows(10000);
1186 runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,1)");//txn X write to bucket1
1187 runStatementOnDriver("insert into " + Table.ACIDTBL + " values(0,0),(3,3)");// txn X + 1 write to bucket0 + bucket1
1188 runStatementOnDriver("update " + Table.ACIDTBL + " set b = -1");
1189 List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBL + " order by a, b");
1190 int[][] expected = {{0, -1}, {1, -1}, {3, -1}};
1191 Assert.assertEquals(stringifyValues(expected), r);
1192 }
1193 @Ignore("Moved to Tez")
1194 @Test
1195 public void testMoreBucketsThanReducers2() throws Exception {
1196 //todo: try using set VerifyNumReducersHook.num.reducers=10;
1197 //see bucket_num_reducers.q bucket_num_reducers2.q
1198 d.destroy();
1199 HiveConf hc = new HiveConf(hiveConf);
1200 hc.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
1201 //this is used in multiple places, SemanticAnalyzer.getBucketingSortingDest() among others
1202 hc.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
1203 d = new Driver(hc);
1204 d.setMaxRows(10000);
1205 runStatementOnDriver("create table fourbuckets (a int, b int) clustered by (a) into 4 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
1206 //below value for a is bucket id, for b - txn id (logically)
1207 runStatementOnDriver("insert into fourbuckets values(0,1),(1,1)");//txn X write to b0 + b1
1208 runStatementOnDriver("insert into fourbuckets values(2,2),(3,2)");// txn X + 1 write to b2 + b3
1209 runStatementOnDriver("insert into fourbuckets values(0,3),(1,3)");//txn X + 2 write to b0 + b1
1210 runStatementOnDriver("insert into fourbuckets values(2,4),(3,4)");//txn X + 3 write to b2 + b3
1211 //so with 2 FileSinks and 4 buckets, FS1 should see (0,1),(2,2),(0,3)(2,4) since data is sorted by ROW__ID where tnxid is the first component
1212 //FS2 should see (1,1),(3,2),(1,3),(3,4)
1213
1214 runStatementOnDriver("update fourbuckets set b = -1");
1215 List<String> r = runStatementOnDriver("select * from fourbuckets order by a, b");
1216 int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
1217 Assert.assertEquals(stringifyValues(expected), r);
1218 }
1219 @Test
1220 public void testVersioning() throws Exception {
1221 hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
1222 runStatementOnDriver("drop table if exists T");
1223 runStatementOnDriver("create table T (a int, b int) stored as orc");
1224 int[][] data = {{1, 2}};
1225 //create 1 delta file bucket_00000
1226 runStatementOnDriver("insert into T" + makeValuesClause(data));
1227
1228 //delete the bucket files so now we have empty delta dirs
1229 List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
1230 FileSystem fs = FileSystem.get(hiveConf);
1231 Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX));
1232 Path filePath = new Path(rs.get(0));
1233 int version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs);
1234 //check it has expected version marker
1235 Assert.assertEquals("Unexpected version marker in " + filePath,
1236 AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
1237
1238 //check that delta dir has a version file with expected value
1239 filePath = filePath.getParent();
1240 Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX));
1241 int versionFromMetaFile = AcidUtils.OrcAcidVersion
1242 .getAcidVersionFromMetaFile(filePath, fs);
1243 Assert.assertEquals("Unexpected version marker in " + filePath,
1244 AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
1245
1246 runStatementOnDriver("insert into T" + makeValuesClause(data));
1247 runStatementOnDriver("alter table T compact 'major'");
1248 TestTxnCommands2.runWorker(hiveConf);
1249
1250 //check status of compaction job
1251 TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
1252 ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
1253 Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
1254 Assert.assertEquals("Unexpected 0 compaction state",
1255 TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
1256 Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
1257
1258 rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
1259 Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX));
1260
1261 filePath = new Path(rs.get(0));
1262 version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs);
1263 //check that files produced by compaction still have the version marker
1264 Assert.assertEquals("Unexpected version marker in " + filePath,
1265 AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
1266
1267 //check that compacted base dir has a version file with expected value
1268 filePath = filePath.getParent();
1269 Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX));
1270 versionFromMetaFile = AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile(
1271 filePath, fs);
1272 Assert.assertEquals("Unexpected version marker in " + filePath,
1273 AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
1274 }
1275 }