-
Notifications
You must be signed in to change notification settings - Fork 5
/
impala_load_test.py
executable file
·293 lines (250 loc) · 10.7 KB
/
impala_load_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#!/usr/bin/env python
# Copyright 2014 (c) Cloudera
import argparse
import datetime
from threading import Thread
import time
import random
import signal
import sys
import socket
from impala.dbapi import connect
from impala.rpc import TTransportException
import tornado.ioloop
try:
import simplejson as json
except ImportError:
import json
import tornado.web
from stats import get_stats_tables, print_stats
# Date Format
DATE_FORMAT = "%m-%d-%Y %H:%M:%S"
# Time in-between queries
TIME_BETWEEN_QUERIES = 1 # Seconds
class ImpalaQueryScheduler(Thread):
"""
Thread responsible for launching and monitoring ImpalaQuery threads
"""
def __init__(self, queries, num_threads, impala_hosts, stats_port, kerberos=False, kerberos_service="impala"):
# Define the number of threads to launch
Thread.__init__(self)
self.__num_threads = num_threads
self.__queries = queries
self.__finished = False
self.__impala_threads = []
self.__impala_hosts = impala_hosts
self.__connection_pool = (host for host in self.__impala_hosts)
self.__stats_port = stats_port
self.__start_time = datetime.datetime.now()
self.__kerberos = kerberos
self.__kerberos_service = kerberos_service
def get_new_connection(self):
# Use a generator to keep a revolving iteration of the impala hosts, guaranteeing even connection distribution
try:
impala_host = self.__connection_pool.next()
except StopIteration:
# This means we've reached the end of the Python generator, let's reset it
self.__connection_pool = (host for host in self.__impala_hosts)
impala_host = self.__connection_pool.next()
def connection_callback():
"""
Returns Impala connection when executed, used to speed up thread startup times.
"""
try:
connection = connect(host=impala_host, port=21050, use_kerberos=self.__kerberos,
kerberos_service_name=self.__kerberos_service)
connection.host = impala_host # Slap the hostname inside the connection object
return connection
except TTransportException:
raise NameError("Error connecting to Impala daemon [%s]" % impala_host)
return connection_callback
def run(self):
"""
Execute Impala query threads which run randomized queries
"""
for _ in range(self.__num_threads):
query_thread = ImpalaQuery(self.get_new_connection(), self.__queries)
self.__impala_threads.append(query_thread)
print("Starting " + query_thread.name)
for query_thread in self.__impala_threads:
query_thread.start()
# Start HTTP Stats Thread
self.__http_thread = HttpThread(self.__stats_port, stats_method=self.stats)
self.__http_thread.start()
def shutdown(self):
"""
Shut down all the threads
"""
print("Shutting down...")
self.__http_thread.shutdown()
# Shutdown the threads
for thread in self.__impala_threads:
thread.shutdown()
# Wait for the threads to shutdown
print("Waiting for %d threads to to shutdown..." % len(self.__impala_threads))
for thread in self.__impala_threads:
while thread.stats()["running"]:
time.sleep(.10)
print(thread.name + " shutdown")
self.__finished = True
print("Scheduler shutdown")
@property
def finished(self):
return self.__finished
def stats(self):
stats = {}
for query_thread in self.__impala_threads:
stats[query_thread.name] = query_thread.stats()
total_successful_queries = sum([i["successful"] for i in stats.values()])
total_failed_queries = sum([i["failures"] for i in stats.values()])
average_query_time = sum([i["average_query_time"] for i in stats.values() if i["average_query_time"]]) / len(
self.__impala_threads)
stats["total_successful_queries"] = total_successful_queries
stats["total_failed_queries"] = total_failed_queries
stats["average_query_time"] = average_query_time
# basic estiamtion of the "queries per hour" metric
total_runtime = (datetime.datetime.now() - self.__start_time)
stats["total_runtime"] = str(total_runtime)
if total_successful_queries > 0:
stats["queries_per_hour"] = int(
(float(total_successful_queries) / float(total_runtime.seconds)) * (60.0 * 60.0))
else:
stats["queries_per_hour"] = 0
return stats
class ImpalaQuery(Thread):
def __init__(self, connection, queries):
"""
Given an Impala connection and an array of queries, continously
execute a random query against Impala.
"""
Thread.__init__(self)
self.__queries = queries
self.__shutdown = False
self.__failures = 0
self.__successful = 0
self.__start_time = datetime.datetime.now()
self.__running = False
self.__failed_queries = []
self.__query_times = []
self.__running_query = False
try:
self.__connection = connection() # This is a connection callback, established here
except NameError as e:
self.say(e.message)
self.__connection = None
self.__shutdown = True
def random_query(self):
return random.choice(self.__queries)
def say(self, message):
print(self.name + ": " + message)
def run(self):
self.__running = True
num_queries = 0
while not self.__shutdown:
# Try to run a query
query = self.random_query()
num_queries += 1
try:
self.__active_cursor = self.__connection.cursor()
self.__running_start_time = time.time()
self.__running_query = True
self.__active_cursor.execute(query)
_ = self.__active_cursor.fetchall()
self.__active_cursor.close()
self.__running_query = False
self.__running_end_time = time.time()
self.__successful += 1
self.__query_times.append((self.__running_end_time - self.__running_start_time))
except Exception as e:
self.say("QUERY EXCEPTION: [%s]" % e.message)
self.__failures += 1
self.__failed_queries.append(
dict(query=query, exception=e.message, failed_time=datetime.datetime.now().strftime(DATE_FORMAT)))
time.sleep(TIME_BETWEEN_QUERIES)
print(self.name + " shutdown")
self.__running = False
def shutdown(self):
self.say("closing impala connection")
if self.__connection:
self.__connection.close()
self.__shutdown = True
self.say("closed impala connection")
@property
def average_run_time(self):
if self.__query_times:
return sum(self.__query_times) / len(self.__query_times)
def stats(self):
stats = dict(failures=self.__failures, successful=self.__successful,
start_time=self.__start_time.strftime(DATE_FORMAT), running=self.__running,
failed_queries=self.__failed_queries,
average_query_time=self.average_run_time)
if self.__connection:
stats["impala_host"] = self.__connection.host
else:
stats["impala_host"] = "Not Connected"
# See if we have an active query, if so, find out how long it's been running
if self.__running_query:
stats["currently_running_query_time"] = time.time() - self.__running_start_time
stats["currently_running_query"] = True
else:
stats["currently_running_query"] = False
total_runtime = (datetime.datetime.now() - self.__start_time)
stats["total_runtime"] = str(total_runtime)
if self.__successful > 0:
stats["queries_per_hour"] = int(
(float(self.__successful) / float(total_runtime.seconds)) * (60.0 * 60.0))
else:
stats["queries_per_hour"] = 0
return stats
class HttpThread(Thread):
def __init__(self, port, stats_method):
Thread.__init__(self)
self.__stats_method = stats_method
self.__port = port
def run(self):
stats_http = tornado.web.Application([(r"/", self.StatsHttpHandler, dict(stats_method=self.__stats_method))])
stats_http.listen(self.__port)
print("Stats HTTP handler listening => http://%s:%d/" % (socket.getfqdn(), self.__port))
tornado.ioloop.IOLoop.instance().start()
def shutdown(self):
tornado.ioloop.IOLoop.instance().stop()
print("http thread stopped")
class StatsHttpHandler(tornado.web.RequestHandler):
def initialize(self, stats_method):
self.__stats_method = stats_method
def get(self):
self.write(json.dumps(self.__stats_method()))
if __name__ == "__main__":
argparser = argparse.ArgumentParser(description="Simple Impala workload simulator.")
argparser.add_argument("--query_file", metavar="f", type=str, required=True)
argparser.add_argument("--threads", metavar="t", type=int, required=True)
argparser.add_argument("--impala_hosts", metavar="h", type=str, required=True)
argparser.add_argument("--stats_port", metavar="p", type=int, default=8888)
argparser.add_argument("--kerberos", metavar="k", type=bool, default=False)
argparser.add_argument("--kerberos_service", type=str, default="impala")
args = argparser.parse_args()
queries_file = open(args.query_file, "r")
queries = queries_file.read().replace("\n", "").split(";")
queries.pop(-1) # remove trailing empty element
# If kerberos is enabled, make sure we have the sasl library
if args.kerberos:
try:
import sasl
except ImportError:
print("You need the sasl library in order to use kerberos")
sys.exit(1)
scheduler = ImpalaQueryScheduler(queries, args.threads, args.impala_hosts.split(","), args.stats_port,
args.kerberos, args.kerberos_service)
# Keep an empty place holder for the final stats
stats = None
def signal_handler(signal, frame):
global stats
stats = get_stats_tables(url="http://localhost:%d" % args.stats_port)
scheduler.shutdown()
while not scheduler.finished:
time.sleep(.10)
scheduler.start()
signal.signal(signal.SIGINT, signal_handler)
signal.pause()
print("\n\n")
print_stats(stats)