-
Notifications
You must be signed in to change notification settings - Fork 35
/
coordinator.py
231 lines (187 loc) · 8.03 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#!/usr/bin/env python3
"""
Coordinator Application for Drone Management
This script initializes and manages various components related to drone operations,
including MAVLink communication, mission scheduling. It also
provides LED feedback based on the drone's state to aid field operations.
Author: Alireza Ghaderi
GitHub Repository: https://github.com/alireza787b
Date: September 2024
"""
import os
import sys
import time
import threading
import datetime
import logging
import sdnotify # For systemd watchdog notifications
import asyncio # Needed for async functions
# Import necessary modules and classes
from src.drone_config import DroneConfig
from src.local_mavlink_controller import LocalMavlinkController
from src.drone_communicator import DroneCommunicator
from src.drone_setup import DroneSetup
from src.params import Params
from src.mavlink_manager import MavlinkManager
from src.flask_handler import FlaskHandler
from src.led_controller import LEDController # Import LEDController
# For log rotation
from logging.handlers import RotatingFileHandler
# Set up logging directory and configuration
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# Get current datetime to use in the filename
now = datetime.datetime.now()
current_time = now.strftime("%Y-%m-%d_%H-%M-%S")
log_filename = os.path.join(LOG_DIR, f'{current_time}.log')
# Set up logging with rotation
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create handlers
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
file_handler = RotatingFileHandler(
log_filename, maxBytes=5 * 1024 * 1024, backupCount=5
) # 5 MB per file, keep 5 backups
file_handler.setLevel(logging.DEBUG)
# Create formatter and add it to handlers
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# Global variables
mavlink_manager = None
global_telemetry = {} # Store telemetry data
run_telemetry_thread = threading.Event()
run_telemetry_thread.set()
drones = {} # Dictionary to store drone information
params = Params() # Global parameters instance
drone_config = DroneConfig(drones) # Initialize DroneConfig
drone_comms = None # Initialize drone_comms as None
drone_setup = None # Initialize drone_setup as None
# Initialize LEDController only if not in simulation mode
if not Params.sim_mode:
try:
led_controller = LEDController.get_instance()
except Exception as e:
logger.error("Failed to initialize LEDController: %s", e)
else:
led_controller = None # Or use a mock controller if needed
# Systemd watchdog notifier
notifier = sdnotify.SystemdNotifier()
def schedule_missions_thread(drone_setup_instance):
"""
Thread target function to schedule missions asynchronously.
"""
asyncio.run(schedule_missions_async(drone_setup_instance))
async def schedule_missions_async(drone_setup_instance):
"""
Asynchronous function to schedule missions at a specified frequency.
"""
while True:
await drone_setup_instance.schedule_mission()
await asyncio.sleep(1.0 / params.schedule_mission_frequency)
def main_loop():
"""
Main loop of the coordinator application.
"""
global mavlink_manager, drone_comms, drone_setup # Declare as global variables
try:
logger.info("Starting the main loop...")
# Set LEDs to Blue to indicate initialization in progress
LEDController.set_color(0, 0, 255) # Blue
logger.info("After intial LED set color...")
# Synchronize time if enabled
if params.online_sync_time:
drone_setup.synchronize_time()
logger.info("Time synchronized.")
# Initialization successful
LEDController.set_color(0, 255, 0) # Green
logger.info("Initialization successful. MAVLink is ready.")
# Start mission scheduling thread
scheduling_thread = threading.Thread(target=schedule_missions_thread, args=(drone_setup,))
scheduling_thread.start()
logger.info("Mission scheduling thread started.")
# Variable to track the last state value
last_state_value = None
while True:
current_time = time.time()
# Notify systemd watchdog
notifier.notify("WATCHDOG=1")
# Check drone state and update LEDs accordingly
current_state = drone_config.state
if current_state != last_state_value:
last_state_value = current_state
logger.info(f"Drone state changed to {current_state}")
if current_state == 0:
# Idle state on ground
LEDController.set_color(0, 0, 255) # Blue
logger.debug("Drone is idle on ground (state == 0).")
elif current_state == 1:
# Trigger time received; ready to fly
LEDController.set_color(255, 165, 0) # Orange
logger.debug("Trigger time received. Drone is ready to fly (state == 1).")
elif current_state == 2:
# Maneuver started; stop changing LEDs
logger.info("Maneuver started (state == 2). LED control handed over to drone show script.")
# Do not change LEDs anymore; drone show script will take over
else:
# Unknown state; set LEDs to Red
LEDController.set_color(255, 0, 0) # Red
logger.warning(f"Unknown drone state: {current_state}")
time.sleep(params.sleep_interval) # Sleep for defined interval
except Exception as e:
logger.error(f"An error occurred in main loop: {e}", exc_info=True)
LEDController.set_color(255, 0, 0) # Red for error state
finally:
logger.info("Closing threads and cleaning up...")
if mavlink_manager:
mavlink_manager.terminate() # Terminate MavlinkManager
logger.info("MAVLink manager terminated.")
if drone_comms:
drone_comms.stop_communication()
logger.info("Drone communication stopped.")
# Optionally, turn off LEDs or set to a default color
# LEDController.turn_off()
def main():
"""
Main function to start the coordinator application.
"""
global drone_comms, drone_setup , mavlink_manager # Declare as global variables
logger.info("Starting the coordinator application...")
# Initialize MAVLink communication
mavlink_manager = MavlinkManager(params, drone_config)
logger.info("Initializing MAVLink...")
mavlink_manager.initialize()
time.sleep(2) # Wait for initialization
# Initialize LocalMavlinkController
local_drone_controller = LocalMavlinkController(drone_config, params, False)
logger.info("LocalMavlinkController initialized.")
# Step 1: Initialize DroneCommunicator and FlaskHandler without dependencies
drone_comms = DroneCommunicator(drone_config, params, drones)
flask_handler = FlaskHandler(params, drone_config)
# Step 2: Inject the dependencies afterward (setters)
drone_comms.set_flask_handler(flask_handler)
logger.info("DroneCommunicator's FlaskHandler set.")
flask_handler.set_drone_communicator(drone_comms)
logger.info("FlaskHandler's DroneCommunicator set.")
# Step 3: Start DroneCommunicator communication
drone_comms.start_communication()
logger.info("DroneCommunicator communication started.")
# Step 4: Start Flask HTTP server if enabled
if params.enable_drones_http_server:
flask_thread = threading.Thread(target=flask_handler.run, daemon=True)
flask_thread.start()
logger.info("Flask HTTP server started.")
# Step 5: Initialize DroneSetup
drone_setup = DroneSetup(params, drone_config)
logger.info("DroneSetup initialized.")
# Step 6: Start the main loop
main_loop()
if __name__ == "__main__":
main()