Skip to content

Commit

Permalink
Merge pull request #1627 from eclipse-zenoh/fix/congestion
Browse files Browse the repository at this point in the history
Improve congestion control
  • Loading branch information
Mallets authored Dec 4, 2024
2 parents 9a73585 + 94b06be commit b8a2979
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
69 changes: 57 additions & 12 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
Arc, Mutex, MutexGuard,
},
time::{Duration, Instant},
Expand Down Expand Up @@ -688,28 +688,62 @@ impl TransmissionPipeline {
});
}

let active = Arc::new(AtomicBool::new(true));
let active = Arc::new(TransmissionPipelineStatus {
disabled: AtomicBool::new(false),
congested: AtomicU8::new(0),
});
let producer = TransmissionPipelineProducer {
stage_in: stage_in.into_boxed_slice().into(),
active: active.clone(),
status: active.clone(),
wait_before_drop: config.wait_before_drop,
wait_before_close: config.wait_before_close,
};
let consumer = TransmissionPipelineConsumer {
stage_out: stage_out.into_boxed_slice(),
n_out_r,
active,
status: active,
};

(producer, consumer)
}
}

struct TransmissionPipelineStatus {
// The whole pipeline is enabled or disabled
disabled: AtomicBool,
// Bitflags to indicate the given priority queue is congested
congested: AtomicU8,
}

impl TransmissionPipelineStatus {
fn set_disabled(&self, status: bool) {
self.disabled.store(status, Ordering::Relaxed);
}

fn is_disabled(&self) -> bool {
self.disabled.load(Ordering::Relaxed)
}

fn set_congested(&self, priority: Priority, status: bool) {
let prioflag = 1 << priority as u8;
if status {
self.congested.fetch_or(prioflag, Ordering::Relaxed);
} else {
self.congested.fetch_and(!prioflag, Ordering::Relaxed);
}
}

fn is_congested(&self, priority: Priority) -> bool {
let prioflag = 1 << priority as u8;
self.congested.load(Ordering::Relaxed) & prioflag != 0
}
}

#[derive(Clone)]
pub(crate) struct TransmissionPipelineProducer {
// Each priority queue has its own Mutex
stage_in: Arc<[Mutex<StageIn>]>,
active: Arc<AtomicBool>,
status: Arc<TransmissionPipelineStatus>,
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
}
Expand All @@ -724,16 +758,25 @@ impl TransmissionPipelineProducer {
} else {
(0, Priority::DEFAULT)
};

// If message is droppable, compute a deadline after which the sample could be dropped
let (wait_time, max_wait_time) = if msg.is_droppable() {
// Checked if we are blocked on the priority queue and we drop directly the message
if self.status.is_congested(priority) {
return false;
}
(self.wait_before_drop.0, Some(self.wait_before_drop.1))
} else {
(self.wait_before_close, None)
};
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
let sent = queue.push_network_message(&mut msg, priority, &mut deadline);
if !sent {
self.status.set_congested(priority, true);
}
sent
}

#[inline]
Expand All @@ -750,7 +793,7 @@ impl TransmissionPipelineProducer {
}

pub(crate) fn disable(&self) {
self.active.store(false, Ordering::Relaxed);
self.status.set_disabled(true);

// Acquire all the locks, in_guard first, out_guard later
// Use the same locking order as in drain to avoid deadlocks
Expand All @@ -768,17 +811,18 @@ pub(crate) struct TransmissionPipelineConsumer {
// A single Mutex for all the priority queues
stage_out: Box<[StageOut]>,
n_out_r: Waiter,
active: Arc<AtomicBool>,
status: Arc<TransmissionPipelineStatus>,
}

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
while self.active.load(Ordering::Relaxed) {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, Priority)> {
while !self.status.is_disabled() {
let mut backoff = MicroSeconds::MAX;
// Calculate the backoff maximum
for (prio, queue) in self.stage_out.iter_mut().enumerate() {
match queue.try_pull() {
Pull::Some(batch) => {
let prio = Priority::try_from(prio as u8).unwrap();
return Some((batch, prio));
}
Pull::Backoff(deadline) => {
Expand Down Expand Up @@ -818,8 +862,9 @@ impl TransmissionPipelineConsumer {
None
}

pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) {
self.stage_out[priority].refill(batch);
pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) {
self.stage_out[priority as usize].refill(batch);
self.status.set_congested(priority, false);
}

pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> {
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,10 @@ async fn tx_task(
link.send_batch(&mut batch).await?;
// Keep track of next SNs
if let Some(sn) = batch.codec.latest_sn.reliable {
last_sns[priority].reliable = sn;
last_sns[priority as usize].reliable = sn;
}
if let Some(sn) = batch.codec.latest_sn.best_effort {
last_sns[priority].best_effort = sn;
last_sns[priority as usize].best_effort = sn;
}
#[cfg(feature = "stats")]
{
Expand Down

0 comments on commit b8a2979

Please sign in to comment.