Skip to content

Commit

Permalink
Cargo fmt instead of rust-fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenhaahr committed Dec 13, 2024
1 parent e6acf27 commit fbfb769
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 128 deletions.
60 changes: 31 additions & 29 deletions src/constraint_based_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_stream::stream;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use tokio::sync::broadcast;
use std::collections::BTreeMap;
use async_stream::stream;
use tokio::sync::broadcast;

use crate::ast::LOLASpecification;
use crate::ast::SExpr;
Expand All @@ -21,8 +21,7 @@ pub struct ValStreamCollection(pub BTreeMap<VarName, BoxStream<'static, Value>>)

impl ValStreamCollection {
fn into_stream(mut self) -> BoxStream<'static, BTreeMap<VarName, Value>> {
Box::pin(stream!(
loop {
Box::pin(stream!(loop {
let mut res = BTreeMap::new();
for (name, stream) in self.0.iter_mut() {
match stream.next().await {
Expand All @@ -35,8 +34,7 @@ impl ValStreamCollection {
}
}
yield res;
}
))
}))
}
}

Expand Down Expand Up @@ -131,9 +129,9 @@ impl ConstraintBasedRuntime {
}

#[derive(Debug, Clone)]
pub enum ProducerMessage<T>{
pub enum ProducerMessage<T> {
Data(T),
Done
Done,
}

struct InputProducer {
Expand All @@ -145,7 +143,7 @@ impl InputProducer {
let (sender, _) = broadcast::channel(10);
Self { sender }
}
pub fn run(&self, stream_collection : ValStreamCollection) {
pub fn run(&self, stream_collection: ValStreamCollection) {
let task_sender = self.sender.clone();
tokio::spawn(async move {
let mut inputs_stream = stream_collection.into_stream();
Expand All @@ -172,7 +170,11 @@ pub struct ConstraintBasedMonitor {

#[async_trait]
impl Monitor<LOLASpecification, Value> for ConstraintBasedMonitor {
fn new(model: LOLASpecification, input: &mut dyn InputProvider<Value>, output: Box<dyn OutputHandler<Value>>) -> Self {
fn new(
model: LOLASpecification,
input: &mut dyn InputProvider<Value>,
output: Box<dyn OutputHandler<Value>>,
) -> Self {
let input_streams = model
.input_vars()
.iter()
Expand Down Expand Up @@ -215,33 +217,33 @@ impl Monitor<LOLASpecification, Value> for ConstraintBasedMonitor {

impl ConstraintBasedMonitor {
fn stream_output_constraints(&mut self) -> BoxStream<'static, ConstraintStore> {
let input_receiver= self.input_producer.subscribe();
let input_receiver = self.input_producer.subscribe();
let mut runtime_initial = ConstraintBasedRuntime::default();
runtime_initial.store = model_constraints(self.model.clone());
let has_inputs = self.has_inputs.clone();
Box::pin(stream!(
let mut runtime = runtime_initial;
if has_inputs {
let mut input_receiver = input_receiver;
while let Ok(inputs) = input_receiver.recv().await {
match inputs {
ProducerMessage::Data(inputs) => {
runtime.step(&inputs);
yield runtime.store.clone();
}
ProducerMessage::Done => {
break;
let mut runtime = runtime_initial;
if has_inputs {
let mut input_receiver = input_receiver;
while let Ok(inputs) = input_receiver.recv().await {
match inputs {
ProducerMessage::Data(inputs) => {
runtime.step(&inputs);
yield runtime.store.clone();
}
ProducerMessage::Done => {
break;
}
}
}
}
}
else {
loop {
runtime.step(&BTreeMap::new());
yield runtime.store.clone();
else {
loop {
runtime.step(&BTreeMap::new());
yield runtime.store.clone();
}
}
}
))
))
}

fn output_stream(&mut self, var: &VarName) -> BoxStream<'static, Value> {
Expand Down
4 changes: 3 additions & 1 deletion src/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{
collections::BTreeMap,
fmt::{Debug, Display}, future::Future, pin::Pin,
fmt::{Debug, Display},
future::Future,
pin::Pin,
};

use async_trait::async_trait;
Expand Down
7 changes: 2 additions & 5 deletions src/file_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mod tests {
use futures::StreamExt;

use crate::core::VarName;
use crate::{Value, InputProvider};
use crate::{InputProvider, Value};

use super::*;

Expand All @@ -65,9 +65,6 @@ mod tests {
.unwrap()
.collect::<Vec<_>>()
.await;
assert_eq!(
x_vals,
vec![Value::Int(1), Value::Int(3)]
);
assert_eq!(x_vals, vec![Value::Int(1), Value::Int(3)]);
}
}
18 changes: 2 additions & 16 deletions src/file_input_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,7 @@ mod tests {

let iter = super::input_file_data_iter(data, VarName("x".into()));
let vec: Vec<Value> = iter.collect();
assert_eq!(
vec,
vec![
Value::Int(1),
Value::Int(2),
Value::Int(3)
]
);
assert_eq!(vec, vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
}

#[tokio::test]
Expand All @@ -89,13 +82,6 @@ mod tests {

let input_stream = data.input_stream(&VarName("x".into())).unwrap();
let input_vec = input_stream.collect::<Vec<_>>().await;
assert_eq!(
input_vec,
vec![
Value::Int(1),
Value::Int(2),
Value::Int(3)
]
);
assert_eq!(input_vec, vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
}
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub use file_handling::parse_file;
pub mod commandline_args;
pub mod macros;
pub mod manual_output_handler;
pub mod mqtt_client;
pub mod mqtt_input_provider;
pub mod mqtt_output_handler;
pub mod null_output_handler;
#[cfg(feature = "ros")]
pub mod ros_input_provider;
Expand All @@ -37,5 +39,3 @@ pub mod stdout_output_handler;
pub mod stream_utils;
pub mod typed_monitoring_combinators;
pub mod typed_monitoring_semantics;
pub mod mqtt_output_handler;
pub mod mqtt_client;
25 changes: 17 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use core::panic;
use clap::Parser;
use trustworthiness_checker::core::OutputHandler;
use trustworthiness_checker::mqtt_output_handler::MQTTOutputHandler;
use trustworthiness_checker::{InputProvider, Value};
use trustworthiness_checker::{self as tc, parse_file, type_checking::type_check, Monitor};
use trustworthiness_checker::{InputProvider, Value};

use trustworthiness_checker::commandline_args::{Cli, Language, Runtime, Semantics};
#[cfg(feature = "ros")]
Expand Down Expand Up @@ -83,24 +83,33 @@ async fn main() {
output_stdout: true,
output_mqtt_topics: None,
output_ros_topics: None,
} => Box::new(StdoutOutputHandler::<tc::Value>::new(model.output_vars.clone())),
} => Box::new(StdoutOutputHandler::<tc::Value>::new(
model.output_vars.clone(),
)),
trustworthiness_checker::commandline_args::OutputMode {
output_stdout: false,
output_mqtt_topics: Some(topics),
output_ros_topics: None,
} => {
let topics = topics.into_iter().map(|topic| (tc::VarName(topic.clone()), topic)).collect();
Box::new(MQTTOutputHandler::new(MQTT_HOSTNAME, topics)
.expect("MQTT output handler could not be created"))
},
let topics = topics
.into_iter()
.map(|topic| (tc::VarName(topic.clone()), topic))
.collect();
Box::new(
MQTTOutputHandler::new(MQTT_HOSTNAME, topics)
.expect("MQTT output handler could not be created"),
)
}
trustworthiness_checker::commandline_args::OutputMode {
output_stdout: false,
output_mqtt_topics: None,
output_ros_topics: Some(_),
} => unimplemented!("ROS output not implemented"),
// Default to stdout
_ => Box::new(StdoutOutputHandler::<tc::Value>::new(model.output_vars.clone())),
};
_ => Box::new(StdoutOutputHandler::<tc::Value>::new(
model.output_vars.clone(),
)),
};

// println!("Outputs: {:?}", model.output_vars);
// println!("Inputs: {:?}", model.input_vars);
Expand Down
36 changes: 20 additions & 16 deletions src/manual_output_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ impl<V: StreamData> OutputHandler<V> for ManualOutputHandler<V> {
}
}


pub struct AsyncManualOutputHandler<V: StreamData>{
pub struct AsyncManualOutputHandler<V: StreamData> {
var_names: Vec<VarName>,
stream_senders: Option<Vec<oneshot::Sender<OutputStream<V>>>>,
stream_receivers: Option<Vec<oneshot::Receiver<OutputStream<V>>>>,
Expand Down Expand Up @@ -136,13 +135,11 @@ impl<V: StreamData> OutputHandler<V> for AsyncManualOutputHandler<V> {
.into_iter()
.zip(var_names)
.map(|(stream, var_name)| {
{
let mut stream = stream;
let output_sender = output_sender.clone();
async move {
while let Some(data) = stream.next().await {
let _ = output_sender.send((var_name.clone(), data)).await;
}
let mut stream = stream;
let output_sender = output_sender.clone();
async move {
while let Some(data) = stream.next().await {
let _ = output_sender.send((var_name.clone(), data)).await;
}
}
})
Expand Down Expand Up @@ -178,7 +175,6 @@ impl<V: StreamData> AsyncManualOutputHandler<V> {
}
}


#[cfg(test)]
mod tests {
use futures::StreamExt;
Expand Down Expand Up @@ -234,12 +230,18 @@ mod tests {
#[tokio::test]
async fn async_test_combined_output() {
// Helper to create a named stream with delay
fn create_stream(name: &str, multiplier: i64, offset: i64) -> (VarName, OutputStream<Value>) {
fn create_stream(
name: &str,
multiplier: i64,
offset: i64,
) -> (VarName, OutputStream<Value>) {
let var_name = VarName(name.to_string());
// Delay to force expected ordering of the streams
let interval = IntervalStream::new(interval(Duration::from_millis(5)));
let stream = Box::pin(
stream::iter(0..10).zip(interval).map(move |(x, _)| (multiplier * x + offset).into()),
stream::iter(0..10)
.zip(interval)
.map(move |(x, _)| (multiplier * x + offset).into()),
);
(var_name, stream)
}
Expand All @@ -250,10 +252,12 @@ mod tests {

// Prepare expected output
let expected_output: Vec<_> = (0..10)
.flat_map(|x| vec![
(x_name.clone(), (x * 2).into()),
(y_name.clone(), (x * 2 + 1).into()),
])
.flat_map(|x| {
vec![
(x_name.clone(), (x * 2).into()),
(y_name.clone(), (x * 2 + 1).into()),
]
})
.collect();

// Initialize the handler
Expand Down
12 changes: 7 additions & 5 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ pub async fn provide_mqtt_client(hostname: Hostname) -> Result<mqtt::AsyncClient
let (tx, rx) = oneshot::channel();

// Get or initialize the background task and a sender to it
let req_sender = MQTT_CLIENT_REQ_SENDER.get_or_init(async {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(mqtt_client_provider(rx));
tx
}).await;
let req_sender = MQTT_CLIENT_REQ_SENDER
.get_or_init(async {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(mqtt_client_provider(rx));
tx
})
.await;

// Send a request for the client to the background task
req_sender.send((hostname, tx)).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt_input_provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::time::Duration;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::StreamExt;
use paho_mqtt as mqtt;
Expand Down
24 changes: 10 additions & 14 deletions src/queuing_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use async_trait::async_trait;
use tokio::sync::Mutex;

use futures::stream;
Expand Down Expand Up @@ -260,14 +260,14 @@ where
}

#[async_trait]
impl<
Val: StreamData,
Expr: Send,
S: MonitoringSemantics<Expr, Val>,
M: Specification<Expr>,
> Monitor<M, Val> for QueuingMonitorRunner<Expr, Val, S, M>
impl<Val: StreamData, Expr: Send, S: MonitoringSemantics<Expr, Val>, M: Specification<Expr>>
Monitor<M, Val> for QueuingMonitorRunner<Expr, Val, S, M>
{
fn new(model: M, input_streams: &mut dyn InputProvider<Val>, output: Box<dyn OutputHandler<Val>>) -> Self {
fn new(
model: M,
input_streams: &mut dyn InputProvider<Val>,
output: Box<dyn OutputHandler<Val>>,
) -> Self {
let var_names: Vec<VarName> = model
.input_vars()
.into_iter()
Expand Down Expand Up @@ -332,12 +332,8 @@ impl<
}
}

impl<
Val: StreamData,
Expr,
S: MonitoringSemantics<Expr, Val>,
M: Specification<Expr>,
> QueuingMonitorRunner<Expr, Val, S, M>
impl<Val: StreamData, Expr, S: MonitoringSemantics<Expr, Val>, M: Specification<Expr>>
QueuingMonitorRunner<Expr, Val, S, M>
{
fn output_stream(&self, var: VarName) -> OutputStream<Val> {
self.var_exchange.var(&var).unwrap()
Expand Down
Loading

0 comments on commit fbfb769

Please sign in to comment.