-
Notifications
You must be signed in to change notification settings - Fork 131
/
observelayer.py
288 lines (250 loc) · 10.8 KB
/
observelayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import logging
import time
import threading
from coapthon import defines
__author__ = 'Giacomo Tanganelli'
logger = logging.getLogger(__name__)
class ObserveItem(object):
def __init__(self, timestamp, non_counter, allowed, transaction, serv=None):
"""
Data structure for the Observe option
:param timestamp: the timestamop of last message sent
:param non_counter: the number of NON notification sent
:param allowed: if the client is allowed as observer
:param transaction: the transaction
:param serv: reference to CoAP object
"""
self.timestamp = timestamp
self.non_counter = non_counter
self.allowed = allowed
self.transaction = transaction
# parameters for dynamic resource observing
self.conditional = False
self.conditions = {}
self.last_notify = time.time()
self.timer = None
self.coap = serv
# timer for notification procedure is set at (pmax - pmin)/2
def pmax_timer(self):
self.coap.notify(self.transaction.resource)
def start_timer(self):
pmin = 0
pmax = 0
for cond in self.conditions:
if cond == "pmin":
pmin = self.conditions[cond]
elif cond == "pmax":
pmax = self.conditions[cond]
if pmax == 0:
return
else:
self.timer = threading.Timer((pmax-pmin)/2, self.pmax_timer)
self.timer.start()
class ObserveLayer(object):
"""
Manage the observing feature. It store observing relationships.
"""
def __init__(self, server=None):
self._relations = {}
self._server = server
def send_request(self, request):
"""
Add itself to the observing list
:param request: the request
:return: the request unmodified
"""
if request.observe == 0:
# Observe request
host, port = request.destination
key_token = hash(str(host) + str(port) + str(request.token))
self._relations[key_token] = ObserveItem(time.time(), None, True, None)
if request.observe == 1:
# Cancelling observe explicitly
self.remove_subscriber(request)
return request
def receive_response(self, transaction):
"""
Sets notification's parameters.
:type transaction: Transaction
:param transaction: the transaction
:rtype : Transaction
:return: the modified transaction
"""
host, port = transaction.response.source
key_token = hash(str(host) + str(port) + str(transaction.response.token))
if key_token in self._relations and transaction.response.type == defines.Types["CON"]:
transaction.notification = True
return transaction
def send_empty(self, message):
"""
Eventually remove from the observer list in case of a RST message.
:type message: Message
:param message: the message
:return: the message unmodified
"""
host, port = message.destination
key_token = hash(str(host) + str(port) + str(message.token))
if key_token in self._relations and message.type == defines.Types["RST"]:
del self._relations[key_token]
return message
def receive_request(self, transaction):
"""
Manage the observe option in the request end eventually initialize the client for adding to
the list of observers or remove from the list.
:type transaction: Transaction
:param transaction: the transaction that owns the request
:rtype : Transaction
:return: the modified transaction
"""
if transaction.request.observe == 0:
# Observe request
host, port = transaction.request.source
key_token = hash(str(host) + str(port) + str(transaction.request.token))
non_counter = 0
if key_token in self._relations:
# Renew registration
allowed = True
else:
allowed = False
self._relations[key_token] = ObserveItem(time.time(), non_counter, allowed, transaction, self._server)
# check if the observing request has dynamic parameters (sent inside uri_query field)
if transaction.request.uri_query is not None:
logger.info("Dynamic Observing registration")
self._relations[key_token].conditional = True
self._relations[key_token].conditions = ObserveLayer.parse_uri_query(transaction.request.uri_query)
self._relations[key_token].start_timer()
elif transaction.request.observe == 1:
host, port = transaction.request.source
key_token = hash(str(host) + str(port) + str(transaction.request.token))
logger.info("Remove Subscriber")
try:
del self._relations[key_token]
except KeyError:
pass
return transaction
def receive_empty(self, empty, transaction):
"""
Manage the observe feature to remove a client in case of a RST message receveide in reply to a notification.
:type empty: Message
:param empty: the received message
:type transaction: Transaction
:param transaction: the transaction that owns the notification message
:rtype : Transaction
:return: the modified transaction
"""
if empty.type == defines.Types["RST"]:
host, port = transaction.request.source
key_token = hash(str(host) + str(port) + str(transaction.request.token))
logger.info("Remove Subscriber")
try:
del self._relations[key_token]
except KeyError:
pass
transaction.completed = True
return transaction
def send_response(self, transaction):
"""
Finalize to add the client to the list of observer.
:type transaction: Transaction
:param transaction: the transaction that owns the response
:return: the transaction unmodified
"""
host, port = transaction.request.source
key_token = hash(str(host) + str(port) + str(transaction.request.token))
if key_token in self._relations:
if transaction.response.code == defines.Codes.CONTENT.number:
if transaction.resource is not None and transaction.resource.observable:
transaction.response.observe = transaction.resource.observe_count
self._relations[key_token].allowed = True
self._relations[key_token].transaction = transaction
self._relations[key_token].timestamp = time.time()
else:
del self._relations[key_token]
elif transaction.response.code >= defines.Codes.ERROR_LOWER_BOUND:
del self._relations[key_token]
return transaction
def notify(self, resource, root=None):
"""
Prepare notification for the resource to all interested observers.
:rtype: list
:param resource: the resource for which send a new notification
:param root: deprecated
:return: the list of transactions to be notified
"""
ret = []
if root is not None:
resource_list = root.with_prefix_resource(resource.path)
else:
resource_list = [resource]
for key in self._relations.keys():
if self._relations[key].transaction.resource in resource_list:
# checking dynamic resource parameters
if self._relations[key].conditional:
if self.verify_conditions(self._relations[key]) is False:
continue
# updating relation timestamp and resetting timer
self._relations[key].last_notify = time.time()
self._relations[key].timer.cancel()
self._relations[key].start_timer()
if self._relations[key].non_counter > defines.MAX_NON_NOTIFICATIONS \
or self._relations[key].transaction.request.type == defines.Types["CON"]:
self._relations[key].transaction.response.type = defines.Types["CON"]
self._relations[key].non_counter = 0
elif self._relations[key].transaction.request.type == defines.Types["NON"]:
self._relations[key].non_counter += 1
self._relations[key].transaction.response.type = defines.Types["NON"]
self._relations[key].transaction.resource = resource
del self._relations[key].transaction.response.mid
del self._relations[key].transaction.response.token
ret.append(self._relations[key].transaction)
return ret
def remove_subscriber(self, message):
"""
Remove a subscriber based on token.
:param message: the message
"""
logger.debug("Remove Subscriber")
host, port = message.destination
key_token = hash(str(host) + str(port) + str(message.token))
try:
self._relations[key_token].transaction.completed = True
del self._relations[key_token]
except AttributeError:
logger.warning("No Transaction")
except KeyError:
logger.warning("No Subscriber")
@staticmethod
def parse_uri_query(uri_query):
"""
parse the conditional parameters for the conditional observing
:return: a map with pairs [parameter, value]
"""
dict_att = {}
print(uri_query)
attributes = uri_query.split(";")
for att in attributes:
a = att.split("=")
if len(a) > 1:
if str(a[0]) == "band":
a[1] = bool(a[1])
if a[1].isdigit():
a[1] = int(a[1])
dict_att[str(a[0])] = a[1]
else:
dict_att[str(a[0])] = a[0]
print (dict_att)
return dict_att
@staticmethod
def verify_conditions(item):
"""
checks if the changed resource requires a notification
:param item: ObserveItem
:return: Boolean
"""
for cond in item.conditions:
if cond == "pmin":
# CURRENT TIME - TIMESTAMP < PMIN
t = int(time.time() - item.last_notify)
if t < int(item.conditions[cond]):
return False
return True