diff --git a/src/constraint_based_runtime.rs b/src/constraint_based_runtime.rs index 6e8577c..a022927 100644 --- a/src/constraint_based_runtime.rs +++ b/src/constraint_based_runtime.rs @@ -1,9 +1,9 @@ +use async_stream::stream; use async_trait::async_trait; use futures::stream::BoxStream; use futures::StreamExt; -use tokio::sync::broadcast; use std::collections::BTreeMap; -use async_stream::stream; +use tokio::sync::broadcast; use crate::ast::LOLASpecification; use crate::ast::SExpr; @@ -21,8 +21,7 @@ pub struct ValStreamCollection(pub BTreeMap>) impl ValStreamCollection { fn into_stream(mut self) -> BoxStream<'static, BTreeMap> { - Box::pin(stream!( - loop { + Box::pin(stream!(loop { let mut res = BTreeMap::new(); for (name, stream) in self.0.iter_mut() { match stream.next().await { @@ -35,8 +34,7 @@ impl ValStreamCollection { } } yield res; - } - )) + })) } } @@ -131,9 +129,9 @@ impl ConstraintBasedRuntime { } #[derive(Debug, Clone)] -pub enum ProducerMessage{ +pub enum ProducerMessage { Data(T), - Done + Done, } struct InputProducer { @@ -145,7 +143,7 @@ impl InputProducer { let (sender, _) = broadcast::channel(10); Self { sender } } - pub fn run(&self, stream_collection : ValStreamCollection) { + pub fn run(&self, stream_collection: ValStreamCollection) { let task_sender = self.sender.clone(); tokio::spawn(async move { let mut inputs_stream = stream_collection.into_stream(); @@ -172,7 +170,11 @@ pub struct ConstraintBasedMonitor { #[async_trait] impl Monitor for ConstraintBasedMonitor { - fn new(model: LOLASpecification, input: &mut dyn InputProvider, output: Box>) -> Self { + fn new( + model: LOLASpecification, + input: &mut dyn InputProvider, + output: Box>, + ) -> Self { let input_streams = model .input_vars() .iter() @@ -215,33 +217,33 @@ impl Monitor for ConstraintBasedMonitor { impl ConstraintBasedMonitor { fn stream_output_constraints(&mut self) -> BoxStream<'static, ConstraintStore> { - let input_receiver= self.input_producer.subscribe(); + let input_receiver = self.input_producer.subscribe(); let mut runtime_initial = ConstraintBasedRuntime::default(); runtime_initial.store = model_constraints(self.model.clone()); let has_inputs = self.has_inputs.clone(); Box::pin(stream!( - let mut runtime = runtime_initial; - if has_inputs { - let mut input_receiver = input_receiver; - while let Ok(inputs) = input_receiver.recv().await { - match inputs { - ProducerMessage::Data(inputs) => { - runtime.step(&inputs); - yield runtime.store.clone(); - } - ProducerMessage::Done => { - break; + let mut runtime = runtime_initial; + if has_inputs { + let mut input_receiver = input_receiver; + while let Ok(inputs) = input_receiver.recv().await { + match inputs { + ProducerMessage::Data(inputs) => { + runtime.step(&inputs); + yield runtime.store.clone(); + } + ProducerMessage::Done => { + break; + } } } } - } - else { - loop { - runtime.step(&BTreeMap::new()); - yield runtime.store.clone(); + else { + loop { + runtime.step(&BTreeMap::new()); + yield runtime.store.clone(); + } } - } - )) + )) } fn output_stream(&mut self, var: &VarName) -> BoxStream<'static, Value> { diff --git a/src/core.rs b/src/core.rs index 0c98f06..affbdea 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,6 +1,8 @@ use std::{ collections::BTreeMap, - fmt::{Debug, Display}, future::Future, pin::Pin, + fmt::{Debug, Display}, + future::Future, + pin::Pin, }; use async_trait::async_trait; diff --git a/src/file_handling.rs b/src/file_handling.rs index 302fd2c..62dd81c 100644 --- a/src/file_handling.rs +++ b/src/file_handling.rs @@ -51,7 +51,7 @@ mod tests { use futures::StreamExt; use crate::core::VarName; - use crate::{Value, InputProvider}; + use crate::{InputProvider, Value}; use super::*; @@ -65,9 +65,6 @@ mod tests { .unwrap() .collect::>() .await; - assert_eq!( - x_vals, - vec![Value::Int(1), Value::Int(3)] - ); + assert_eq!(x_vals, vec![Value::Int(1), Value::Int(3)]); } } diff --git a/src/file_input_provider.rs b/src/file_input_provider.rs index 656045c..30fcc88 100644 --- a/src/file_input_provider.rs +++ b/src/file_input_provider.rs @@ -58,14 +58,7 @@ mod tests { let iter = super::input_file_data_iter(data, VarName("x".into())); let vec: Vec = iter.collect(); - assert_eq!( - vec, - vec![ - Value::Int(1), - Value::Int(2), - Value::Int(3) - ] - ); + assert_eq!(vec, vec![Value::Int(1), Value::Int(2), Value::Int(3)]); } #[tokio::test] @@ -89,13 +82,6 @@ mod tests { let input_stream = data.input_stream(&VarName("x".into())).unwrap(); let input_vec = input_stream.collect::>().await; - assert_eq!( - input_vec, - vec![ - Value::Int(1), - Value::Int(2), - Value::Int(3) - ] - ); + assert_eq!(input_vec, vec![Value::Int(1), Value::Int(2), Value::Int(3)]); } } diff --git a/src/lib.rs b/src/lib.rs index 6fc3800..2422e0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,9 @@ pub use file_handling::parse_file; pub mod commandline_args; pub mod macros; pub mod manual_output_handler; +pub mod mqtt_client; pub mod mqtt_input_provider; +pub mod mqtt_output_handler; pub mod null_output_handler; #[cfg(feature = "ros")] pub mod ros_input_provider; @@ -37,5 +39,3 @@ pub mod stdout_output_handler; pub mod stream_utils; pub mod typed_monitoring_combinators; pub mod typed_monitoring_semantics; -pub mod mqtt_output_handler; -pub mod mqtt_client; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 6b46a90..c7a4501 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,8 +4,8 @@ use core::panic; use clap::Parser; use trustworthiness_checker::core::OutputHandler; use trustworthiness_checker::mqtt_output_handler::MQTTOutputHandler; -use trustworthiness_checker::{InputProvider, Value}; use trustworthiness_checker::{self as tc, parse_file, type_checking::type_check, Monitor}; +use trustworthiness_checker::{InputProvider, Value}; use trustworthiness_checker::commandline_args::{Cli, Language, Runtime, Semantics}; #[cfg(feature = "ros")] @@ -83,24 +83,33 @@ async fn main() { output_stdout: true, output_mqtt_topics: None, output_ros_topics: None, - } => Box::new(StdoutOutputHandler::::new(model.output_vars.clone())), + } => Box::new(StdoutOutputHandler::::new( + model.output_vars.clone(), + )), trustworthiness_checker::commandline_args::OutputMode { output_stdout: false, output_mqtt_topics: Some(topics), output_ros_topics: None, } => { - let topics = topics.into_iter().map(|topic| (tc::VarName(topic.clone()), topic)).collect(); - Box::new(MQTTOutputHandler::new(MQTT_HOSTNAME, topics) - .expect("MQTT output handler could not be created")) - }, + let topics = topics + .into_iter() + .map(|topic| (tc::VarName(topic.clone()), topic)) + .collect(); + Box::new( + MQTTOutputHandler::new(MQTT_HOSTNAME, topics) + .expect("MQTT output handler could not be created"), + ) + } trustworthiness_checker::commandline_args::OutputMode { output_stdout: false, output_mqtt_topics: None, output_ros_topics: Some(_), } => unimplemented!("ROS output not implemented"), // Default to stdout - _ => Box::new(StdoutOutputHandler::::new(model.output_vars.clone())), - }; + _ => Box::new(StdoutOutputHandler::::new( + model.output_vars.clone(), + )), + }; // println!("Outputs: {:?}", model.output_vars); // println!("Inputs: {:?}", model.input_vars); diff --git a/src/manual_output_handler.rs b/src/manual_output_handler.rs index 9554655..9fec021 100644 --- a/src/manual_output_handler.rs +++ b/src/manual_output_handler.rs @@ -97,8 +97,7 @@ impl OutputHandler for ManualOutputHandler { } } - -pub struct AsyncManualOutputHandler{ +pub struct AsyncManualOutputHandler { var_names: Vec, stream_senders: Option>>>, stream_receivers: Option>>>, @@ -136,13 +135,11 @@ impl OutputHandler for AsyncManualOutputHandler { .into_iter() .zip(var_names) .map(|(stream, var_name)| { - { - let mut stream = stream; - let output_sender = output_sender.clone(); - async move { - while let Some(data) = stream.next().await { - let _ = output_sender.send((var_name.clone(), data)).await; - } + let mut stream = stream; + let output_sender = output_sender.clone(); + async move { + while let Some(data) = stream.next().await { + let _ = output_sender.send((var_name.clone(), data)).await; } } }) @@ -178,7 +175,6 @@ impl AsyncManualOutputHandler { } } - #[cfg(test)] mod tests { use futures::StreamExt; @@ -234,12 +230,18 @@ mod tests { #[tokio::test] async fn async_test_combined_output() { // Helper to create a named stream with delay - fn create_stream(name: &str, multiplier: i64, offset: i64) -> (VarName, OutputStream) { + fn create_stream( + name: &str, + multiplier: i64, + offset: i64, + ) -> (VarName, OutputStream) { let var_name = VarName(name.to_string()); // Delay to force expected ordering of the streams let interval = IntervalStream::new(interval(Duration::from_millis(5))); let stream = Box::pin( - stream::iter(0..10).zip(interval).map(move |(x, _)| (multiplier * x + offset).into()), + stream::iter(0..10) + .zip(interval) + .map(move |(x, _)| (multiplier * x + offset).into()), ); (var_name, stream) } @@ -250,10 +252,12 @@ mod tests { // Prepare expected output let expected_output: Vec<_> = (0..10) - .flat_map(|x| vec![ - (x_name.clone(), (x * 2).into()), - (y_name.clone(), (x * 2 + 1).into()), - ]) + .flat_map(|x| { + vec![ + (x_name.clone(), (x * 2).into()), + (y_name.clone(), (x * 2 + 1).into()), + ] + }) .collect(); // Initialize the handler diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index b3d6536..2227181 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -79,11 +79,13 @@ pub async fn provide_mqtt_client(hostname: Hostname) -> Result, - M: Specification, - > Monitor for QueuingMonitorRunner +impl, M: Specification> + Monitor for QueuingMonitorRunner { - fn new(model: M, input_streams: &mut dyn InputProvider, output: Box>) -> Self { + fn new( + model: M, + input_streams: &mut dyn InputProvider, + output: Box>, + ) -> Self { let var_names: Vec = model .input_vars() .into_iter() @@ -332,12 +332,8 @@ impl< } } -impl< - Val: StreamData, - Expr, - S: MonitoringSemantics, - M: Specification, - > QueuingMonitorRunner +impl, M: Specification> + QueuingMonitorRunner { fn output_stream(&self, var: VarName) -> OutputStream { self.var_exchange.var(&var).unwrap() diff --git a/src/typed_monitoring_combinators.rs b/src/typed_monitoring_combinators.rs index 2729dac..355eac1 100644 --- a/src/typed_monitoring_combinators.rs +++ b/src/typed_monitoring_combinators.rs @@ -1,8 +1,6 @@ use crate::core::{StreamContext, StreamData}; use crate::untimed_monitoring_combinators::{lift1, lift2, lift3}; -use crate::{ - lola_expression, Value, MonitoringSemantics, OutputStream, UntimedLolaSemantics, -}; +use crate::{lola_expression, MonitoringSemantics, OutputStream, UntimedLolaSemantics, Value}; use futures::{ stream::{self, BoxStream}, StreamExt, diff --git a/src/typed_monitoring_semantics.rs b/src/typed_monitoring_semantics.rs index cd1d3ee..a5af87f 100644 --- a/src/typed_monitoring_semantics.rs +++ b/src/typed_monitoring_semantics.rs @@ -8,13 +8,8 @@ use crate::typed_monitoring_combinators::{from_typed_stream, to_typed_stream}; #[derive(Clone)] pub struct TypedUntimedLolaSemantics; -impl MonitoringSemantics - for TypedUntimedLolaSemantics -{ - fn to_async_stream( - expr: SExprTE, - ctx: &dyn StreamContext, - ) -> OutputStream { +impl MonitoringSemantics for TypedUntimedLolaSemantics { + fn to_async_stream(expr: SExprTE, ctx: &dyn StreamContext) -> OutputStream { match expr { SExprTE::Int(e) => from_typed_stream::(Self::to_async_stream(e, ctx)), SExprTE::Str(e) => from_typed_stream::(Self::to_async_stream(e, ctx)), @@ -25,10 +20,7 @@ impl MonitoringSemantics } impl MonitoringSemantics for TypedUntimedLolaSemantics { - fn to_async_stream( - expr: SExprInt, - ctx: &dyn StreamContext, - ) -> OutputStream { + fn to_async_stream(expr: SExprInt, ctx: &dyn StreamContext) -> OutputStream { match expr { SExprInt::Val(v) => mc::val(v), SExprInt::BinOp(e1, e2, op) => { @@ -57,10 +49,7 @@ impl MonitoringSemantics for TypedUntimedLolaSemantics { } impl MonitoringSemantics for TypedUntimedLolaSemantics { - fn to_async_stream( - expr: SExprStr, - ctx: &dyn StreamContext, - ) -> OutputStream { + fn to_async_stream(expr: SExprStr, ctx: &dyn StreamContext) -> OutputStream { match expr { SExprStr::Val(v) => mc::val(v), SExprStr::Var(v) => to_typed_stream(ctx.var(&v).unwrap()), @@ -84,10 +73,7 @@ impl MonitoringSemantics for TypedUntimedLolaSemantics } impl MonitoringSemantics for TypedUntimedLolaSemantics { - fn to_async_stream( - expr: SExprUnit, - ctx: &dyn StreamContext, - ) -> OutputStream<()> { + fn to_async_stream(expr: SExprUnit, ctx: &dyn StreamContext) -> OutputStream<()> { match expr { SExprUnit::Val(v) => mc::val(v), SExprUnit::Var(v) => to_typed_stream(ctx.var(&v).unwrap()), @@ -106,10 +92,7 @@ impl MonitoringSemantics for TypedUntimedLolaSemantics { } impl MonitoringSemantics for TypedUntimedLolaSemantics { - fn to_async_stream( - expr: SExprBool, - ctx: &dyn StreamContext, - ) -> OutputStream { + fn to_async_stream(expr: SExprBool, ctx: &dyn StreamContext) -> OutputStream { match expr { SExprBool::Val(b) => mc::val(b), SExprBool::EqInt(e1, e2) => { diff --git a/tests/constraint_based_lola.rs b/tests/constraint_based_lola.rs index 1c7da77..99b14cb 100644 --- a/tests/constraint_based_lola.rs +++ b/tests/constraint_based_lola.rs @@ -3,7 +3,9 @@ use futures::stream::StreamExt; use std::collections::BTreeMap; use trustworthiness_checker::constraint_based_runtime::ConstraintBasedMonitor; -use trustworthiness_checker::{lola_specification, manual_output_handler::ManualOutputHandler, LOLASpecification}; +use trustworthiness_checker::{ + lola_specification, manual_output_handler::ManualOutputHandler, LOLASpecification, +}; use trustworthiness_checker::{Monitor, Value, VarName}; mod lola_fixtures; use futures::stream; diff --git a/tests/typed_untimed_semantics_async.rs b/tests/typed_untimed_semantics_async.rs index 2defa94..31fcde7 100644 --- a/tests/typed_untimed_semantics_async.rs +++ b/tests/typed_untimed_semantics_async.rs @@ -167,7 +167,6 @@ async fn test_eval_monitor() { ); } - #[tokio::test] async fn test_multiple_parameters() { let mut input_streams = input_streams3(); diff --git a/tests/untimed_semantics_async.rs b/tests/untimed_semantics_async.rs index 06520cb..bb1c80f 100644 --- a/tests/untimed_semantics_async.rs +++ b/tests/untimed_semantics_async.rs @@ -160,7 +160,6 @@ async fn test_eval_monitor() { ); } - #[tokio::test] async fn test_multiple_parameters() { let mut input_streams = input_streams1(); @@ -168,7 +167,11 @@ async fn test_multiple_parameters() { let spec = lola_specification(&mut spec).unwrap(); let mut output_handler = Box::new(ManualOutputHandler::new(spec.output_vars.clone())); let outputs = output_handler.get_output(); - let async_monitor = AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new(spec, &mut input_streams, output_handler); + let async_monitor = AsyncMonitorRunner::<_, _, UntimedLolaSemantics, _>::new( + spec, + &mut input_streams, + output_handler, + ); tokio::spawn(async_monitor.run()); let outputs: Vec<(usize, BTreeMap)> = outputs.enumerate().collect().await; assert_eq!(outputs.len(), 2); diff --git a/tests/untimed_semantics_queuing.rs b/tests/untimed_semantics_queuing.rs index 7127906..338a26f 100644 --- a/tests/untimed_semantics_queuing.rs +++ b/tests/untimed_semantics_queuing.rs @@ -123,7 +123,6 @@ async fn test_eval_monitor() { ); } - #[tokio::test] async fn test_multiple_parameters() { let mut input_streams = input_streams1();