Skip to content

Commit

Permalink
Use set_congested function
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Dec 4, 2024
1 parent bceabb1 commit 5b24ff5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
35 changes: 19 additions & 16 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -30,23 +32,24 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
tokio::time::sleep(Duration::from_millis(100)).await;
// // Refer to z_bytes.rs to see how to deserialize different types of message
// let payload = sample
// .payload()
// .try_to_string()
// .unwrap_or_else(|e| e.to_string().into());

print!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
if let Some(att) = sample.attachment() {
let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into());
print!(" ({})", att);
}
println!();
// print!(
// ">> [Subscriber] Received {} ('{}': '{}')",
// sample.kind(),
// sample.key_expr().as_str(),
// payload
// );
// if let Some(att) = sample.attachment() {
// let att = att.try_to_string().unwrap_or_else(|e| e.to_string().into());
// print!(" ({})", att);
// }
// println!();
}
}

Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,7 @@ impl TransmissionPipelineConsumer {

pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) {
self.stage_out[priority].refill(batch);
// Reset the priority congested flag
let prioflag = 1 << priority as u8;
self.status.congested.fetch_and(!prioflag, Ordering::AcqRel);
self.status.set_congested(priority, false);
}

pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> {
Expand Down

0 comments on commit 5b24ff5

Please sign in to comment.