From b7f5ddaee57f06554f20eaeb5179b3284df303ce Mon Sep 17 00:00:00 2001 From: hices Date: Thu, 7 Nov 2024 23:37:09 -0800 Subject: [PATCH] Tried making threading better --- lil-gcs/src/main.rs | 275 +++++++++++++++++++++++++------------------- 1 file changed, 155 insertions(+), 120 deletions(-) diff --git a/lil-gcs/src/main.rs b/lil-gcs/src/main.rs index e732aaad7..c62048592 100644 --- a/lil-gcs/src/main.rs +++ b/lil-gcs/src/main.rs @@ -13,12 +13,12 @@ use std::thread; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; +use clap::Parser; use serde::Serialize; use tracing::info; use tracing::warn; use tracing::Level; use tracing_subscriber::fmt; -use clap::{Parser, ValueEnum}; use victory_broker::adapters::tcp::TCPClientAdapter; use victory_broker::adapters::tcp::TCPClientOptions; @@ -34,15 +34,17 @@ mod webserver; pub struct TCPNodeSubscriber { map: BTreeMap, - // update: Vec<(String, String )> + update: BTreeMap, } impl SubCallback for TCPNodeSubscriber { fn on_update(&mut self, datapoints: &victory_data_store::datapoints::DatapointMap) { - // info!("Datapoints: {:?}", datapoints.len()); for (topic, datapoint) in datapoints.iter() { self.map .insert(topic.display_name(), format!("{:?}", datapoint.value)); + + self.update + .insert(topic.display_name(), format!("{:?}", datapoint.value)); } } } @@ -56,7 +58,13 @@ struct DataLine { #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct SILArgs { - #[clap(short, long, value_parser, help = "Publishing connection string", default_value = "localhost:7001")] + #[clap( + short, + long, + value_parser, + help = "Publishing connection string", + default_value = "localhost:7001" + )] connection: String, } @@ -92,7 +100,7 @@ async fn main() { tokio::spawn(webserver::websocket_server_task(tcp_tx.clone(), ws_tx)); fmt() - .with_max_level(Level::DEBUG) + .with_max_level(Level::INFO) .with_target(true) .pretty() .compact() @@ -123,7 +131,9 @@ async fn main() { let subscriber = TCPNodeSubscriber { map: BTreeMap::new(), + update: BTreeMap::new(), }; + let subscriber_handle = Arc::new(Mutex::new(subscriber)); let topic_key = TopicKey::from_str(""); @@ -133,141 +143,166 @@ async fn main() { let subscriber_handle_clone = subscriber_handle.clone(); let tcp_tx_clone = tcp_tx.clone(); + let subscriber_handle_clone_ws = subscriber_handle.clone(); + let tcp_tx_clone_ws = tcp_tx.clone(); + tokio::spawn(async move { - tokio::spawn(async move { - loop { - thread::sleep(Duration::from_millis(100)); - if let Some(ws_msg) = ws_rx.recv().await { - println!("{:#?}", ws_msg); + loop { + if let Some(ws_msg) = ws_rx.recv().await { + info!("WS MESSAGE -> {:#?}", ws_msg); - // Need to parse from json or something a topic and a value. It would be great if I could parse a normal victory value. + // Need to parse from json or something a topic and a value. It would be great if I could parse a normal victory value. - // I thought this was a scope thing at some point. - let mut datastore = datastore.lock().expect("Failed to lock mutex"); + // I thought this was a scope thing at some point. + let mut datastore = datastore.lock().expect("Failed to lock mutex"); - let msg_topic: String; - let mut msg_value: Option = None; + let msg_topic: String; + let mut msg_value: Option = None; - if let Some((complex_msg, complex_value)) = parse_basic_message(&ws_msg) { - msg_topic = complex_msg; - msg_value = Some(complex_value); - } else { - msg_topic = ws_msg; - } + if let Some((complex_msg, complex_value)) = parse_basic_message(&ws_msg) { + msg_topic = complex_msg; + msg_value = Some(complex_value); + } else { + msg_topic = ws_msg; + } - match msg_topic.as_str() { - "ARM" => { - info!("ARM the drone!"); - let arm_request = QuadArmRequest::new(true); - datastore - .add_struct( - &TopicKey::from_str("cmd/arm"), - Timepoint::now(), - arm_request, - ) - .unwrap(); - } - "DISARM" => { - info!("Disarm the drone"); - let arm_request = QuadArmRequest::new(false); - datastore - .add_struct( - &TopicKey::from_str("cmd/arm"), - Timepoint::now(), - arm_request, - ) - .unwrap(); - } - "TAKEOFF" => { - info!("Takeoff requested"); - // Hard coded to 10.0 for now. - let arm_request = QuadTakeoffRequest::new(10.0); - datastore - .add_struct( - &TopicKey::from_str("cmd/takeoff"), - Timepoint::now(), - arm_request, - ) - .unwrap(); - } - "LAND" => { - info!("Land Requested"); - let arm_request = QuadLandRequest::new(); - datastore - .add_struct( - &TopicKey::from_str("cmd/land"), - Timepoint::now(), - arm_request, - ) - .unwrap(); - } - "MODE" => { - if let Some(value) = msg_value { - if let Ok(mode) = QuadMode::from_str(&value) { - println!("Setting new mode now {0}", mode); - let mode_req = QuadSetModeRequest::new(mode); - datastore - .add_struct( - &TopicKey::from_str("cmd/mode/mode"), - Timepoint::now(), - mode_req, - ) - .unwrap(); - } + match msg_topic.as_str() { + "NEW_CLIENT" => { + // info!("GOT A NEW CLIENT YALL"); + + { + let mut map = subscriber_handle_clone_ws.lock().unwrap(); + + let data = map + .map + .iter() + .map(|(topic, datapoint)| DataLine { + topic: topic.to_string(), + datapoint: datapoint.to_string(), + }) + .collect::>(); + + map.update.clear(); + + info!("Sending data? {0}", data.len()); + + let message = WebMessage { + timestamp: get_current_timestamp(), + data, + }; + + if let Ok(data_out) = to_vec_named(&message) { + let _ = tcp_tx_clone_ws.send(data_out); // Ignore if no clients are connected + } else { + warn!("Failed to MsgPack the DataStore Map") } } - _ => { - warn!("Unknown command: {}", msg_topic); + } + "ARM" => { + info!("ARM the drone!"); + let arm_request = QuadArmRequest::new(true); + datastore + .add_struct( + &TopicKey::from_str("cmd/arm"), + Timepoint::now(), + arm_request, + ) + .unwrap(); + } + "DISARM" => { + info!("Disarm the drone"); + let arm_request = QuadArmRequest::new(false); + datastore + .add_struct( + &TopicKey::from_str("cmd/arm"), + Timepoint::now(), + arm_request, + ) + .unwrap(); + } + "TAKEOFF" => { + info!("Takeoff requested"); + // Hard coded to 10.0 for now. + let arm_request = QuadTakeoffRequest::new(10.0); + datastore + .add_struct( + &TopicKey::from_str("cmd/takeoff"), + Timepoint::now(), + arm_request, + ) + .unwrap(); + } + "LAND" => { + info!("Land Requested"); + let arm_request = QuadLandRequest::new(); + datastore + .add_struct( + &TopicKey::from_str("cmd/land"), + Timepoint::now(), + arm_request, + ) + .unwrap(); + } + "MODE" => { + if let Some(value) = msg_value { + if let Ok(mode) = QuadMode::from_str(&value) { + println!("Setting new mode now {0}", mode); + let mode_req = QuadSetModeRequest::new(mode); + datastore + .add_struct( + &TopicKey::from_str("cmd/mode/mode"), + Timepoint::now(), + mode_req, + ) + .unwrap(); + } } } + _ => { + warn!("Unknown command: {}", msg_topic); + } } } - }); + } + }); + tokio::spawn(async move { loop { thread::sleep(Duration::from_secs_f32(2.0)); - let map = subscriber_handle_clone.lock().unwrap(); - - let data = map - .map - .iter() - .map(|(topic, datapoint)| DataLine { - topic: topic.to_string(), - datapoint: datapoint.to_string(), - }) - .collect::>(); - - let message = WebMessage { - timestamp: get_current_timestamp(), - data, - }; - - if let Ok(data_out) = to_vec_named(&message) { - let _ = tcp_tx_clone.send(data_out); // Ignore if no clients are connected - } else { - warn!("Failed to MsgPack the DataStore Map") + { + let mut map = subscriber_handle_clone.lock().unwrap(); + + if !map.update.is_empty() { + let data = map + .update + .iter() + .map(|(topic, datapoint)| DataLine { + topic: topic.to_string(), + datapoint: datapoint.to_string(), + }) + .collect::>(); + + map.update.clear(); + + info!("Sending data? {0}", data.len()); + + let message = WebMessage { + timestamp: get_current_timestamp(), + data, + }; + + if let Ok(data_out) = to_vec_named(&message) { + let _ = tcp_tx_clone.send(data_out); // Ignore if no clients are connected + } else { + warn!("Failed to MsgPack the DataStore Map") + } + } } } }); - let mut start_time = Timepoint::now(); - let mut fired = false; + loop { thread::sleep(Duration::from_millis(100)); node.tick(); - - /* - let elapsed = Timepoint::now() - start_time.clone(); - if elapsed.secs() > 1.0 && !fired { - fired = true; - info!("Fired!"); - // Send arm command - let arm_command = QuadArmRequest::new(true); - let topic = TopicKey::from_str("cmd/arm"); - datastore - .lock() - .unwrap() - .add_struct(&topic, Timepoint::now(), arm_command) - .unwrap(); - } */ } }