From 8eba5d29e4ee466e8655ce9f17dd8e30aaafab29 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 7 Jun 2024 10:35:19 +0200 Subject: [PATCH 1/4] Yield task for backoff --- io/zenoh-transport/src/common/pipeline.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 832cabd207..fff0913567 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -674,6 +674,9 @@ impl TransmissionPipelineConsumer { } } + // Since tokio::timeout may not sleep immediately, yield the task first. + tokio::task::yield_now().await; + // Wait for the backoff to expire or for a new message let _ = tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async()) From 4c83e526344e9481cb3134ae768a6089f0d7be01 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 7 Jun 2024 12:13:49 +0200 Subject: [PATCH 2/4] Improve comments and error handling in backoff --- io/zenoh-transport/src/common/pipeline.rs | 32 +++++++++++++++++------ 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index fff0913567..1be21ebea7 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -383,7 +383,7 @@ impl Backoff { } } - fn stop(&mut self) { + fn reset(&mut self) { self.retry_time = 0; self.backoff.store(false, Ordering::Relaxed); } @@ -400,7 +400,6 @@ impl StageOutIn { #[inline] fn try_pull(&mut self) -> Pull { if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); return Pull::Some(batch); } @@ -418,18 +417,15 @@ impl StageOutIn { if let Ok(mut g) = self.current.try_lock() { // First try to pull from stage OUT if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); return Pull::Some(batch); } // An incomplete (non-empty) batch is available in the state IN pipeline. match g.take() { Some(batch) => { - self.backoff.stop(); return Pull::Some(batch); } None => { - self.backoff.stop(); return Pull::None; } } @@ -439,7 +435,6 @@ impl StageOutIn { std::cmp::Ordering::Less => { // There should be a new batch in Stage OUT if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); return Pull::Some(batch); } // Go to backoff @@ -657,6 +652,11 @@ pub(crate) struct TransmissionPipelineConsumer { impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { + // Reset backoff before pulling + for queue in self.stage_out.iter_mut() { + queue.s_in.backoff.reset(); + } + while self.active.load(Ordering::Relaxed) { // Calculate the backoff maximum let mut bo = NanoSeconds::MAX; @@ -674,13 +674,29 @@ impl TransmissionPipelineConsumer { } } - // Since tokio::timeout may not sleep immediately, yield the task first. + // In case of writing many small messages, `recv_async()` will most likely return immedietaly. + // While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to + // a spinning behaviour while attempting to take the lock. Yield the current task to avoid + // spinning the current task indefinitely. tokio::task::yield_now().await; // Wait for the backoff to expire or for a new message - let _ = + let res = tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async()) .await; + match res { + Ok(Ok(())) => { + // We have received a notification from the channel that some bytes are available, retry to pull. + } + Ok(Err(_channel_error)) => { + // The channel is closed, we can't be notified anymore. Break the loop and return None. + break; + } + Err(_timeout) => { + // The backoff timeout expired. Be aware that tokio timeout may not sleep for short duration since + // it has time resolution of 1ms: https://docs.rs/tokio/latest/tokio/time/fn.sleep.html + } + } } None } From 73cf5ed7743b8f6db7ece32d36cea028c90a637f Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 7 Jun 2024 12:45:56 +0200 Subject: [PATCH 3/4] Simplify pipeline pull --- io/zenoh-transport/src/common/pipeline.rs | 41 +++++++++-------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 1be21ebea7..dbe6c30703 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -411,37 +411,26 @@ impl StageOutIn { let old_bytes = self.backoff.last_bytes; self.backoff.last_bytes = new_bytes; - match new_bytes.cmp(&old_bytes) { - std::cmp::Ordering::Equal => { - // No new bytes have been written on the batch, try to pull - if let Ok(mut g) = self.current.try_lock() { - // First try to pull from stage OUT - if let Some(batch) = self.s_out_r.pull() { + if new_bytes == old_bytes { + // It seems no new bytes have been written on the batch, try to pull + if let Ok(mut g) = self.current.try_lock() { + // First try to pull from stage OUT to make sure we are not in the case + // where new_bytes == old_bytes are because of two identical serializations + if let Some(batch) = self.s_out_r.pull() { + return Pull::Some(batch); + } + + // An incomplete (non-empty) batch may be available in the state IN pipeline. + match g.take() { + Some(batch) => { return Pull::Some(batch); } - - // An incomplete (non-empty) batch is available in the state IN pipeline. - match g.take() { - Some(batch) => { - return Pull::Some(batch); - } - None => { - return Pull::None; - } + None => { + return Pull::None; } } - // Go to backoff - } - std::cmp::Ordering::Less => { - // There should be a new batch in Stage OUT - if let Some(batch) = self.s_out_r.pull() { - return Pull::Some(batch); - } - // Go to backoff - } - std::cmp::Ordering::Greater => { - // Go to backoff } + // Go to backoff } // Do backoff From bd08a095474a388a585b78e349eed76b7bb65a04 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 7 Jun 2024 15:20:31 +0200 Subject: [PATCH 4/4] Consider backoff configuration --- io/zenoh-transport/src/common/pipeline.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index dbe6c30703..e3a4068b2d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -49,7 +49,6 @@ use zenoh_protocol::{ type NanoSeconds = u32; const RBLEN: usize = QueueSizeConf::MAX; -const TSLOT: NanoSeconds = 100; // Inner structure to reuse serialization batches struct StageInRefill { @@ -347,6 +346,7 @@ enum Pull { // Inner structure to keep track and signal backoff operations #[derive(Clone)] struct Backoff { + tslot: NanoSeconds, retry_time: NanoSeconds, last_bytes: BatchSize, bytes: Arc, @@ -354,8 +354,9 @@ struct Backoff { } impl Backoff { - fn new(bytes: Arc, backoff: Arc) -> Self { + fn new(tslot: NanoSeconds, bytes: Arc, backoff: Arc) -> Self { Self { + tslot, retry_time: 0, last_bytes: 0, bytes, @@ -365,7 +366,7 @@ impl Backoff { fn next(&mut self) { if self.retry_time == 0 { - self.retry_time = TSLOT; + self.retry_time = self.tslot; self.backoff.store(true, Ordering::Relaxed); } else { match self.retry_time.checked_mul(2) { @@ -553,7 +554,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(bytes, backoff), + backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, });