From 0836fcb6e881edc72a989abaadee9dc162247433 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 3 Dec 2024 16:45:57 +0100 Subject: [PATCH 1/7] Perform early dropping --- examples/examples/z_pub_thr.rs | 2 +- examples/examples/z_sub.rs | 3 +++ io/zenoh-transport/src/common/pipeline.rs | 26 +++++++++++++++++------ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index dc18715e2a..56c6ec4a02 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -43,7 +43,7 @@ fn main() { let publisher = session .declare_publisher("test/thr") - .congestion_control(CongestionControl::Block) + .congestion_control(CongestionControl::Drop) .priority(prio) .express(args.express) .wait() diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 71eba72533..e6771e77f3 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + // // Copyright (c) 2023 ZettaScale Technology // @@ -47,6 +49,7 @@ async fn main() { print!(" ({})", att); } println!(); + tokio::time::sleep(Duration::from_millis(10)).await; } } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 0dbded9209..e7597645bb 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -14,7 +14,7 @@ use std::{ ops::Add, sync::{ - atomic::{AtomicBool, AtomicU32, Ordering}, + atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, @@ -688,7 +688,7 @@ impl TransmissionPipeline { }); } - let active = Arc::new(AtomicBool::new(true)); + let active = Arc::new((AtomicBool::new(true), AtomicU8::new(0))); let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), active: active.clone(), @@ -709,7 +709,7 @@ impl TransmissionPipeline { pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, - active: Arc, + active: Arc<(AtomicBool, AtomicU8)>, wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -724,8 +724,14 @@ impl TransmissionPipelineProducer { } else { (0, Priority::DEFAULT) }; + let prioflag = 1 << priority as u8; + // If message is droppable, compute a deadline after which the sample could be dropped let (wait_time, max_wait_time) = if msg.is_droppable() { + // Checked if we are blocked on the priority queue and we drop directly the message + if self.active.1.load(Ordering::Acquire) & prioflag != 0 { + return false; + } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { (self.wait_before_close, None) @@ -733,7 +739,11 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority, &mut deadline) + let sent = queue.push_network_message(&mut msg, priority, &mut deadline); + if !sent { + self.active.1.fetch_or(prioflag, Ordering::AcqRel); + } + sent } #[inline] @@ -750,7 +760,7 @@ impl TransmissionPipelineProducer { } pub(crate) fn disable(&self) { - self.active.store(false, Ordering::Relaxed); + self.active.0.store(false, Ordering::Relaxed); // Acquire all the locks, in_guard first, out_guard later // Use the same locking order as in drain to avoid deadlocks @@ -768,12 +778,12 @@ pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, n_out_r: Waiter, - active: Arc, + active: Arc<(AtomicBool, AtomicU8)>, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - while self.active.load(Ordering::Relaxed) { + while self.active.0.load(Ordering::Relaxed) { let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum for (prio, queue) in self.stage_out.iter_mut().enumerate() { @@ -820,6 +830,8 @@ impl TransmissionPipelineConsumer { pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { self.stage_out[priority].refill(batch); + let prioflag = 1 << priority as u8; + self.active.1.fetch_and(!prioflag, Ordering::AcqRel); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { From 528ec4932b797f156f6c03bcd44b0ff768d7ae1a Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 11:15:24 +0100 Subject: [PATCH 2/7] Example slow subscriber --- examples/examples/z_sub.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index e6771e77f3..a73f4ea3ae 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -38,18 +38,21 @@ async fn main() { .try_to_string() .unwrap_or_else(|e| e.to_string().into()); - print!( - ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ); - if let Some(att) = sample.attachment() { - let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); - print!(" ({})", att); - } - println!(); - tokio::time::sleep(Duration::from_millis(10)).await; + // print!( + // ">> [Subscriber] Received {} ('{}': '{}')", + // sample.kind(), + // sample.key_expr().as_str(), + // payload + // ); + // if let Some(att) = sample.attachment() { + // let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); + // print!(" ({})", att); + // } + // println!(); + print!("."); + use std::io::Write; + std::io::stdout().flush().unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; } } From 30341ff4affd38f353358fab4c3e2c6542ba15a8 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 13:30:59 +0100 Subject: [PATCH 3/7] TransmissionPipelineStatus --- io/zenoh-transport/src/common/pipeline.rs | 56 ++++++++++++++++++----- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index e7597645bb..f5eb033eb9 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -688,28 +688,62 @@ impl TransmissionPipeline { }); } - let active = Arc::new((AtomicBool::new(true), AtomicU8::new(0))); + let active = Arc::new(TransmissionPipelineStatus { + disabled: AtomicBool::new(false), + congested: AtomicU8::new(0), + }); let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), - active: active.clone(), + status: active.clone(), wait_before_drop: config.wait_before_drop, wait_before_close: config.wait_before_close, }; let consumer = TransmissionPipelineConsumer { stage_out: stage_out.into_boxed_slice(), n_out_r, - active, + status: active, }; (producer, consumer) } } +struct TransmissionPipelineStatus { + // The whole pipeline is enabled or disabled + disabled: AtomicBool, + // Bitflags to indicate the given priority queue is congested + congested: AtomicU8, +} + +impl TransmissionPipelineStatus { + fn set_disabled(&self, status: bool) { + self.disabled.store(status, Ordering::Relaxed); + } + + fn is_disabled(&self) -> bool { + self.disabled.load(Ordering::Relaxed) + } + + fn set_congested(&self, priority: Priority, status: bool) { + let prioflag = 1 << priority as u8; + if status { + self.congested.fetch_or(prioflag, Ordering::Relaxed); + } else { + self.congested.fetch_and(!prioflag, Ordering::Relaxed); + } + } + + fn is_congested(&self, priority: Priority) -> bool { + let prioflag = 1 << priority as u8; + self.congested.load(Ordering::Relaxed) & prioflag != 0 + } +} + #[derive(Clone)] pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, - active: Arc<(AtomicBool, AtomicU8)>, + status: Arc, wait_before_drop: (Duration, Duration), wait_before_close: Duration, } @@ -724,12 +758,11 @@ impl TransmissionPipelineProducer { } else { (0, Priority::DEFAULT) }; - let prioflag = 1 << priority as u8; // If message is droppable, compute a deadline after which the sample could be dropped let (wait_time, max_wait_time) = if msg.is_droppable() { // Checked if we are blocked on the priority queue and we drop directly the message - if self.active.1.load(Ordering::Acquire) & prioflag != 0 { + if self.status.is_congested(priority) { return false; } (self.wait_before_drop.0, Some(self.wait_before_drop.1)) @@ -741,7 +774,7 @@ impl TransmissionPipelineProducer { let mut queue = zlock!(self.stage_in[idx]); let sent = queue.push_network_message(&mut msg, priority, &mut deadline); if !sent { - self.active.1.fetch_or(prioflag, Ordering::AcqRel); + self.status.set_congested(priority, true); } sent } @@ -760,7 +793,7 @@ impl TransmissionPipelineProducer { } pub(crate) fn disable(&self) { - self.active.0.store(false, Ordering::Relaxed); + self.status.set_disabled(true); // Acquire all the locks, in_guard first, out_guard later // Use the same locking order as in drain to avoid deadlocks @@ -778,12 +811,12 @@ pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, n_out_r: Waiter, - active: Arc<(AtomicBool, AtomicU8)>, + status: Arc, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - while self.active.0.load(Ordering::Relaxed) { + while !self.status.is_disabled() { let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum for (prio, queue) in self.stage_out.iter_mut().enumerate() { @@ -830,8 +863,9 @@ impl TransmissionPipelineConsumer { pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { self.stage_out[priority].refill(batch); + // Reset the priority congested flag let prioflag = 1 << priority as u8; - self.active.1.fetch_and(!prioflag, Ordering::AcqRel); + self.status.congested.fetch_and(!prioflag, Ordering::AcqRel); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { From bceabb137d22e7f962d8b4ebc400bfdf211334f7 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 13:40:01 +0100 Subject: [PATCH 4/7] Restore examples --- examples/examples/z_pub_thr.rs | 2 +- examples/examples/z_sub.rs | 28 +++++++++++----------------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index 56c6ec4a02..dc18715e2a 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -43,7 +43,7 @@ fn main() { let publisher = session .declare_publisher("test/thr") - .congestion_control(CongestionControl::Drop) + .congestion_control(CongestionControl::Block) .priority(prio) .express(args.express) .wait() diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index a73f4ea3ae..71eba72533 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - // // Copyright (c) 2023 ZettaScale Technology // @@ -38,21 +36,17 @@ async fn main() { .try_to_string() .unwrap_or_else(|e| e.to_string().into()); - // print!( - // ">> [Subscriber] Received {} ('{}': '{}')", - // sample.kind(), - // sample.key_expr().as_str(), - // payload - // ); - // if let Some(att) = sample.attachment() { - // let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); - // print!(" ({})", att); - // } - // println!(); - print!("."); - use std::io::Write; - std::io::stdout().flush().unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + print!( + ">> [Subscriber] Received {} ('{}': '{}')", + sample.kind(), + sample.key_expr().as_str(), + payload + ); + if let Some(att) = sample.attachment() { + let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); + print!(" ({})", att); + } + println!(); } } From 5b24ff5d4e1bb57cc8bcfa1a36b0b98ebde7964c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 14:05:09 +0100 Subject: [PATCH 5/7] Use set_congested function --- examples/examples/z_sub.rs | 35 ++++++++++++----------- io/zenoh-transport/src/common/pipeline.rs | 4 +-- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 71eba72533..13b44d0267 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + // // Copyright (c) 2023 ZettaScale Technology // @@ -30,23 +32,24 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { - // Refer to z_bytes.rs to see how to deserialize different types of message - let payload = sample - .payload() - .try_to_string() - .unwrap_or_else(|e| e.to_string().into()); + tokio::time::sleep(Duration::from_millis(100)).await; + // // Refer to z_bytes.rs to see how to deserialize different types of message + // let payload = sample + // .payload() + // .try_to_string() + // .unwrap_or_else(|e| e.to_string().into()); - print!( - ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ); - if let Some(att) = sample.attachment() { - let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); - print!(" ({})", att); - } - println!(); + // print!( + // ">> [Subscriber] Received {} ('{}': '{}')", + // sample.kind(), + // sample.key_expr().as_str(), + // payload + // ); + // if let Some(att) = sample.attachment() { + // let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); + // print!(" ({})", att); + // } + // println!(); } } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index f5eb033eb9..d8af20e990 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -863,9 +863,7 @@ impl TransmissionPipelineConsumer { pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { self.stage_out[priority].refill(batch); - // Reset the priority congested flag - let prioflag = 1 << priority as u8; - self.status.congested.fetch_and(!prioflag, Ordering::AcqRel); + self.status.set_congested(priority, false); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { From 7e66014aecffcda8841500194bdec4c512b5c442 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 14:08:41 +0100 Subject: [PATCH 6/7] Fix Priority usage in TransmissionPipeline API --- io/zenoh-transport/src/common/pipeline.rs | 7 ++++--- io/zenoh-transport/src/multicast/link.rs | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index d8af20e990..c7d401b0d1 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -815,13 +815,14 @@ pub(crate) struct TransmissionPipelineConsumer { } impl TransmissionPipelineConsumer { - pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { + pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> { while !self.status.is_disabled() { let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum for (prio, queue) in self.stage_out.iter_mut().enumerate() { match queue.try_pull() { Pull::Some(batch) => { + let prio = Priority::try_from(prio as u8).unwrap(); return Some((batch, prio)); } Pull::Backoff(deadline) => { @@ -861,8 +862,8 @@ impl TransmissionPipelineConsumer { None } - pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { - self.stage_out[priority].refill(batch); + pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { + self.stage_out[priority as usize].refill(batch); self.status.set_congested(priority, false); } diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index c4c23290ee..61c6f36ece 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -431,10 +431,10 @@ async fn tx_task( link.send_batch(&mut batch).await?; // Keep track of next SNs if let Some(sn) = batch.codec.latest_sn.reliable { - last_sns[priority].reliable = sn; + last_sns[priority as usize].reliable = sn; } if let Some(sn) = batch.codec.latest_sn.best_effort { - last_sns[priority].best_effort = sn; + last_sns[priority as usize].best_effort = sn; } #[cfg(feature = "stats")] { From 94b06bea6f8e754fc204d22606d38653db92be22 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 14:09:29 +0100 Subject: [PATCH 7/7] Reset examples --- examples/examples/z_sub.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 13b44d0267..71eba72533 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - // // Copyright (c) 2023 ZettaScale Technology // @@ -32,24 +30,23 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { - tokio::time::sleep(Duration::from_millis(100)).await; - // // Refer to z_bytes.rs to see how to deserialize different types of message - // let payload = sample - // .payload() - // .try_to_string() - // .unwrap_or_else(|e| e.to_string().into()); + // Refer to z_bytes.rs to see how to deserialize different types of message + let payload = sample + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); - // print!( - // ">> [Subscriber] Received {} ('{}': '{}')", - // sample.kind(), - // sample.key_expr().as_str(), - // payload - // ); - // if let Some(att) = sample.attachment() { - // let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); - // print!(" ({})", att); - // } - // println!(); + print!( + ">> [Subscriber] Received {} ('{}': '{}')", + sample.kind(), + sample.key_expr().as_str(), + payload + ); + if let Some(att) = sample.attachment() { + let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); + print!(" ({})", att); + } + println!(); } }