From 165d1695d12aca95af76914ca7e9ec222978c448 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 6 Oct 2024 22:36:12 +0530 Subject: [PATCH] refactor: condense and simplify examples --- rumqttc/examples/ack_promise.rs | 70 ++++++++------------------ rumqttc/examples/ack_promise_sync.rs | 75 +++++++++++----------------- rumqttc/examples/ack_promise_v5.rs | 70 ++++++++------------------ 3 files changed, 72 insertions(+), 143 deletions(-) diff --git a/rumqttc/examples/ack_promise.rs b/rumqttc/examples/ack_promise.rs index 7366442f8..55ff7493e 100644 --- a/rumqttc/examples/ack_promise.rs +++ b/rumqttc/examples/ack_promise.rs @@ -6,9 +6,6 @@ use std::time::Duration; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); @@ -39,57 +36,34 @@ async fn main() -> Result<(), Box> { } // Publish at all QoS levels and wait for broker acknowledgement - match client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap() - .await - { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), - } - - match client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .await - .unwrap() - .await + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), + match client + .publish("hello/world", qos, false, vec![1; i]) + .await + .unwrap() + .await + { + Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Err(e) => println!("Publish failed: {e:?}"), + } } - match client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) - .await - .unwrap() - .await + // Publish with different QoS levels and spawn wait for notification + let mut set = JoinSet::new(); + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), + let token = client + .publish("hello/world", qos, false, vec![1; i]) + .await + .unwrap(); + set.spawn(token); } - // Publish and spawn wait for notification - let mut set = JoinSet::new(); - - let future = client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap(); - set.spawn(async { future.await }); - - let future = client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .await - .unwrap(); - set.spawn(async { future.await }); - - let future = client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) - .await - .unwrap(); - set.spawn(async { future.await }); - while let Some(Ok(res)) = set.join_next().await { match res { Ok(pkid) => println!("Acknowledged Pub({pkid})"), diff --git a/rumqttc/examples/ack_promise_sync.rs b/rumqttc/examples/ack_promise_sync.rs index 128be07d7..6d13a9e39 100644 --- a/rumqttc/examples/ack_promise_sync.rs +++ b/rumqttc/examples/ack_promise_sync.rs @@ -5,9 +5,6 @@ use std::thread::{self, sleep}; use std::time::Duration; fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); @@ -36,59 +33,43 @@ fn main() -> Result<(), Box> { } // Publish at all QoS levels and wait for broker acknowledgement - match client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .unwrap() - .blocking_wait() - { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), - } - - match client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .unwrap() - .try_resolve() + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), - } - - match client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) - .unwrap() - .blocking_wait() - { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), + match client + .publish("hello/world", qos, false, vec![1; i]) + .unwrap() + .blocking_wait() + { + Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Err(e) => println!("Publish failed: {e:?}"), + } } // Spawn threads for each publish, use channel to notify result let (tx, rx) = bounded(1); - let future = client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .unwrap(); - let tx_clone = tx.clone(); - thread::spawn(move || { - let res = future.blocking_wait(); - tx_clone.send(res).unwrap() - }); - - let future = client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .unwrap(); - let tx_clone = tx.clone(); - thread::spawn(move || { - let res = future.blocking_wait(); - tx_clone.send(res).unwrap() - }); + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() + { + let token = client + .publish("hello/world", qos, false, vec![1; i]) + .unwrap(); + let tx = tx.clone(); + thread::spawn(move || { + let res = token.blocking_wait(); + tx.send(res).unwrap() + }); + } - let mut future = client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) + // Try resolving a promise, if it is waiting to resolve, try again after a sleep of 1s + let mut token = client + .publish("hello/world", QoS::AtMostOnce, false, vec![1; 4]) .unwrap(); thread::spawn(move || loop { - match future.try_resolve() { + match token.try_resolve() { Err(PromiseError::Waiting) => { println!("Promise yet to resolve, retrying"); sleep(Duration::from_secs(1)); diff --git a/rumqttc/examples/ack_promise_v5.rs b/rumqttc/examples/ack_promise_v5.rs index c2eb26319..8873cf6af 100644 --- a/rumqttc/examples/ack_promise_v5.rs +++ b/rumqttc/examples/ack_promise_v5.rs @@ -6,9 +6,6 @@ use std::time::Duration; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { - pretty_env_logger::init(); - // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); @@ -39,57 +36,34 @@ async fn main() -> Result<(), Box> { } // Publish at all QoS levels and wait for broker acknowledgement - match client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap() - .await - { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), - } - - match client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .await - .unwrap() - .await + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), + match client + .publish("hello/world", qos, false, vec![1; i]) + .await + .unwrap() + .await + { + Ok(pkid) => println!("Acknowledged Pub({pkid})"), + Err(e) => println!("Publish failed: {e:?}"), + } } - match client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) - .await - .unwrap() - .await + // Publish with different QoS levels and spawn wait for notification + let mut set = JoinSet::new(); + for (i, qos) in [QoS::AtMostOnce, QoS::AtLeastOnce, QoS::ExactlyOnce] + .into_iter() + .enumerate() { - Ok(pkid) => println!("Acknowledged Pub({pkid})"), - Err(e) => println!("Publish failed: {e:?}"), + let token = client + .publish("hello/world", qos, false, vec![1; i]) + .await + .unwrap(); + set.spawn(token); } - // Publish and spawn wait for notification - let mut set = JoinSet::new(); - - let future = client - .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1]) - .await - .unwrap(); - set.spawn(async { future.await }); - - let future = client - .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2]) - .await - .unwrap(); - set.spawn(async { future.await }); - - let future = client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3]) - .await - .unwrap(); - set.spawn(async { future.await }); - while let Some(Ok(res)) = set.join_next().await { match res { Ok(pkid) => println!("Acknowledged Pub({pkid})"),