forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 2
/
thrift_hsha_test.py
112 lines (93 loc) · 4 KB
/
thrift_hsha_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from dtest import Tester, debug, DEFAULT_DIR
import unittest
import time
import os
import subprocess
import shlex
import pycassa
import glob
import sys
JNA_PATH = '/usr/share/java/jna.jar'
ATTACK_JAR = 'cassandra-attack.jar'
# Use jna.jar in {CASSANDRA_DIR,DEFAULT_DIR}/lib/, since >=2.1 needs correct version
try:
if glob.glob('%s/lib/jna-*.jar' % os.environ['CASSANDRA_DIR']):
debug('Using jna.jar in CASSANDRA_DIR/lib..')
JNA_IN_LIB = glob.glob('%s/lib/jna-*.jar' % os.environ['CASSANDRA_DIR'])
JNA_PATH = JNA_IN_LIB[0]
except KeyError:
if glob.glob('%s/lib/jna-*.jar' % DEFAULT_DIR):
print ('Using jna.jar in DEFAULT_DIR/lib/..')
JNA_IN_LIB = glob.glob('%s/lib/jna-*.jar' % DEFAULT_DIR)
JNA_PATH = JNA_IN_LIB[0]
class ThriftHSHATest(Tester):
def __init__(self, *args, **kwargs):
Tester.__init__(self, *args, **kwargs)
@unittest.skipIf(sys.platform == "win32", 'Could not be executed on Windows')
def test_closing_connections(self):
"""Test CASSANDRA-6546 - do connections get closed when disabling / renabling thrift service?"""
cluster = self.cluster
cluster.set_configuration_options(values={
'rpc_server_type' : 'hsha',
'rpc_max_threads' : 20
})
cluster.populate(1)
cluster.start()
(node1,) = cluster.nodelist()
cursor = self.patient_cql_connection(node1).cursor()
self.create_ks(cursor, 'test', 1)
cursor.execute("CREATE TABLE \"CF\" (key text PRIMARY KEY, val text) WITH COMPACT STORAGE;")
def make_connection():
pool = pycassa.ConnectionPool('test', timeout=None)
cf = pycassa.ColumnFamily(pool, 'CF')
return pool
pools = []
for i in xrange(10):
debug("Creating connection pools..")
for x in xrange(3):
pools.append(make_connection())
debug("Disabling/Enabling thrift iteration #{i}".format(i=i))
node1.nodetool('disablethrift')
node1.nodetool('enablethrift')
debug("Closing connections from the client side..")
for pool in pools:
pool.dispose()
stdout = subprocess.Popen(["lsof -a -p %s -iTCP -sTCP:CLOSE_WAIT" % node1.pid], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True).communicate()[0]
lines = stdout.splitlines()
self.assertEqual(len(lines), 0, "There are non-closed connections: %s" % stdout)
@unittest.skipIf(not os.path.exists(ATTACK_JAR), "No attack jar found")
@unittest.skipIf(not os.path.exists(JNA_PATH), "No JNA jar found")
def test_6285(self):
"""Test CASSANDRA-6285 with Viktor Kuzmin's attack jar.
This jar file is not a part of this repository, you can
compile it yourself from sources found on CASSANDRA-6285. This
test will be skipped if the jar file is not found.
"""
cluster = self.cluster
cluster.set_configuration_options(values={ 'rpc_server_type' : 'hsha'})
# Enable JNA:
with open(os.path.join(self.test_path, 'test', 'cassandra.in.sh'),'w') as f:
f.write('CLASSPATH={jna_path}:$CLASSPATH\n'.format(
jna_path=JNA_PATH))
cluster.populate(2)
nodes = (node1, node2) = cluster.nodelist()
[n.start(use_jna=True) for n in nodes]
debug("Cluster started.")
cursor = self.patient_cql_connection(node1).cursor()
self.create_ks(cursor, 'tmp', 2)
cursor.execute("""CREATE TABLE "CF" (
key blob,
column1 timeuuid,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE;
""")
debug("running attack jar...")
p = subprocess.Popen(shlex.split("java -jar {attack_jar}".format(attack_jar=ATTACK_JAR)))
p.communicate()
debug("Stopping cluster..")
cluster.stop()
debug("Starting cluster..")
cluster.start(no_wait=True)
debug("Waiting 10 seconds before we're done..")
time.sleep(10)