forked from SvenskaSpel/locust-plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsocketio.py
143 lines (130 loc) · 5.15 KB
/
socketio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import json
import logging
import re
import time
import gevent
import websocket
from locust import User
from websocket._abnf import *
class SocketIOUser(User):
"""
A locust that includes a socket io websocket connection.
You could easily use this a template for plain WebSockets,
socket.io just happens to be my use case. You can use multiple
inheritance to combine this with an HttpUser
(class MyUser(HttpUser, SocketIOUser)
"""
abstract = True
message_regex = re.compile(r"(\d*)(.*)")
description_regex = re.compile(r"<([0-9]+)>$")
def connect(self, host: str, header=[]):
self.ws = websocket.create_connection(host, header=header)
gevent.spawn(self.receive_loop)
def on_message(self, message): # override this method in your subclass for custom handling
m = self.message_regex.match(message)
response_time = 0 # unknown
if m is None:
# uh oh...
raise Exception(f"got no matches for {self.message_regex} in {message}")
code = m.group(1)
json_string = m.group(2)
if code == "0":
name = "0 open"
elif code == "3":
name = "3 heartbeat"
elif code == "40":
name = "40 message ok"
elif code == "42":
# this is rather specific to our use case. Some messages contain an originating timestamp,
# and we use that to calculate the delay & report it as locust response time
# see it as inspiration rather than something you just pick up and use
current_timestamp = time.time()
obj = json.loads(json_string)
logging.debug(json_string)
ts_type, payload = obj
name = f"{code} {ts_type} apiUri: {payload['apiUri']}"
if payload["value"] != "":
value = payload["value"]
if "draw" in value:
description = value["draw"]["description"]
description_match = self.description_regex.search(description)
if description_match:
sent_timestamp = int(description_match.group(1))
response_time = current_timestamp - sent_timestamp
else:
# differentiate samples that have no timestamps from ones that do
name += "_"
elif "source_ts" in value:
sent_timestamp = value["source_ts"]
response_time = (current_timestamp - sent_timestamp) * 1000
else:
name += "_missingTimestamp"
else:
print(f"Received unexpected message: {message}")
return
self.environment.events.request.fire(
request_type="WSR",
name=name,
response_time=response_time,
response_length=len(message),
exception=None,
context=self.context(),
)
def receive_loop(self):
while True:
message = self.ws.recv()
logging.debug(f"WSR: {message}")
self.on_message(message)
def send(self, body, name=None, context={}):
if not name:
if body == "2":
name = "2 heartbeat"
else:
# hoping this is a subscribe type message, try to detect name
m = re.search(r'(\d*)\["([a-z]*)"', body)
assert m is not None
code = m.group(1)
action = m.group(2)
url_part = re.search(r'"url": *"([^"]*)"', body)
assert url_part is not None
url = re.sub(r"/[0-9_]*/", "/:id/", url_part.group(1))
name = f"{code} {action} url: {url}"
self.environment.events.request.fire(
request_type="WSS",
name=name,
response_time=None,
response_length=len(body),
exception=None,
context={**self.context(), **context},
)
logging.debug(f"WSS: {body}")
self.ws.send(body)
def send_binary(self, body, name=None, context={}):
if not name:
if body == "2":
name = "2 heartbeat"
else:
# hoping this is a subscribe type message, try to detect name
m = re.search(r'(\d*)\["([a-z]*)"', body)
assert m is not None
code = m.group(1)
action = m.group(2)
url_part = re.search(r'"url": *"([^"]*)"', body)
assert url_part is not None
url = re.sub(r"/[0-9_]*/", "/:id/", url_part.group(1))
name = f"{code} {action} url: {url}"
self.environment.events.request.fire(
request_type="WSS",
name=name,
response_time=None,
response_length=len(body),
exception=None,
context={**self.context(), **context},
)
logging.debug(f"WSS: {body}")
self.ws.send_binary(body)
def sleep_with_heartbeat(self, seconds):
while seconds >= 0:
gevent.sleep(min(15, seconds))
seconds -= 15
self.send("2")