Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore SN in case of frame drops caused by congestion control #815

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl StageIn {
let is_droppable = msg.is_droppable();

macro_rules! zgetbatch_rets {
($fragment:expr) => {
($fragment:expr, $restore_sn:expr) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -142,13 +142,17 @@ impl StageIn {
None => {
drop(c_guard);
if !$fragment && is_droppable {
// Restore the sequence number
$restore_sn;
// We are in the congestion scenario
// The yield is to avoid the writing task to spin
// indefinitely and monopolize the CPU usage.
thread::yield_now();
return false;
} else {
if !self.s_ref.wait() {
// Restore the sequence number
$restore_sn;
return false;
}
}
Expand All @@ -171,7 +175,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!(false);
let mut batch = zgetbatch_rets!(false, {});
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch),
Expand Down Expand Up @@ -201,7 +205,7 @@ impl StageIn {
if !batch.is_empty() {
// Move out existing batch
self.s_out.move_batch(batch);
batch = zgetbatch_rets!(false);
batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap());
}

// Attempt a second serialization on fully empty batch
Expand Down Expand Up @@ -232,7 +236,7 @@ impl StageIn {
while reader.can_read() {
// Get the current serialization batch
// Treat all messages as non-droppable once we start fragmenting
batch = zgetbatch_rets!(true);
batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap());

// Serialize the message fragmnet
match batch.encode((&mut reader, &mut fragment)) {
Expand Down