-
Notifications
You must be signed in to change notification settings - Fork 131
/
helperclient.py
302 lines (249 loc) · 10.8 KB
/
helperclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import random
import threading
from coapthon.messages.message import Message
from coapthon import defines
from coapthon.client.coap import CoAP
from coapthon.messages.request import Request
from coapthon.utils import generate_random_token
__author__ = 'Giacomo Tanganelli'
class _RequestContext(object):
def __init__(self, request, callback=None):
self.request = request
if callback:
self.callback = callback
else:
self.response = None
self.responded = threading.Event()
class HelperClient(object):
"""
Helper Client class to perform requests to remote servers in a simplified way.
"""
def __init__(self, server, sock=None, cb_ignore_read_exception=None, cb_ignore_write_exception=None):
"""
Initialize a client to perform request to a server.
:param server: the remote CoAP server
:param sock: if a socket has been created externally, it can be used directly
:param cb_ignore_read_exception: Callback function to handle exception raised during the socket read operation
:param cb_ignore_write_exception: Callback function to handle exception raised during the socket write operation
"""
self.server = server
self.protocol = CoAP(self.server, random.randint(1, 65535), self._wait_response, sock=sock,
cb_ignore_read_exception=cb_ignore_read_exception, cb_ignore_write_exception=cb_ignore_write_exception)
self.requests_lock = threading.RLock()
self.requests = dict()
def _wait_response(self, message):
"""
Private function to get responses from the server.
:param message: the received message
"""
if message.code == defines.Codes.CONTINUE.number:
return
with self.requests_lock:
if message.token not in self.requests:
return
context = self.requests[message.token]
if message.timeouted:
# Message is actually the original timed out request (not the response), discard content
message = None
if hasattr(context, 'callback'):
if not hasattr(context.request, 'observe'):
# OBSERVE stays until cancelled, for all others we're done
del self.requests[message.token]
context.callback(message)
else:
# Signal that a response is available to blocking call
context.response = message
context.responded.set()
def stop(self):
"""
Stop the client.
"""
self.protocol.close()
with self.requests_lock:
# Unblock/signal waiters
for token in self.requests:
context = self.requests[token]
if hasattr(context, 'callback'):
context.callback(None)
else:
context.responded.set()
def close(self):
"""
Close the client.
"""
self.stop()
def cancel_observe_token(self, token, explicit, timeout=None): # pragma: no cover
"""
Delete observing on the remote server.
:param token: the observe token
:param explicit: if explicitly cancel
:type explicit: bool
"""
with self.requests_lock:
if token not in self.requests:
return
if not hasattr(self.requests[token].request, 'observe'):
return
context = self.requests[token]
del self.requests[token]
self.protocol.end_observation(token)
if not explicit:
return
request = self.mk_request(defines.Codes.GET, context.request.uri_path)
# RFC7641 explicit cancel is by sending OBSERVE=1 with the same token,
# not by an unsolicited RST (which would be ignored)
request.token = token
request.observe = 1
self.send_request(request, callback=None, timeout=timeout)
def cancel_observing(self, response, explicit): # pragma: no cover
"""
Delete observing on the remote server.
:param response: the last received response
:param explicit: if explicitly cancel using token
:type send_rst: bool
"""
self.cancel_observe_token(self, response.token, explicit)
def get(self, path, proxy_uri=None, callback=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a GET on a certain path.
:param path: the path
:param proxy_uri: Proxy-Uri option of a request
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response
"""
request = self.mk_request(defines.Codes.GET, path)
request.token = generate_random_token(2)
if proxy_uri:
request.proxy_uri = proxy_uri
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def observe(self, path, callback, uri_query=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a GET with observe on a certain path.
:param path: the path
:param callback: the callback function to invoke upon notifications
:param timeout: the timeout of the request
:return: the response to the observe request
"""
request = self.mk_request(defines.Codes.GET, path)
request.token = generate_random_token(2)
request.observe = 0
if uri_query is not None:
request.uri_query = uri_query
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def delete(self, path, proxy_uri=None, callback=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a DELETE on a certain path.
:param path: the path
:param proxy_uri: Proxy-Uri option of a request
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response
"""
request = self.mk_request(defines.Codes.DELETE, path)
request.token = generate_random_token(2)
if proxy_uri:
request.proxy_uri = proxy_uri
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def post(self, path, payload, proxy_uri=None, callback=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a POST on a certain path.
:param path: the path
:param payload: the request payload
:param proxy_uri: Proxy-Uri option of a request
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response
"""
request = self.mk_request(defines.Codes.POST, path)
request.token = generate_random_token(2)
request.payload = payload
if proxy_uri:
request.proxy_uri = proxy_uri
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def put(self, path, payload, proxy_uri=None, callback=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a PUT on a certain path.
:param path: the path
:param payload: the request payload
:param proxy_uri: Proxy-Uri option of a request
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response
"""
request = self.mk_request(defines.Codes.PUT, path)
request.token = generate_random_token(2)
request.payload = payload
if proxy_uri:
request.proxy_uri = proxy_uri
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def discover(self, callback=None, timeout=None, **kwargs): # pragma: no cover
"""
Perform a Discover request on the server.
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response
"""
request = self.mk_request(defines.Codes.GET, defines.DISCOVERY_URL)
request.token = generate_random_token(2)
for k, v in kwargs.iteritems():
if hasattr(request, k):
setattr(request, k, v)
return self.send_request(request, callback, timeout)
def send_request(self, request, callback=None, timeout=None): # pragma: no cover
"""
Send a request to the remote server.
:param request: the request to send
:param callback: the callback function to invoke upon response
:param timeout: the timeout of the request
:return: the response (synchronous), or the token (for asynchronous callback)
"""
with self.requests_lock:
# Same requests from the same endpoint must have different tokens
# Ensure there is a unique token in case the other side issues a
# delayed response after a standalone ACK
while request.token in self.requests:
request.token = generate_random_token(2)
context = _RequestContext(request, callback)
self.requests[request.token] = context
self.protocol.send_message(request)
if callback:
# So that requester can cancel asynchronous OBSERVE
return request.token
# Wait for response
context.responded.wait(timeout)
del self.requests[request.token]
return context.response
def send_empty(self, empty): # pragma: no cover
"""
Send empty message.
:param empty: the empty message
"""
self.protocol.send_message(empty)
def mk_request(self, method, path):
"""
Create a request.
:param method: the CoAP method
:param path: the path of the request
:return: the request
"""
request = Request()
request.destination = self.server
request.code = method.number
request.uri_path = path
return request