From 5b24ff5d4e1bb57cc8bcfa1a36b0b98ebde7964c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Dec 2024 14:05:09 +0100 Subject: [PATCH] Use set_congested function --- examples/examples/z_sub.rs | 35 ++++++++++++----------- io/zenoh-transport/src/common/pipeline.rs | 4 +-- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 71eba72533..13b44d0267 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + // // Copyright (c) 2023 ZettaScale Technology // @@ -30,23 +32,24 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { - // Refer to z_bytes.rs to see how to deserialize different types of message - let payload = sample - .payload() - .try_to_string() - .unwrap_or_else(|e| e.to_string().into()); + tokio::time::sleep(Duration::from_millis(100)).await; + // // Refer to z_bytes.rs to see how to deserialize different types of message + // let payload = sample + // .payload() + // .try_to_string() + // .unwrap_or_else(|e| e.to_string().into()); - print!( - ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ); - if let Some(att) = sample.attachment() { - let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); - print!(" ({})", att); - } - println!(); + // print!( + // ">> [Subscriber] Received {} ('{}': '{}')", + // sample.kind(), + // sample.key_expr().as_str(), + // payload + // ); + // if let Some(att) = sample.attachment() { + // let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into()); + // print!(" ({})", att); + // } + // println!(); } } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index f5eb033eb9..d8af20e990 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -863,9 +863,7 @@ impl TransmissionPipelineConsumer { pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { self.stage_out[priority].refill(batch); - // Reset the priority congested flag - let prioflag = 1 << priority as u8; - self.status.congested.fetch_and(!prioflag, Ordering::AcqRel); + self.status.set_congested(priority, false); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> {