Skip to content

Commit

Permalink
Datastore sync MVP (#61)
Browse files Browse the repository at this point in the history
* Datastore sync MVP

* tunng

* Tuned GCS

* Updated
  • Loading branch information
victoryforphil authored Nov 12, 2024
1 parent a0b89c3 commit 00e4765
Show file tree
Hide file tree
Showing 22 changed files with 118 additions and 78 deletions.
2 changes: 1 addition & 1 deletion lil-gcs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ victory-wtf = { workspace = true }
lil-link = { path = "../lil-link" }
serde_json = "1.0.132"
polars = { version = "0.44.0", features = ["csv"] }
tokio = "1.41.0"
tokio = { version = "1.41.0", features = ["full"] }
tokio-tungstenite = "0.24.0"
tungstenite = "0.24.0"
futures = "0.3.31"
77 changes: 46 additions & 31 deletions lil-gcs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use victory_data_store::database::listener::DataStoreListener;
use victory_data_store::sync::adapters::tcp::tcp_client::TCPClient;
use victory_data_store::sync::config::SyncConfig;

use clap::Parser;
use serde::Serialize;
Expand All @@ -21,10 +24,6 @@ use tracing::warn;
use tracing::Level;
use tracing_subscriber::fmt;

use victory_broker::adapters::tcp::TCPClientAdapter;
use victory_broker::adapters::tcp::TCPClientOptions;
use victory_broker::node::sub_callback::SubCallback;
use victory_broker::node::Node;
use victory_data_store::database::Datastore;
use victory_data_store::topics::TopicKey;
use victory_wtf::Timepoint;
Expand All @@ -38,15 +37,23 @@ pub struct TCPNodeSubscriber {
update: BTreeMap<String, String>,
}

impl SubCallback for TCPNodeSubscriber {
fn on_update(&mut self, datapoints: &victory_data_store::datapoints::DatapointMap) {
for (topic, datapoint) in datapoints.iter() {
self.map
.insert(topic.display_name(), format!("{:?}", datapoint.value));
impl DataStoreListener for TCPNodeSubscriber {
fn on_datapoint(&mut self, datapoint: &victory_data_store::datapoints::Datapoint) {

}

self.update
.insert(topic.display_name(), format!("{:?}", datapoint.value));
}
fn on_raw_datapoint(&mut self, datapoint: &victory_data_store::datapoints::Datapoint) {
let topic = datapoint.topic.clone();
self.map
.insert(topic.display_name(), format!("{:?}", datapoint.value));

self.update
.insert(topic.display_name(), format!("{:?}", datapoint.value));
}


fn on_bucket_update(&mut self, bucket: &victory_data_store::buckets::BucketHandle) {

}
}

Expand Down Expand Up @@ -106,7 +113,7 @@ async fn main() {
tokio::spawn(webserver::websocket_server_task(tcp_tx.clone(), ws_tx));

fmt()
.with_max_level(Level::INFO)
.with_max_level(Level::DEBUG)
.with_target(true)
.pretty()
.compact()
Expand All @@ -117,41 +124,49 @@ async fn main() {

let args = SILArgs::parse();

let mut client = TCPClientAdapter::new(TCPClientOptions::from_url(&args.connection));
let mut client = TCPClient::new(args.connection.to_string()).await;

while client.is_err() {
info!("Failed to connect to server, retrying...");
thread::sleep(Duration::from_secs_f32(1.0));
client = TCPClientAdapter::new(TCPClientOptions::from_url(&args.connection));
client = TCPClient::new(args.connection.to_string()).await;
}

let client = client.unwrap();

let client_handle = Arc::new(Mutex::new(client));
let datastore = Datastore::new().handle();
let mut node = Node::new(
"TCP Client".to_string(),
client_handle.clone(),
datastore.clone(),
);

let subscriber = TCPNodeSubscriber {
let topic_key = TopicKey::from_str("");

let sync_config = SyncConfig {
client_name: "GCS".to_string(),
subscriptions: vec![topic_key.display_name()],
};
datastore
.lock()
.unwrap()
.setup_sync(sync_config, client_handle);


let mut listener = TCPNodeSubscriber {
map: BTreeMap::new(),
update: BTreeMap::new(),
};

let subscriber_handle = Arc::new(Mutex::new(subscriber));
let listener_handle = Arc::new(Mutex::new(listener));

let topic_key = TopicKey::from_str("");
node.add_sub_callback(topic_key, subscriber_handle.clone());
node.register();
datastore.lock().unwrap().add_listener(&topic_key, listener_handle.clone());



let subscriber_handle_clone = subscriber_handle.clone();
let subscriber_handle_clone = listener_handle.clone();
let tcp_tx_clone = tcp_tx.clone();

let subscriber_handle_clone_ws = subscriber_handle.clone();
let subscriber_handle_clone_ws = listener_handle.clone();
let tcp_tx_clone_ws = tcp_tx.clone();

let datastore_clone = datastore.clone();
tokio::spawn(async move {
loop {
if let Some(ws_msg) = ws_rx.recv().await {
Expand All @@ -160,7 +175,7 @@ async fn main() {
// Need to parse from json or something a topic and a value. It would be great if I could parse a normal victory value.

// I thought this was a scope thing at some point.
let mut datastore = datastore.lock().expect("Failed to lock mutex");
let mut datastore = datastore_clone.lock().expect("Failed to lock mutex");

let (msg_topic, params) = parse_message(&ws_msg);

Expand Down Expand Up @@ -288,7 +303,7 @@ async fn main() {
tokio::spawn(async move {
loop {
// thread::sleep(Duration::from_secs_f32(2.0));
tokio::time::sleep(Duration::from_secs_f32(0.25)).await;
tokio::time::sleep(Duration::from_millis(200)).await;
{
let mut map = subscriber_handle_clone.lock().unwrap();

Expand Down Expand Up @@ -318,9 +333,9 @@ async fn main() {
}
}
});

let datastore = datastore.clone();
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
node.tick();
datastore.lock().unwrap().run_sync();
}
}
1 change: 1 addition & 0 deletions lil-launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ victory-broker = { workspace = true }
lil-rerun = { path = "../lil-rerun" }
lil-quad = { path = "../lil-quad" }
lil-link = { path = "../lil-link" }
tokio = { version = "1.41.1", features = ["full"] }

29 changes: 21 additions & 8 deletions lil-launcher/bin_tests/quad_idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use victory_broker::adapters::tcp::TCPServerAdapter;
use victory_broker::adapters::tcp::TCPServerOptions;
use victory_commander::system::runner::BasherSysRunner;
use victory_data_store::primitives::Primitives;
use victory_data_store::sync::adapters::tcp::tcp_server::TcpSyncServer;
use victory_data_store::sync::config::SyncConfig;
use victory_data_store::topics::TopicKey;
use victory_wtf::Timepoint;
use victory_wtf::Timespan;
Expand All @@ -49,6 +51,10 @@ struct SILArgs {
)]
duration: f32,


#[clap(short, long, value_parser, help = "TCP Sync Server address")]
sync_server_address: String,

#[clap(
short,
long,
Expand All @@ -58,8 +64,8 @@ struct SILArgs {
)]
arm_time: f32,
}

fn main() {
#[tokio::main]
async fn main() {
fmt()
.with_max_level(Level::INFO)
.with_target(true)
Expand All @@ -74,13 +80,20 @@ fn main() {
info!("Running 'quad_sil' with args: {:#?}", args);

let mut runner = BasherSysRunner::new();
let server = TCPServerAdapter::new(TCPServerOptions {
port: 7001,
address: "0.0.0.0".to_string(),
update_interval: Timespan::new_hz(100.0),
});
let server = TcpSyncServer::new(&args.sync_server_address).await;
let server_handle = Arc::new(Mutex::new(server));
runner.enable_pubsub(server_handle);

let topic_filter = TopicKey::from_str("cmd");

let sync_config = SyncConfig {
client_name: "Quad Idle".to_string(),
subscriptions: vec![topic_filter.display_name()],
};
runner.data_store
.lock()
.unwrap()
.setup_sync(sync_config, server_handle);

runner.dt = Timespan::new_hz(args.hz as f64);

runner.add_system(Arc::new(Mutex::new(
Expand Down
33 changes: 22 additions & 11 deletions lil-launcher/bin_tests/quad_sil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ use tracing::Level;
use tracing_subscriber::fmt;

use clap::Parser;
use victory_broker::adapters::tcp::TCPServerAdapter;
use victory_broker::adapters::tcp::TCPServerOptions;

use victory_commander::system::runner::BasherSysRunner;
use victory_data_store::primitives::Primitives;
use victory_data_store::sync::adapters::tcp::tcp_server::TcpSyncServer;
use victory_data_store::sync::config::SyncConfig;
use victory_data_store::topics::TopicKey;
use victory_wtf::Timepoint;
use victory_wtf::Timespan;
Expand All @@ -35,6 +36,9 @@ struct SILArgs {
#[clap(short, long, value_parser, help = "Mavlink connection string")]
connection_string: String,

#[clap(short, long, value_parser, help = "TCP Sync Server address")]
sync_server_address: String,

#[clap(long, value_parser, help = "Command Hz ", default_value = "25.0")]
hz: f32,

Expand All @@ -56,10 +60,10 @@ struct SILArgs {
)]
arm_time: f32,
}

fn main() {
#[tokio::main]
async fn main() {
fmt()
.with_max_level(Level::DEBUG)
.with_max_level(Level::INFO)
.with_target(true)
.pretty()
.compact()
Expand All @@ -72,13 +76,20 @@ fn main() {
info!("Running 'quad_sil' with args: {:#?}", args);

let mut runner = BasherSysRunner::new();
let server = TCPServerAdapter::new(TCPServerOptions {
port: 7001,
address: "0.0.0.0".to_string(),
update_interval: Timespan::new_hz(100.0),
});
let server = TcpSyncServer::new(&args.sync_server_address).await;
let server_handle = Arc::new(Mutex::new(server));
runner.enable_pubsub(server_handle);

let topic_filter = TopicKey::from_str("cmd");

let sync_config = SyncConfig {
client_name: "TCP Sync Server".to_string(),
subscriptions: vec![topic_filter.display_name()],
};
runner.data_store
.lock()
.unwrap()
.setup_sync(sync_config, server_handle);

runner.dt = Timespan::new_hz(args.hz as f64);

runner.add_system(Arc::new(Mutex::new(
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use victory_data_store::database::DataView;
use victory_data_store::{database::view::DataView, topics::TopicKey};

use super::core::MavlinkMessageType;

Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_attitude.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::{
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_command_ack.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use log::info;
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::identifiers::{IDENT_BASE_LOG, IDENT_COMMAND_ACK},
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_ekf_health.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::identifiers::{IDENT_BASE_STATUS, IDENT_STATUS_EKF, IDENT_STATUS_SENSORS},
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use log::trace;
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::{
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_local_position.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::{
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_param_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct ParamValueProcessor;
impl MavlinkMessageProcessor for ParamValueProcessor {
fn on_mavlink_message(
msg: crate::mavlink::core::MavlinkMessageType,
data_view: &mut victory_data_store::database::DataView,
data_view: &mut victory_data_store::database::view::DataView,
) -> Result<(), anyhow::Error> {
let param_value_msg = match msg {
MavlinkMessageType::PARAM_VALUE(param_value) => param_value,
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_status_text.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use log::info;
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::identifiers::{IDENT_BASE_LOG, IDENT_STATUS_TEXT},
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/processors/proc_sys_status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};

use crate::{
common::identifiers::*,
Expand Down
2 changes: 1 addition & 1 deletion lil-link/src/mavlink/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use log::{debug, info};
use victory_commander::system::System;
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};
use victory_wtf::Timespan;

use crate::common::types::{
Expand Down
6 changes: 3 additions & 3 deletions lil-quad/src/systems/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use lil_link::common::{
use log::info;
use serde::{Deserialize, Serialize};
use victory_commander::system::System;
use victory_data_store::{database::DataView, topics::TopicKey};
use victory_data_store::{database::view::DataView, topics::TopicKey};
use victory_wtf::Timepoint;

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -59,9 +59,9 @@ impl System for HealthCheck {

fn execute(
&mut self,
inputs: &victory_data_store::database::DataView,
inputs: &victory_data_store::database::view::DataView,
_dt: victory_wtf::Timespan,
) -> victory_data_store::database::DataView {
) -> victory_data_store::database::view::DataView {
let ekf_status: QuadEkfStatus = inputs
.get_latest(&TopicKey::from_str(&format!(
"{}/{}",
Expand Down
Loading

0 comments on commit 00e4765

Please sign in to comment.