-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathHackRfBroadcastThread.py
234 lines (185 loc) · 10.6 KB
/
HackRfBroadcastThread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
""" This class holds the aircraft states from the ADS-B point of view
It is refreshed by the simulation thread (or sensor feed thread) and will
be used to provide broadcasted informations
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
#
# This class overrides threading.Thread and provides service to broacast
# ADS-B message though a HackRF device
# message updates are performed from a separate thread which will
# update/push messages thanks to the replace_message method
# thread loop will pump and broacast updated message (soft realtime)
#
# mutex protection mecanism is implemented in
# replace_message() which is call from other thread
# broadcast_one_message() which is called from this thread
# in order to prevent concurrent access to broadcasted data buffers
import time, datetime, math
import threading
from CustomDecorators import *
from ADSBLowLevelEncoder import ADSBLowLevelEncoder
from pyhackrf import *
from ctypes import *
class hackrf_tx_context(Structure):
_fields_ = [("buffer", POINTER(c_ubyte)),
("last_tx_pos", c_int),
("buffer_length", c_int) ]
def hackrfTXCB(hackrf_transfer):
user_tx_context = cast(hackrf_transfer.contents.tx_ctx, POINTER(hackrf_tx_context))
tx_buffer_length = hackrf_transfer.contents.valid_length
left = user_tx_context.contents.buffer_length - user_tx_context.contents.last_tx_pos
addr_dest = addressof(hackrf_transfer.contents.buffer.contents)
addr_src = addressof(user_tx_context.contents.buffer.contents)
if (left > tx_buffer_length):
memmove(addr_dest,addr_src,tx_buffer_length)
user_tx_context.contents.last_tx_pos += tx_buffer_length
return 0
else:
memmove(addr_dest,addr_src,left)
memset(addr_dest+left,0,tx_buffer_length-left)
return -1
@Singleton
class HackRfBroadcastThread(threading.Thread):
def __init__(self,airborne_position_refresh_period = 150000):
super().__init__()
self._mutex = threading.Lock()
self._lowlevelencoder = ADSBLowLevelEncoder()
self._messages_feed_threads = {}
# Initialize pyHackRF library
result = HackRF.initialize()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
# Initialize HackRF instance (could pass board serial or index if specific board is needed)
self._hackrf_broadcaster = HackRF()
# Do requiered settings
# so far hard-coded e.g. gain and disabled amp are specific to hardware test setup
# with hackrf feeding a flight aware dongle through cable + attenuators (-50dB)
result = self._hackrf_broadcaster.open()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._hackrf_broadcaster.setCrystalPPM(0)
result = self._hackrf_broadcaster.setSampleRate(2000000)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setBasebandFilterBandwidth(HackRF.computeBaseBandFilterBw(2000000))
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#result = self.hackrf_broadcaster.setFrequency(868000000) # free frequency for over the air brodcast tests
result = self._hackrf_broadcaster.setFrequency(1090000000) # do not use 1090MHz for actual over the air broadcasting
# only if you use wire feed (you'll need attenuators in that case)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setTXVGAGain(4) # week gain (used for wire feed + attenuators)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setAmplifierMode(LibHackRfHwMode.HW_MODE_OFF)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._tx_context = hackrf_tx_context()
self._do_stop = False
# do hackRF lib and instance cleanup at object destruction time
def __del__(self):
result = self._hackrf_broadcaster.close()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = HackRF.deinitialize()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
def stop(self):
self._do_stop = True
def getMutex(self):
return self._mutex
# updates the next data to be broadcaster for a given message type
#@Timed
def replace_message(self,type,frame_even,frame_odd = []):
frame_IQ = self._lowlevelencoder.frame_1090es_ppm_modulate_IQ(frame_even, frame_odd)
# this will usually be called from another thread, so mutex lock mecanism is used during update
self._mutex.acquire()
calling_thread = threading.current_thread()
if calling_thread in self._messages_feed_threads:
self._messages_feed_threads[calling_thread][type][0] = frame_IQ
self._mutex.release()
def register_track_simulation_thread(self,feeder_thread):
if feeder_thread in self._messages_feed_threads:
print(feeder_thread,"already registred as a feeder")
else:
self._messages_feed_threads[feeder_thread] = {}
# key : "name of message" value : ["data to be broadcasted", datetime of last broadcast, delay_between 2 messages of this type]
self._messages_feed_threads[feeder_thread]["identification"] = [None, None, feeder_thread.identitification_message_period_us]
self._messages_feed_threads[feeder_thread]["register_6116"] = [None, None, feeder_thread.squawk_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["surface_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_velocity"] = [None, None, feeder_thread.velocity_message_period_us]
def broadcast_data(self,data):
length = len(data)
if length != 0:
sleep_time = length*0.50e-6*(1.0+1e-6*self._hackrf_broadcaster.getCrystalPPM())
self._tx_context.last_tx_pos = 0
self._mutex.acquire()
self._tx_context.buffer_length = length
self._tx_context.buffer = (c_ubyte*self._tx_context.buffer_length).from_buffer_copy(data)
# TODO : need to evaluate if mutex protection is requiered during full broadcast or
# could be reduced to buffer filling (probably can be reduced)
# reduced version is when next line mutex.release() is uncommented and
# mutex release at the end of this method is commented
self._mutex.release()
result = self._hackrf_broadcaster.startTX(hackrfTXCB,self._tx_context)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
while self._hackrf_broadcaster.isStreaming():
time.sleep(sleep_time)
result = self._hackrf_broadcaster.stopTX()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#self._mutex.release()
def run(self):
while not self._do_stop:
#self._mutex.acquire()
now = datetime.datetime.now(datetime.timezone.utc)
plane_messages = bytearray()
sleep_time = 10.0
for thread_broadcast_schedule in self._messages_feed_threads.values():
for v in thread_broadcast_schedule.values():
#now = datetime.datetime.now(datetime.timezone.utc)
v2_sec = v[2]*1e-6
if v[1] != None:
remaining = v2_sec - (now - v[1]).total_seconds()
else:
remaining = -float('inf')
sleep_time = 0.0
# Time throttling : messages are broadcasted only at provided time intervall
# TODO : implement UTC syncing mecanism (requiered that the actual host clock is UTC synced) ?
# which may be implemented to some accuracy level with ntp or GPS + PPS mecanisms ? in Python ?
if (v[0] != None and len(v[0]) > 0) and remaining <= 0.0:
plane_messages.extend(v[0])
v[1] = now
elif remaining > 0.0:
remaining = math.fmod(remaining,v2_sec)
if remaining < sleep_time:
sleep_time = remaining
#print("sleep_time1",sleep_time)
bc_length = len(plane_messages)
if (bc_length > 0):
self.broadcast_data(plane_messages)
elasped = (datetime.datetime.now(datetime.timezone.utc) - now).total_seconds()
sleep_time -= elasped
if sleep_time < 0.0:
sleep_time = 0.0
elif sleep_time < 0.5:
sleep_time *= 0.1
else:
sleep_time = 0.5
time.sleep(0.1*sleep_time)
else:
time.sleep(0.000001)
#self._mutex.release()
# upon exit, reset _do_stop flag in case there is a new start
self._do_stop = False