Skip to content

Commit

Permalink
Tried making threading better
Browse files Browse the repository at this point in the history
  • Loading branch information
HiceS committed Nov 8, 2024
1 parent 2dcf647 commit b7f5dda
Showing 1 changed file with 155 additions and 120 deletions.
275 changes: 155 additions & 120 deletions lil-gcs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use std::thread;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};

use clap::Parser;
use serde::Serialize;
use tracing::info;
use tracing::warn;
use tracing::Level;
use tracing_subscriber::fmt;
use clap::{Parser, ValueEnum};

use victory_broker::adapters::tcp::TCPClientAdapter;
use victory_broker::adapters::tcp::TCPClientOptions;
Expand All @@ -34,15 +34,17 @@ mod webserver;

pub struct TCPNodeSubscriber {
map: BTreeMap<String, String>,
// update: Vec<(String, String )>
update: BTreeMap<String, String>,
}

impl SubCallback for TCPNodeSubscriber {
fn on_update(&mut self, datapoints: &victory_data_store::datapoints::DatapointMap) {
// info!("Datapoints: {:?}", datapoints.len());
for (topic, datapoint) in datapoints.iter() {
self.map
.insert(topic.display_name(), format!("{:?}", datapoint.value));

self.update
.insert(topic.display_name(), format!("{:?}", datapoint.value));
}
}
}
Expand All @@ -56,7 +58,13 @@ struct DataLine {
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct SILArgs {
#[clap(short, long, value_parser, help = "Publishing connection string", default_value = "localhost:7001")]
#[clap(
short,
long,
value_parser,
help = "Publishing connection string",
default_value = "localhost:7001"
)]
connection: String,
}

