Skip to content

Commit

Permalink
fix: don't emit an error log if pipeline is closed
Browse files Browse the repository at this point in the history
Before, there was no distinction between the pipeline dropping a
message because its deadline was reached or because the transport was
closed.
As a consequence, blocking message which was dropped because of
closed transport would still emit an error log. This PR introduce
the distinction between the two dropping causes.
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent fb2d2bc commit 19b15a5
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 32 deletions.
47 changes: 32 additions & 15 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
fmt,
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
Expand Down Expand Up @@ -40,7 +41,7 @@ use zenoh_protocol::{
AtomicBatchSize, BatchSize, TransportMessage,
},
};
use zenoh_sync::{event, Notifier, Waiter};
use zenoh_sync::{event, Notifier, WaitDeadlineError, Waiter};

use super::{
batch::{Encode, WBatch},
Expand All @@ -56,6 +57,15 @@ struct StageInRefill {
s_ref_r: RingBufferReader<WBatch, RBLEN>,
}

#[derive(Debug)]
pub(crate) struct TransportClosed;
impl fmt::Display for TransportClosed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "transport closed")
}
}
impl std::error::Error for TransportClosed {}

impl StageInRefill {
fn pull(&mut self) -> Option<WBatch> {
self.s_ref_r.pull()
Expand All @@ -65,8 +75,12 @@ impl StageInRefill {
self.n_ref_r.wait().is_ok()
}

fn wait_deadline(&self, instant: Instant) -> bool {
self.n_ref_r.wait_deadline(instant).is_ok()
fn wait_deadline(&self, instant: Instant) -> Result<bool, TransportClosed> {
match self.n_ref_r.wait_deadline(instant) {
Ok(()) => Ok(true),
Err(WaitDeadlineError::Deadline) => Ok(false),
Err(WaitDeadlineError::WaitError) => Err(TransportClosed),
}
}
}

Expand Down Expand Up @@ -214,9 +228,9 @@ impl Deadline {
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
fn wait(&mut self, s_ref: &StageInRefill) -> Result<bool, TransportClosed> {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Immediate => Ok(false),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}
Expand All @@ -243,7 +257,7 @@ impl StageIn {
msg: &mut NetworkMessage,
priority: Priority,
deadline: &mut Deadline,
) -> bool {
) -> Result<bool, TransportClosed> {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

Expand All @@ -264,15 +278,15 @@ impl StageIn {
None => {
drop(c_guard);
// Wait for an available batch until deadline
if !deadline.wait(&self.s_ref) {
if !deadline.wait(&self.s_ref)? {
// Still no available batch.
// Restore the sequence number and drop the message
$($restore_sn)?
tracing::trace!(
"Zenoh message dropped because it's over the deadline {:?}: {:?}",
deadline.lazy_deadline.wait_time, msg
);
return false;
return Ok(false);
}
c_guard = self.mutex.current();
}
Expand All @@ -287,13 +301,13 @@ impl StageIn {
if !self.batching || $msg.is_express() {
// Move out existing batch
self.s_out.move_batch($batch);
return true;
return Ok(true);
} else {
let bytes = $batch.len();
*c_guard = Some($batch);
drop(c_guard);
self.s_out.notify(bytes);
return true;
return Ok(true);
}
}};
}
Expand Down Expand Up @@ -404,7 +418,7 @@ impl StageIn {
// Clean the fragbuf
self.fragbuf.clear();

true
Ok(true)
}

#[inline]
Expand Down Expand Up @@ -767,7 +781,10 @@ pub(crate) struct TransmissionPipelineProducer {

impl TransmissionPipelineProducer {
#[inline]
pub(crate) fn push_network_message(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn push_network_message(
&self,
mut msg: NetworkMessage,
) -> Result<bool, TransportClosed> {
// If the queue is not QoS, it means that we only have one priority with index 0.
let (idx, priority) = if self.stage_in.len() > 1 {
let priority = msg.priority();
Expand Down Expand Up @@ -1002,7 +1019,7 @@ mod tests {
"Pipeline Flow [>>>]: Pushed {} msgs ({payload_size} bytes)",
i + 1
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
}
}

Expand Down Expand Up @@ -1129,7 +1146,7 @@ mod tests {
println!(
"Pipeline Blocking [>>>]: ({id}) Scheduling message #{i} with payload size of {payload_size} bytes"
);
queue.push_network_message(message.clone());
queue.push_network_message(message.clone()).unwrap();
let c = counter.fetch_add(1, Ordering::AcqRel);
println!(
"Pipeline Blocking [>>>]: ({}) Scheduled message #{} (tot {}) with payload size of {} bytes",
Expand Down Expand Up @@ -1242,7 +1259,7 @@ mod tests {
let duration = Duration::from_millis(5_500);
let start = Instant::now();
while start.elapsed() < duration {
producer.push_network_message(message.clone());
producer.push_network_message(message.clone()).unwrap();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/multicast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TransportMulticast {
#[inline(always)]
pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> {
let transport = self.get_transport()?;
transport.schedule(message);
transport.schedule(message)?;
Ok(())
}

Expand Down
13 changes: 7 additions & 6 deletions io/zenoh-transport/src/multicast/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@
//
use zenoh_core::zread;
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;

use super::transport::TransportMulticastInner;
#[cfg(feature = "shared-memory")]
use crate::shm::map_zmsg_to_partner;

//noinspection ALL
impl TransportMulticastInner {
fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
macro_rules! zpush {
($guard:expr, $pipeline:expr, $msg:expr) => {
// Drop the guard before the push_zenoh_message since
// the link could be congested and this operation could
// block for fairly long time
let pl = $pipeline.clone();
drop($guard);
return pl.push_network_message($msg);
return Ok(pl.push_network_message($msg)?);
};
}

Expand All @@ -47,13 +48,13 @@ impl TransportMulticastInner {
}
}

false
Ok(false)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> bool {
pub(super) fn schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.shm) {
Expand All @@ -62,7 +63,7 @@ impl TransportMulticastInner {
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -71,6 +72,6 @@ impl TransportMulticastInner {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ impl TransportUnicastTrait for TransportUnicastUniversal {
/*************************************/
fn schedule(&self, msg: NetworkMessage) -> ZResult<()> {
match self.internal_schedule(msg) {
true => Ok(()),
false => bail!("error scheduling message!"),
Ok(_) => Ok(()),
Err(err) => Err(err.into()),
}
}

Expand Down
17 changes: 9 additions & 8 deletions io/zenoh-transport/src/unicast/universal/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use zenoh_protocol::{
network::NetworkMessage,
transport::close,
};
use zenoh_result::ZResult;

use super::transport::TransportUnicastUniversal;
#[cfg(feature = "shared-memory")]
Expand Down Expand Up @@ -68,7 +69,7 @@ impl TransportUnicastUniversal {
match_.full.or(match_.partial).or(match_.any)
}

fn schedule_on_link(&self, msg: NetworkMessage) -> bool {
fn schedule_on_link(&self, msg: NetworkMessage) -> ZResult<bool> {
let transport_links = self
.links
.read()
Expand All @@ -93,7 +94,7 @@ impl TransportUnicastUniversal {
);

// No Link found
return false;
return Ok(false);
};

let transport_link = transport_links
Expand All @@ -112,7 +113,7 @@ impl TransportUnicastUniversal {
// block for fairly long time
drop(transport_links);
let droppable = msg.is_droppable();
let push = pipeline.push_network_message(msg);
let push = pipeline.push_network_message(msg)?;
if !push && !droppable {
tracing::error!(
"Unable to push non droppable network message to {}. Closing transport!",
Expand All @@ -131,22 +132,22 @@ impl TransportUnicastUniversal {
}
});
}
push
Ok(push)
}

#[allow(unused_mut)] // When feature "shared-memory" is not enabled
#[allow(clippy::let_and_return)] // When feature "stats" is not enabled
#[inline(always)]
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> bool {
pub(crate) fn internal_schedule(&self, mut msg: NetworkMessage) -> ZResult<bool> {
#[cfg(feature = "shared-memory")]
{
if let Err(e) = map_zmsg_to_partner(&mut msg, &self.config.shm) {
tracing::trace!("Failed SHM conversion: {}", e);
return false;
return Ok(false);
}
}

let res = self.schedule_on_link(msg);
let res = self.schedule_on_link(msg)?;

#[cfg(feature = "stats")]
if res {
Expand All @@ -155,7 +156,7 @@ impl TransportUnicastUniversal {
self.stats.inc_tx_n_dropped(1);
}

res
Ok(res)
}
}

Expand Down

0 comments on commit 19b15a5

Please sign in to comment.