diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 832cabd207..e3a4068b2d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -49,7 +49,6 @@ use zenoh_protocol::{ type NanoSeconds = u32; const RBLEN: usize = QueueSizeConf::MAX; -const TSLOT: NanoSeconds = 100; // Inner structure to reuse serialization batches struct StageInRefill { @@ -347,6 +346,7 @@ enum Pull { // Inner structure to keep track and signal backoff operations #[derive(Clone)] struct Backoff { + tslot: NanoSeconds, retry_time: NanoSeconds, last_bytes: BatchSize, bytes: Arc, @@ -354,8 +354,9 @@ struct Backoff { } impl Backoff { - fn new(bytes: Arc, backoff: Arc) -> Self { + fn new(tslot: NanoSeconds, bytes: Arc, backoff: Arc) -> Self { Self { + tslot, retry_time: 0, last_bytes: 0, bytes, @@ -365,7 +366,7 @@ impl Backoff { fn next(&mut self) { if self.retry_time == 0 { - self.retry_time = TSLOT; + self.retry_time = self.tslot; self.backoff.store(true, Ordering::Relaxed); } else { match self.retry_time.checked_mul(2) { @@ -383,7 +384,7 @@ impl Backoff { } } - fn stop(&mut self) { + fn reset(&mut self) { self.retry_time = 0; self.backoff.store(false, Ordering::Relaxed); } @@ -400,7 +401,6 @@ impl StageOutIn { #[inline] fn try_pull(&mut self) -> Pull { if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); return Pull::Some(batch); } @@ -412,41 +412,26 @@ impl StageOutIn { let old_bytes = self.backoff.last_bytes; self.backoff.last_bytes = new_bytes; - match new_bytes.cmp(&old_bytes) { - std::cmp::Ordering::Equal => { - // No new bytes have been written on the batch, try to pull - if let Ok(mut g) = self.current.try_lock() { - // First try to pull from stage OUT - if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); + if new_bytes == old_bytes { + // It seems no new bytes have been written on the batch, try to pull + if let Ok(mut g) = self.current.try_lock() { + // First try to pull from stage OUT to make sure we are not in the case + // where new_bytes == old_bytes are because of two identical serializations + if let Some(batch) = self.s_out_r.pull() { + return Pull::Some(batch); + } + + // An incomplete (non-empty) batch may be available in the state IN pipeline. + match g.take() { + Some(batch) => { return Pull::Some(batch); } - - // An incomplete (non-empty) batch is available in the state IN pipeline. - match g.take() { - Some(batch) => { - self.backoff.stop(); - return Pull::Some(batch); - } - None => { - self.backoff.stop(); - return Pull::None; - } + None => { + return Pull::None; } } - // Go to backoff - } - std::cmp::Ordering::Less => { - // There should be a new batch in Stage OUT - if let Some(batch) = self.s_out_r.pull() { - self.backoff.stop(); - return Pull::Some(batch); - } - // Go to backoff - } - std::cmp::Ordering::Greater => { - // Go to backoff } + // Go to backoff } // Do backoff @@ -569,7 +554,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(bytes, backoff), + backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, }); @@ -657,6 +642,11 @@ pub(crate) struct TransmissionPipelineConsumer { impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { + // Reset backoff before pulling + for queue in self.stage_out.iter_mut() { + queue.s_in.backoff.reset(); + } + while self.active.load(Ordering::Relaxed) { // Calculate the backoff maximum let mut bo = NanoSeconds::MAX; @@ -674,10 +664,29 @@ impl TransmissionPipelineConsumer { } } + // In case of writing many small messages, `recv_async()` will most likely return immedietaly. + // While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to + // a spinning behaviour while attempting to take the lock. Yield the current task to avoid + // spinning the current task indefinitely. + tokio::task::yield_now().await; + // Wait for the backoff to expire or for a new message - let _ = + let res = tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async()) .await; + match res { + Ok(Ok(())) => { + // We have received a notification from the channel that some bytes are available, retry to pull. + } + Ok(Err(_channel_error)) => { + // The channel is closed, we can't be notified anymore. Break the loop and return None. + break; + } + Err(_timeout) => { + // The backoff timeout expired. Be aware that tokio timeout may not sleep for short duration since + // it has time resolution of 1ms: https://docs.rs/tokio/latest/tokio/time/fn.sleep.html + } + } } None }