Expand Down Expand Up @@ -92,7 +100,7 @@ async fn main() {
tokio::spawn(webserver::websocket_server_task(tcp_tx.clone(), ws_tx));

fmt()
.with_max_level(Level::DEBUG)
.with_max_level(Level::INFO)
.with_target(true)
.pretty()
.compact()
Expand Down Expand Up @@ -123,7 +131,9 @@ async fn main() {

let subscriber = TCPNodeSubscriber {
map: BTreeMap::new(),
update: BTreeMap::new(),
};

let subscriber_handle = Arc::new(Mutex::new(subscriber));

let topic_key = TopicKey::from_str("");
Expand All @@ -133,141 +143,166 @@ async fn main() {
let subscriber_handle_clone = subscriber_handle.clone();
let tcp_tx_clone = tcp_tx.clone();

let subscriber_handle_clone_ws = subscriber_handle.clone();
let tcp_tx_clone_ws = tcp_tx.clone();

tokio::spawn(async move {
tokio::spawn(async move {
loop {
thread::sleep(Duration::from_millis(100));
if let Some(ws_msg) = ws_rx.recv().await {
println!("{:#?}", ws_msg);
loop {
if let Some(ws_msg) = ws_rx.recv().await {
info!("WS MESSAGE -> {:#?}", ws_msg);

// Need to parse from json or something a topic and a value. It would be great if I could parse a normal victory value.
// Need to parse from json or something a topic and a value. It would be great if I could parse a normal victory value.

// I thought this was a scope thing at some point.
let mut datastore = datastore.lock().expect("Failed to lock mutex");
// I thought this was a scope thing at some point.
let mut datastore = datastore.lock().expect("Failed to lock mutex");

let msg_topic: String;
let mut msg_value: Option<String> = None;
let msg_topic: String;
let mut msg_value: Option<String> = None;

if let Some((complex_msg, complex_value)) = parse_basic_message(&ws_msg) {
msg_topic = complex_msg;
msg_value = Some(complex_value);
} else {
msg_topic = ws_msg;
}
if let Some((complex_msg, complex_value)) = parse_basic_message(&ws_msg) {
msg_topic = complex_msg;
msg_value = Some(complex_value);
} else {
msg_topic = ws_msg;
}

match msg_topic.as_str() {
"ARM" => {
info!("ARM the drone!");
let arm_request = QuadArmRequest::new(true);
datastore
.add_struct(
&TopicKey::from_str("cmd/arm"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"DISARM" => {
info!("Disarm the drone");
let arm_request = QuadArmRequest::new(false);
datastore
.add_struct(
&TopicKey::from_str("cmd/arm"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"TAKEOFF" => {
info!("Takeoff requested");
// Hard coded to 10.0 for now.
let arm_request = QuadTakeoffRequest::new(10.0);
datastore
.add_struct(
&TopicKey::from_str("cmd/takeoff"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"LAND" => {
info!("Land Requested");
let arm_request = QuadLandRequest::new();
datastore
.add_struct(
&TopicKey::from_str("cmd/land"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"MODE" => {
if let Some(value) = msg_value {
if let Ok(mode) = QuadMode::from_str(&value) {
println!("Setting new mode now {0}", mode);
let mode_req = QuadSetModeRequest::new(mode);
datastore
.add_struct(
&TopicKey::from_str("cmd/mode/mode"),
Timepoint::now(),
mode_req,
)
.unwrap();
}
match msg_topic.as_str() {
"NEW_CLIENT" => {
// info!("GOT A NEW CLIENT YALL");

{
let mut map = subscriber_handle_clone_ws.lock().unwrap();

let data = map
.map
.iter()
.map(|(topic, datapoint)| DataLine {
topic: topic.to_string(),
datapoint: datapoint.to_string(),
})
.collect::<Vec<DataLine>>();

map.update.clear();

info!("Sending data? {0}", data.len());

let message = WebMessage {
timestamp: get_current_timestamp(),
data,
};

if let Ok(data_out) = to_vec_named(&message) {
let _ = tcp_tx_clone_ws.send(data_out); // Ignore if no clients are connected
} else {
warn!("Failed to MsgPack the DataStore Map")
}
}
_ => {
warn!("Unknown command: {}", msg_topic);
}
"ARM" => {
info!("ARM the drone!");
let arm_request = QuadArmRequest::new(true);
datastore
.add_struct(
&TopicKey::from_str("cmd/arm"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"DISARM" => {
info!("Disarm the drone");
let arm_request = QuadArmRequest::new(false);
datastore
.add_struct(
&TopicKey::from_str("cmd/arm"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"TAKEOFF" => {
info!("Takeoff requested");
// Hard coded to 10.0 for now.
let arm_request = QuadTakeoffRequest::new(10.0);
datastore
.add_struct(
&TopicKey::from_str("cmd/takeoff"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"LAND" => {
info!("Land Requested");
let arm_request = QuadLandRequest::new();
datastore
.add_struct(
&TopicKey::from_str("cmd/land"),
Timepoint::now(),
arm_request,
)
.unwrap();
}
"MODE" => {
if let Some(value) = msg_value {
if let Ok(mode) = QuadMode::from_str(&value) {
println!("Setting new mode now {0}", mode);
let mode_req = QuadSetModeRequest::new(mode);
datastore
.add_struct(
&TopicKey::from_str("cmd/mode/mode"),
Timepoint::now(),
mode_req,
)
.unwrap();
}
}
}
_ => {
warn!("Unknown command: {}", msg_topic);
}
}
}
});
}
});

tokio::spawn(async move {
loop {
thread::sleep(Duration::from_secs_f32(2.0));
let map = subscriber_handle_clone.lock().unwrap();

let data = map
.map
.iter()
.map(|(topic, datapoint)| DataLine {
topic: topic.to_string(),
datapoint: datapoint.to_string(),
})
.collect::<Vec<DataLine>>();

let message = WebMessage {
timestamp: get_current_timestamp(),
data,
};

if let Ok(data_out) = to_vec_named(&message) {
let _ = tcp_tx_clone.send(data_out); // Ignore if no clients are connected
} else {
warn!("Failed to MsgPack the DataStore Map")
{
let mut map = subscriber_handle_clone.lock().unwrap();

if !map.update.is_empty() {
let data = map
.update
.iter()
.map(|(topic, datapoint)| DataLine {
topic: topic.to_string(),
datapoint: datapoint.to_string(),
})
.collect::<Vec<DataLine>>();

map.update.clear();

info!("Sending data? {0}", data.len());

let message = WebMessage {
timestamp: get_current_timestamp(),
data,
};

if let Ok(data_out) = to_vec_named(&message) {
let _ = tcp_tx_clone.send(data_out); // Ignore if no clients are connected
} else {
warn!("Failed to MsgPack the DataStore Map")
}
}
}
}
});
let mut start_time = Timepoint::now();
let mut fired = false;

loop {
thread::sleep(Duration::from_millis(100));
node.tick();

/*
let elapsed = Timepoint::now() - start_time.clone();
if elapsed.secs() > 1.0 && !fired {
fired = true;
info!("Fired!");
// Send arm command
let arm_command = QuadArmRequest::new(true);
let topic = TopicKey::from_str("cmd/arm");
datastore
.lock()
.unwrap()
.add_struct(&topic, Timepoint::now(), arm_command)
.unwrap();
} */
}
}

0 comments on commit b7f5dda

Please sign in to comment.