cant add DC before it has any nodes, also need to run queries at LOCAL_ONE to make...
[cassandra-dtest.git] / materialized_views_test.py
1 import collections
2 import re
3 import sys
4 import time
5 import traceback
6 import pytest
7 import threading
8 import logging
9
10 from flaky import flaky
11 from enum import Enum
12 from queue import Empty
13 from functools import partial
14 from multiprocessing import Process, Queue
15
16 from cassandra import ConsistencyLevel, InvalidRequest, WriteFailure
17 from cassandra.cluster import NoHostAvailable
18 from cassandra.concurrent import execute_concurrent_with_args
19 from cassandra.cluster import Cluster
20 from cassandra.query import SimpleStatement
21
22 from distutils.version import LooseVersion
23 from dtest import Tester, get_ip_from_node, create_ks
24 from tools.assertions import (assert_all, assert_crc_check_chance_equal,
25 assert_invalid, assert_none, assert_one,
26 assert_unavailable)
27 from tools.data import rows_to_list
28 from tools.misc import new_node
29 from tools.jmxutils import (JolokiaAgent, make_mbean, remove_perf_disable_shared_mem)
30
31 since = pytest.mark.since
32 logger = logging.getLogger(__name__)
33
34 # CASSANDRA-10978. Migration wait (in seconds) to use in bootstrapping tests. Needed to handle
35 # pathological case of flushing schema keyspace for multiple data directories. See CASSANDRA-6696
36 # for multiple data directory changes and CASSANDRA-10421 for compaction logging that must be
37 # written.
38 MIGRATION_WAIT = 5
39
40
41 @flaky
42 @since('3.0')
43 class TestMaterializedViews(Tester):
44 """
45 Test materialized views implementation.
46 @jira_ticket CASSANDRA-6477
47 @since 3.0
48 """
49
50 def _rows_to_list(self, rows):
51 new_list = [list(row) for row in rows]
52 return new_list
53
54 def prepare(self, user_table=False, rf=1, options=None, nodes=3, install_byteman=False, **kwargs):
55 cluster = self.cluster
56 cluster.populate([nodes, 0], install_byteman=install_byteman)
57 if options:
58 cluster.set_configuration_options(values=options)
59 cluster.start()
60 node1 = cluster.nodelist()[0]
61
62 session = self.patient_cql_connection(node1, **kwargs)
63 create_ks(session, 'ks', rf)
64
65 if user_table:
66 session.execute(
67 ("CREATE TABLE users (username varchar, password varchar, gender varchar, "
68 "session_token varchar, state varchar, birth_year bigint, "
69 "PRIMARY KEY (username));")
70 )
71
72 # create a materialized view
73 session.execute(("CREATE MATERIALIZED VIEW users_by_state AS "
74 "SELECT * FROM users WHERE STATE IS NOT NULL AND username IS NOT NULL "
75 "PRIMARY KEY (state, username)"))
76
77 return session
78
79 def update_view(self, session, query, flush, compact=False):
80 session.execute(query)
81 self._replay_batchlogs()
82 if flush:
83 self.cluster.flush()
84 if compact:
85 self.cluster.compact()
86
87 def _settle_nodes(self):
88 logger.debug("Settling all nodes")
89 stage_match = re.compile("(?P<name>\S+)\s+(?P<active>\d+)\s+(?P<pending>\d+)\s+(?P<completed>\d+)\s+(?P<blocked>\d+)\s+(?P<alltimeblocked>\d+)")
90
91 def _settled_stages(node):
92 (stdout, stderr, rc) = node.nodetool("tpstats")
93 lines = re.split("\n+", stdout)
94 for line in lines:
95 match = stage_match.match(line)
96 if match is not None:
97 active = int(match.group('active'))
98 pending = int(match.group('pending'))
99 if active != 0 or pending != 0:
100 logger.debug("%s - pool %s still has %d active and %d pending" % (node.name, match.group("name"), active, pending))
101 return False
102 return True
103
104 for node in self.cluster.nodelist():
105 if node.is_running():
106 node.nodetool("replaybatchlog")
107 attempts = 50 # 100 milliseconds per attempt, so 5 seconds total
108 while attempts > 0 and not _settled_stages(node):
109 time.sleep(0.1)
110 attempts -= 1
111
112 def _build_progress_table(self):
113 if self.cluster.version() >= '4':
114 return 'system.view_builds_in_progress'
115 else:
116 return 'system.views_builds_in_progress'
117
118 def _wait_for_view(self, ks, view):
119 logger.debug("waiting for view")
120
121 def _view_build_finished(node):
122 s = self.patient_exclusive_cql_connection(node)
123 query = "SELECT * FROM %s WHERE keyspace_name='%s' AND view_name='%s'" %\
124 (self._build_progress_table(), ks, view)
125 result = list(s.execute(query))
126 return len(result) == 0
127
128 for node in self.cluster.nodelist():
129 if node.is_running():
130 attempts = 50 # 1 sec per attempt, so 50 seconds total
131 while attempts > 0 and not _view_build_finished(node):
132 time.sleep(1)
133 attempts -= 1
134 if attempts <= 0:
135 raise RuntimeError("View {}.{} build not finished after 50 seconds.".format(ks, view))
136
137 def _wait_for_view_build_start(self, session, ks, view, wait_minutes=2):
138 """Wait for the start of a MV build, ensuring that it has saved some progress"""
139 start = time.time()
140 while True:
141 try:
142 query = "SELECT COUNT(*) FROM %s WHERE keyspace_name='%s' AND view_name='%s'" %\
143 (self._build_progress_table(), ks, view)
144 result = list(session.execute(query))
145 assert 0 == result[0].count
146 except AssertionError:
147 break
148
149 elapsed = (time.time() - start) / 60
150 if elapsed > wait_minutes:
151 self.fail("The MV build hasn't started in 2 minutes.")
152
153 def _insert_data(self, session):
154 # insert data
155 insert_stmt = "INSERT INTO users (username, password, gender, state, birth_year) VALUES "
156 session.execute(insert_stmt + "('user1', 'ch@ngem3a', 'f', 'TX', 1968);")
157 session.execute(insert_stmt + "('user2', 'ch@ngem3b', 'm', 'CA', 1971);")
158 session.execute(insert_stmt + "('user3', 'ch@ngem3c', 'f', 'FL', 1978);")
159 session.execute(insert_stmt + "('user4', 'ch@ngem3d', 'm', 'TX', 1974);")
160 self._settle_nodes()
161
162 def _replay_batchlogs(self):
163 for node in self.cluster.nodelist():
164 if node.is_running():
165 logger.debug("Replaying batchlog on node {}".format(node.name))
166 node.nodetool("replaybatchlog")
167 # CASSANDRA-13069 - Ensure replayed mutations are removed from the batchlog
168 node_session = self.patient_exclusive_cql_connection(node)
169 result = list(node_session.execute("SELECT count(*) FROM system.batches;"))
170 assert result[0].count == 0
171
172 def test_create(self):
173 """Test the materialized view creation"""
174 session = self.prepare(user_table=True)
175
176 result = list(session.execute(("SELECT * FROM system_schema.views "
177 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
178 assert len(result) == 1, "Expecting 1 materialized view == got" + str(result)
179
180 def test_gcgs_validation(self):
181 """Verify that it's not possible to create or set a too low gc_grace_seconds on MVs"""
182 session = self.prepare(user_table=True)
183
184 # Shouldn't be able to alter the gc_grace_seconds of the base table to 0
185 assert_invalid(session,
186 "ALTER TABLE users WITH gc_grace_seconds = 0",
187 "Cannot alter gc_grace_seconds of the base table of a materialized view "
188 "to 0, since this value is used to TTL undelivered updates. Setting "
189 "gc_grace_seconds too low might cause undelivered updates to expire "
190 "before being replayed.")
191
192 # But can alter the gc_grace_seconds of the bease table to a value != 0
193 session.execute("ALTER TABLE users WITH gc_grace_seconds = 10")
194
195 # Shouldn't be able to alter the gc_grace_seconds of the MV to 0
196 assert_invalid(session,
197 "ALTER MATERIALIZED VIEW users_by_state WITH gc_grace_seconds = 0",
198 "Cannot alter gc_grace_seconds of a materialized view to 0, since "
199 "this value is used to TTL undelivered updates. Setting gc_grace_seconds "
200 "too low might cause undelivered updates to expire before being replayed.")
201
202 # Now let's drop MV
203 session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
204
205 # Now we should be able to set the gc_grace_seconds of the base table to 0
206 session.execute("ALTER TABLE users WITH gc_grace_seconds = 0")
207
208 # Now we shouldn't be able to create a new MV on this table
209 assert_invalid(session,
210 "CREATE MATERIALIZED VIEW users_by_state AS "
211 "SELECT * FROM users WHERE STATE IS NOT NULL AND username IS NOT NULL "
212 "PRIMARY KEY (state, username)",
213 "Cannot create materialized view 'users_by_state' for base table 'users' "
214 "with gc_grace_seconds of 0, since this value is used to TTL undelivered "
215 "updates. Setting gc_grace_seconds too low might cause undelivered updates"
216 " to expire before being replayed.")
217
218 def test_insert(self):
219 """Test basic insertions"""
220 session = self.prepare(user_table=True)
221
222 self._insert_data(session)
223
224 result = list(session.execute("SELECT * FROM users;"))
225 assert len(result) == 4, "Expecting {} users, got {}".format(4 == len(result))
226
227 result = list(session.execute("SELECT * FROM users_by_state WHERE state='TX';"))
228 assert len(result) == 2, "Expecting {} users, got {}".format(2 == len(result))
229
230 result = list(session.execute("SELECT * FROM users_by_state WHERE state='CA';"))
231 assert len(result) == 1, "Expecting {} users, got {}".format(1 == len(result))
232
233 result = list(session.execute("SELECT * FROM users_by_state WHERE state='MA';"))
234 assert len(result) == 0, "Expecting {} users, got {}".format(0 == len(result))
235
236 def test_populate_mv_after_insert(self):
237 """Test that a view is OK when created with existing data"""
238 session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
239
240 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
241
242 for i in range(1000):
243 session.execute("INSERT INTO t (id, v) VALUES ({v}, {v})".format(v=i))
244
245 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
246 "AND id IS NOT NULL PRIMARY KEY (v, id)"))
247
248 logger.debug("wait for view to build")
249 self._wait_for_view("ks", "t_by_v")
250
251 logger.debug("wait that all batchlogs are replayed")
252 self._replay_batchlogs()
253
254 for i in range(1000):
255 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(i), [i, i])
256
257 def test_populate_mv_after_insert_wide_rows(self):
258 """Test that a view is OK when created with existing data with wide rows"""
259 session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
260
261 session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v))")
262
263 for i in range(5):
264 for j in range(10000):
265 session.execute("INSERT INTO t (id, v) VALUES ({}, {})".format(i, j))
266
267 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
268 "AND id IS NOT NULL PRIMARY KEY (v, id)"))
269
270 logger.debug("wait for view to build")
271 self._wait_for_view("ks", "t_by_v")
272
273 logger.debug("wait that all batchlogs are replayed")
274 self._replay_batchlogs()
275 for i in range(5):
276 for j in range(10000):
277 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, j), [j, i])
278
279 def test_crc_check_chance(self):
280 """Test that crc_check_chance parameter is properly populated after mv creation and update"""
281 session = self.prepare()
282
283 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
284 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t WHERE v IS NOT NULL "
285 "AND id IS NOT NULL PRIMARY KEY (v, id) WITH crc_check_chance = 0.5"))
286
287 assert_crc_check_chance_equal(session, "t_by_v", 0.5, view=True)
288
289 session.execute("ALTER MATERIALIZED VIEW t_by_v WITH crc_check_chance = 0.3")
290
291 assert_crc_check_chance_equal(session, "t_by_v", 0.3, view=True)
292
293 def test_prepared_statement(self):
294 """Test basic insertions with prepared statement"""
295 session = self.prepare(user_table=True)
296
297 insertPrepared = session.prepare(
298 "INSERT INTO users (username, password, gender, state, birth_year) VALUES (?, ?, ?, ?, ?);"
299 )
300 selectPrepared = session.prepare(
301 "SELECT state, password, session_token FROM users_by_state WHERE state=?;"
302 )
303
304 # insert data
305 session.execute(insertPrepared.bind(('user1', 'ch@ngem3a', 'f', 'TX', 1968)))
306 session.execute(insertPrepared.bind(('user2', 'ch@ngem3b', 'm', 'CA', 1971)))
307 session.execute(insertPrepared.bind(('user3', 'ch@ngem3c', 'f', 'FL', 1978)))
308 session.execute(insertPrepared.bind(('user4', 'ch@ngem3d', 'm', 'TX', 1974)))
309
310 result = list(session.execute("SELECT * FROM users;"))
311 assert len(result) == 4, "Expecting {} users, got {}".format(4, len(result))
312
313 result = list(session.execute(selectPrepared.bind(['TX'])))
314 assert len(result) == 2, "Expecting {} users, got {}".format(2, len(result))
315
316 result = list(session.execute(selectPrepared.bind(['CA'])))
317 assert len(result) == 1, "Expecting {} users, got {}".format(1, len(result))
318
319 result = list(session.execute(selectPrepared.bind(['MA'])))
320 assert len(result) == 0, "Expecting {} users, got {}".format(0, len(result))
321
322 def test_immutable(self):
323 """Test that a materialized view is immutable"""
324 session = self.prepare(user_table=True)
325
326 # cannot insert
327 assert_invalid(session, "INSERT INTO users_by_state (state, username) VALUES ('TX', 'user1');",
328 "Cannot directly modify a materialized view")
329
330 # cannot update
331 assert_invalid(session, "UPDATE users_by_state SET session_token='XYZ' WHERE username='user1' AND state = 'TX';",
332 "Cannot directly modify a materialized view")
333
334 # cannot delete a row
335 assert_invalid(session, "DELETE from users_by_state where state='TX';",
336 "Cannot directly modify a materialized view")
337
338 # cannot delete a cell
339 assert_invalid(session, "DELETE session_token from users_by_state where state='TX';",
340 "Cannot directly modify a materialized view")
341
342 # cannot alter a table
343 assert_invalid(session, "ALTER TABLE users_by_state ADD first_name varchar",
344 "Cannot use ALTER TABLE on Materialized View")
345
346 def test_drop_mv(self):
347 """Test that we can drop a view properly"""
348 session = self.prepare(user_table=True)
349
350 # create another materialized view
351 session.execute(("CREATE MATERIALIZED VIEW users_by_birth_year AS "
352 "SELECT * FROM users WHERE birth_year IS NOT NULL AND "
353 "username IS NOT NULL PRIMARY KEY (birth_year, username)"))
354
355 result = list(session.execute(("SELECT * FROM system_schema.views "
356 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
357 assert len(result) == 2, "Expecting {} materialized view, got {}".format(2, len(result))
358
359 session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
360
361 result = list(session.execute(("SELECT * FROM system_schema.views "
362 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
363 assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
364
365 def test_drop_column(self):
366 """Test that we cannot drop a column if it is used by a MV"""
367 session = self.prepare(user_table=True)
368
369 result = list(session.execute(("SELECT * FROM system_schema.views "
370 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
371 assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
372
373 assert_invalid(
374 session,
375 "ALTER TABLE ks.users DROP state;",
376 "Cannot drop column state on base table with materialized views."
377 )
378
379 def test_drop_table(self):
380 """Test that we cannot drop a table without deleting its MVs first"""
381 session = self.prepare(user_table=True)
382
383 result = list(session.execute(("SELECT * FROM system_schema.views "
384 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
385 assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
386
387 assert_invalid(
388 session,
389 "DROP TABLE ks.users;",
390 "Cannot drop table when materialized views still depend on it"
391 )
392
393 result = list(session.execute(("SELECT * FROM system_schema.views "
394 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
395 assert len(result) == 1, "Expecting {} materialized view, got {}".format(1, len(result))
396
397 session.execute("DROP MATERIALIZED VIEW ks.users_by_state;")
398 session.execute("DROP TABLE ks.users;")
399
400 result = list(session.execute(("SELECT * FROM system_schema.views "
401 "WHERE keyspace_name='ks' AND base_table_name='users' ALLOW FILTERING")))
402 assert len(result) == 0, "Expecting {} materialized view, got {}".format(1, len(result))
403
404 def test_clustering_column(self):
405 """Test that we can use clustering columns as primary key for a materialized view"""
406 session = self.prepare(consistency_level=ConsistencyLevel.QUORUM)
407
408 session.execute(("CREATE TABLE users (username varchar, password varchar, gender varchar, "
409 "session_token varchar, state varchar, birth_year bigint, "
410 "PRIMARY KEY (username, state, birth_year));"))
411
412 # create a materialized view that use a compound key
413 session.execute(("CREATE MATERIALIZED VIEW users_by_state_birth_year "
414 "AS SELECT * FROM users WHERE state IS NOT NULL AND birth_year IS NOT NULL "
415 "AND username IS NOT NULL PRIMARY KEY (state, birth_year, username)"))
416
417 session.cluster.control_connection.wait_for_schema_agreement()
418
419 self._insert_data(session)
420
421 result = list(session.execute("SELECT * FROM ks.users_by_state_birth_year WHERE state='TX'"))
422 assert len(result) == 2, "Expecting {} users, got {}".format(2, len(result))
423
424 result = list(session.execute("SELECT * FROM ks.users_by_state_birth_year WHERE state='TX' AND birth_year=1968"))
425 assert len(result) == 1, "Expecting {} users, got {}".format(1, len(result))
426
427 def _add_dc_after_mv_test(self, rf, nts):
428 """
429 @jira_ticket CASSANDRA-10978
430
431 Add datacenter with configurable replication.
432 """
433
434 session = self.prepare(rf=rf)
435
436 logger.debug("Creating schema")
437 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
438 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
439 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
440
441 logger.debug("Writing 1k to base")
442 for i in range(1000):
443 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
444
445 logger.debug("Reading 1k from view")
446 for i in range(1000):
447 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
448
449 logger.debug("Reading 1k from base")
450 for i in range(1000):
451 assert_one(session, "SELECT * FROM t WHERE id = {}".format(i), [i, -i])
452
453 logger.debug("Bootstrapping new node in another dc")
454 node4 = new_node(self.cluster, data_center='dc2')
455 node4.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
456
457 logger.debug("Bootstrapping new node in another dc")
458 node5 = new_node(self.cluster, remote_debug_port='1414', data_center='dc2')
459 node5.start(jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)], wait_other_notice=True, wait_for_binary_proto=True)
460 if nts:
461 session.execute("alter keyspace ks with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
462 session.execute("alter keyspace system_auth with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
463 session.execute("alter keyspace system_traces with replication = {'class':'NetworkTopologyStrategy', 'dc1':1, 'dc2':1}")
464 node4.nodetool('rebuild dc1')
465 node5.nodetool('rebuild dc1')
466
467 cl = ConsistencyLevel.LOCAL_ONE if nts else ConsistencyLevel.ONE
468 session2 = self.patient_exclusive_cql_connection(node4, consistency_level=cl)
469
470 logger.debug("Verifying data from new node in view")
471 for i in range(1000):
472 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = {}".format(-i), [-i, i])
473
474 logger.debug("Inserting 100 into base")
475 for i in range(1000, 1100):
476 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
477
478 logger.debug("Verify 100 in view")
479 for i in range(1000, 1100):
480 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
481
482 @pytest.mark.resource_intensive
483 def test_add_dc_after_mv_simple_replication(self):
484 """
485 @jira_ticket CASSANDRA-10634
486
487 Test that materialized views work as expected when adding a datacenter with SimpleStrategy.
488 """
489
490 self._add_dc_after_mv_test(1, False)
491
492 @pytest.mark.resource_intensive
493 def test_add_dc_after_mv_network_replication(self):
494 """
495 @jira_ticket CASSANDRA-10634
496
497 Test that materialized views work as expected when adding a datacenter with NetworkTopologyStrategy.
498 """
499
500 self._add_dc_after_mv_test({'dc1': 1}, True)
501
502 @pytest.mark.resource_intensive
503 def test_add_node_after_mv(self):
504 """
505 @jira_ticket CASSANDRA-10978
506
507 Test that materialized views work as expected when adding a node.
508 """
509
510 session = self.prepare()
511
512 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
513 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
514 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
515
516 for i in range(1000):
517 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
518
519 for i in range(1000):
520 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
521
522 node4 = new_node(self.cluster)
523 node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
524
525 session2 = self.patient_exclusive_cql_connection(node4)
526
527 """
528 @jira_ticket CASSANDRA-12984
529
530 Assert that MVs are marked as build after bootstrap. Otherwise newly streamed MVs will be built again
531 """
532 assert_one(session2, "SELECT count(*) FROM system.built_views WHERE keyspace_name = 'ks' AND view_name = 't_by_v'", [1])
533
534 for i in range(1000):
535 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE v = {}".format(-i), [-i, i])
536
537 for i in range(1000, 1100):
538 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
539
540 for i in range(1000, 1100):
541 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
542
543 def test_insert_during_range_movement_rf1(self):
544 self._base_test_insert_during_range_movement(rf=1)
545
546 def test_insert_during_range_movement_rf2(self):
547 self._base_test_insert_during_range_movement(rf=2)
548
549 def test_insert_during_range_movement_rf3(self):
550 self._base_test_insert_during_range_movement(rf=3)
551
552 def _base_test_insert_during_range_movement(self, rf):
553 """
554 @jira_ticket CASSANDRA-14251
555
556 Test that materialized views replication work in the middle of a join
557 for different replication factors.
558 """
559
560 session = self.prepare(rf=rf)
561
562 logger.debug("Creating table and view")
563
564 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
565 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
566 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
567
568 logger.debug("Starting new node4 in write survey mode")
569 node4 = new_node(self.cluster)
570 # Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will be replayed below
571 node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.write_survey=true",
572 "-Dcassandra.batchlog.replay_timeout_in_ms=1"])
573
574 logger.debug("Insert data while node4 is joining")
575
576 for i in range(1000):
577 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
578
579 logger.debug("Finish joining node4")
580 node4.nodetool("join")
581
582 logger.debug('Replay batchlogs')
583 time.sleep(0.001) # Wait batchlog.replay_timeout_in_ms=1 (ms)
584 self._replay_batchlogs()
585
586 logger.debug("Verify data")
587 for i in range(1000):
588 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
589
590 @pytest.mark.resource_intensive
591 def test_add_node_after_wide_mv_with_range_deletions(self):
592 """
593 @jira_ticket CASSANDRA-11670
594
595 Test that materialized views work with wide materialized views as expected when adding a node.
596 """
597
598 session = self.prepare()
599
600 session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v)) WITH compaction = { 'class': 'SizeTieredCompactionStrategy', 'enabled': 'false' }")
601 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
602 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
603
604 for i in range(10):
605 for j in range(100):
606 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
607
608 self.cluster.flush()
609
610 for i in range(10):
611 for j in range(100):
612 assert_one(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j), [i, j])
613 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
614
615 for i in range(10):
616 for j in range(100):
617 if j % 10 == 0:
618 session.execute("DELETE FROM t WHERE id = {} AND v >= {} and v < {}".format(i, j, j + 2))
619
620 self.cluster.flush()
621
622 for i in range(10):
623 for j in range(100):
624 if j % 10 == 0 or (j - 1) % 10 == 0:
625 assert_none(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j))
626 assert_none(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j))
627 else:
628 assert_one(session, "SELECT * FROM t WHERE id = {} and v = {}".format(i, j), [i, j])
629 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
630
631 node4 = new_node(self.cluster)
632 node4.set_configuration_options(values={'max_mutation_size_in_kb': 20}) # CASSANDRA-11670
633 logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
634 node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
635
636 session2 = self.patient_exclusive_cql_connection(node4)
637
638 for i in range(10):
639 for j in range(100):
640 if j % 10 == 0 or (j - 1) % 10 == 0:
641 assert_none(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j))
642 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j))
643 else:
644 assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j), [i, j])
645 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
646
647 for i in range(10):
648 for j in range(100, 110):
649 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
650
651 for i in range(10):
652 for j in range(110):
653 if j < 100 and (j % 10 == 0 or (j - 1) % 10 == 0):
654 assert_none(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j))
655 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j))
656 else:
657 assert_one(session2, "SELECT * FROM ks.t WHERE id = {} and v = {}".format(i, j), [i, j])
658 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
659
660 @pytest.mark.resource_intensive
661 def test_add_node_after_very_wide_mv(self):
662 """
663 @jira_ticket CASSANDRA-11670
664
665 Test that materialized views work with very wide materialized views as expected when adding a node.
666 """
667
668 session = self.prepare()
669
670 session.execute("CREATE TABLE t (id int, v int, PRIMARY KEY (id, v))")
671 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
672 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
673
674 for i in range(5):
675 for j in range(5000):
676 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
677
678 self.cluster.flush()
679
680 for i in range(5):
681 for j in range(5000):
682 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
683
684 node4 = new_node(self.cluster)
685 node4.set_configuration_options(values={'max_mutation_size_in_kb': 20}) # CASSANDRA-11670
686 logger.debug("Start join at {}".format(time.strftime("%H:%M:%S")))
687 node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
688
689 session2 = self.patient_exclusive_cql_connection(node4)
690
691 for i in range(5):
692 for j in range(5000):
693 assert_one(session2, "SELECT * FROM ks.t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
694
695 for i in range(5):
696 for j in range(5100):
697 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=j))
698
699 for i in range(5):
700 for j in range(5100):
701 assert_one(session, "SELECT * FROM t_by_v WHERE id = {} and v = {}".format(i, j), [j, i])
702
703 @pytest.mark.resource_intensive
704 def test_add_write_survey_node_after_mv(self):
705 """
706 @jira_ticket CASSANDRA-10621
707 @jira_ticket CASSANDRA-10978
708
709 Test that materialized views work as expected when adding a node in write survey mode.
710 """
711
712 session = self.prepare()
713
714 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
715 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
716 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
717
718 for i in range(1000):
719 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
720
721 for i in range(1000):
722 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
723
724 node4 = new_node(self.cluster)
725 node4.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.write_survey=true", "-Dcassandra.migration_task_wait_in_seconds={}".format(MIGRATION_WAIT)])
726
727 for i in range(1000, 1100):
728 session.execute("INSERT INTO t (id, v) VALUES ({id}, {v})".format(id=i, v=-i))
729
730 for i in range(1100):
731 assert_one(session, "SELECT * FROM t_by_v WHERE v = {}".format(-i), [-i, i])
732
733 def test_allow_filtering(self):
734 """Test that allow filtering works as usual for a materialized view"""
735 session = self.prepare()
736
737 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
738 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
739 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
740 session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
741 "WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v2, id)"))
742
743 for i in range(1000):
744 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
745
746 for i in range(1000):
747 assert_one(session, "SELECT * FROM t_by_v WHERE v = {v}".format(v=i), [i, i, 'a', 3.0])
748
749 rows = list(session.execute("SELECT * FROM t_by_v2 WHERE v2 = 'a'"))
750 assert len(rows) == 1000, "Expected 1000 rows but got {}".format(len(rows))
751
752 assert_invalid(session, "SELECT * FROM t_by_v WHERE v = 1 AND v2 = 'a'")
753 assert_invalid(session, "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = 1")
754
755 for i in range(1000):
756 assert_one(
757 session,
758 "SELECT * FROM t_by_v WHERE v = {} AND v3 = 3.0 ALLOW FILTERING".format(i),
759 [i, i, 'a', 3.0]
760 )
761 assert_one(
762 session,
763 "SELECT * FROM t_by_v2 WHERE v2 = 'a' AND v = {} ALLOW FILTERING".format(i),
764 ['a', i, i, 3.0]
765 )
766
767 def test_secondary_index(self):
768 """Test that secondary indexes cannot be created on a materialized view"""
769 session = self.prepare()
770
771 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
772 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
773 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
774 assert_invalid(session, "CREATE INDEX ON t_by_v (v2)",
775 "Secondary indexes are not supported on materialized views")
776
777 def test_ttl(self):
778 """
779 Test that TTL works as expected for a materialized view
780 @expected_result The TTL is propagated properly between tables.
781 """
782 session = self.prepare()
783 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 int, v3 int)")
784 session.execute(("CREATE MATERIALIZED VIEW t_by_v2 AS SELECT * FROM t "
785 "WHERE v2 IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v2, id)"))
786
787 for i in range(100):
788 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, {v}, {v}) USING TTL 10".format(v=i))
789
790 for i in range(100):
791 assert_one(session, "SELECT * FROM t_by_v2 WHERE v2 = {}".format(i), [i, i, i, i])
792
793 time.sleep(20)
794
795 rows = list(session.execute("SELECT * FROM t_by_v2"))
796 assert len(rows) == 0, "Expected 0 rows but got {}".format(len(rows))
797
798 def test_query_all_new_column(self):
799 """
800 Test that a materialized view created with a 'SELECT *' works as expected when adding a new column
801 @expected_result The new column is present in the view.
802 """
803 session = self.prepare(user_table=True)
804
805 self._insert_data(session)
806
807 assert_one(
808 session,
809 "SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
810 ['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
811 )
812
813 session.execute("ALTER TABLE users ADD first_name varchar;")
814
815 results = list(session.execute("SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'"))
816 assert len(results) == 1
817 assert hasattr(results[0], 'first_name'), 'Column "first_name" not found'
818 assert_one(
819 session,
820 "SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
821 ['TX', 'user1', 1968, None, 'f', 'ch@ngem3a', None]
822 )
823
824 def test_query_new_column(self):
825 """
826 Test that a materialized view created with 'SELECT <col1, ...>' works as expected when adding a new column
827 @expected_result The new column is not present in the view.
828 """
829 session = self.prepare(user_table=True)
830
831 session.execute(("CREATE MATERIALIZED VIEW users_by_state2 AS SELECT username FROM users "
832 "WHERE STATE IS NOT NULL AND USERNAME IS NOT NULL PRIMARY KEY (state, username)"))
833
834 self._insert_data(session)
835
836 assert_one(
837 session,
838 "SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'",
839 ['TX', 'user1']
840 )
841
842 session.execute("ALTER TABLE users ADD first_name varchar;")
843
844 results = list(session.execute("SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'"))
845 assert len(results) == 1
846 assert not hasattr(results[0], 'first_name'), 'Column "first_name" found in view'
847 assert_one(
848 session,
849 "SELECT * FROM users_by_state2 WHERE state = 'TX' AND username = 'user1'",
850 ['TX', 'user1']
851 )
852
853 def test_rename_column(self):
854 """
855 Test that a materialized view created with a 'SELECT *' works as expected when renaming a column
856 @expected_result The column is also renamed in the view.
857 """
858 session = self.prepare(user_table=True)
859
860 self._insert_data(session)
861
862 assert_one(
863 session,
864 "SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
865 ['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
866 )
867
868 session.execute("ALTER TABLE users RENAME username TO user")
869
870 results = list(session.execute("SELECT * FROM users_by_state WHERE state = 'TX' AND user = 'user1'"))
871 assert len(results) == 1
872 assert hasattr(results[0], 'user'), 'Column "user" not found'
873 assert_one(
874 session,
875 "SELECT state, user, birth_year, gender FROM users_by_state WHERE state = 'TX' AND user = 'user1'",
876 ['TX', 'user1', 1968, 'f']
877 )
878
879 def test_rename_column_atomicity(self):
880 """
881 Test that column renaming is atomically done between a table and its materialized views
882 @jira_ticket CASSANDRA-12952
883 """
884 session = self.prepare(nodes=1, user_table=True, install_byteman=True)
885 node = self.cluster.nodelist()[0]
886
887 self._insert_data(session)
888
889 assert_one(
890 session,
891 "SELECT * FROM users_by_state WHERE state = 'TX' AND username = 'user1'",
892 ['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
893 )
894
895 # Rename a column with an injected byteman rule to kill the node after the first schema update
896 self.fixture_dtest_setup.allow_log_errors = True
897 script_version = '4x' if self.cluster.version() >= '4' else '3x'
898 node.byteman_submit(['./byteman/merge_schema_failure_{}.btm'.format(script_version)])
899 with pytest.raises(NoHostAvailable):
900 session.execute("ALTER TABLE users RENAME username TO user")
901
902 logger.debug('Restarting node')
903 node.stop()
904 node.start(wait_for_binary_proto=True)
905 session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
906
907 # Both the table and its view should have the new schema after restart
908 assert_one(
909 session,
910 "SELECT * FROM ks.users WHERE state = 'TX' AND user = 'user1' ALLOW FILTERING",
911 ['user1', 1968, 'f', 'ch@ngem3a', None, 'TX']
912 )
913 assert_one(
914 session,
915 "SELECT * FROM ks.users_by_state WHERE state = 'TX' AND user = 'user1'",
916 ['TX', 'user1', 1968, 'f', 'ch@ngem3a', None]
917 )
918
919 def test_lwt(self):
920 """Test that lightweight transaction behave properly with a materialized view"""
921 session = self.prepare()
922
923 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
924 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
925 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
926
927 logger.debug("Inserting initial data using IF NOT EXISTS")
928 for i in range(1000):
929 session.execute(
930 "INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i)
931 )
932 self._replay_batchlogs()
933
934 logger.debug("All rows should have been inserted")
935 for i in range(1000):
936 assert_one(
937 session,
938 "SELECT * FROM t_by_v WHERE v = {}".format(i),
939 [i, i, 'a', 3.0]
940 )
941
942 logger.debug("Tyring to UpInsert data with a different value using IF NOT EXISTS")
943 for i in range(1000):
944 v = i * 2
945 session.execute(
946 "INSERT INTO t (id, v, v2, v3) VALUES ({id}, {v}, 'a', 3.0) IF NOT EXISTS".format(id=i, v=v)
947 )
948 self._replay_batchlogs()
949
950 logger.debug("No rows should have changed")
951 for i in range(1000):
952 assert_one(
953 session,
954 "SELECT * FROM t_by_v WHERE v = {}".format(i),
955 [i, i, 'a', 3.0]
956 )
957
958 logger.debug("Update the 10 first rows with a different value")
959 for i in range(1000):
960 v = i + 2000
961 session.execute(
962 "UPDATE t SET v={v} WHERE id = {id} IF v < 10".format(id=i, v=v)
963 )
964 self._replay_batchlogs()
965
966 logger.debug("Verify that only the 10 first rows changed.")
967 results = list(session.execute("SELECT * FROM t_by_v;"))
968 assert len(results) == 1000
969 for i in range(1000):
970 v = i + 2000 if i < 10 else i
971 assert_one(
972 session,
973 "SELECT * FROM t_by_v WHERE v = {}".format(v),
974 [v, i, 'a', 3.0]
975 )
976
977 logger.debug("Deleting the first 10 rows")
978 for i in range(1000):
979 v = i + 2000
980 session.execute(
981 "DELETE FROM t WHERE id = {id} IF v = {v} ".format(id=i, v=v)
982 )
983 self._replay_batchlogs()
984
985 logger.debug("Verify that only the 10 first rows have been deleted.")
986 results = list(session.execute("SELECT * FROM t_by_v;"))
987 assert len(results) == 990
988 for i in range(10, 1000):
989 assert_one(
990 session,
991 "SELECT * FROM t_by_v WHERE v = {}".format(i),
992 [i, i, 'a', 3.0]
993 )
994
995 def test_interrupt_build_process(self):
996 """Test that an interrupted MV build process is resumed as it should"""
997
998 options = {'hinted_handoff_enabled': False}
999 if self.cluster.version() >= '4':
1000 options['concurrent_materialized_view_builders'] = 4
1001
1002 session = self.prepare(options=options, install_byteman=True)
1003 node1, node2, node3 = self.cluster.nodelist()
1004
1005 logger.debug("Avoid premature MV build finalization with byteman")
1006 for node in self.cluster.nodelist():
1007 if self.cluster.version() >= '4':
1008 node.byteman_submit(['./byteman/4.0/skip_view_build_finalization.btm'])
1009 node.byteman_submit(['./byteman/4.0/skip_view_build_task_finalization.btm'])
1010 else:
1011 node.byteman_submit(['./byteman/pre4.0/skip_finish_view_build_status.btm'])
1012 node.byteman_submit(['./byteman/pre4.0/skip_view_build_update_distributed.btm'])
1013
1014 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1015
1016 logger.debug("Inserting initial data")
1017 for i in range(10000):
1018 session.execute(
1019 "INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i)
1020 )
1021
1022 logger.debug("Create a MV")
1023 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1024 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1025
1026 logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
1027 self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
1028
1029 logger.debug("Stop the cluster. Interrupt the MV build process.")
1030 self.cluster.stop()
1031
1032 logger.debug("Checking logs to verify that the view build tasks have been created")
1033 for node in self.cluster.nodelist():
1034 assert node.grep_log('Starting new view build', filename='debug.log')
1035 assert not node.grep_log('Resuming view build', filename='debug.log')
1036 node.mark_log(filename='debug.log')
1037
1038 logger.debug("Restart the cluster")
1039 self.cluster.start(wait_for_binary_proto=True)
1040 session = self.patient_cql_connection(node1)
1041 session.execute("USE ks")
1042
1043 logger.debug("MV shouldn't be built yet.")
1044 assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 10000
1045
1046 logger.debug("Wait and ensure the MV build resumed. Waiting up to 2 minutes.")
1047 self._wait_for_view("ks", "t_by_v")
1048
1049 logger.debug("Verify all data")
1050 assert_one(session, "SELECT COUNT(*) FROM t_by_v", [10000])
1051 for i in range(10000):
1052 assert_one(
1053 session,
1054 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1055 [i, i, 'a', 3.0],
1056 cl=ConsistencyLevel.ALL
1057 )
1058
1059 logger.debug("Checking logs to verify that some view build tasks have been resumed")
1060 for node in self.cluster.nodelist():
1061 assert node.grep_log('Resuming view build', filename='debug.log')
1062
1063 @pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as tracked by CASSANDRA-14148")
1064 @since('4.0')
1065 def test_drop_while_building(self):
1066 """Test that a parallel MV build is interrupted when the view is removed"""
1067
1068 session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
1069 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1070
1071 logger.debug("Inserting initial data")
1072 for i in range(5000):
1073 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
1074
1075 logger.debug("Slowing down MV build with byteman")
1076 for node in self.cluster.nodelist():
1077 node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
1078
1079 logger.debug("Create a MV")
1080 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1081 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1082
1083 logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
1084 self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
1085
1086 logger.debug("Drop the MV while it is still building")
1087 session.execute("DROP MATERIALIZED VIEW t_by_v")
1088
1089 logger.debug("Verify that the build has been stopped before its finalization without errors")
1090 for node in self.cluster.nodelist():
1091 self.check_logs_for_errors()
1092 assert not node.grep_log('Marking view', filename='debug.log')
1093 assert node.grep_log('Stopping current view builder due to schema change', filename='debug.log')
1094
1095 logger.debug("Verify that the view has been removed")
1096 failed = False
1097 try:
1098 session.execute("SELECT COUNT(*) FROM t_by_v")
1099 except InvalidRequest:
1100 failed = True
1101 self.assertTrue(failed, "The view shouldn't be queryable")
1102
1103 logger.debug("Create the MV again")
1104 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1105 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1106
1107 logger.debug("Verify that the MV has been successfully created")
1108 self._wait_for_view('ks', 't_by_v')
1109 assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
1110
1111 @since('4.0')
1112 def test_drop_with_stopped_build(self):
1113 """Test that MV whose build has been stopped with `nodetool stop` can be dropped"""
1114
1115 session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
1116 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1117 nodes = self.cluster.nodelist()
1118
1119 logger.debug("Inserting initial data")
1120 for i in range(5000):
1121 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
1122
1123 logger.debug("Slowing down MV build with byteman")
1124 for node in nodes:
1125 node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
1126
1127 logger.debug("Create a MV")
1128 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1129 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1130
1131 logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
1132 self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
1133
1134 logger.debug("Stopping all running view build tasks with nodetool")
1135 for node in nodes:
1136 node.watch_log_for('Starting new view build for range', filename='debug.log', timeout=120)
1137 node.nodetool('stop VIEW_BUILD')
1138
1139 logger.debug("Checking logs to verify that some view build tasks have been stopped")
1140 for node in nodes:
1141 node.watch_log_for('Stopped build for view', filename='debug.log', timeout=120)
1142 node.watch_log_for('Compaction interrupted: View build', filename='system.log', timeout=120)
1143 self.check_logs_for_errors()
1144
1145 logger.debug("Drop the MV while it is still building")
1146 session.execute("DROP MATERIALIZED VIEW t_by_v")
1147
1148 logger.debug("Verify that the build has been stopped before its finalization without errors")
1149 for node in nodes:
1150 self.check_logs_for_errors()
1151 assert not node.grep_log('Marking view', filename='debug.log')
1152 assert node.grep_log('Stopping current view builder due to schema change', filename='debug.log')
1153
1154 logger.debug("Verify that the view has been removed")
1155 failed = False
1156 try:
1157 session.execute("SELECT COUNT(*) FROM t_by_v")
1158 except InvalidRequest:
1159 failed = True
1160 assert failed, "The view shouldn't be queryable"
1161
1162 logger.debug("Create the MV again")
1163 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1164 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1165
1166 logger.debug("Verify that the MV has been successfully created")
1167 self._wait_for_view('ks', 't_by_v')
1168 assert_one(session, "SELECT COUNT(*) FROM t_by_v", [5000])
1169
1170 @since('4.0')
1171 def test_resume_stopped_build(self):
1172 """Test that MV builds stopped with `nodetool stop` are resumed after restart"""
1173
1174 session = self.prepare(options={'concurrent_materialized_view_builders': 4}, install_byteman=True)
1175 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1176 nodes = self.cluster.nodelist()
1177
1178 logger.debug("Inserting initial data")
1179 for i in range(5000):
1180 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) IF NOT EXISTS".format(v=i))
1181
1182 logger.debug("Slowing down MV build with byteman")
1183 for node in nodes:
1184 node.byteman_submit(['./byteman/4.0/view_builder_task_sleep.btm'])
1185
1186 logger.debug("Create a MV")
1187 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1188 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1189
1190 logger.debug("Wait and ensure the MV build has started. Waiting up to 2 minutes.")
1191 self._wait_for_view_build_start(session, 'ks', 't_by_v', wait_minutes=2)
1192
1193 logger.debug("Stopping all running view build tasks with nodetool")
1194 for node in nodes:
1195 node.watch_log_for('Starting new view build for range', filename='debug.log', timeout=120)
1196 node.nodetool('stop VIEW_BUILD')
1197
1198 logger.debug("Checking logs to verify that some view build tasks have been stopped")
1199 for node in nodes:
1200 node.watch_log_for('Stopped build for view', filename='debug.log', timeout=120)
1201 node.watch_log_for('Compaction interrupted: View build', filename='system.log', timeout=120)
1202 node.watch_log_for('Interrupted build for view', filename='debug.log', timeout=120)
1203 assert not node.grep_log('Marking view', filename='debug.log')
1204 self.check_logs_for_errors()
1205
1206 logger.debug("Check that MV shouldn't be built yet.")
1207 assert len(list(session.execute("SELECT COUNT(*) FROM t_by_v"))) != 5000
1208
1209 logger.debug("Restart the cluster")
1210 self.cluster.stop()
1211 marks = [node.mark_log() for node in nodes]
1212 self.cluster.start(wait_for_binary_proto=True)
1213 session = self.patient_cql_connection(nodes[0])
1214
1215 logger.debug("Verify that the MV has been successfully created")
1216 self._wait_for_view('ks', 't_by_v')
1217 assert_one(session, "SELECT COUNT(*) FROM ks.t_by_v", [5000])
1218
1219 logger.debug("Checking logs to verify that the view build has been resumed and completed after restart")
1220 for node, mark in zip(nodes, marks):
1221 assert node.grep_log('Resuming view build', filename='debug.log', from_mark=mark)
1222 assert node.grep_log('Marking view', filename='debug.log', from_mark=mark)
1223 self.check_logs_for_errors()
1224
1225 @since('3.0')
1226 def test_mv_with_default_ttl_with_flush(self):
1227 self._test_mv_with_default_ttl(True)
1228
1229 @since('3.0')
1230 def test_mv_with_default_ttl_without_flush(self):
1231 self._test_mv_with_default_ttl(False)
1232
1233 def _test_mv_with_default_ttl(self, flush):
1234 """
1235 Verify mv with default_time_to_live can be deleted properly using expired livenessInfo
1236 @jira_ticket CASSANDRA-14071
1237 """
1238 session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
1239 node1, node2, node3 = self.cluster.nodelist()
1240 session.execute('USE ks')
1241
1242 logger.debug("MV with same key and unselected columns")
1243 session.execute("CREATE TABLE t2 (k int, a int, b int, c int, primary key(k, a)) with default_time_to_live=600")
1244 session.execute(("CREATE MATERIALIZED VIEW mv2 AS SELECT k,a,b FROM t2 "
1245 "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (a, k)"))
1246 session.cluster.control_connection.wait_for_schema_agreement()
1247
1248 self.update_view(session, "UPDATE t2 SET c=1 WHERE k=1 AND a=1;", flush)
1249 assert_one(session, "SELECT k,a,b,c FROM t2", [1, 1, None, 1])
1250 assert_one(session, "SELECT k,a,b FROM mv2", [1, 1, None])
1251
1252 self.update_view(session, "UPDATE t2 SET c=null WHERE k=1 AND a=1;", flush)
1253 assert_none(session, "SELECT k,a,b,c FROM t2")
1254 assert_none(session, "SELECT k,a,b FROM mv2")
1255
1256 self.update_view(session, "UPDATE t2 SET c=2 WHERE k=1 AND a=1;", flush)
1257 assert_one(session, "SELECT k,a,b,c FROM t2", [1, 1, None, 2])
1258 assert_one(session, "SELECT k,a,b FROM mv2", [1, 1, None])
1259
1260 self.update_view(session, "DELETE c FROM t2 WHERE k=1 AND a=1;", flush)
1261 assert_none(session, "SELECT k,a,b,c FROM t2")
1262 assert_none(session, "SELECT k,a,b FROM mv2")
1263
1264 if flush:
1265 self.cluster.compact()
1266 assert_none(session, "SELECT * FROM t2")
1267 assert_none(session, "SELECT * FROM mv2")
1268
1269 # test with user-provided ttl
1270 self.update_view(session, "INSERT INTO t2(k,a,b,c) VALUES(2,2,2,2) USING TTL 5", flush)
1271 self.update_view(session, "UPDATE t2 USING TTL 100 SET c=1 WHERE k=2 AND a=2;", flush)
1272 self.update_view(session, "UPDATE t2 USING TTL 50 SET c=2 WHERE k=2 AND a=2;", flush)
1273 self.update_view(session, "DELETE c FROM t2 WHERE k=2 AND a=2;", flush)
1274
1275 time.sleep(5)
1276
1277 assert_none(session, "SELECT k,a,b,c FROM t2")
1278 assert_none(session, "SELECT k,a,b FROM mv2")
1279
1280 if flush:
1281 self.cluster.compact()
1282 assert_none(session, "SELECT * FROM t2")
1283 assert_none(session, "SELECT * FROM mv2")
1284
1285 logger.debug("MV with extra key")
1286 session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int) with default_time_to_live=600")
1287 session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
1288 "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
1289 session.cluster.control_connection.wait_for_schema_agreement()
1290
1291 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1);", flush)
1292 assert_one(session, "SELECT * FROM t", [1, 1, 1])
1293 assert_one(session, "SELECT * FROM mv", [1, 1, 1])
1294
1295 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 2, 1);", flush)
1296 assert_one(session, "SELECT * FROM t", [1, 2, 1])
1297 assert_one(session, "SELECT * FROM mv", [1, 2, 1])
1298
1299 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 3, 1);", flush)
1300 assert_one(session, "SELECT * FROM t", [1, 3, 1])
1301 assert_one(session, "SELECT * FROM mv", [1, 3, 1])
1302
1303 if flush:
1304 self.cluster.compact()
1305 assert_one(session, "SELECT * FROM t", [1, 3, 1])
1306 assert_one(session, "SELECT * FROM mv", [1, 3, 1])
1307
1308 # user provided ttl
1309 self.update_view(session, "UPDATE t USING TTL 50 SET a = 4 WHERE k = 1", flush)
1310 assert_one(session, "SELECT * FROM t", [1, 4, 1])
1311 assert_one(session, "SELECT * FROM mv", [1, 4, 1])
1312
1313 self.update_view(session, "UPDATE t USING TTL 40 SET a = 5 WHERE k = 1", flush)
1314 assert_one(session, "SELECT * FROM t", [1, 5, 1])
1315 assert_one(session, "SELECT * FROM mv", [1, 5, 1])
1316
1317 self.update_view(session, "UPDATE t USING TTL 30 SET a = 6 WHERE k = 1", flush)
1318 assert_one(session, "SELECT * FROM t", [1, 6, 1])
1319 assert_one(session, "SELECT * FROM mv", [1, 6, 1])
1320
1321 if flush:
1322 self.cluster.compact()
1323 assert_one(session, "SELECT * FROM t", [1, 6, 1])
1324 assert_one(session, "SELECT * FROM mv", [1, 6, 1])
1325
1326 @flaky
1327 @since('3.0')
1328 def test_no_base_column_in_view_pk_complex_timestamp_with_flush(self):
1329 self._test_no_base_column_in_view_pk_complex_timestamp(flush=True)
1330
1331 @pytest.mark.skip(reason="Frequently fails in CI. Skipping until fixed as tracked by CASSANDRA-14148")
1332 @since('3.0')
1333 def test_no_base_column_in_view_pk_complex_timestamp_without_flush(self):
1334 self._test_no_base_column_in_view_pk_complex_timestamp(flush=False)
1335
1336 def _test_no_base_column_in_view_pk_complex_timestamp(self, flush):
1337 """
1338 Able to shadow old view row if all columns in base are removed including unselected
1339 Able to recreate view row if at least one selected column alive
1340
1341 @jira_ticket CASSANDRA-11500
1342 """
1343 session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
1344 node1, node2, node3 = self.cluster.nodelist()
1345
1346 session.execute('USE ks')
1347 session.execute("CREATE TABLE t (k int, c int, a int, b int, e int, f int, primary key(k, c))")
1348 session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT k,c,a,b FROM t "
1349 "WHERE k IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, k)"))
1350 session.cluster.control_connection.wait_for_schema_agreement()
1351
1352 # update unselected, view row should be alive
1353 self.update_view(session, "UPDATE t USING TIMESTAMP 1 SET e=1 WHERE k=1 AND c=1;", flush)
1354 assert_one(session, "SELECT * FROM t", [1, 1, None, None, 1, None])
1355 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1356
1357 # remove unselected, add selected column, view row should be alive
1358 self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=1 WHERE k=1 AND c=1;", flush)
1359 assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
1360 assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
1361
1362 # remove selected column, view row is removed
1363 self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET e=null, b=null WHERE k=1 AND c=1;", flush)
1364 assert_none(session, "SELECT * FROM t")
1365 assert_none(session, "SELECT * FROM mv")
1366
1367 # update unselected with ts=3, view row should be alive
1368 self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=1 WHERE k=1 AND c=1;", flush)
1369 assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
1370 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1371
1372 # insert livenesssInfo, view row should be alive
1373 self.update_view(session, "INSERT INTO t(k,c) VALUES(1,1) USING TIMESTAMP 3", flush)
1374 assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
1375 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1376
1377 # remove unselected, view row should be alive because of base livenessInfo alive
1378 self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET f=null WHERE k=1 AND c=1;", flush)
1379 assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, None])
1380 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1381
1382 # add selected column, view row should be alive
1383 self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a=1 WHERE k=1 AND c=1;", flush)
1384 assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
1385 assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
1386
1387 # update unselected, view row should be alive
1388 self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=1 WHERE k=1 AND c=1;", flush)
1389 assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, 1])
1390 assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
1391
1392 # delete with ts=3, view row should be alive due to unselected@ts4
1393 self.update_view(session, "DELETE FROM t USING TIMESTAMP 3 WHERE k=1 AND c=1;", flush)
1394 assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
1395 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1396
1397 # remove unselected, view row should be removed
1398 self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET f=null WHERE k=1 AND c=1;", flush)
1399 assert_none(session, "SELECT * FROM t")
1400 assert_none(session, "SELECT * FROM mv")
1401
1402 # add selected with ts=7, view row is alive
1403 self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=1 WHERE k=1 AND c=1;", flush)
1404 assert_one(session, "SELECT * FROM t", [1, 1, None, 1, None, None])
1405 assert_one(session, "SELECT * FROM mv", [1, 1, None, 1])
1406
1407 # remove selected with ts=7, view row is dead
1408 self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET b=null WHERE k=1 AND c=1;", flush)
1409 assert_none(session, "SELECT * FROM t")
1410 assert_none(session, "SELECT * FROM mv")
1411
1412 # add selected with ts=5, view row is alive (selected column should not affects each other)
1413 self.update_view(session, "UPDATE t USING TIMESTAMP 5 SET a=1 WHERE k=1 AND c=1;", flush)
1414 assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
1415 assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
1416
1417 # add selected with ttl=5
1418 self.update_view(session, "UPDATE t USING TTL 10 SET a=1 WHERE k=1 AND c=1;", flush)
1419 assert_one(session, "SELECT * FROM t", [1, 1, 1, None, None, None])
1420 assert_one(session, "SELECT * FROM mv", [1, 1, 1, None])
1421
1422 time.sleep(10)
1423
1424 # update unselected with ttl=10, view row should be alive
1425 self.update_view(session, "UPDATE t USING TTL 10 SET f=1 WHERE k=1 AND c=1;", flush)
1426 assert_one(session, "SELECT * FROM t", [1, 1, None, None, None, 1])
1427 assert_one(session, "SELECT * FROM mv", [1, 1, None, None])
1428
1429 time.sleep(10)
1430
1431 # view row still alive due to base livenessInfo
1432 assert_none(session, "SELECT * FROM t")
1433 assert_none(session, "SELECT * FROM mv")
1434
1435 @since('3.0')
1436 def test_base_column_in_view_pk_complex_timestamp_with_flush(self):
1437 self._test_base_column_in_view_pk_complex_timestamp(flush=True)
1438
1439 @since('3.0')
1440 def test_base_column_in_view_pk_complex_timestamp_without_flush(self):
1441 self._test_base_column_in_view_pk_complex_timestamp(flush=False)
1442
1443 def _test_base_column_in_view_pk_complex_timestamp(self, flush):
1444 """
1445 Able to shadow old view row with column ts greater than pk's ts and re-insert the view row
1446 Able to shadow old view row with column ts smaller than pk's ts and re-insert the view row
1447
1448 @jira_ticket CASSANDRA-11500
1449 """
1450 session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
1451 node1, node2, node3 = self.cluster.nodelist()
1452
1453 session.execute('USE ks')
1454 session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
1455 session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
1456 "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
1457 session.cluster.control_connection.wait_for_schema_agreement()
1458
1459 # Set initial values TS=1
1460 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 1;", flush)
1461 assert_one(session, "SELECT * FROM t", [1, 1, 1])
1462 assert_one(session, "SELECT * FROM mv", [1, 1, 1])
1463
1464 # increase b ts to 10
1465 self.update_view(session, "UPDATE t USING TIMESTAMP 10 SET b = 2 WHERE k = 1;", flush)
1466 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
1467 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
1468
1469 # switch entries. shadow a = 1, insert a = 2
1470 self.update_view(session, "UPDATE t USING TIMESTAMP 2 SET a = 2 WHERE k = 1;", flush)
1471 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
1472 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
1473
1474 # switch entries. shadow a = 2, insert a = 1
1475 self.update_view(session, "UPDATE t USING TIMESTAMP 3 SET a = 1 WHERE k = 1;", flush)
1476 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
1477 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
1478
1479 # switch entries. shadow a = 1, insert a = 2
1480 self.update_view(session, "UPDATE t USING TIMESTAMP 4 SET a = 2 WHERE k = 1;", flush, compact=True)
1481 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 2, 2, 10])
1482 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 2, 10])
1483
1484 # able to shadow view row even if base-column in view pk's ts is smaller than row timestamp
1485 # set row TS = 20, a@6, b@20
1486 self.update_view(session, "DELETE FROM t USING TIMESTAMP 5 where k = 1;", flush)
1487 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, None, 2, 10])
1488 assert_none(session, "SELECT k,a,b,writetime(b) FROM mv")
1489 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (1, 1, 1) USING TIMESTAMP 6;", flush)
1490 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 2, 10])
1491 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 2, 10])
1492 self.update_view(session, "INSERT INTO t (k, b) VALUES (1, 1) USING TIMESTAMP 20;", flush)
1493 assert_one(session, "SELECT k,a,b,writetime(b) FROM t", [1, 1, 1, 20])
1494 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
1495
1496 # switch entries. shadow a = 1, insert a = 2
1497 self.update_view(session, "UPDATE t USING TIMESTAMP 7 SET a = 2 WHERE k = 1;", flush)
1498 assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 2, 1, 7, 20])
1499 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 2, 1, 20])
1500
1501 # switch entries. shadow a = 2, insert a = 1
1502 self.update_view(session, "UPDATE t USING TIMESTAMP 8 SET a = 1 WHERE k = 1;", flush)
1503 assert_one(session, "SELECT k,a,b,writetime(a),writetime(b) FROM t", [1, 1, 1, 8, 20])
1504 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv", [1, 1, 1, 20])
1505
1506 # create another view row
1507 self.update_view(session, "INSERT INTO t (k, a, b) VALUES (2, 2, 2);", flush)
1508 assert_one(session, "SELECT k,a,b FROM t WHERE k = 2", [2, 2, 2])
1509 assert_one(session, "SELECT k,a,b FROM mv WHERE k = 2", [2, 2, 2])
1510
1511 # stop node2, node3
1512 logger.debug('Shutdown node2')
1513 node2.stop(wait_other_notice=True)
1514 logger.debug('Shutdown node3')
1515 node3.stop(wait_other_notice=True)
1516 # shadow a = 1, create a = 2
1517 query = SimpleStatement("UPDATE t USING TIMESTAMP 9 SET a = 2 WHERE k = 1", consistency_level=ConsistencyLevel.ONE)
1518 self.update_view(session, query, flush)
1519 # shadow (a=2, k=2) after 3 second
1520 query = SimpleStatement("UPDATE t USING TTL 3 SET a = 2 WHERE k = 2", consistency_level=ConsistencyLevel.ONE)
1521 self.update_view(session, query, flush)
1522
1523 logger.debug('Starting node2')
1524 node2.start(wait_other_notice=True, wait_for_binary_proto=True)
1525 logger.debug('Starting node3')
1526 node3.start(wait_other_notice=True, wait_for_binary_proto=True)
1527
1528 # For k = 1 & a = 1, We should get a digest mismatch of tombstones and repaired
1529 query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 1", consistency_level=ConsistencyLevel.ALL)
1530 result = session.execute(query, trace=True)
1531 self.check_trace_events(result.get_query_trace(), True)
1532 assert 0 == len(result.current_rows)
1533
1534 # For k = 1 & a = 1, second time no digest mismatch
1535 result = session.execute(query, trace=True)
1536 self.check_trace_events(result.get_query_trace(), False)
1537 assert_none(session, "SELECT * FROM mv WHERE k = 1 AND a = 1")
1538 assert 0 == len(result.current_rows)
1539
1540 # For k = 1 & a = 2, We should get a digest mismatch of data and repaired for a = 2
1541 query = SimpleStatement("SELECT * FROM mv WHERE k = 1 AND a = 2", consistency_level=ConsistencyLevel.ALL)
1542 result = session.execute(query, trace=True)
1543 self.check_trace_events(result.get_query_trace(), True)
1544 assert 1 == len(result.current_rows)
1545
1546 # For k = 1 & a = 2, second time no digest mismatch
1547 result = session.execute(query, trace=True)
1548 self.check_trace_events(result.get_query_trace(), False)
1549 assert 1 == len(result.current_rows)
1550 assert_one(session, "SELECT k,a,b,writetime(b) FROM mv WHERE k = 1", [1, 2, 1, 20])
1551
1552 time.sleep(3)
1553 # For k = 2 & a = 2, We should get a digest mismatch of expired and repaired
1554 query = SimpleStatement("SELECT * FROM mv WHERE k = 2 AND a = 2", consistency_level=ConsistencyLevel.ALL)
1555 result = session.execute(query, trace=True)
1556 self.check_trace_events(result.get_query_trace(), True)
1557 logger.debug(result.current_rows)
1558 assert 0 == len(result.current_rows)
1559
1560 # For k = 2 & a = 2, second time no digest mismatch
1561 result = session.execute(query, trace=True)
1562 self.check_trace_events(result.get_query_trace(), False)
1563 assert 0 == len(result.current_rows)
1564
1565 @since('3.0')
1566 def test_expired_liveness_with_limit_rf1_nodes1(self):
1567 self._test_expired_liveness_with_limit(rf=1, nodes=1)
1568
1569 @since('3.0')
1570 def test_expired_liveness_with_limit_rf1_nodes3(self):
1571 self._test_expired_liveness_with_limit(rf=1, nodes=3)
1572
1573 @since('3.0')
1574 def test_expired_liveness_with_limit_rf3(self):
1575 self._test_expired_liveness_with_limit(rf=3, nodes=3)
1576
1577 def _test_expired_liveness_with_limit(self, rf, nodes):
1578 """
1579 Test MV with expired liveness limit is properly handled
1580
1581 @jira_ticket CASSANDRA-13883
1582 """
1583 session = self.prepare(rf=rf, nodes=nodes, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
1584 node1 = self.cluster.nodelist()[0]
1585
1586 session.execute('USE ks')
1587 session.execute("CREATE TABLE t (k int PRIMARY KEY, a int, b int)")
1588 session.execute(("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t "
1589 "WHERE k IS NOT NULL AND a IS NOT NULL PRIMARY KEY (k, a)"))
1590 session.cluster.control_connection.wait_for_schema_agreement()
1591
1592 for k in range(100):
1593 session.execute("INSERT INTO t (k, a, b) VALUES ({}, {}, {})".format(k, k, k))
1594
1595 # generate view row with expired liveness except for row 50 and 99
1596 for k in range(100):
1597 if k == 50 or k == 99:
1598 continue
1599 session.execute("DELETE a FROM t where k = {};".format(k))
1600
1601 # there should be 2 live data
1602 assert_one(session, "SELECT k,a,b FROM mv limit 1", [50, 50, 50])
1603 assert_all(session, "SELECT k,a,b FROM mv limit 2", [[50, 50, 50], [99, 99, 99]])
1604 assert_all(session, "SELECT k,a,b FROM mv", [[50, 50, 50], [99, 99, 99]])
1605
1606 # verify IN
1607 keys = range(100)
1608 assert_one(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 1".format(', '.join(str(x) for x in keys)),
1609 [50, 50, 50])
1610 assert_all(session, "SELECT k,a,b FROM mv WHERE k in ({}) limit 2".format(', '.join(str(x) for x in keys)),
1611 [[50, 50, 50], [99, 99, 99]])
1612 assert_all(session, "SELECT k,a,b FROM mv WHERE k in ({})".format(', '.join(str(x) for x in keys)),
1613 [[50, 50, 50], [99, 99, 99]])
1614
1615 # verify fetch size
1616 session.default_fetch_size = 1
1617 assert_one(session, "SELECT k,a,b FROM mv limit 1", [50, 50, 50])
1618 assert_all(session, "SELECT k,a,b FROM mv limit 2", [[50, 50, 50], [99, 99, 99]])
1619 assert_all(session, "SELECT k,a,b FROM mv", [[50, 50, 50], [99, 99, 99]])
1620
1621 @since('3.0')
1622 def test_base_column_in_view_pk_commutative_tombstone_with_flush(self):
1623 self._test_base_column_in_view_pk_commutative_tombstone_(flush=True)
1624
1625 @since('3.0')
1626 def test_base_column_in_view_pk_commutative_tombstone_without_flush(self):
1627 self._test_base_column_in_view_pk_commutative_tombstone_(flush=False)
1628
1629 def _test_base_column_in_view_pk_commutative_tombstone_(self, flush):
1630 """
1631 view row deletion should be commutative with newer view livenessInfo, otherwise deleted columns may be resurrected.
1632 @jira_ticket CASSANDRA-13409
1633 """
1634 session = self.prepare(rf=3, nodes=3, options={'hinted_handoff_enabled': False}, consistency_level=ConsistencyLevel.QUORUM)
1635 node1 = self.cluster.nodelist()[0]
1636
1637 session.execute('USE ks')
1638 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1639 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1640 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
1641 session.cluster.control_connection.wait_for_schema_agreement()
1642 for node in self.cluster.nodelist():
1643 node.nodetool("disableautocompaction")
1644
1645 # sstable 1, Set initial values TS=1
1646 self.update_view(session, "INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 1", flush)
1647 assert_one(session, "SELECT * FROM t_by_v", [1, 1, 'a', 3.0])
1648
1649 # sstable 2, change v's value and TS=2, tombstones v=1 and adds v=0 record
1650 self.update_view(session, "DELETE FROM t USING TIMESTAMP 2 WHERE id = 1;", flush)
1651 assert_none(session, "SELECT * FROM t_by_v")
1652 assert_none(session, "SELECT * FROM t")
1653
1654 # sstable 3, tombstones of mv created by base deletion should remain.
1655 self.update_view(session, "INSERT INTO t (id, v) VALUES (1, 1) USING TIMESTAMP 3", flush)
1656 assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
1657 assert_one(session, "SELECT * FROM t", [1, 1, None, None])
1658
1659 # sstable 4, shadow view row (id=1, v=1), insert (id=1, v=2, ts=4)
1660 self.update_view(session, "UPDATE t USING TIMESTAMP 4 set v = 2 WHERE id = 1;", flush)
1661 assert_one(session, "SELECT * FROM t_by_v", [2, 1, None, None])
1662 assert_one(session, "SELECT * FROM t", [1, 2, None, None])
1663
1664 # sstable 5, shadow view row (id=1, v=2), insert (id=1, v=1 ts=5)
1665 self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = 1 WHERE id = 1;", flush)
1666 assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
1667 assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
1668
1669 if flush:
1670 self.cluster.compact()
1671 assert_one(session, "SELECT * FROM t_by_v", [1, 1, None, None])
1672 assert_one(session, "SELECT * FROM t", [1, 1, None, None]) # data deleted by row-tombstone@2 should not resurrect
1673
1674 # shadow view row (id=1, v=1)
1675 self.update_view(session, "UPDATE t USING TIMESTAMP 5 set v = null WHERE id = 1;", flush)
1676 assert_none(session, "SELECT * FROM t_by_v")
1677 assert_one(session, "SELECT * FROM t", [1, None, None, None])
1678
1679 def test_view_tombstone(self):
1680 """
1681 Test that a materialized views properly tombstone
1682
1683 @jira_ticket CASSANDRA-10261
1684 @jira_ticket CASSANDRA-10910
1685 """
1686
1687 self.prepare(rf=3, options={'hinted_handoff_enabled': False})
1688 node1, node2, node3 = self.cluster.nodelist()
1689
1690 session = self.patient_exclusive_cql_connection(node1)
1691 session.max_trace_wait = 120
1692 session.execute('USE ks')
1693
1694 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1695 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1696 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v,id)"))
1697
1698 session.cluster.control_connection.wait_for_schema_agreement()
1699
1700 # Set initial values TS=0, verify
1701 session.execute(SimpleStatement("INSERT INTO t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0) USING TIMESTAMP 0",
1702 consistency_level=ConsistencyLevel.ALL))
1703 self._replay_batchlogs()
1704 assert_one(
1705 session,
1706 "SELECT * FROM t_by_v WHERE v = 1",
1707 [1, 1, 'a', 3.0]
1708 )
1709
1710 session.execute(SimpleStatement("INSERT INTO t (id, v2) VALUES (1, 'b') USING TIMESTAMP 1",
1711 consistency_level=ConsistencyLevel.ALL))
1712 self._replay_batchlogs()
1713
1714 assert_one(
1715 session,
1716 "SELECT * FROM t_by_v WHERE v = 1",
1717 [1, 1, 'b', 3.0]
1718 )
1719
1720 # change v's value and TS=3, tombstones v=1 and adds v=0 record
1721 session.execute(SimpleStatement("UPDATE t USING TIMESTAMP 3 SET v = 0 WHERE id = 1",
1722 consistency_level=ConsistencyLevel.ALL))
1723 self._replay_batchlogs()
1724
1725 assert_none(session, "SELECT * FROM t_by_v WHERE v = 1")
1726
1727 logger.debug('Shutdown node2')
1728 node2.stop(wait_other_notice=True)
1729
1730 session.execute(SimpleStatement("UPDATE t USING TIMESTAMP 4 SET v = 1 WHERE id = 1",
1731 consistency_level=ConsistencyLevel.QUORUM))
1732 self._replay_batchlogs()
1733
1734 assert_one(
1735 session,
1736 "SELECT * FROM t_by_v WHERE v = 1",
1737 [1, 1, 'b', 3.0]
1738 )
1739
1740 node2.start(wait_other_notice=True, wait_for_binary_proto=True)
1741
1742 # We should get a digest mismatch
1743 query = SimpleStatement("SELECT * FROM t_by_v WHERE v = 1",
1744 consistency_level=ConsistencyLevel.ALL)
1745
1746 result = session.execute(query, trace=True)
1747 self.check_trace_events(result.get_query_trace(), True)
1748
1749 # We should not get a digest mismatch the second time
1750 query = SimpleStatement("SELECT * FROM t_by_v WHERE v = 1", consistency_level=ConsistencyLevel.ALL)
1751
1752 result = session.execute(query, trace=True)
1753 self.check_trace_events(result.get_query_trace(), False)
1754
1755 # Verify values one last time
1756 assert_one(
1757 session,
1758 "SELECT * FROM t_by_v WHERE v = 1",
1759 [1, 1, 'b', 3.0],
1760 cl=ConsistencyLevel.ALL
1761 )
1762
1763 def check_trace_events(self, trace, expect_digest):
1764 # we should see multiple requests get enqueued prior to index scan
1765 # execution happening
1766
1767 # Look for messages like:
1768 # 4.0+ Digest mismatch: Mismatch for key DecoratedKey
1769 # <4.0 Digest mismatch: org.apache.cassandra.service.DigestMismatchException: Mismatch for key DecoratedKey
1770 regex = r"Digest mismatch: ([a-zA-Z.]+:\s)?Mismatch for key DecoratedKey"
1771 for event in trace.events:
1772 desc = event.description
1773 match = re.match(regex, desc)
1774 if match:
1775 if expect_digest:
1776 break
1777 else:
1778 self.fail("Encountered digest mismatch when we shouldn't")
1779 else:
1780 if expect_digest:
1781 self.fail("Didn't find digest mismatch")
1782
1783 def test_simple_repair_by_base(self):
1784 self._simple_repair_test(repair_base=True)
1785
1786 def test_simple_repair_by_view(self):
1787 self._simple_repair_test(repair_view=True)
1788
1789 def _simple_repair_test(self, repair_base=False, repair_view=False):
1790 """
1791 Test that a materialized view are consistent after a simple repair.
1792 """
1793
1794 session = self.prepare(rf=3, options={'hinted_handoff_enabled': False})
1795 node1, node2, node3 = self.cluster.nodelist()
1796
1797 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1798 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1799 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1800
1801 session.cluster.control_connection.wait_for_schema_agreement()
1802
1803 logger.debug('Shutdown node2')
1804 node2.stop(wait_other_notice=True)
1805
1806 for i in range(1000):
1807 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
1808
1809 self._replay_batchlogs()
1810
1811 logger.debug('Verify the data in the MV with CL=ONE')
1812 for i in range(1000):
1813 assert_one(
1814 session,
1815 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1816 [i, i, 'a', 3.0]
1817 )
1818
1819 logger.debug('Verify the data in the MV with CL=ALL. All should be unavailable.')
1820 for i in range(1000):
1821 statement = SimpleStatement(
1822 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1823 consistency_level=ConsistencyLevel.ALL
1824 )
1825
1826 assert_unavailable(
1827 session.execute,
1828 statement
1829 )
1830
1831 logger.debug('Start node2, and repair')
1832 node2.start(wait_other_notice=True, wait_for_binary_proto=True)
1833 if repair_base:
1834 node1.nodetool("repair ks t")
1835 if repair_view:
1836 node1.nodetool("repair ks t_by_v")
1837
1838 logger.debug('Verify the data in the MV with CL=ALL. All should be available now and no digest mismatch')
1839 for i in range(1000):
1840 query = SimpleStatement(
1841 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1842 consistency_level=ConsistencyLevel.ALL
1843 )
1844 result = session.execute(query, trace=True)
1845 self.check_trace_events(result.get_query_trace(), False)
1846 assert self._rows_to_list(result.current_rows), [[i, i, 'a' == 3.0]]
1847
1848 def test_base_replica_repair(self):
1849 self._base_replica_repair_test()
1850
1851 def test_base_replica_repair_with_contention(self):
1852 """
1853 Test repair does not fail when there is MV lock contention
1854 @jira_ticket CASSANDRA-12905
1855 """
1856 self._base_replica_repair_test(fail_mv_lock=True)
1857
1858 def _base_replica_repair_test(self, fail_mv_lock=False):
1859 """
1860 Test that a materialized view are consistent after the repair of the base replica.
1861 """
1862
1863 self.prepare(rf=3)
1864 node1, node2, node3 = self.cluster.nodelist()
1865 session = self.patient_exclusive_cql_connection(node1)
1866 session.execute('USE ks')
1867
1868 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
1869 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
1870 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1871
1872 session.cluster.control_connection.wait_for_schema_agreement()
1873
1874 logger.debug('Write initial data')
1875 for i in range(1000):
1876 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
1877
1878 self._replay_batchlogs()
1879
1880 logger.debug('Verify the data in the MV with CL=ALL')
1881 for i in range(1000):
1882 assert_one(
1883 session,
1884 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1885 [i, i, 'a', 3.0],
1886 cl=ConsistencyLevel.ALL
1887 )
1888
1889 logger.debug('Shutdown node1')
1890 node1.stop(wait_other_notice=True)
1891 logger.debug('Delete node1 data')
1892 node1.clear(clear_all=True)
1893
1894 jvm_args = []
1895 if fail_mv_lock:
1896 if self.cluster.version() >= LooseVersion('3.10'): # CASSANDRA-10134
1897 jvm_args = ['-Dcassandra.allow_unsafe_replace=true', '-Dcassandra.replace_address={}'.format(node1.address())]
1898 jvm_args.append("-Dcassandra.test.fail_mv_locks_count=1000")
1899 # this should not make Keyspace.apply throw WTE on failure to acquire lock
1900 node1.set_configuration_options(values={'write_request_timeout_in_ms': 100})
1901 logger.debug('Restarting node1 with jvm_args={}'.format(jvm_args))
1902 node1.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=jvm_args)
1903 logger.debug('Shutdown node2 and node3')
1904 node2.stop(wait_other_notice=True)
1905 node3.stop(wait_other_notice=True)
1906
1907 session = self.patient_exclusive_cql_connection(node1)
1908 session.execute('USE ks')
1909
1910 logger.debug('Verify that there is no data on node1')
1911 for i in range(1000):
1912 assert_none(
1913 session,
1914 "SELECT * FROM t_by_v WHERE v = {}".format(i)
1915 )
1916
1917 logger.debug('Restarting node2 and node3')
1918 node2.start(wait_other_notice=True, wait_for_binary_proto=True)
1919 node3.start(wait_other_notice=True, wait_for_binary_proto=True)
1920
1921 # Just repair the base replica
1922 logger.debug('Starting repair on node1')
1923 node1.nodetool("repair ks t")
1924
1925 logger.debug('Verify data with cl=ALL')
1926 for i in range(1000):
1927 assert_one(
1928 session,
1929 "SELECT * FROM t_by_v WHERE v = {}".format(i),
1930 [i, i, 'a', 3.0]
1931 )
1932
1933 @pytest.mark.resource_intensive
1934 def test_complex_repair(self):
1935 """
1936 Test that a materialized view are consistent after a more complex repair.
1937 """
1938 session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
1939 node1, node2, node3, node4, node5 = self.cluster.nodelist()
1940
1941 # we create the base table with gc_grace_seconds=5 so batchlog will expire after 5 seconds
1942 session.execute("CREATE TABLE ks.t (id int PRIMARY KEY, v int, v2 text, v3 decimal)"
1943 "WITH gc_grace_seconds = 5")
1944 session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
1945 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
1946
1947 session.cluster.control_connection.wait_for_schema_agreement()
1948
1949 logger.debug('Shutdown node2 and node3')
1950 node2.stop()
1951 node3.stop(wait_other_notice=True)
1952
1953 logger.debug('Write initial data to node1 (will be replicated to node4 and node5)')
1954 for i in range(1000):
1955 session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i))
1956
1957 logger.debug('Verify the data in the MV on node1 with CL=ONE')
1958 for i in range(1000):
1959 assert_one(
1960 session,
1961 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
1962 [i, i, 'a', 3.0]
1963 )
1964
1965 logger.debug('Close connection to node1')
1966 session.cluster.shutdown()
1967 logger.debug('Shutdown node1, node4 and node5')
1968 node1.stop()
1969 node4.stop()
1970 node5.stop()
1971
1972 logger.debug('Start nodes 2 and 3')
1973 node2.start()
1974 node3.start(wait_other_notice=True, wait_for_binary_proto=True)
1975
1976 session2 = self.patient_cql_connection(node2)
1977
1978 logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows should be found.')
1979 for i in range(1000):
1980 assert_none(
1981 session2,
1982 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i)
1983 )
1984
1985 logger.debug('Write new data in node2 and node3 that overlap those in node1, node4 and node5')
1986 for i in range(1000):
1987 # we write i*2 as value, instead of i
1988 session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0)".format(v=i * 2))
1989
1990 logger.debug('Verify the new data in the MV on node2 with CL=ONE')
1991 for i in range(1000):
1992 v = i * 2
1993 assert_one(
1994 session2,
1995 "SELECT * FROM ks.t_by_v WHERE v = {}".format(v),
1996 [v, v, 'a', 3.0]
1997 )
1998
1999 logger.debug('Wait for batchlogs to expire from node2 and node3')
2000 time.sleep(5)
2001
2002 logger.debug('Start remaining nodes')
2003 node1.start(wait_other_notice=True, wait_for_binary_proto=True)
2004 node4.start(wait_other_notice=True, wait_for_binary_proto=True)
2005 node5.start(wait_other_notice=True, wait_for_binary_proto=True)
2006
2007 session = self.patient_cql_connection(node1)
2008
2009 logger.debug('Read data from MV at QUORUM (old data should be returned)')
2010 for i in range(1000):
2011 assert_one(
2012 session,
2013 "SELECT * FROM ks.t_by_v WHERE v = {}".format(i),
2014 [i, i, 'a', 3.0],
2015 cl=ConsistencyLevel.QUORUM
2016 )
2017
2018 logger.debug('Run global repair on node1')
2019 node1.repair()
2020
2021 logger.debug('Read data from MV at quorum (new data should be returned after repair)')
2022 for i in range(1000):
2023 v = i * 2
2024 assert_one(
2025 session,
2026 "SELECT * FROM ks.t_by_v WHERE v = {}".format(v),
2027 [v, v, 'a', 3.0],
2028 cl=ConsistencyLevel.QUORUM
2029 )
2030
2031 @pytest.mark.resource_intensive
2032 def test_throttled_partition_update(self):
2033 """
2034 @jira_ticket: CASSANDRA-13299, test break up large partition when repairing base with mv.
2035
2036 Provide a configuable batch size(cassandra.mv.mutation.row.count=100) to trottle number
2037 of rows to be applied in one mutation
2038 """
2039
2040 session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
2041 node1, node2, node3, node4, node5 = self.cluster.nodelist()
2042
2043 for node in self.cluster.nodelist():
2044 node.nodetool("disableautocompaction")
2045
2046 session.execute("CREATE TABLE ks.t (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY(pk, ck1, ck2))")
2047 session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
2048 "WHERE pk IS NOT NULL AND ck1 IS NOT NULL AND ck2 IS NOT NULL "
2049 "PRIMARY KEY (pk, ck2, ck1)"))
2050
2051 session.cluster.control_connection.wait_for_schema_agreement()
2052
2053 logger.debug('Shutdown node2 and node3')
2054 node2.stop(wait_other_notice=True)
2055 node3.stop(wait_other_notice=True)
2056
2057 size = 50
2058 range_deletion_ts = 30
2059 partition_deletion_ts = 10
2060
2061 for ck1 in range(size):
2062 for ck2 in range(size):
2063 session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
2064 " VALUES (1, {}, {}, {}, {}) USING TIMESTAMP {}".format(ck1, ck2, ck1, ck2, ck1))
2065
2066 self._replay_batchlogs()
2067
2068 for ck1 in range(size):
2069 for ck2 in range(size):
2070 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
2071 [1, ck1, ck2, ck1, ck2])
2072 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2),
2073 [1, ck1, ck2, ck1, ck2])
2074
2075 logger.debug('Shutdown node4 and node5')
2076 node4.stop(wait_other_notice=True)
2077 node5.stop(wait_other_notice=True)
2078
2079 for ck1 in range(size):
2080 for ck2 in range(size):
2081 if ck1 % 2 == 0: # range tombstone
2082 session.execute("DELETE FROM ks.t USING TIMESTAMP 50 WHERE pk=1 AND ck1={}".format(ck1))
2083 elif ck1 == ck2: # row tombstone
2084 session.execute("DELETE FROM ks.t USING TIMESTAMP 60 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
2085 elif ck1 == ck2 - 1: # cell tombstone
2086 session.execute("DELETE v2 FROM ks.t USING TIMESTAMP 70 WHERE pk=1 AND ck1={} AND ck2={}".format(ck1, ck2))
2087
2088 # range deletion
2089 session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 < 30 and ck1 > 20".format(range_deletion_ts))
2090 session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1 and ck1 = 20 and ck2 < 10".format(range_deletion_ts))
2091
2092 # partition deletion for ck1 <= partition_deletion_ts
2093 session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=1".format(partition_deletion_ts))
2094 # only partition deletion for the pk=2000
2095 session.execute("DELETE FROM ks.t USING TIMESTAMP {} WHERE pk=2000".format(partition_deletion_ts))
2096 self._replay_batchlogs()
2097
2098 # start nodes with different batch size
2099 logger.debug('Starting nodes')
2100 node2.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(2)])
2101 node3.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5)])
2102 node4.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(50)])
2103 node5.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=["-Dcassandra.repair.mutation_repair_rows_per_batch={}".format(5000)])
2104 self._replay_batchlogs()
2105
2106 logger.debug('repairing base table')
2107 node1.nodetool("repair ks t")
2108 # insert data to the deleted partition with pk=2000, they should be considered dead
2109 session.execute("INSERT INTO ks.t (pk, ck1, ck2, v1, v2)"
2110 " VALUES (2000, 0, 0, 0, 0) USING TIMESTAMP {}".format(partition_deletion_ts - 1))
2111 self._replay_batchlogs()
2112
2113 logger.debug('stop cluster')
2114 self.cluster.stop()
2115
2116 logger.debug('rolling restart to check repaired data on each node')
2117 for node in self.cluster.nodelist():
2118 logger.debug('starting {}'.format(node.name))
2119 node.start(wait_other_notice=True, wait_for_binary_proto=True)
2120 session = self.patient_cql_connection(node, consistency_level=ConsistencyLevel.ONE)
2121 for ck1 in range(size):
2122 for ck2 in range(size):
2123 if (
2124 ck1 <= partition_deletion_ts or # partition deletion
2125 ck1 == ck2 or ck1 % 2 == 0 or # row deletion or range tombstone
2126 (ck1 > 20 and ck1 < 30) or (ck1 == 20 and ck2 < 10) # range tombstone
2127 ):
2128 assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
2129 "ck1={} AND ck2={}".format(ck1, ck2))
2130 assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
2131 "ck1={} AND ck2={}".format(ck1, ck2))
2132 elif ck1 == ck2 - 1: # cell tombstone
2133 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
2134 "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
2135 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
2136 "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, None])
2137 else:
2138 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=1 AND "
2139 "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
2140 assert_one(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=1 AND "
2141 "ck1={} AND ck2={}".format(ck1, ck2), [1, ck1, ck2, ck1, ck2])
2142 # Verify partition deletion with pk=2000 has no live data
2143 assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t WHERE pk=2000")
2144 assert_none(session, "SELECT pk,ck1,ck2,v1,v2 FROM ks.t_by_v WHERE pk=2000")
2145 logger.debug('stopping {}'.format(node.name))
2146 node.stop(wait_other_notice=True, wait_for_binary_proto=True)
2147
2148 @pytest.mark.resource_intensive
2149 def test_really_complex_repair(self):
2150 """
2151 Test that a materialized view are consistent after a more complex repair.
2152 """
2153 session = self.prepare(rf=5, options={'hinted_handoff_enabled': False}, nodes=5)
2154 node1, node2, node3, node4, node5 = self.cluster.nodelist()
2155
2156 # we create the base table with gc_grace_seconds=5 so batchlog will expire after 5 seconds
2157 session.execute("CREATE TABLE ks.t (id int, v int, v2 text, v3 decimal, PRIMARY KEY(id, v, v2))"
2158 "WITH gc_grace_seconds = 1")
2159 session.execute(("CREATE MATERIALIZED VIEW ks.t_by_v AS SELECT * FROM t "
2160 "WHERE v IS NOT NULL AND id IS NOT NULL AND v IS NOT NULL AND "
2161 "v2 IS NOT NULL PRIMARY KEY (v2, v, id)"))
2162
2163 session.cluster.control_connection.wait_for_schema_agreement()
2164
2165 logger.debug('Shutdown node2 and node3')
2166 node2.stop(wait_other_notice=True)
2167 node3.stop(wait_other_notice=True)
2168
2169 session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'a', 3.0)")
2170 session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'a', 3.0)")
2171 self._replay_batchlogs()
2172 logger.debug('Verify the data in the MV on node1 with CL=ONE')
2173 assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", [['a', 1, 1, 3.0], ['a', 2, 2, 3.0]])
2174
2175 session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'b', 3.0)")
2176 session.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'b', 3.0)")
2177 self._replay_batchlogs()
2178 logger.debug('Verify the data in the MV on node1 with CL=ONE')
2179 assert_all(session, "SELECT * FROM ks.t_by_v WHERE v2 = 'b'", [['b', 1, 1, 3.0], ['b', 2, 2, 3.0]])
2180
2181 session.shutdown()
2182
2183 logger.debug('Shutdown node1, node4 and node5')
2184 node1.stop()
2185 node4.stop()
2186 node5.stop()
2187
2188 logger.debug('Start nodes 2 and 3')
2189 node2.start()
2190 node3.start(wait_other_notice=True, wait_for_binary_proto=True)
2191
2192 session2 = self.patient_cql_connection(node2)
2193 session2.execute('USE ks')
2194
2195 logger.debug('Verify the data in the MV on node2 with CL=ONE. No rows should be found.')
2196 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'")
2197
2198 logger.debug('Write new data in node2 that overlap those in node1')
2199 session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'c', 3.0)")
2200 session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'c', 3.0)")
2201 self._replay_batchlogs()
2202 assert_all(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'c'", [['c', 1, 1, 3.0], ['c', 2, 2, 3.0]])
2203
2204 session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (1, 1, 'd', 3.0)")
2205 session2.execute("INSERT INTO ks.t (id, v, v2, v3) VALUES (2, 2, 'd', 3.0)")
2206 self._replay_batchlogs()
2207 assert_all(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'", [['d', 1, 1, 3.0], ['d', 2, 2, 3.0]])
2208
2209 logger.debug("Composite delete of everything")
2210 session2.execute("DELETE FROM ks.t WHERE id = 1 and v = 1")
2211 session2.execute("DELETE FROM ks.t WHERE id = 2 and v = 2")
2212 self._replay_batchlogs()
2213 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'c'")
2214 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'd'")
2215
2216 logger.debug('Wait for batchlogs to expire from node2 and node3')
2217 time.sleep(5)
2218
2219 logger.debug('Start remaining nodes')
2220 node1.start(wait_other_notice=True, wait_for_binary_proto=True)
2221 node4.start(wait_other_notice=True, wait_for_binary_proto=True)
2222 node5.start(wait_other_notice=True, wait_for_binary_proto=True)
2223
2224 # at this point the data isn't repaired so we have an inconsistency.
2225 # this value should return None
2226 assert_all(
2227 session2,
2228 "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", [['a', 1, 1, 3.0], ['a', 2, 2, 3.0]],
2229 cl=ConsistencyLevel.QUORUM
2230 )
2231
2232 logger.debug('Run global repair on node1')
2233 node1.repair()
2234
2235 assert_none(session2, "SELECT * FROM ks.t_by_v WHERE v2 = 'a'", cl=ConsistencyLevel.QUORUM)
2236
2237 def test_complex_mv_select_statements(self):
2238 """
2239 Test complex MV select statements
2240 @jira_ticket CASSANDRA-9664
2241 """
2242 cluster = self.cluster
2243 cluster.populate(3).start()
2244 node1 = cluster.nodelist()[0]
2245 session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.QUORUM)
2246
2247 logger.debug("Creating keyspace")
2248 session.execute("CREATE KEYSPACE mvtest WITH replication = "
2249 "{'class': 'SimpleStrategy', 'replication_factor': '3'}")
2250 session.execute('USE mvtest')
2251
2252 mv_primary_keys = ["((a, b), c)",
2253 "((b, a), c)",
2254 "(a, b, c)",
2255 "(c, b, a)",
2256 "((c, a), b)"]
2257
2258 for mv_primary_key in mv_primary_keys:
2259
2260 session.execute("CREATE TABLE test (a int, b int, c int, d int, PRIMARY KEY (a, b, c))")
2261
2262 insert_stmt = session.prepare("INSERT INTO test (a, b, c, d) VALUES (?, ?, ?, ?)")
2263 update_stmt = session.prepare("UPDATE test SET d = ? WHERE a = ? AND b = ? AND c = ?")
2264 delete_stmt1 = session.prepare("DELETE FROM test WHERE a = ? AND b = ? AND c = ?")
2265 delete_stmt2 = session.prepare("DELETE FROM test WHERE a = ?")
2266
2267 session.cluster.control_connection.wait_for_schema_agreement()
2268
2269 rows = [(0, 0, 0, 0),
2270 (0, 0, 1, 0),
2271 (0, 1, 0, 0),
2272 (0, 1, 1, 0),
2273 (1, 0, 0, 0),
2274 (1, 0, 1, 0),
2275 (1, 1, -1, 0),
2276 (1, 1, 0, 0),
2277 (1, 1, 1, 0)]
2278
2279 for row in rows:
2280 session.execute(insert_stmt, row)
2281
2282 logger.debug("Testing MV primary key: {}".format(mv_primary_key))
2283
2284 session.execute("CREATE MATERIALIZED VIEW mv AS SELECT * FROM test WHERE "
2285 "a = 1 AND b IS NOT NULL AND c = 1 PRIMARY KEY {}".format(mv_primary_key))
2286 time.sleep(3)
2287
2288 assert_all(
2289 session, "SELECT a, b, c, d FROM mv",
2290 [[1, 0, 1, 0], [1, 1, 1, 0]],
2291 ignore_order=True,
2292 cl=ConsistencyLevel.QUORUM
2293 )
2294
2295 # insert new rows that does not match the filter
2296 session.execute(insert_stmt, (0, 0, 1, 0))
2297 session.execute(insert_stmt, (1, 1, 0, 0))
2298 assert_all(
2299 session, "SELECT a, b, c, d FROM mv",
2300 [[1, 0, 1, 0], [1, 1, 1, 0]],
2301 ignore_order=True,
2302 cl=ConsistencyLevel.QUORUM
2303 )
2304
2305 # insert new row that does match the filter
2306 session.execute(insert_stmt, (1, 2, 1, 0))
2307 assert_all(
2308 session, "SELECT a, b, c, d FROM mv",
2309 [[1, 0, 1, 0], [1, 1, 1, 0], [1, 2, 1, 0]],
2310 ignore_order=True,
2311 cl=ConsistencyLevel.QUORUM
2312 )
2313
2314 # update rows that does not match the filter
2315 session.execute(update_stmt, (1, 1, -1, 0))
2316 session.execute(update_stmt, (0, 1, 1, 0))
2317 assert_all(
2318 session, "SELECT a, b, c, d FROM mv",
2319 [[1, 0, 1, 0], [1, 1, 1, 0], [1, 2, 1, 0]],
2320 ignore_order=True,
2321 cl=ConsistencyLevel.QUORUM
2322 )
2323
2324 # update a row that does match the filter
2325 session.execute(update_stmt, (2, 1, 1, 1))
2326 assert_all(
2327 session, "SELECT a, b, c, d FROM mv",
2328 [[1, 0, 1, 0], [1, 1, 1, 2], [1, 2, 1, 0]],
2329 ignore_order=True,
2330 cl=ConsistencyLevel.QUORUM
2331 )
2332
2333 # delete rows that does not match the filter
2334 session.execute(delete_stmt1, (1, 1, -1))
2335 session.execute(delete_stmt1, (2, 0, 1))
2336 session.execute(delete_stmt2, (0,))
2337 assert_all(
2338 session, "SELECT a, b, c, d FROM mv",
2339 [[1, 0, 1, 0], [1, 1, 1, 2], [1, 2, 1, 0]],
2340 ignore_order=True,
2341 cl=ConsistencyLevel.QUORUM
2342 )
2343
2344 # delete a row that does match the filter
2345 session.execute(delete_stmt1, (1, 1, 1))
2346 assert_all(
2347 session, "SELECT a, b, c, d FROM mv",
2348 [[1, 0, 1, 0], [1, 2, 1, 0]],
2349 ignore_order=True,
2350 cl=ConsistencyLevel.QUORUM
2351 )
2352
2353 # delete a partition that matches the filter
2354 session.execute(delete_stmt2, (1,))
2355 assert_all(session, "SELECT a, b, c, d FROM mv", [], cl=ConsistencyLevel.QUORUM)
2356
2357 # Cleanup
2358 session.execute("DROP MATERIALIZED VIEW mv")
2359 session.execute("DROP TABLE test")
2360
2361 def propagate_view_creation_over_non_existing_table(self):
2362 """
2363 The internal addition of a view over a non existing table should be ignored
2364 @jira_ticket CASSANDRA-13737
2365 """
2366
2367 cluster = self.cluster
2368 cluster.populate(3)
2369 cluster.start()
2370 node1, node2, node3 = self.cluster.nodelist()
2371 session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.QUORUM)
2372 create_ks(session, 'ks', 3)
2373
2374 session.execute('CREATE TABLE users (username varchar PRIMARY KEY, state varchar)')
2375
2376 # create a materialized view only in nodes 1 and 2
2377 node3.stop(wait_other_notice=True)
2378 session.execute(('CREATE MATERIALIZED VIEW users_by_state AS '
2379 'SELECT * FROM users WHERE state IS NOT NULL AND username IS NOT NULL '
2380 'PRIMARY KEY (state, username)'))
2381
2382 # drop the base table only in node 3
2383 node1.stop(wait_other_notice=True)
2384 node2.stop(wait_other_notice=True)
2385 node3.start(wait_for_binary_proto=True)
2386 session = self.patient_cql_connection(node3, consistency_level=ConsistencyLevel.QUORUM)
2387 session.execute('DROP TABLE ks.users')
2388
2389 # restart the cluster
2390 cluster.stop()
2391 cluster.start()
2392
2393 # node3 should have received and ignored the creation of the MV over the dropped table
2394 assert node3.grep_log('Not adding view users_by_state because the base table')
2395
2396 def test_base_view_consistency_on_failure_after_mv_apply(self):
2397 self._test_base_view_consistency_on_crash("after")
2398
2399 def test_base_view_consistency_on_failure_before_mv_apply(self):
2400 self._test_base_view_consistency_on_crash("before")
2401
2402 def _test_base_view_consistency_on_crash(self, fail_phase):
2403 """
2404 * Fails base table write before or after applying views
2405 * Restart node and replay commit and batchlog
2406 * Check that base and views are present
2407
2408 @jira_ticket CASSANDRA-13069
2409 """
2410
2411 self.cluster.set_batch_commitlog(enabled=True)
2412 self.fixture_dtest_setup.ignore_log_patterns = [r'Dummy failure', r"Failed to force-recycle all segments"]
2413 self.prepare(rf=1, install_byteman=True)
2414 node1, node2, node3 = self.cluster.nodelist()
2415 session = self.patient_exclusive_cql_connection(node1)
2416 session.execute('USE ks')
2417
2418 session.execute("CREATE TABLE t (id int PRIMARY KEY, v int, v2 text, v3 decimal)")
2419 session.execute(("CREATE MATERIALIZED VIEW t_by_v AS SELECT * FROM t "
2420 "WHERE v IS NOT NULL AND id IS NOT NULL PRIMARY KEY (v, id)"))
2421
2422 session.cluster.control_connection.wait_for_schema_agreement()
2423
2424 logger.debug('Make node1 fail {} view writes'.format(fail_phase))
2425 node1.byteman_submit(['./byteman/fail_{}_view_write.btm'.format(fail_phase)])
2426
2427 logger.debug('Write 1000 rows - all node1 writes should fail')
2428
2429 failed = False
2430 for i in range(1, 1000):
2431 try:
2432 session.execute("INSERT INTO t (id, v, v2, v3) VALUES ({v}, {v}, 'a', 3.0) USING TIMESTAMP {v}".format(v=i))
2433 except WriteFailure:
2434 failed = True
2435
2436 assert failed, "Should fail at least once."
2437 assert node1.grep_log("Dummy failure"), "Should throw Dummy failure"
2438
2439 missing_entries = 0
2440 session = self.patient_exclusive_cql_connection(node1)
2441 session.execute('USE ks')
2442 for i in range(1, 1000):
2443 view_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
2444 consistency_level=ConsistencyLevel.ONE)))
2445 base_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t WHERE id = {}".format(i),
2446 consistency_level=ConsistencyLevel.ONE)))
2447
2448 if not base_entry:
2449 missing_entries += 1
2450 if not view_entry:
2451 missing_entries += 1
2452
2453 logger.debug("Missing entries {}".format(missing_entries))
2454 assert missing_entries > 0
2455
2456 logger.debug('Restarting node1 to ensure commit log is replayed')
2457 node1.stop(wait_other_notice=True)
2458 # Set batchlog.replay_timeout_seconds=1 so we can ensure batchlog will be replayed below
2459 node1.start(jvm_args=["-Dcassandra.batchlog.replay_timeout_in_ms=1"])
2460
2461 logger.debug('Replay batchlogs')
2462 time.sleep(0.001) # Wait batchlog.replay_timeout_in_ms=1 (ms)
2463 self._replay_batchlogs()
2464
2465 logger.debug('Verify that both the base table entry and view are present after commit and batchlog replay')
2466 session = self.patient_exclusive_cql_connection(node1)
2467 session.execute('USE ks')
2468 for i in range(1, 1000):
2469 view_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t_by_v WHERE id = {} AND v = {}".format(i, i),
2470 consistency_level=ConsistencyLevel.ONE)))
2471 base_entry = rows_to_list(session.execute(SimpleStatement("SELECT * FROM t WHERE id = {}".format(i),
2472 consistency_level=ConsistencyLevel.ONE)))
2473
2474 assert base_entry, "Both base {} and view entry {} should exist.".format(base_entry, view_entry)
2475 assert view_entry, "Both base {} and view entry {} should exist.".format(base_entry, view_entry)
2476
2477
2478 # For read verification
2479 class MutationPresence(Enum):
2480 match = 1
2481 extra = 2
2482 missing = 3
2483 excluded = 4
2484 unknown = 5
2485
2486
2487 class MM(object):
2488 mp = None
2489
2490 def out(self):
2491 pass
2492
2493
2494 class Match(MM):
2495
2496 def __init__(self):
2497 self.mp = MutationPresence.match
2498
2499 def out(self):
2500 return None
2501
2502
2503 class Extra(MM):
2504 expecting = None
2505 value = None
2506 row = None
2507
2508 def __init__(self, expecting, value, row):
2509 self.mp = MutationPresence.extra
2510 self.expecting = expecting
2511 self.value = value
2512 self.row = row
2513
2514 def out(self):
2515 return "Extra. Expected {} instead of {}; row: {}".format(self.expecting, self.value, self.row)
2516
2517
2518 class Missing(MM):
2519 value = None
2520 row = None
2521
2522 def __init__(self, value, row):
2523 self.mp = MutationPresence.missing
2524 self.value = value
2525 self.row = row
2526
2527 def out(self):
2528 return "Missing. At {}".format(self.row)
2529
2530
2531 class Excluded(MM):
2532
2533 def __init__(self):
2534 self.mp = MutationPresence.excluded
2535
2536 def out(self):
2537 return None
2538
2539
2540 class Unknown(MM):
2541
2542 def __init__(self):
2543 self.mp = MutationPresence.unknown
2544
2545 def out(self):
2546 return None
2547
2548
2549 readConsistency = ConsistencyLevel.QUORUM
2550 writeConsistency = ConsistencyLevel.QUORUM
2551 SimpleRow = collections.namedtuple('SimpleRow', 'a b c d')
2552
2553
2554 def row_generate(i, num_partitions):
2555 return SimpleRow(a=i % num_partitions, b=(i % 400) // num_partitions, c=i, d=i)
2556
2557
2558 # Create a threaded session and execute queries from a Queue
2559 def thread_session(ip, queue, start, end, rows, num_partitions):
2560
2561 def execute_query(session, select_gi, i):
2562 row = row_generate(i, num_partitions)
2563 if (row.a, row.b) in rows:
2564 base = rows[(row.a, row.b)]
2565 else:
2566 base = -1
2567 gi = list(session.execute(select_gi, [row.c, row.a]))
2568 if base == i and len(gi) == 1:
2569 return Match()
2570 elif base != i and len(gi) == 1:
2571 return Extra(base, i, (gi[0][0], gi[0][1], gi[0][2], gi[0][3]))
2572 elif base == i and len(gi) == 0:
2573 return Missing(base, i)
2574 elif base != i and len(gi) == 0:
2575 return Excluded()
2576 else:
2577 return Unknown()
2578
2579 try:
2580 cluster = Cluster([ip])
2581 session = cluster.connect()
2582 select_gi = session.prepare("SELECT * FROM mvtest.mv1 WHERE c = ? AND a = ?")
2583 select_gi.consistency_level = readConsistency
2584
2585 for i in range(start, end):
2586 ret = execute_query(session, select_gi, i)
2587 queue.put_nowait(ret)
2588 except Exception as e:
2589 print(str(e))
2590 queue.close()
2591
2592
2593 @since('3.0')
2594 @pytest.mark.skipif(sys.platform == 'win32', reason='Bug in python on Windows: https://bugs.python.org/issue10128')
2595 class TestMaterializedViewsConsistency(Tester):
2596
2597 def prepare(self, user_table=False):
2598 cluster = self.cluster
2599 cluster.populate(3).start()
2600 node2 = cluster.nodelist()[1]
2601
2602 # Keep the status of async requests
2603 self.exception_type = collections.Counter()
2604 self.num_request_done = 0
2605 self.counts = {}
2606 for mp in MutationPresence:
2607 self.counts[mp] = 0
2608 self.rows = {}
2609 self.update_stats_every = 100
2610
2611 logger.debug("Set to talk to node 2")
2612 self.session = self.patient_cql_connection(node2)
2613
2614 return self.session
2615
2616 def _print_write_status(self, row):
2617 output = "\r{}".format(row)
2618 for key in list(self.exception_type.keys()):
2619 output = "{} ({}: {})".format(output, key, self.exception_type[key])
2620 sys.stdout.write(output)
2621 sys.stdout.flush()
2622
2623 def _print_read_status(self, row):
2624 if self.counts[MutationPresence.unknown] == 0:
2625 sys.stdout.write(
2626 "\rOn {}; match: {}; extra: {}; missing: {}".format(
2627 row,
2628 self.counts[MutationPresence.match],
2629 self.counts[MutationPresence.extra],
2630 self.counts[MutationPresence.missing])
2631 )
2632 else:
2633 sys.stdout.write(
2634 "\rOn {}; match: {}; extra: {}; missing: {}; WTF: {}".format(
2635 row,
2636 self.counts[MutationPresence.match],
2637 self.counts[MutationPresence.extra],
2638 self.counts[MutationPresence.missing],
2639 self.counts[MutationPresence.unkown])
2640 )
2641 sys.stdout.flush()
2642
2643 def _do_row(self, insert_stmt, i, num_partitions):
2644
2645 # Error callback for async requests
2646 def handle_errors(row, exc):
2647 self.num_request_done += 1
2648 try:
2649 name = type(exc).__name__
2650 self.exception_type[name] += 1
2651 except Exception as e:
2652 print(traceback.format_exception_only(type(e), e))
2653
2654 # Success callback for async requests
2655 def success_callback(row):
2656 self.num_request_done += 1
2657
2658 if i % self.update_stats_every == 0:
2659 self._print_write_status(i)
2660
2661 row = row_generate(i, num_partitions)
2662
2663 async_ret = self.session.execute_async(insert_stmt, row)
2664 errors = partial(handle_errors, row)
2665 async_ret.add_callbacks(success_callback, errors)
2666
2667 def _populate_rows(self):
2668 statement = SimpleStatement(
2669 "SELECT a, b, c FROM mvtest.test1",
2670 consistency_level=readConsistency
2671 )
2672 data = self.session.execute(statement)
2673 for row in data:
2674 self.rows[(row.a, row.b)] = row.c
2675
2676 @pytest.mark.skip(reason='awaiting CASSANDRA-11290')
2677 def test_single_partition_consistent_reads_after_write(self):
2678 """
2679 Tests consistency of multiple writes to a single partition
2680
2681 @jira_ticket CASSANDRA-10981
2682 """
2683 self._consistent_reads_after_write_test(1)
2684
2685 def test_multi_partition_consistent_reads_after_write(self):
2686 """
2687 Tests consistency of multiple writes to a multiple partitions
2688
2689 @jira_ticket CASSANDRA-10981
2690 """
2691 self._consistent_reads_after_write_test(5)
2692
2693 def _consistent_reads_after_write_test(self, num_partitions):
2694
2695 session = self.prepare()
2696 node1, node2, node3 = self.cluster.nodelist()
2697
2698 # Test config
2699 lower = 0
2700 upper = 100000
2701 processes = 4
2702 queues = [None] * processes
2703 eachProcess = (upper - lower) // processes
2704
2705 logger.debug("Creating schema")
2706 session.execute(
2707 ("CREATE KEYSPACE IF NOT EXISTS mvtest WITH replication = "
2708 "{'class': 'SimpleStrategy', 'replication_factor': '3'}")
2709 )
2710 session.execute(
2711 "CREATE TABLE mvtest.test1 (a int, b int, c int, d int, PRIMARY KEY (a,b))"
2712 )
2713 session.cluster.control_connection.wait_for_schema_agreement()
2714
2715 insert1 = session.prepare("INSERT INTO mvtest.test1 (a,b,c,d) VALUES (?,?,?,?)")
2716 insert1.consistency_level = writeConsistency
2717
2718 logger.debug("Writing data to base table")
2719 for i in range(upper // 10):
2720 self._do_row(insert1, i, num_partitions)
2721
2722 logger.debug("Creating materialized view")
2723 session.execute(
2724 ('CREATE MATERIALIZED VIEW mvtest.mv1 AS '
2725 'SELECT a,b,c,d FROM mvtest.test1 WHERE a IS NOT NULL AND b IS NOT NULL AND '
2726 'c IS NOT NULL PRIMARY KEY (c,a,b)')
2727 )
2728 session.cluster.control_connection.wait_for_schema_agreement()
2729
2730 logger.debug("Writing more data to base table")
2731 for i in range(upper // 10, upper):
2732 self._do_row(insert1, i, num_partitions)
2733
2734 # Wait that all requests are done
2735 while self.num_request_done < upper:
2736 time.sleep(1)
2737
2738 logger.debug("Making sure all batchlogs are replayed on node1")
2739 node1.nodetool("replaybatchlog")
2740 logger.debug("Making sure all batchlogs are replayed on node2")
2741 node2.nodetool("replaybatchlog")
2742 logger.debug("Making sure all batchlogs are replayed on node3")
2743 node3.nodetool("replaybatchlog")
2744
2745 logger.debug("Finished writes, now verifying reads")
2746 self._populate_rows()
2747
2748 threads = []
2749
2750 for i in range(processes):
2751 start = lower + (eachProcess * i)
2752 if i == processes - 1:
2753 end = upper
2754 else:
2755 end = lower + (eachProcess * (i + 1))
2756 q = Queue()
2757 node_ip = get_ip_from_node(node2)
2758 t = threading.Thread(target=thread_session, args=(node_ip, q, start, end, self.rows, num_partitions))
2759 threads.append(t)
2760 t.daemon = True
2761 t.start()
2762 queues[i] = q
2763
2764 for i in range(lower, upper):
2765 if i % 100 == 0:
2766 self._print_read_status(i)
2767 try:
2768 mm = queues[i % processes].get(timeout=60)
2769 except Empty as e:
2770 pytest.skip("Failed to get range {range} within timeout from queue. {error}".format(range=i, error=str(e)))
2771
2772 if not mm.out() is None:
2773 sys.stdout.write("\r{}\n" .format(mm.out()))
2774 self.counts[mm.mp] += 1
2775
2776 self._print_read_status(upper)
2777 sys.stdout.write("\n")
2778 sys.stdout.flush()
2779
2780 for thread in threads:
2781 thread.join(timeout=300)
2782
2783
2784 @since('3.0')
2785 class TestMaterializedViewsLockcontention(Tester):
2786 """
2787 Test materialized views lock contention.
2788 @jira_ticket CASSANDRA-12689
2789 @since 3.0
2790 """
2791
2792 def _prepare_cluster(self):
2793 self.cluster.populate(1)
2794 self.supports_v5_protocol = self.supports_v5_protocol(self.cluster.version())
2795 self.protocol_version = 5 if self.supports_v5_protocol else 4
2796
2797 self.cluster.set_configuration_options(values={
2798 'concurrent_materialized_view_writes': 1,
2799 'concurrent_writes': 1,
2800 })
2801 self.nodes = list(self.cluster.nodes.values())
2802 for node in self.nodes:
2803 remove_perf_disable_shared_mem(node)
2804
2805 self.cluster.start(wait_for_binary_proto=True, jvm_args=[
2806 "-Dcassandra.test.fail_mv_locks_count=64"
2807 ])
2808
2809 session = self.patient_exclusive_cql_connection(self.nodes[0], protocol_version=self.protocol_version)
2810
2811 keyspace = "locktest"
2812 session.execute("""
2813 CREATE KEYSPACE IF NOT EXISTS {}
2814 WITH replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '1' }}
2815 """.format(keyspace))
2816 session.set_keyspace(keyspace)
2817
2818 session.execute(
2819 "CREATE TABLE IF NOT EXISTS test (int1 int, int2 int, date timestamp, PRIMARY KEY (int1, int2))")
2820 session.execute("""CREATE MATERIALIZED VIEW test_sorted_mv AS
2821 SELECT int1, date, int2
2822 FROM test
2823 WHERE int1 IS NOT NULL AND date IS NOT NULL AND int2 IS NOT NULL
2824 PRIMARY KEY (int1, date, int2)
2825 WITH CLUSTERING ORDER BY (date DESC, int1 DESC)""")
2826
2827 return session
2828
2829 @since('3.0')
2830 def test_mutations_dontblock(self):
2831 session = self._prepare_cluster()
2832 records = 100
2833 records2 = 100
2834 params = []
2835 for x in range(records):
2836 for y in range(records2):
2837 params.append([x, y])
2838
2839 execute_concurrent_with_args(
2840 session,
2841 session.prepare('INSERT INTO test (int1, int2, date) VALUES (?, ?, toTimestamp(now()))'),
2842 params
2843 )
2844
2845 assert_one(session, "SELECT count(*) FROM test WHERE int1 = 1", [records2])
2846
2847 for node in self.nodes:
2848 with JolokiaAgent(node) as jmx:
2849 mutationStagePending = jmx.read_attribute(
2850 make_mbean('metrics', type="ThreadPools", path='request', scope='MutationStage', name='PendingTasks'), "Value"
2851 )
2852 assert 0 == mutationStagePending, "Pending mutations: {}".format(mutationStagePending)