From 6a76771676d0fe45cd5680a4819673e690fd7427 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 13 Mar 2024 11:31:42 +0300 Subject: [PATCH] restore SN in case of frame drops caused by congestion control --- io/zenoh-transport/src/common/pipeline.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 954c656280..d3a80af34b 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -130,7 +130,7 @@ impl StageIn { let is_droppable = msg.is_droppable(); macro_rules! zgetbatch_rets { - ($fragment:expr) => { + ($fragment:expr, $restore_sn:expr) => { loop { match c_guard.take() { Some(batch) => break batch, @@ -142,6 +142,8 @@ impl StageIn { None => { drop(c_guard); if !$fragment && is_droppable { + // Restore the sequence number + $restore_sn; // We are in the congestion scenario // The yield is to avoid the writing task to spin // indefinitely and monopolize the CPU usage. @@ -149,6 +151,8 @@ impl StageIn { return false; } else { if !self.s_ref.wait() { + // Restore the sequence number + $restore_sn; return false; } } @@ -171,7 +175,7 @@ impl StageIn { } // Get the current serialization batch. - let mut batch = zgetbatch_rets!(false); + let mut batch = zgetbatch_rets!(false, {}); // Attempt the serialization on the current batch let e = match batch.encode(&*msg) { Ok(_) => zretok!(batch), @@ -201,7 +205,7 @@ impl StageIn { if !batch.is_empty() { // Move out existing batch self.s_out.move_batch(batch); - batch = zgetbatch_rets!(false); + batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap()); } // Attempt a second serialization on fully empty batch @@ -232,7 +236,7 @@ impl StageIn { while reader.can_read() { // Get the current serialization batch // Treat all messages as non-droppable once we start fragmenting - batch = zgetbatch_rets!(true); + batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap()); // Serialize the message fragmnet match batch.encode((&mut reader, &mut fragment)) {