forked from SvenskaSpel/locust-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listeners.py
488 lines (424 loc) · 18.2 KB
/
listeners.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
from contextlib import contextmanager
from locust.exception import (
RescheduleTask,
StopUser,
CatchResponseError,
InterruptTaskSet,
) # need to do this first to make sure monkey patching is done
import gevent
import psycogreen.gevent
import json
psycogreen.gevent.patch_psycopg()
import psycopg2
import psycopg2.extras
import atexit
import logging
import os
import socket
import sys
from datetime import datetime, timezone, timedelta
import greenlet
from dateutil import parser
import subprocess
import locust.env
from typing import Callable, List
from gevent.lock import Semaphore
# pylint: disable=trailing-whitespace # pylint is confused by multiline strings used for SQL
def safe_serialize(obj):
default = lambda o: f"<<non-serializable: {type(o).__qualname__}>>"
return json.dumps(obj, default=default)
def print_t(s):
print(str(s), end="\t")
class Timescale: # pylint: disable=R0902
"""
See timescale_listener_ex.py for documentation
"""
dblock = Semaphore()
first_instance = True
def __init__(
self,
env: locust.env.Environment,
testplan: str = None,
):
if not Timescale.first_instance:
# we should refactor this into a module as it is much more pythonic
raise Exception(
"You tried to initialize the Timescale listener twice, maybe both in your locustfile and using command line --timescale? Ignoring second initialization."
)
Timescale.first_instance = False
if testplan:
self._testplan = testplan # legacy
elif env.parsed_options.override_plan_name: # type: ignore[union-attr]
self._testplan = env.parsed_options.override_plan_name
else:
self._testplan = env.parsed_options.locustfile
self.env = env
self._samples: List[dict] = []
self._background = gevent.spawn(self._run)
self._hostname = socket.gethostname() # pylint: disable=no-member
self._username = os.getenv("USER", "unknown")
self._finished = False
self._pid = os.getpid()
events = self.env.events
events.test_start.add_listener(self.on_start)
events.request.add_listener(self.on_request)
events.quit.add_listener(self.on_quit)
events.spawning_complete.add_listener(self.spawning_complete)
atexit.register(self.log_stop_test_run)
@contextmanager
def dbcursor(self):
with self.dblock:
try:
yield self.dbconn.cursor()
except psycopg2.Error:
try:
# try to recreate connection
self.dbconn = self._dbconn()
except:
pass
raise
def on_start(self, environment: locust.env.Environment):
try:
self.dbconn = self._dbconn()
except psycopg2.OperationalError as e:
logging.error(e)
os._exit(1)
try:
self._gitrepo = subprocess.check_output(
"git remote show origin -n 2>/dev/null | grep h.URL | sed 's/.*://;s/.git$//' || true",
shell=True,
stderr=None,
universal_newlines=True,
)
except:
# happens on windows
self._gitrepo = None
if is_worker() or is_master():
# swarm generates the run id for its master and workers
if getattr(environment.parsed_options, "run_id", False):
self._run_id = parser.parse(environment.parsed_options.run_id)
else:
logging.info(
"You are running distributed, but without swarm. run_id:s in Timescale will not match exactly between load gens"
)
self._run_id = datetime.now(timezone.utc)
else:
# non-swarm runs need to generate the run id here
self._run_id = datetime.now(timezone.utc)
if not is_worker():
logging.info(
f"Follow test run here: {self.env.parsed_options.grafana_url}&var-testplan={self._testplan}&from={int(self._run_id.timestamp()*1000)}&to=now"
)
self.log_start_testrun()
self._user_count_logger = gevent.spawn(self._log_user_count)
def _dbconn(self) -> psycopg2.extensions.connection:
try:
conn = psycopg2.connect(
host=self.env.parsed_options.pghost,
user=self.env.parsed_options.pguser,
password=self.env.parsed_options.pgpassword,
database=self.env.parsed_options.pgdatabase,
port=self.env.parsed_options.pgport,
keepalives_idle=120,
keepalives_interval=20,
keepalives_count=6,
)
except Exception:
logging.error(
"Could not connect to postgres. Use standard postgres env vars or --pg* command line options to specify where to report locust samples (https://www.postgresql.org/docs/13/libpq-envars.html)"
)
raise
conn.autocommit = True
return conn
def _log_user_count(self):
while True:
if self.env.runner is None:
return # there is no runner, so nothing to log...
try:
with self.dbcursor() as cur:
cur.execute(
"""INSERT INTO user_count(time, run_id, testplan, user_count) VALUES (%s, %s, %s, %s)""",
(datetime.now(timezone.utc), self._run_id, self._testplan, self.env.runner.user_count),
)
except psycopg2.Error as error:
logging.error("Failed to write user count to Postgresql: " + repr(error))
try:
# try to recreate connection
self.user_conn = self._dbconn()
except:
pass
gevent.sleep(2.0)
def _run(self):
while True:
if self._samples:
# Buffer samples, so that a locust greenlet will write to the new list
# instead of the one that has been sent into postgres client
samples_buffer = self._samples
self._samples = []
self.write_samples_to_db(samples_buffer)
else:
if self._finished:
break
gevent.sleep(0.5)
def write_samples_to_db(self, samples):
try:
with self.dbcursor() as cur:
psycopg2.extras.execute_values(
cur,
"""INSERT INTO request(time,run_id,greenlet_id,loadgen,name,request_type,response_time,success,testplan,response_length,exception,pid,url,context) VALUES %s""",
samples,
template="(%(time)s, %(run_id)s, %(greenlet_id)s, %(loadgen)s, %(name)s, %(request_type)s, %(response_time)s, %(success)s, %(testplan)s, %(response_length)s, %(exception)s, %(pid)s, %(url)s, %(context)s)",
)
except psycopg2.Error as error:
logging.error("Failed to write samples to Postgresql timescale database: " + repr(error))
os._exit(1)
def on_quit(self, exit_code, **kwargs):
self._finished = True
atexit._clear() # make sure we dont capture additional ctrl-c:s # pylint: disable=protected-access
self._background.join(timeout=10)
if getattr(self, "_user_count_logger", False):
self._user_count_logger.kill()
self.log_stop_test_run(exit_code)
def on_request(
self,
request_type,
name,
response_time,
response_length,
exception,
context,
start_time=None,
url=None,
**kwargs,
):
success = 0 if exception else 1
if start_time:
time = datetime.fromtimestamp(start_time, tz=timezone.utc)
else:
# some users may not send start_time, so we just make an educated guess
# (which will be horribly wrong if users spend a lot of time in a with/catch_response-block)
time = datetime.now(timezone.utc) - timedelta(milliseconds=response_time or 0)
greenlet_id = getattr(greenlet.getcurrent(), "minimal_ident", 0) # if we're debugging there is no greenlet
sample = {
"time": time,
"run_id": self._run_id,
"greenlet_id": greenlet_id,
"loadgen": self._hostname,
"name": name,
"request_type": request_type,
"response_time": response_time,
"success": success,
"url": url[0:255] if url else None,
"testplan": self._testplan,
"pid": self._pid,
"context": psycopg2.extras.Json(context, safe_serialize),
}
if response_length >= 0:
sample["response_length"] = response_length
else:
sample["response_length"] = None
if exception:
if isinstance(exception, CatchResponseError):
sample["exception"] = str(exception)
else:
try:
sample["exception"] = repr(exception)
except AttributeError:
sample["exception"] = f"{exception.__class__} (and it has no string representation)"
else:
sample["exception"] = None
self._samples.append(sample)
def log_start_testrun(self):
cmd = sys.argv
del cmd[0]
with self.dbcursor() as cur:
cur.execute(
"INSERT INTO testrun (id, testplan, num_clients, rps, description, env, username, gitrepo, changeset_guid, arguments) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(
self._run_id,
self._testplan,
self.env.parsed_options.num_users or 1,
self.env.parsed_options.ips, # this field is incorrectly called "rps" in db, it should be called something like "target_ips"
self.env.parsed_options.description,
self.env.parsed_options.test_env,
self._username,
self._gitrepo,
self.env.parsed_options.test_version,
" ".join(cmd),
),
)
cur.execute(
"INSERT INTO events (time, text) VALUES (%s, %s)",
(datetime.now(timezone.utc).isoformat(), self._testplan + " started by " + self._username),
)
def spawning_complete(self, user_count):
if not is_worker(): # only log for master/standalone
end_time = datetime.now(timezone.utc)
try:
with self.dbcursor() as cur:
cur.execute(
"INSERT INTO events (time, text) VALUES (%s, %s)",
(end_time, f"{self._testplan} rampup complete, {user_count} users spawned"),
)
except psycopg2.Error as error:
logging.error(
"Failed to insert rampup complete event time to Postgresql timescale database: " + repr(error)
)
def log_stop_test_run(self, exit_code=None):
if is_worker():
return # only run on master or standalone
if not self.dbconn:
return # test_start never ran, so there's not much for us to do
end_time = datetime.now(timezone.utc)
try:
with self.dbcursor() as cur:
cur.execute(
"UPDATE testrun SET end_time = %s, exit_code = %s where id = %s",
(end_time, exit_code, self._run_id),
)
cur.execute(
"INSERT INTO events (time, text) VALUES (%s, %s)",
(end_time, self._testplan + f" finished with exit code: {exit_code}"),
)
# The AND time > run_id clause in the following statements are there to help Timescale performance
# We dont use start_time / end_time to calculate RPS, instead we use the time between the actual first and last request
# (as this is a more accurate measurement of the actual test)
try:
cur.execute(
"""
UPDATE testrun
SET (requests, resp_time_avg, rps_avg, fail_ratio) =
(SELECT reqs, resp_time, reqs / GREATEST(duration, 1), fails / reqs) FROM
(SELECT
COUNT(*)::numeric AS reqs,
AVG(response_time)::numeric as resp_time
FROM request WHERE run_id = %s AND time > %s) AS _,
(SELECT
EXTRACT(epoch FROM (SELECT MAX(time)-MIN(time) FROM request WHERE run_id = %s AND time > %s))::numeric AS duration) AS __,
(SELECT
COUNT(*)::numeric AS fails
FROM request WHERE run_id = %s AND time > %s AND success = 0) AS ___
WHERE id = %s""",
[self._run_id] * 7,
)
except psycopg2.errors.DivisionByZero: # pylint: disable=no-member
logging.debug(
"Got DivisionByZero error when trying to update testrun into events, most likely because there were no requests logged"
)
except psycopg2.Error as error:
logging.error(
"Failed to update testrun record (or events) with end time to Postgresql timescale database: "
+ repr(error)
)
logging.info(
f"Report: {self.env.parsed_options.grafana_url}&var-testplan={self._testplan}&from={int(self._run_id.timestamp()*1000)}&to={int((end_time.timestamp()+1)*1000)}\n"
)
class Print:
"""
Print every response (useful when debugging a single locust)
"""
def __init__(self, env: locust.env.Environment, include_length=False, include_time=False, include_context=False):
env.events.request.add_listener(self.on_request)
self.include_length = "length\t" if include_length else ""
self.include_time = "time \t" if include_time else ""
self.include_context = "context\t" if include_context else ""
print(
f"\n{self.include_time}type\t{'name'.ljust(50)}\tresp_ms\t{self.include_length}exception\t{self.include_context}"
)
def on_request(
self, request_type, name, response_time, response_length, exception, context: dict, start_time=None, **kwargs
):
if exception:
if isinstance(exception, CatchResponseError):
e = str(exception)
else:
try:
e = repr(exception)
except AttributeError:
e = f"{exception.__class__} (and it has no string representation)"
errortext = e[:500].replace("\n", " ")
else:
errortext = ""
if not context:
context = ""
if response_time is None:
response_time = -1
n = name.ljust(30) if name else ""
if self.include_time:
if start_time:
print_t(datetime.fromtimestamp(start_time, tz=timezone.utc))
else:
print_t(datetime.now())
print_t(request_type)
print_t(n.ljust(50))
print_t(str(round(response_time)).ljust(7))
if self.include_length:
print_t(response_length)
print_t(errortext.ljust(9))
if self.include_context:
print_t(context)
print()
class RescheduleTaskOnFail:
def __init__(self, env: locust.env.Environment):
# make sure to add this listener LAST, because any failures will throw an exception,
# causing other listeners to be skipped
env.events.request.add_listener(self.request)
def request(self, exception, **kwargs):
if exception:
raise RescheduleTask(exception)
class InterruptTaskOnFail:
def __init__(self, env: locust.env.Environment):
# make sure to add this listener LAST, because any failures will throw an exception,
# causing other listeners to be skipped
env.events.request.add_listener(self.request)
def request(self, exception, **kwargs):
if exception:
raise InterruptTaskSet()
class StopUserOnFail:
def __init__(self, env: locust.env.Environment):
# make sure to add this listener LAST, because any failures will throw an exception,
# causing other listeners to be skipped
env.events.request.add_listener(self.request)
def request(self, exception, **kwargs):
if exception:
raise StopUser()
class ExitOnFail:
def __init__(self, env: locust.env.Environment):
# make sure to add this listener LAST, because any failures will throw an exception,
# causing other listeners to be skipped
env.events.request.add_listener(self.request)
def request(self, exception, **kwargs):
if exception:
gevent.sleep(0.2) # wait for other listeners output to flush / write to db
os._exit(1)
class QuitOnFail:
def __init__(self, env: locust.env.Environment, name=None):
# make sure to add this listener LAST, because any failures will throw an exception,
# causing other listeners to be skipped
self.name = name
self.env = env
env.events.request.add_listener(self.request)
def request(self, exception, name, **kwargs):
if exception and (name == self.name or not self.name):
gevent.sleep(0.2) # wait for other listeners output to flush / write to db
self.env.runner.quit()
class RunOnFail:
def __init__(self, env: locust.env.Environment, function: Callable):
# execute the provided function on failure
self.function = function
env.events.request.add_listener(self.request)
def request(self, exception, **kwargs):
if exception:
self.function(exception, **kwargs)
class RunOnUserError:
def __init__(self, env: locust.env.Environment, function: Callable):
# execute the provided function on unhandled exception in a task
self.function = function
env.events.user_error.add_listener(self.user_error)
def user_error(self, user_instance, exception, tb, **kwargs):
if exception:
self.function(user_instance, exception, tb, **kwargs)
def is_worker():
return "--worker" in sys.argv
def is_master():
return "--master" in sys.argv