forked from apache/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopology_test.py
495 lines (400 loc) · 19.8 KB
/
topology_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
import re
import time
import pytest
import logging
from threading import Thread
from cassandra import ConsistencyLevel
from ccmlib.node import TimeoutError, ToolError
from dtest import Tester, create_ks, create_cf
from tools.assertions import assert_almost_equal, assert_all, assert_none
from tools.data import insert_c1c2, query_c1c2
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestTopology(Tester):
def test_do_not_join_ring(self):
"""
@jira_ticket CASSANDRA-9034
Check that AssertionError is not thrown on SizeEstimatesRecorder before node joins ring
"""
cluster = self.cluster.populate(1)
node1, = cluster.nodelist()
node1.start(wait_for_binary_proto=True, join_ring=False,
jvm_args=["-Dcassandra.size_recorder_interval=1"])
# initial delay is 30s
time.sleep(40)
node1.stop(gently=False)
@since('3.0.11')
def test_size_estimates_multidc(self):
"""
Test that primary ranges are correctly generated on
system.size_estimates for multi-dc, multi-ks scenario
@jira_ticket CASSANDRA-9639
"""
logger.debug("Creating cluster")
cluster = self.cluster
cluster.set_configuration_options(values={'num_tokens': 2})
cluster.populate([2, 1])
node1_1, node1_2, node2_1 = cluster.nodelist()
logger.debug("Setting tokens")
node1_tokens, node2_tokens, node3_tokens = ['-6639341390736545756,-2688160409776496397',
'-2506475074448728501,8473270337963525440',
'-3736333188524231709,8673615181726552074']
node1_1.set_configuration_options(values={'initial_token': node1_tokens})
node1_2.set_configuration_options(values={'initial_token': node2_tokens})
node2_1.set_configuration_options(values={'initial_token': node3_tokens})
cluster.set_configuration_options(values={'num_tokens': 2})
logger.debug("Starting cluster")
cluster.start()
out, _, _ = node1_1.nodetool('ring')
logger.debug("Nodetool ring output {}".format(out))
logger.debug("Creating keyspaces")
session = self.patient_cql_connection(node1_1)
create_ks(session, 'ks1', 3)
create_ks(session, 'ks2', {'dc1': 2})
create_cf(session, 'ks1.cf1', columns={'c1': 'text', 'c2': 'text'})
create_cf(session, 'ks2.cf2', columns={'c1': 'text', 'c2': 'text'})
logger.debug("Refreshing size estimates")
node1_1.nodetool('refreshsizeestimates')
node1_2.nodetool('refreshsizeestimates')
node2_1.nodetool('refreshsizeestimates')
"""
CREATE KEYSPACE ks1 WITH replication =
{'class': 'SimpleStrategy', 'replication_factor': '3'}
CREATE KEYSPACE ks2 WITH replication =
{'class': 'NetworkTopologyStrategy', 'dc1': '2'} AND durable_writes = true;
Datacenter: dc1
==========
Address Token
8473270337963525440
127.0.0.1 -6639341390736545756
127.0.0.1 -2688160409776496397
127.0.0.2 -2506475074448728501
127.0.0.2 8473270337963525440
Datacenter: dc2
==========
Address Token
8673615181726552074
127.0.0.3 -3736333188524231709
127.0.0.3 8673615181726552074
"""
logger.debug("Checking node1_1 size_estimates primary ranges")
session = self.patient_exclusive_cql_connection(node1_1)
assert_all(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks1'", [['-3736333188524231709', '-2688160409776496397'],
['-9223372036854775808', '-6639341390736545756'],
['8673615181726552074', '-9223372036854775808']])
assert_all(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks2'", [['-3736333188524231709', '-2688160409776496397'],
['-6639341390736545756', '-3736333188524231709'],
['-9223372036854775808', '-6639341390736545756'],
['8473270337963525440', '8673615181726552074'],
['8673615181726552074', '-9223372036854775808']])
logger.debug("Checking node1_2 size_estimates primary ranges")
session = self.patient_exclusive_cql_connection(node1_2)
assert_all(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks1'", [['-2506475074448728501', '8473270337963525440'],
['-2688160409776496397', '-2506475074448728501']])
assert_all(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks2'", [['-2506475074448728501', '8473270337963525440'],
['-2688160409776496397', '-2506475074448728501']])
logger.debug("Checking node2_1 size_estimates primary ranges")
session = self.patient_exclusive_cql_connection(node2_1)
assert_all(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks1'", [['-6639341390736545756', '-3736333188524231709'],
['8473270337963525440', '8673615181726552074']])
assert_none(session, "SELECT range_start, range_end FROM system.size_estimates "
"WHERE keyspace_name = 'ks2'")
def test_simple_decommission(self):
"""
@jira_ticket CASSANDRA-9912
Check that AssertionError is not thrown on SizeEstimatesRecorder after node is decommissioned
"""
cluster = self.cluster
cluster.populate(3)
cluster.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.size_recorder_interval=1"])
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
if cluster.version() >= '2.2':
# reduce system_distributed RF to 2 so we don't require forceful decommission
session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'2'};")
# write some data
node1.stress(['write', 'n=10K', 'no-warmup', '-rate', 'threads=8'])
# Decommission node and wipe its data
node2.decommission()
node2.stop()
# This sleep is here to give the cluster time to hit the AssertionError
# described in 9912. Do not remove it.
time.sleep(10)
@pytest.mark.skip(reason='Hangs on CI for 2.1')
def test_concurrent_decommission_not_allowed(self):
"""
Test concurrent decommission is not allowed
"""
cluster = self.cluster
cluster.set_configuration_options(values={'stream_throughput_outbound_megabits_per_sec': 1})
cluster.populate(2).start(wait_other_notice=True)
node1, node2 = cluster.nodelist()
session = self.patient_cql_connection(node2)
create_ks(session, 'ks', 1)
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=10000, consistency=ConsistencyLevel.ALL)
mark = node2.mark_log()
def decommission():
node2.nodetool('decommission')
# Launch first decommission in a external thread
t = Thread(target=decommission)
t.start()
# Make sure first decommission is initialized before second decommission
node2.watch_log_for('DECOMMISSIONING', filename='debug.log')
# Launch a second decommission, should fail
with pytest.raises(ToolError):
node2.nodetool('decommission')
# Check data is correctly forwarded to node1 after node2 is decommissioned
t.join()
node2.watch_log_for('DECOMMISSIONED', from_mark=mark)
session = self.patient_cql_connection(node1)
session.execute('USE ks')
for n in range(0, 10000):
query_c1c2(session, n, ConsistencyLevel.ONE)
@since('3.10')
def test_resumable_decommission(self):
"""
@jira_ticket CASSANDRA-12008
Test decommission operation is resumable
"""
self.fixture_dtest_setup.ignore_log_patterns = [r'Streaming error occurred',
r'Error while decommissioning node',
r'Remote peer 127.0.0.2 failed stream session',
r'Remote peer 127.0.0.2:7000 failed stream session']
cluster = self.cluster
cluster.set_configuration_options(values={'stream_throughput_outbound_megabits_per_sec': 1})
cluster.populate(3, install_byteman=True).start(wait_other_notice=True)
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node2)
# reduce system_distributed RF to 2 so we don't require forceful decommission
session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'2'};")
create_ks(session, 'ks', 2)
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=10000, consistency=ConsistencyLevel.ALL)
# Execute first rebuild, should fail
with pytest.raises(ToolError):
if cluster.version() >= '4.0':
script = ['./byteman/4.0/decommission_failure_inject.btm']
else:
script = ['./byteman/pre4.0/decommission_failure_inject.btm']
node2.byteman_submit(script)
node2.nodetool('decommission')
# Make sure previous ToolError is due to decommission
node2.watch_log_for('Error while decommissioning node')
# Decommission again
mark = node2.mark_log()
node2.nodetool('decommission')
# Check decommision is done and we skipped transfereed ranges
node2.watch_log_for('DECOMMISSIONED', from_mark=mark)
node2.grep_log("Skipping transferred range .* of keyspace ks, endpoint {}".format(node2.address_for_current_version_slashy()), filename='debug.log')
# Check data is correctly forwarded to node1 and node3
cluster.remove(node2)
node3.stop(gently=False)
session = self.patient_exclusive_cql_connection(node1)
session.execute('USE ks')
for i in range(0, 10000):
query_c1c2(session, i, ConsistencyLevel.ONE)
node1.stop(gently=False)
node3.start()
session.shutdown()
mark = node3.mark_log()
node3.watch_log_for('Starting listening for CQL clients', from_mark=mark)
session = self.patient_exclusive_cql_connection(node3)
session.execute('USE ks')
for i in range(0, 10000):
query_c1c2(session, i, ConsistencyLevel.ONE)
@pytest.mark.no_vnodes
def test_movement(self):
cluster = self.cluster
# Create an unbalanced ring
cluster.populate(3, tokens=[0, 2**48, 2**62]).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=30000, consistency=ConsistencyLevel.ONE)
cluster.flush()
# Move nodes to balance the cluster
def move_node(node, token):
mark = node.mark_log()
node.move(token) # can't assume 0 is balanced with m3p
node.watch_log_for('{} state jump to NORMAL'.format(node.address_for_current_version()), from_mark=mark, timeout=180)
time.sleep(3)
balancing_tokens = cluster.balanced_tokens(3)
move_node(node1, balancing_tokens[0])
move_node(node2, balancing_tokens[1])
move_node(node3, balancing_tokens[2])
time.sleep(1)
cluster.cleanup()
# Check we can get all the keys
for n in range(0, 30000):
query_c1c2(session, n, ConsistencyLevel.ONE)
# Now the load should be basically even
sizes = [node.data_size() for node in [node1, node2, node3]]
assert_almost_equal(sizes[0], sizes[1])
assert_almost_equal(sizes[0], sizes[2])
assert_almost_equal(sizes[1], sizes[2])
@pytest.mark.no_vnodes
def test_decommission(self):
cluster = self.cluster
tokens = cluster.balanced_tokens(4)
cluster.populate(4, tokens=tokens).start()
node1, node2, node3, node4 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=30000, consistency=ConsistencyLevel.QUORUM)
cluster.flush()
sizes = [node.data_size() for node in cluster.nodelist() if node.is_running()]
init_size = sizes[0]
assert_almost_equal(*sizes)
time.sleep(.5)
node4.decommission()
node4.stop()
cluster.cleanup()
time.sleep(.5)
# Check we can get all the keys
for n in range(0, 30000):
query_c1c2(session, n, ConsistencyLevel.QUORUM)
sizes = [node.data_size() for node in cluster.nodelist() if node.is_running()]
logger.debug(sizes)
assert_almost_equal(sizes[0], sizes[1])
assert_almost_equal((2.0 / 3.0) * sizes[0], sizes[2])
assert_almost_equal(sizes[2], init_size)
@pytest.mark.no_vnodes
def test_move_single_node(self):
""" Test moving a node in a single-node cluster (#4200) """
cluster = self.cluster
# Create an unbalanced ring
cluster.populate(1, tokens=[0]).start()
node1 = cluster.nodelist()[0]
time.sleep(0.2)
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=10000, consistency=ConsistencyLevel.ONE)
cluster.flush()
node1.move(2**25)
time.sleep(1)
cluster.cleanup()
# Check we can get all the keys
for n in range(0, 10000):
query_c1c2(session, n, ConsistencyLevel.ONE)
@since('3.0')
def test_decommissioned_node_cant_rejoin(self):
"""
@jira_ticket CASSANDRA-8801
Test that a decommissioned node can't rejoin the cluster by:
- creating a cluster,
- decommissioning a node, and
- asserting that the "decommissioned node won't rejoin" error is in the
logs for that node and
- asserting that the node is not running.
"""
rejoin_err = 'This node was decommissioned and will not rejoin the ring'
self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [
rejoin_err]
self.cluster.populate(3).start(wait_for_binary_proto=True)
node1, node2, node3 = self.cluster.nodelist()
logger.debug('decommissioning...')
node3.decommission(force=self.cluster.version() >= '4.0')
logger.debug('stopping...')
node3.stop()
logger.debug('attempting restart...')
node3.start(wait_other_notice=False)
try:
# usually takes 3 seconds, so give it a generous 15
node3.watch_log_for(rejoin_err, timeout=15)
except TimeoutError:
# TimeoutError is not very helpful to the reader of the test output;
# let that pass and move on to string assertion below
pass
assert re.search(rejoin_err,
'\n'.join(['\n'.join(err_list) for err_list in node3.grep_log_for_errors()]), re.MULTILINE)
# Give the node some time to shut down once it has detected
# its invalid state. If it doesn't shut down in the 30 seconds,
# consider filing a bug. It shouldn't take more than 10, in most cases.
start = time.time()
while start + 30 > time.time() and node3.is_running():
time.sleep(1)
assert not node3.is_running()
@since('3.0')
def test_crash_during_decommission(self):
"""
If a node crashes whilst another node is being decommissioned,
upon restarting the crashed node should not have invalid entries
for the decommissioned node
@jira_ticket CASSANDRA-10231
"""
cluster = self.cluster
self.fixture_dtest_setup.ignore_log_patterns = [r'Streaming error occurred', 'Stream failed']
cluster.populate(3).start(wait_other_notice=True)
node1, node2 = cluster.nodelist()[0:2]
t = DecommissionInParallel(node1)
t.start()
node1.watch_log_for("DECOMMISSIONING", filename='debug.log')
null_status_pattern = re.compile(".N(?:\s*)127\.0\.0\.1(?:.*)null(?:\s*)rack1")
while t.is_alive():
out = self.show_status(node2)
if null_status_pattern.search(out):
logger.debug("Matched null status entry")
break
logger.debug("Restarting node2")
node2.stop(gently=False)
node2.start(wait_for_binary_proto=True, wait_other_notice=False)
logger.debug("Waiting for decommission to complete")
t.join()
self.show_status(node2)
logger.debug("Sleeping for 30 seconds to allow gossip updates")
time.sleep(30)
out = self.show_status(node2)
assert not null_status_pattern.search(out)
@since('3.12')
@pytest.mark.resource_intensive
def test_stop_decommission_too_few_replicas_multi_dc(self):
"""
Decommission should fail when it would result in the number of live replicas being less than
the replication factor. --force should bypass this requirement.
@jira_ticket CASSANDRA-12510
@expected_errors ToolError when # nodes will drop below configured replicas in NTS/SimpleStrategy
"""
cluster = self.cluster
cluster.populate([2, 2]).start(wait_for_binary_proto=True)
node1, node2, node3, node4 = self.cluster.nodelist()
session = self.patient_cql_connection(node2)
session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'2'};")
create_ks(session, 'ks', {'dc1': 2, 'dc2': 2})
with pytest.raises(ToolError):
node4.nodetool('decommission')
session.execute('DROP KEYSPACE ks')
create_ks(session, 'ks2', 4)
with pytest.raises(ToolError):
node4.nodetool('decommission')
node4.nodetool('decommission --force')
decommissioned = node4.watch_log_for("DECOMMISSIONED", timeout=120)
assert decommissioned, "Node failed to decommission when passed --force"
def show_status(self, node):
out, _, _ = node.nodetool('status')
logger.debug("Status as reported by node {}".format(node.address()))
logger.debug(out)
return out
class DecommissionInParallel(Thread):
def __init__(self, node):
Thread.__init__(self)
self.node = node
def run(self):
node = self.node
mark = node.mark_log()
try:
out, err, _ = node.nodetool("decommission")
node.watch_log_for("DECOMMISSIONED", from_mark=mark)
logger.debug(out)
logger.debug(err)
except ToolError as e:
logger.debug("Decommission failed with exception: " + str(e))
pass