-
Notifications
You must be signed in to change notification settings - Fork 0
/
OnAir.py
194 lines (161 loc) · 7.56 KB
/
OnAir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# OnAir.py 19/05/2016 D.J.Whale
#
# A set of adaptors to allow device classes to interact with radio interfaces.
# At the moment, the main purpose of this abstraction is to prevent the need
# for device classes to know specifics about radio technology and focus
# on the device aspects only.
#
# In the future, this will be a useful point in the architecture to add
# an intelligent message scheduler, that learns the report patterns of
# specific devices, builds a timeslotted schedule, and schedules transmits
# into mostly free slots.
#
# NOTE: This also might include intelligent power level selection based
# on RSSI reports from different devices.
import time
# Python 2
import OpenThings
import TwoBit
import radio
from lifecycle import *
class OpenThingsAirInterface():
def __init__(self):
self.radio = radio # aids mocking later
class RadioDefaults():
frequency = 433.92
modulation = radio.RADIO_MODULATION_FSK
class TxDefaults(RadioDefaults):
##power_level = 0
inner_times = 4
outer_delay = 0
outer_times = 1
self.tx_defaults = TxDefaults()
class RxDefaults(RadioDefaults):
poll_rate = 100 #ms
timeout = 1000 #ms
self.rx_defaults = RxDefaults()
##@log_method
def send(self, payload, radio_config=None):
# payload is a pydict suitable for OpenThings
# radio_params is an overlay on top of radio tx defaults
p = OpenThings.encode(payload)
# Set radio defaults, if no override
outer_times = self.tx_defaults.outer_times
outer_delay = self.tx_defaults.outer_delay
inner_times = self.tx_defaults.inner_times
# Merge any wanted radio params, if provided
if radio_config != None:
try:
outer_times = radio_config.outer_times
except AttributeError: pass
try:
outer_delay = radio_config.outer_delay
except AttributeError: pass
try:
inner_times = radio_config.inner_times
except AttributeError: pass
radio.transmitter(fsk=True)
##print("inner times %s" % inner_times)
radio.transmit(p, outer_times=outer_times, inner_times=inner_times, outer_delay=outer_delay)
# radio auto-returns to previous state after transmit completes
##@log_method
def receive(self, radio_config=None): # -> (radio_measurements, address or None, payload or None)
# radio_params is an overlay on top of radio rx defaults (e.g. poll rate, timeout, min payload, max payload)
# radio_measurements might include rssi reading, short payload report, etc
pass # TODO
#TODO: set radio to receive mode
#TODO: merge radio_params with self.tx_defaults
#TODO: configure radio modulation based on merged params
#TODO: poll radio at rate until timeout or received
#TODO: start timeout timer
payload = None
radio.receiver(fsk=True)
while True: # timer not expired
if radio.is_receive_waiting():
payload = radio.receive() #TODO: payload, radio_measurements = radio.receive()
now = time.time()
p = OpenThings.decode(payload, receive_timestamp=now)
#TODO: if crc failure, report it, but keep trying
#if crc check passes...
break
#TODO: inter-try delay
#TODO: return radio to state it was before receiver (e.g. standby) - radio needs a pop() on this too?
if payload == None: # nothing received in timeout
return (None, None, None) # (radio_measurements, address, payload) #TODO: might be measurements, average min max?
#TODO: extract addresses: header_manufacturerid, header_productid header_deviceid -> (m, p, d)
m, p, d = None, None, None
radio_measurements = None #TODO: get from radio.receive()
address = (m, p, d)
return (radio_measurements, address, payload)
class TwoBitAirInterface():
def __init__(self):
self.radio = radio # aids mocking later
class RadioDefaults():
frequency = 433.92
modulation = radio.RADIO_MODULATION_OOK
class TxDefaults(RadioDefaults):
power_level = 0
inner_times = 8
outer_delay = 0
outer_times = 1
self.tx_defaults = TxDefaults()
class RxDefaults(RadioDefaults):
poll_rate = 100 #ms
timeout = 1000 #ms
self.rx_defaults = RxDefaults()
##@log_method
def send(self, payload, radio_config=None):
# payload is just a list of bytes, or a byte buffer
# radio_config is an overlay on top of radio tx defaults
house_address = payload["house_address"]
device_index = payload["device_index"]
state = payload["on"]
bytes = TwoBit.encode_switch_message(state, device_index, house_address)
radio.modulation(ook=True)
# Set radio defaults, if no override
outer_times = self.tx_defaults.outer_times
outer_delay = self.tx_defaults.outer_delay
inner_times = self.tx_defaults.inner_times
# Merge any wanted radio params, if provided
if radio_config != None:
try:
outer_times = radio_config.outer_times
except AttributeError: pass
try:
outer_delay = radio_config.outer_delay
except AttributeError: pass
try:
inner_times = radio_config.inner_times
except AttributeError: pass
##print("inner times %s" % inner_times)
radio.transmit(bytes, outer_times=outer_times, inner_times=inner_times, outer_delay=outer_delay)
# radio auto-pops to state before transmit
##@log_method
def receive(self, radio_config=None): # -> (radio_measurements, address or None, payload or None)
# radio_params is an overlay on top of radio rx defaults (e.g. poll rate, timeout, min payload, max payload)
# radio_measurements might include rssi reading, short payload report, etc
#TODO: merge radio_params with self.tx_defaults
#TODO: configure radio modulation based on merged params
#TODO: poll radio at rate until timeout or received
#TODO: start timeout timer
payload = None
radio.receiver(ook=True)
while True: # timer not expired
if radio.is_receive_waiting():
#TODO: radio config should set receive preamble 4 bytes to prevent false triggers
payload = radio.receive(size=12) #TODO: payload, radio_measurements = radio.receive()
p = TwoBit.decode(payload)
#TODO: if failure, report it, but keep trying
#if check passes...
break
#TODO: inter-try delay
#TODO: return radio to state it was before receiver (e.g. standby) - radio needs a pop() on this too?
if payload == None: # nothing received in timeout
return (None, None, None) # (radio_measurements, address, payload) #TODO: might be measurements, average min max?
#TODO: extract addresses (house_address, device_index)
radio_measurements = None #TODO: return this from radio.receive()
h = 0xC8C8C #TODO: Get house address from TwoBit.decode()[:10]
d = 0xEE #TODO: Get device command from TwoBit.decode()[11:12]
address = (h, d)
return (radio_measurements, address, payload)
# END