Skip to content

Commit

Permalink
restore SN in case of frame drops caused by congestion control
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Mar 13, 2024
1 parent ceb982d commit 6a76771
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl StageIn {
let is_droppable = msg.is_droppable();

macro_rules! zgetbatch_rets {
($fragment:expr) => {
($fragment:expr, $restore_sn:expr) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -142,13 +142,17 @@ impl StageIn {
None => {
drop(c_guard);
if !$fragment && is_droppable {
// Restore the sequence number
$restore_sn;
// We are in the congestion scenario
// The yield is to avoid the writing task to spin
// indefinitely and monopolize the CPU usage.
thread::yield_now();
return false;
} else {
if !self.s_ref.wait() {
// Restore the sequence number
$restore_sn;
return false;
}
}
Expand All @@ -171,7 +175,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!(false);
let mut batch = zgetbatch_rets!(false, {});
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch),
Expand Down Expand Up @@ -201,7 +205,7 @@ impl StageIn {
if !batch.is_empty() {
// Move out existing batch
self.s_out.move_batch(batch);
batch = zgetbatch_rets!(false);
batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap());
}

// Attempt a second serialization on fully empty batch
Expand Down Expand Up @@ -232,7 +236,7 @@ impl StageIn {
while reader.can_read() {
// Get the current serialization batch
// Treat all messages as non-droppable once we start fragmenting
batch = zgetbatch_rets!(true);
batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap());

// Serialize the message fragmnet
match batch.encode((&mut reader, &mut fragment)) {
Expand Down

0 comments on commit 6a76771

Please sign in to comment.