Skip to content

Commit

Permalink
[Broker] Better Timing (#65)
Browse files Browse the repository at this point in the history
* Tracing

* [Broker] Better timing + stable SIL
  • Loading branch information
victoryforphil authored Nov 23, 2024
1 parent a678bf8 commit 37443ad
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 88 deletions.
12 changes: 7 additions & 5 deletions lil-gcs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use lil_link::common::types::request_mode_set::QuadArmRequest;
use lil_link::common::types::request_takeoff::QuadTakeoffRequest;
use rmp_serde::to_vec_named;
use serde::Deserialize;
use victory_broker::broker::time::BrokerTime;
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -49,14 +50,15 @@ pub struct TCPNodeSubscriber {
impl BrokerTask for TCPNodeSubscriber {
fn get_config(&self) -> victory_broker::task::config::BrokerTaskConfig {
BrokerTaskConfig::new("gcs-server")
.with_trigger(BrokerTaskTrigger::Rate(Timespan::new_hz(50.0)))
.with_trigger(BrokerTaskTrigger::Rate(Timespan::new_hz(10.0)))
.with_subscription(BrokerTaskSubscription::new_updates_only(&TopicKey::from_str("")))
.with_flag(BrokerCommanderFlags::NonBlocking)
}

fn on_execute(
&mut self,
inputs: &victory_data_store::database::view::DataView,
_: &BrokerTime,
) -> Result<victory_data_store::database::view::DataView, anyhow::Error> {
let data_map = inputs.maps.clone();
for (topic, value) in data_map.iter() {
Expand Down Expand Up @@ -137,7 +139,7 @@ async fn main() {
tokio::spawn(webserver::websocket_server_task(tcp_tx.clone(), ws_tx));

fmt()
.with_max_level(Level::DEBUG)
.with_max_level(Level::INFO)
.with_target(true)
.pretty()
.compact()
Expand All @@ -158,7 +160,7 @@ async fn main() {

let client = client.unwrap();

let client_handle = Arc::new(Mutex::new(client));
let client_handle = Arc::new(tokio::sync::Mutex::new(client));

let command_queue = Arc::new(Mutex::new(DataView::new()));
let sub_task = TCPNodeSubscriber {
Expand Down Expand Up @@ -301,7 +303,7 @@ async fn main() {
tokio::spawn(async move {
loop {
// thread::sleep(Duration::from_secs_f32(2.0));
tokio::time::sleep(Duration::from_millis(20)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
{
let mut map = sub_task_handle.lock().unwrap();
// Print the updates
Expand Down Expand Up @@ -335,7 +337,7 @@ async fn main() {

node.init().unwrap();
loop {
tokio::time::sleep(Duration::from_millis(5)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
match node.tick() {
Ok(_) => (),
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion lil-launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ lil-quad = { path = "../lil-quad" }
lil-link = { path = "../lil-link" }
tokio = { version = "1.41.1", features = ["full"] }

tracing-tracy = { version = "0.11.3", features = ["callstack-inlines", "code-transfer", "delayed-init", "sampling", "system-tracing"] }
tracing-tracy = { version = "0.11.3", features = [ "enable", "callstack-inlines", "code-transfer", "delayed-init", "sampling", "system-tracing"] }
84 changes: 53 additions & 31 deletions lil-launcher/bin_tests/quad_sil.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::sync::Mutex;

use std::time::Duration;

use lil_link::common::identifiers::IDENT_BASE_STATUS;
Expand Down Expand Up @@ -45,21 +45,35 @@ struct SILArgs {

#[clap(short, long, default_value = "3000")]
port: u16,

#[clap(long, help = "Enable tracing output")]
tracing: bool,
}
use tracing_subscriber::layer::SubscriberExt;


#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_target(true)
.pretty()
.compact()
.with_file(false)
.with_line_number(false)
.without_time()
.init();


let args = SILArgs::parse();

if !args.tracing {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_target(true)
.pretty()
.compact()
.with_file(false)
.with_line_number(false)
.without_time()
.init();
} else {
tracing::subscriber::set_global_default(
tracing_subscriber::registry().with(tracing_tracy::TracyLayer::default())
).expect("setup tracy layer");
}

info!("Running 'quad_sil' with args: {:#?}", args);

let bind_addr = format!("{}:{}", args.address, args.port);
Expand All @@ -68,7 +82,7 @@ async fn main() {
// Create broker with TCP server
let server = TcpBrokerServer::new(&bind_addr).await.unwrap();
let mut broker = Broker::new(LinearBrokerCommander::new());
broker.add_adapter(Arc::new(Mutex::new(server)));
broker.add_adapter(Arc::new(tokio::sync::Mutex::new(server)));

// Create channel adapter pair for local node
let (adapter_a, adapter_b) = victory_broker::adapters::channel::ChannelBrokerAdapter::new_pair();
Expand All @@ -80,51 +94,59 @@ async fn main() {

// Add systems as tasks
let mavlink_sys = QuadlinkSystem::new_from_connection_string(args.connection_string.as_str()).unwrap();
node.add_task(Arc::new(Mutex::new(mavlink_sys))).unwrap();
node.add_task(Arc::new(std::sync::Mutex::new(mavlink_sys))).unwrap();

let health_check = HealthCheck::new(HealthCheckConfig {
check_ekf: Some(true),
});
node.add_task(Arc::new(Mutex::new(health_check))).unwrap();
node.add_task(Arc::new(std::sync::Mutex::new(health_check))).unwrap();

let arm_task = ConditionTask::new(
"arm".to_string(),
let stabilize_task = ConditionTask::new(
"stabilize".to_string(),
TopicKey::from_str(&format!(
"{}/{}/healthy",
IDENT_BASE_STATUS, IDENT_STATUS_HEALTH
)),
Some(victory_data_store::primitives::Primitives::Boolean(true)),
Tasks::Arm,
Tasks::SetMode(QuadMode::Stabilize),

);

let arm_task = TimedTask::new(
"arm".to_string(),
Timespan::new_secs(3.0),
Tasks::Arm,
);

let mode_task = TimedTask::new(
"mode".to_string(),
Timespan::new_secs(2.0),
Timespan::new_secs(3.0),
Tasks::SetMode(QuadMode::Guided),
);

let takeoff_task = TimedTask::new(
"takeoff".to_string(),
Timespan::new_secs(2.0),
Tasks::Takeoff(11.0),
Timespan::new_secs(5.0),
Tasks::Takeoff(2.0),
);

let waypoint_task = TimedTask::new(
"waypoint".to_string(),
Timespan::new_secs(10.0),
Tasks::Waypoint(QuadPoseNED::new_xyz(0.0, 5.0, -10.0)),
Timespan::new_secs(5.0),
Tasks::Waypoint(QuadPoseNED::new_xyz(1.0, 1.0, -2.0)),
);

let waypoint_2_task = TimedTask::new(
"waypoint_2".to_string(),
Timespan::new_secs(10.0),
Tasks::Waypoint(QuadPoseNED::new_xyz(0.0, 0.0, -10.0)),
Timespan::new_secs(5.0),
Tasks::Waypoint(QuadPoseNED::new_xyz(-1.0, -1.0, -2.0)),
);

let land_task = TimedTask::new("land".to_string(), Timespan::new_secs(5.0), Tasks::Land);

let mission = vec![
TaskType::Condition(arm_task),
TaskType::Condition(stabilize_task),
TaskType::Timed(arm_task),
TaskType::Timed(mode_task),
TaskType::Timed(takeoff_task),
TaskType::Timed(waypoint_task),
Expand All @@ -133,24 +155,24 @@ async fn main() {
];

let mission_runner = MissionRunner::new(mission);
node.add_task(Arc::new(Mutex::new(mission_runner))).unwrap();
node.add_task(Arc::new(std::sync::Mutex::new(mission_runner))).unwrap();

let rerun_sys = RerunSystem::new("quad_sil".to_string());
node.add_task(Arc::new(Mutex::new(rerun_sys))).unwrap();

// Initialize node
if !args.tracing {
node.add_task(Arc::new(std::sync::Mutex::new(rerun_sys))).unwrap();
}

// Initialize node
node.init().unwrap();

// Spawn node thread
let node_handle = Arc::new(Mutex::new(node));
let node_handle = Arc::new(tokio::sync::Mutex::new(node));

let node_thread = tokio::spawn(async move {
loop {

// Run node tick
if let Err(e) = node_handle.lock().unwrap().tick() {
if let Err(e) = node_handle.try_lock().unwrap().tick() {
warn!("Node // Error: {:?}", e);
}

Expand All @@ -166,7 +188,7 @@ async fn main() {
let tick_start = Timepoint::now();

// Run broker tick
if let Err(e) = broker.tick(delay.clone()) {
if let Err(e) = broker.tick(delay.clone()).await {
warn!("Broker // Error: {:?}", e);
}

Expand Down
4 changes: 2 additions & 2 deletions lil-link/src/mavlink/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ impl MavLinkHelper {
pub fn request_stream() -> mavlink::ardupilotmega::MavMessage {
mavlink::ardupilotmega::MavMessage::REQUEST_DATA_STREAM(
mavlink::ardupilotmega::REQUEST_DATA_STREAM_DATA {
target_system: 1,
target_system: 0,
target_component: 0,
req_stream_id: 0,
req_message_rate: 10,
req_message_rate: 50,
start_stop: 1,
},
)
Expand Down
14 changes: 10 additions & 4 deletions lil-link/src/mavlink/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use log::{debug, info};
use victory_broker::task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask};
use victory_broker::{broker::time::BrokerTime, task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask}};
use victory_data_store::{database::view::DataView, topics::TopicKey};
use victory_wtf::Timespan;

Expand Down Expand Up @@ -61,10 +61,14 @@ impl BrokerTask for QuadlinkSystem{
.with_subscription(BrokerTaskSubscription::new_updates_only(
&TopicKey::from_str("cmd")
))
// cmd/waypoint is new_latest
.with_subscription(BrokerTaskSubscription::new_latest(
&TopicKey::from_str("cmd/waypoint")
))
}

fn on_execute(&mut self, inputs: &DataView) -> Result<DataView, anyhow::Error> {
let mut output = DataView::new();
fn on_execute(&mut self, inputs: &DataView, timing: &BrokerTime) -> Result<DataView, anyhow::Error> {
let mut output = DataView::new_timed(timing.time_monotonic.clone());

#[allow(unused_variables)]
let mut msgs = vec![];
Expand Down Expand Up @@ -195,7 +199,9 @@ impl BrokerTask for QuadlinkSystem{
None => {}
}
}
_ => {}
_ => {

}
}

Ok(output)
Expand Down
6 changes: 3 additions & 3 deletions lil-quad/src/systems/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use lil_link::common::{
};
use log::info;
use serde::{Deserialize, Serialize};
use victory_broker::task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask};
use victory_broker::{broker::time::BrokerTime, task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask}};
use victory_data_store::{database::view::DataView, topics::TopicKey};
use victory_wtf::Timespan;

Expand Down Expand Up @@ -53,7 +53,7 @@ impl BrokerTask for HealthCheck {
.with_subscription(BrokerTaskSubscription::new_latest(&QuadHealthStatus::get_topic_key()))
}

fn on_execute(&mut self, inputs: &DataView) -> Result<DataView, anyhow::Error> {
fn on_execute(&mut self, inputs: &DataView, timing: &BrokerTime) -> Result<DataView, anyhow::Error> {
let ekf_status: QuadEkfStatus = inputs
.get_latest(&TopicKey::from_str(&format!(
"{}/{}",
Expand All @@ -66,7 +66,7 @@ impl BrokerTask for HealthCheck {
Err(_) => QuadHealthStatus::new(false, None),
};

let mut out = DataView::new();
let mut out = DataView::new_timed(timing.time_monotonic.clone());

match (self.config.check_ekf, ekf_status.is_healthy()) {
(Some(true), Err(msg)) => {
Expand Down
15 changes: 8 additions & 7 deletions lil-quad/src/systems/mission_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lil_link::common::{
use log::{info, warn};
use task::{TaskType, Tasks};

use victory_broker::task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask};
use victory_broker::{broker::time::BrokerTime, task::{config::BrokerTaskConfig, subscription::BrokerTaskSubscription, trigger::BrokerTaskTrigger, BrokerTask}};
use victory_data_store::{database::view::DataView, primitives::Primitives, topics::TopicKey};
use victory_wtf::{Timepoint, Timespan};

Expand Down Expand Up @@ -129,18 +129,18 @@ impl BrokerTask for MissionRunner {
}
}
let mut config = BrokerTaskConfig::new("mission_runner")
.with_trigger(BrokerTaskTrigger::Rate(Timespan::new_hz(10.0)));
.with_trigger(BrokerTaskTrigger::Always);

for topic in &subbed_conditions {
config.add_subscription(BrokerTaskSubscription::new_latest(topic));
}
config.clone()
}

fn on_execute(&mut self, inputs: &DataView) -> Result<DataView, anyhow::Error> {
let mut out = DataView::new();
let dt = Timespan::from_duration(Duration::from_millis(100));
self.current_time = self.current_time.clone() + dt;
fn on_execute(&mut self, inputs: &DataView, timing: &BrokerTime) -> Result<DataView, anyhow::Error> {
let mut out = DataView::new_timed(timing.time_monotonic.clone());

self.current_time = timing.time_monotonic.clone();

if self.current_idx >= self.tasks.len() {
return Ok(out);
Expand All @@ -150,7 +150,7 @@ impl BrokerTask for MissionRunner {

match current_task {
TaskType::Timed(task) => {
let time_since = self.current_time.clone()
let time_since = timing.time_monotonic.clone()
- self.last_executed.clone().unwrap_or(Timepoint::zero());
if time_since.secs() >= task.duration.secs() {
info!("Running timed task {}", task.name);
Expand All @@ -174,6 +174,7 @@ impl BrokerTask for MissionRunner {
passed = inputs.get_latest::<_, Primitives>(&task.topic).is_ok();
}
if passed {
info!("Running condition task {}", task.name);
self.run_task(task.name.clone(), task.task.clone(), &mut out);
}
}
Expand Down
Loading

0 comments on commit 37443ad

Please sign in to comment.