From 19b15a50fd5a539a8a81368b3501c44d77ae2329 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 11 Dec 2024 10:45:53 +0100 Subject: [PATCH] fix: don't emit an error log if pipeline is closed Before, there was no distinction between the pipeline dropping a message because its deadline was reached or because the transport was closed. As a consequence, blocking message which was dropped because of closed transport would still emit an error log. This PR introduce the distinction between the two dropping causes. --- io/zenoh-transport/src/common/pipeline.rs | 47 +++++++++++++------ io/zenoh-transport/src/multicast/mod.rs | 2 +- io/zenoh-transport/src/multicast/tx.rs | 13 ++--- .../src/unicast/universal/transport.rs | 4 +- .../src/unicast/universal/tx.rs | 17 +++---- 5 files changed, 51 insertions(+), 32 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 256b4e760a..d52c7b55c8 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + fmt, ops::Add, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering}, @@ -40,7 +41,7 @@ use zenoh_protocol::{ AtomicBatchSize, BatchSize, TransportMessage, }, }; -use zenoh_sync::{event, Notifier, Waiter}; +use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter}; use super::{ batch::{Encode, WBatch}, @@ -56,6 +57,15 @@ struct StageInRefill { s_ref_r: RingBufferReader, } +#[derive(Debug)] +pub(crate) struct TransportClosed; +impl fmt::Display for TransportClosed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "transport closed") + } +} +impl std::error::Error for TransportClosed {} + impl StageInRefill { fn pull(&mut self) -> Option { self.s_ref_r.pull() @@ -65,8 +75,12 @@ impl StageInRefill { self.n_ref_r.wait().is_ok() } - fn wait_deadline(&self, instant: Instant) -> bool { - self.n_ref_r.wait_deadline(instant).is_ok() + fn wait_deadline(&self, instant: Instant) -> Result { + match self.n_ref_r.wait_deadline(instant) { + Ok(()) => Ok(true), + Err(WaitDeadlineError::Deadline) => Ok(false), + Err(WaitDeadlineError::WaitError) => Err(TransportClosed), + } } } @@ -214,9 +228,9 @@ impl Deadline { } #[inline] - fn wait(&mut self, s_ref: &StageInRefill) -> bool { + fn wait(&mut self, s_ref: &StageInRefill) -> Result { match self.lazy_deadline.deadline() { - DeadlineSetting::Immediate => false, + DeadlineSetting::Immediate => Ok(false), DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), } } @@ -243,7 +257,7 @@ impl StageIn { msg: &mut NetworkMessage, priority: Priority, deadline: &mut Deadline, - ) -> bool { + ) -> Result { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); @@ -264,7 +278,7 @@ impl StageIn { None => { drop(c_guard); // Wait for an available batch until deadline - if !deadline.wait(&self.s_ref) { + if !deadline.wait(&self.s_ref)? { // Still no available batch. // Restore the sequence number and drop the message $($restore_sn)? @@ -272,7 +286,7 @@ impl StageIn { "Zenoh message dropped because it's over the deadline {:?}: {:?}", deadline.lazy_deadline.wait_time, msg ); - return false; + return Ok(false); } c_guard = self.mutex.current(); } @@ -287,13 +301,13 @@ impl StageIn { if !self.batching || $msg.is_express() { // Move out existing batch self.s_out.move_batch($batch); - return true; + return Ok(true); } else { let bytes = $batch.len(); *c_guard = Some($batch); drop(c_guard); self.s_out.notify(bytes); - return true; + return Ok(true); } }}; } @@ -404,7 +418,7 @@ impl StageIn { // Clean the fragbuf self.fragbuf.clear(); - true + Ok(true) } #[inline] @@ -767,7 +781,10 @@ pub(crate) struct TransmissionPipelineProducer { impl TransmissionPipelineProducer { #[inline] - pub(crate) fn push_network_message(&self, mut msg: NetworkMessage) -> bool { + pub(crate) fn push_network_message( + &self, + mut msg: NetworkMessage, + ) -> Result { // If the queue is not QoS, it means that we only have one priority with index 0. let (idx, priority) = if self.stage_in.len() > 1 { let priority = msg.priority(); @@ -1002,7 +1019,7 @@ mod tests { "Pipeline Flow [>>>]: Pushed {} msgs ({payload_size} bytes)", i + 1 ); - queue.push_network_message(message.clone()); + queue.push_network_message(message.clone()).unwrap(); } } @@ -1129,7 +1146,7 @@ mod tests { println!( "Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes" ); - queue.push_network_message(message.clone()); + queue.push_network_message(message.clone()).unwrap(); let c = counter.fetch_add(1, Ordering::AcqRel); println!( "Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes", @@ -1242,7 +1259,7 @@ mod tests { let duration = Duration::from_millis(5_500); let start = Instant::now(); while start.elapsed() < duration { - producer.push_network_message(message.clone()); + producer.push_network_message(message.clone()).unwrap(); } } } diff --git a/io/zenoh-transport/src/multicast/mod.rs b/io/zenoh-transport/src/multicast/mod.rs index fbb656264d..cd76176b50 100644 --- a/io/zenoh-transport/src/multicast/mod.rs +++ b/io/zenoh-transport/src/multicast/mod.rs @@ -113,7 +113,7 @@ impl TransportMulticast { #[inline(always)] pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> { let transport = self.get_transport()?; - transport.schedule(message); + transport.schedule(message)?; Ok(()) } diff --git a/io/zenoh-transport/src/multicast/tx.rs b/io/zenoh-transport/src/multicast/tx.rs index 775131703a..4bd02c125c 100644 --- a/io/zenoh-transport/src/multicast/tx.rs +++ b/io/zenoh-transport/src/multicast/tx.rs @@ -13,6 +13,7 @@ // use zenoh_core::zread; use zenoh_protocol::network::NetworkMessage; +use zenoh_result::ZResult; use super::transport::TransportMulticastInner; #[cfg(feature = "shared-memory")] @@ -20,7 +21,7 @@ use crate::shm::map_zmsg_to_partner; //noinspection ALL impl TransportMulticastInner { - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { + fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult { macro_rules! zpush { ($guard:expr, $pipeline:expr, $msg:expr) => { // Drop the guard before the push_zenoh_message since @@ -28,7 +29,7 @@ impl TransportMulticastInner { // block for fairly long time let pl = $pipeline.clone(); drop($guard); - return pl.push_network_message($msg); + return Ok(pl.push_network_message($msg)?); }; } @@ -47,13 +48,13 @@ impl TransportMulticastInner { } } - false + Ok(false) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled #[allow(clippy::let_and_return)] // When feature "stats" is not enabled #[inline(always)] - pub(super) fn schedule(&self, mut msg: NetworkMessage) -> bool { + pub(super) fn schedule(&self, mut msg: NetworkMessage) -> ZResult { #[cfg(feature = "shared-memory")] { if let Err(e) = map_zmsg_to_partner(&mut msg, &self.shm) { @@ -62,7 +63,7 @@ impl TransportMulticastInner { } } - let res = self.schedule_on_link(msg); + let res = self.schedule_on_link(msg)?; #[cfg(feature = "stats")] if res { @@ -71,6 +72,6 @@ impl TransportMulticastInner { self.stats.inc_tx_n_dropped(1); } - res + Ok(res) } } diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index b95b68d606..29234aed64 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -399,8 +399,8 @@ impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ fn schedule(&self, msg: NetworkMessage) -> ZResult<()> { match self.internal_schedule(msg) { - true => Ok(()), - false => bail!("error scheduling message!"), + Ok(_) => Ok(()), + Err(err) => Err(err.into()), } } diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index 6f61fc070d..9442bdd1c5 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -16,6 +16,7 @@ use zenoh_protocol::{ network::NetworkMessage, transport::close, }; +use zenoh_result::ZResult; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] @@ -68,7 +69,7 @@ impl TransportUnicastUniversal { match_.full.or(match_.partial).or(match_.any) } - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { + fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult { let transport_links = self .links .read() @@ -93,7 +94,7 @@ impl TransportUnicastUniversal { ); // No Link found - return false; + return Ok(false); }; let transport_link = transport_links @@ -112,7 +113,7 @@ impl TransportUnicastUniversal { // block for fairly long time drop(transport_links); let droppable = msg.is_droppable(); - let push = pipeline.push_network_message(msg); + let push = pipeline.push_network_message(msg)?; if !push && !droppable { tracing::error!( "Unable to push non droppable network message to {}. Closing transport!", @@ -131,22 +132,22 @@ impl TransportUnicastUniversal { } }); } - push + Ok(push) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled #[allow(clippy::let_and_return)] // When feature "stats" is not enabled #[inline(always)] - pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool { + pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> ZResult { #[cfg(feature = "shared-memory")] { if let Err(e) = map_zmsg_to_partner(&mut msg, &self.config.shm) { tracing::trace!("Failed SHM conversion: {}", e); - return false; + return Ok(false); } } - let res = self.schedule_on_link(msg); + let res = self.schedule_on_link(msg)?; #[cfg(feature = "stats")] if res { @@ -155,7 +156,7 @@ impl TransportUnicastUniversal { self.stats.inc_tx_n_dropped(1); } - res + Ok(res) } }