diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index fc30abce9..73b2b4869 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -39,6 +39,8 @@ where more, sn, ext_qos, + ext_first, + ext_drop, } = x; // Header @@ -49,7 +51,10 @@ where if *more { header |= flag::M; } - if ext_qos != &ext::QoSType::DEFAULT { + let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8 + + ext_first.is_some() as u8 + + ext_drop.is_some() as u8; + if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; @@ -59,7 +64,16 @@ where // Extensions if ext_qos != &ext::QoSType::DEFAULT { - self.write(&mut *writer, (*ext_qos, false))?; + n_exts -= 1; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; + } + if let Some(first) = ext_first { + n_exts -= 1; + self.write(&mut *writer, (first, n_exts != 0))? + } + if let Some(drop) = ext_drop { + n_exts -= 1; + self.write(&mut *writer, (drop, n_exts != 0))? } Ok(()) @@ -99,6 +113,8 @@ where // Extensions let mut ext_qos = ext::QoSType::DEFAULT; + let mut ext_first = None; + let mut ext_drop = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -110,6 +126,16 @@ where ext_qos = q; has_ext = ext; } + ext::First::ID => { + let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?; + ext_first = Some(first); + has_ext = ext; + } + ext::Drop::ID => { + let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?; + ext_drop = Some(drop); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Fragment", ext)?; } @@ -121,6 +147,8 @@ where more, sn, ext_qos, + ext_first, + ext_drop, }) } } @@ -139,6 +167,8 @@ where sn, payload, ext_qos, + ext_first, + ext_drop, } = x; // Header @@ -147,6 +177,8 @@ where more: *more, sn: *sn, ext_qos: *ext_qos, + ext_first: *ext_first, + ext_drop: *ext_drop, }; self.write(&mut *writer, &header)?; @@ -185,6 +217,8 @@ where more: header.more, sn: header.sn, ext_qos: header.ext_qos, + ext_first: header.ext_first, + ext_drop: header.ext_drop, payload, }) } diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index dd47dc2c2..25b2b29f0 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -52,6 +52,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -64,7 +65,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -125,6 +127,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -186,6 +192,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -228,6 +235,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitSyn", ext)?; } @@ -248,6 +260,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } @@ -275,6 +288,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -287,7 +301,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -351,6 +366,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -415,6 +434,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -457,6 +477,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitAck", ext)?; } @@ -478,6 +503,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index 3f70d2ec8..5504e5d03 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -150,6 +150,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, } = x; // Header @@ -160,7 +161,9 @@ where if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST { header |= flag::S; } - let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (*ext_patch != ext::PatchType::NONE) as u8; if n_exts != 0 { header |= flag::Z; } @@ -201,6 +204,10 @@ where n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } + if *ext_patch != ext::PatchType::NONE { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -264,6 +271,7 @@ where // Extensions let mut ext_qos = None; let mut ext_shm = None; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -280,6 +288,11 @@ where ext_shm = Some(s); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Join", ext)?; } @@ -296,6 +309,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 3adae0fb7..973eac7e1 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -176,3 +176,43 @@ where Ok((ext.into(), more)) } } + +// Extensions: Patch +impl WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output { + let (x, more) = x; + let ext: ZExtZ64<{ ID }> = x.into(); + + self.write(&mut *writer, (&ext, more)) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let header: u8 = self.read(&mut *reader)?; + let codec = Zenoh080Header::new(header); + codec.read(reader) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?; + Ok((ext.into(), more)) + } +} diff --git a/commons/zenoh-protocol/src/transport/fragment.rs b/commons/zenoh-protocol/src/transport/fragment.rs index c82aefbd8..ab63393ab 100644 --- a/commons/zenoh-protocol/src/transport/fragment.rs +++ b/commons/zenoh-protocol/src/transport/fragment.rs @@ -75,14 +75,26 @@ pub struct Fragment { pub sn: TransportSn, pub payload: ZSlice, pub ext_qos: ext::QoSType, + pub ext_first: Option, + pub ext_drop: Option, } // Extensions pub mod ext { - use crate::{common::ZExtZ64, zextz64}; + use crate::{ + common::{ZExtUnit, ZExtZ64}, + zextunit, zextz64, + }; pub type QoS = zextz64!(0x1, true); pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>; + + /// # Start extension + /// Mark the first fragment of a fragmented message + pub type First = zextunit!(0x2, false); + /// # Stop extension + /// Indicate that the remaining fragments has been dropped + pub type Drop = zextunit!(0x3, false); } impl Fragment { @@ -97,6 +109,8 @@ impl Fragment { let sn: TransportSn = rng.gen(); let payload = ZSlice::rand(rng.gen_range(8..128)); let ext_qos = ext::QoSType::rand(); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); Fragment { reliability, @@ -104,6 +118,8 @@ impl Fragment { more, payload, ext_qos, + ext_first, + ext_drop, } } } @@ -115,6 +131,8 @@ pub struct FragmentHeader { pub more: bool, pub sn: TransportSn, pub ext_qos: ext::QoSType, + pub ext_first: Option, + pub ext_drop: Option, } impl FragmentHeader { @@ -128,12 +146,16 @@ impl FragmentHeader { let more = rng.gen_bool(0.5); let sn: TransportSn = rng.gen(); let ext_qos = ext::QoSType::rand(); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); FragmentHeader { reliability, more, sn, ext_qos, + ext_first, + ext_drop, } } } diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index b514e6b9b..f86b162bf 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -131,6 +131,7 @@ pub struct InitSyn { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } // Extensions @@ -165,6 +166,13 @@ pub mod ext { /// # Compression extension /// Used to negotiate the use of compression on the link pub type Compression = zextunit!(0x6, false); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation first/drop markers + pub type Patch = zextz64!(0x7, false); + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl InitSyn { @@ -189,6 +197,7 @@ impl InitSyn { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -204,6 +213,7 @@ impl InitSyn { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } @@ -234,6 +244,7 @@ pub struct InitAck { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } impl InitAck { @@ -263,6 +274,7 @@ impl InitAck { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -279,6 +291,7 @@ impl InitAck { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index 26ed290c0..afbe55f3c 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -102,6 +102,7 @@ pub struct Join { pub next_sn: PrioritySn, pub ext_qos: Option, pub ext_shm: Option, + pub ext_patch: ext::PatchType, } pub mod flag { @@ -115,7 +116,10 @@ pub mod ext { use alloc::boxed::Box; use super::{Priority, PrioritySn}; - use crate::{common::ZExtZBuf, zextzbuf}; + use crate::{ + common::{ZExtZ64, ZExtZBuf}, + zextz64, zextzbuf, + }; /// # QoS extension /// Used to announce next sn when QoS is enabled @@ -125,6 +129,13 @@ pub mod ext { /// # Shm extension /// Used to advertise shared memory capabilities pub type Shm = zextzbuf!(0x2, true); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation first/drop markers + pub type Patch = zextz64!(0x7, false); // use the same id as Init + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl Join { @@ -151,6 +162,7 @@ impl Join { .gen_bool(0.5) .then_some(Box::new([PrioritySn::rand(); Priority::NUM])); let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -162,6 +174,7 @@ impl Join { next_sn, ext_qos, ext_shm, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index ba2ac32c4..d534fef04 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -311,4 +311,42 @@ pub mod ext { ZExtZ64::new(ext.inner as u64) } } + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct PatchType(u8); + + impl PatchType { + pub const NONE: Self = Self(0); + pub const CURRENT: Self = Self(1); + + pub fn new(int: u8) -> Self { + Self(int) + } + + pub fn raw(self) -> u8 { + self.0 + } + + pub fn has_fragmentation_markers(&self) -> bool { + self.0 >= 1 + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + Self(rand::thread_rng().gen()) + } + } + + impl From> for PatchType { + fn from(ext: ZExtZ64) -> Self { + Self(ext.value as u8) + } + } + + impl From> for ZExtZ64 { + fn from(ext: PatchType) -> Self { + ZExtZ64::new(ext.0 as u64) + } + } } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 2c7316c7c..65150f728 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -201,6 +201,9 @@ pub struct WBatch { // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, + // an ephemeral batch will not be recycled in the pipeline + // it can be used to push a stop fragment when no batch are available + pub ephemeral: bool, } impl WBatch { @@ -209,6 +212,7 @@ impl WBatch { buffer: BBuf::with_capacity(config.mtu as usize), codec: Zenoh080Batch::new(), config, + ephemeral: false, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -219,6 +223,17 @@ impl WBatch { batch } + pub fn new_ephemeral(config: BatchConfig) -> Self { + Self { + ephemeral: true, + ..Self::new(config) + } + } + + pub fn is_ephemeral(&self) -> bool { + self.ephemeral + } + /// Verify that the [`WBatch`] has no serialized bytes. #[inline(always)] pub fn is_empty(&self) -> bool { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index c7d401b0d..256b4e760 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -34,6 +34,7 @@ use zenoh_protocol::{ core::Priority, network::NetworkMessage, transport::{ + fragment, fragment::FragmentHeader, frame::{self, FrameHeader}, AtomicBatchSize, BatchSize, TransportMessage, @@ -232,6 +233,8 @@ struct StageIn { mutex: StageInMutex, fragbuf: ZBuf, batching: bool, + // used for stop fragment + batch_config: BatchConfig, } impl StageIn { @@ -352,19 +355,32 @@ impl StageIn { more: true, sn, ext_qos: frame.ext_qos, + ext_first: Some(fragment::ext::First::new()), + ext_drop: None, }; let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch - // If deadline is reached, sequence number is incremented with `SeqNumGenerator::get` - // in order to break the fragment chain already sent. - batch = zgetbatch_rets!(let _ = tch.sn.get()); + batch = zgetbatch_rets!({ + // If no fragment has been sent, the sequence number is just reset + if fragment.ext_first.is_some() { + tch.sn.set(sn).unwrap() + // Otherwise, an ephemeral batch is created to send the stop fragment + } else { + let mut batch = WBatch::new_ephemeral(self.batch_config); + self.fragbuf.clear(); + fragment.ext_drop = Some(fragment::ext::Drop::new()); + let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); + self.s_out.move_batch(batch); + } + }); // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) { Ok(_) => { // Update the SN fragment.sn = tch.sn.get(); + fragment.ext_first = None; // Move the serialization batch into the OUT pipeline self.s_out.move_batch(batch); } @@ -675,6 +691,7 @@ impl TransmissionPipeline { }, fragbuf: ZBuf::empty(), batching: config.batching_enabled, + batch_config: config.batch, })); // The stage out for this priority @@ -863,8 +880,10 @@ impl TransmissionPipelineConsumer { } pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { - self.stage_out[priority as usize].refill(batch); - self.status.set_congested(priority, false); + if !batch.is_ephemeral() { + self.stage_out[priority as usize].refill(batch); + self.status.set_congested(priority, false); + } } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 61c6f36ec..a4badd2e4 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -24,7 +24,9 @@ use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn}, + transport::{ + join::ext::PatchType, BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn, + }, }; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -495,6 +497,7 @@ async fn tx_task( next_sn, ext_qos, ext_shm: None, + ext_patch: PatchType::CURRENT } .into(); diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 8562d5b3e..dc501742e 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,7 +166,7 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -183,6 +183,8 @@ impl TransportMulticastInner { more, sn, ext_qos, + ext_first, + ext_drop, payload, } = fragment; @@ -205,10 +207,25 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } + if peer.patch.has_fragmentation_markers() { + if ext_first.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); + return Ok(()); + } + if ext_drop.is_some() { + guard.defrag.clear(); + return Ok(()); + } + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } @@ -236,14 +253,16 @@ impl TransportMulticastInner { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.precedes(sn)?; if !precedes { tracing::debug!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.manager.config.zid, + message_type, sn, guard.sn.next() ); diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 3777978a3..b14bed85e 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + cmp::min, collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, @@ -25,7 +26,7 @@ use zenoh_core::{zcondfeat, zread, zwrite}; use zenoh_link::{Link, Locator}; use zenoh_protocol::{ core::{Bits, Field, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{batch_size, close, Close, Join, TransportMessage}, + transport::{batch_size, close, join::ext::PatchType, Close, Join, TransportMessage}, }; use zenoh_result::{bail, ZResult}; use zenoh_task::TaskController; @@ -61,6 +62,7 @@ pub(super) struct TransportMulticastPeer { token: CancellationToken, pub(super) priority_rx: Box<[TransportPriorityRx]>, pub(super) handler: Arc, + pub(super) patch: PatchType, } impl TransportMulticastPeer { @@ -415,6 +417,7 @@ impl TransportMulticastInner { token, priority_rx, handler, + patch: min(PatchType::CURRENT, join.ext_patch), }; zwrite!(self.peers).insert(locator.clone(), peer); diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index db2310c32..386b9f832 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -61,6 +61,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateAccept, ext_lowlatency: ext::lowlatency::StateAccept, + ext_patch: ext::patch::StateAccept, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -143,6 +144,7 @@ struct AcceptLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -268,6 +270,12 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_syn((&mut state.transport.ext_patch, init_syn.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitSynOut { other_zid: init_syn.zid, other_whatami: init_syn.whatami, @@ -330,7 +338,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Extension MultiLink + // Extension Compression let ext_compression = zcondfeat!( "transport_compression", self.ext_compression @@ -340,6 +348,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_ack(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Create the cookie let cookie_nonce: u64 = zasynclock!(self.prng).gen(); let cookie = Cookie { @@ -358,6 +373,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_lowlatency: state.transport.ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression: state.link.ext_compression, + ext_patch: state.transport.ext_patch, }; let mut encrypted = vec![]; @@ -391,6 +407,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -491,6 +508,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { #[cfg(feature = "shared-memory")] ext_shm: cookie.ext_shm, ext_lowlatency: cookie.ext_lowlatency, + ext_patch: cookie.ext_patch, }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -681,6 +699,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Init handshake @@ -719,6 +738,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::StateAccept::new( manager.config.unicast.is_lowlatency, ), + ext_patch: ext::patch::StateAccept::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -786,6 +806,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: osyn_out.other_auth_id, + patch: state.transport.ext_patch.get(), }; let a_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 4220f8e08..58f56fa92 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -44,6 +44,7 @@ pub(crate) struct Cookie { pub(crate) ext_lowlatency: ext::lowlatency::StateAccept, #[cfg(feature = "transport_compression")] pub(crate) ext_compression: ext::compression::StateAccept, + pub(crate) ext_patch: ext::patch::StateAccept, } impl WCodec<&Cookie, &mut W> for Zenoh080 @@ -70,6 +71,7 @@ where self.write(&mut *writer, &x.ext_lowlatency)?; #[cfg(feature = "transport_compression")] self.write(&mut *writer, &x.ext_compression)?; + self.write(&mut *writer, &x.ext_patch)?; Ok(()) } @@ -100,6 +102,7 @@ where let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "transport_compression")] let ext_compression: ext::compression::StateAccept = self.read(&mut *reader)?; + let ext_patch: ext::patch::StateAccept = self.read(&mut *reader)?; let cookie = Cookie { zid, @@ -117,6 +120,7 @@ where ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression, + ext_patch, }; Ok(cookie) @@ -188,6 +192,7 @@ impl Cookie { ext_lowlatency: ext::lowlatency::StateAccept::rand(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::StateAccept::rand(), + ext_patch: ext::patch::StateAccept::rand(), } } } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs index f4aafa832..269979b84 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod compression; pub(crate) mod lowlatency; #[cfg(feature = "transport_multilink")] pub(crate) mod multilink; +pub(crate) mod patch; pub(crate) mod qos; #[cfg(feature = "shared-memory")] pub(crate) mod shm; diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs new file mode 100644 index 000000000..16509df20 --- /dev/null +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -0,0 +1,203 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{cmp::min, marker::PhantomData}; + +use async_trait::async_trait; +use zenoh_buffers::{ + reader::{DidntRead, Reader}, + writer::{DidntWrite, Writer}, +}; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_protocol::transport::init::ext::PatchType; +use zenoh_result::{bail, Error as ZError}; + +use crate::unicast::establishment::{AcceptFsm, OpenFsm}; + +// Extension Fsm +pub(crate) struct PatchFsm<'a> { + _a: PhantomData<&'a ()>, +} + +impl PatchFsm<'_> { + pub(crate) const fn new() -> Self { + Self { _a: PhantomData } + } +} + +/*************************************/ +/* OPEN */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen { + patch: PatchType, +} + +impl StateOpen { + pub(crate) const fn new() -> Self { + Self { + patch: PatchType::NONE, + } + } + + pub(crate) const fn get(&self) -> PatchType { + self.patch + } +} + +#[async_trait] +impl<'a> OpenFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type SendInitSynIn = &'a StateOpen; + type SendInitSynOut = PatchType; + async fn send_init_syn( + self, + _state: Self::SendInitSynIn, + ) -> Result { + Ok(PatchType::CURRENT) + } + + type RecvInitAckIn = (&'a mut StateOpen, PatchType); + type RecvInitAckOut = (); + async fn recv_init_ack( + self, + input: Self::RecvInitAckIn, + ) -> Result { + let (state, other_ext) = input; + if other_ext > PatchType::CURRENT { + bail!( + "Acceptor patch should be lesser or equal to {current:?}, found {other:?}", + current = PatchType::CURRENT.raw(), + other = other_ext.raw(), + ); + } + state.patch = other_ext; + Ok(()) + } + + type SendOpenSynIn = &'a StateOpen; + type SendOpenSynOut = (); + async fn send_open_syn( + self, + _state: Self::SendOpenSynIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } + + type RecvOpenAckIn = (&'a mut StateOpen, ()); + type RecvOpenAckOut = (); + async fn recv_open_ack( + self, + _state: Self::RecvOpenAckIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } +} + +/*************************************/ +/* ACCEPT */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept { + patch: PatchType, +} + +impl StateAccept { + pub(crate) const fn new() -> Self { + Self { + patch: PatchType::NONE, + } + } + + pub(crate) const fn get(&self) -> PatchType { + self.patch + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + Self { + patch: PatchType::rand(), + } + } +} + +// Codec +impl WCodec<&StateAccept, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { + let raw = x.patch.raw(); + self.write(&mut *writer, raw)?; + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let raw: u8 = self.read(&mut *reader)?; + let patch = PatchType::new(raw); + Ok(StateAccept { patch }) + } +} + +#[async_trait] +impl<'a> AcceptFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type RecvInitSynIn = (&'a mut StateAccept, PatchType); + type RecvInitSynOut = (); + async fn recv_init_syn( + self, + input: Self::RecvInitSynIn, + ) -> Result { + let (state, other_ext) = input; + state.patch = other_ext; + Ok(()) + } + + type SendInitAckIn = &'a StateAccept; + type SendInitAckOut = PatchType; + async fn send_init_ack( + self, + state: Self::SendInitAckIn, + ) -> Result { + Ok(min(PatchType::CURRENT, state.patch)) + } + + type RecvOpenSynIn = (&'a mut StateAccept, ()); + type RecvOpenSynOut = (); + async fn recv_open_syn( + self, + _state: Self::RecvOpenSynIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } + + type SendOpenAckIn = &'a StateAccept; + type SendOpenAckOut = (); + async fn send_open_ack( + self, + _state: Self::SendOpenAckIn, + ) -> Result { + unimplemented!("There is no patch extension in OPEN") + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index f3ce60b35..a4283e364 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -58,6 +58,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen, ext_lowlatency: ext::lowlatency::StateOpen, + ext_patch: ext::patch::StateOpen, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -124,6 +125,7 @@ struct OpenLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -192,6 +194,13 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_syn(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let msg: TransportMessage = InitSyn { version: input.mine_version, whatami: input.mine_whatami, @@ -206,6 +215,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -347,6 +357,12 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_ack((&mut state.transport.ext_patch, init_ack.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitAckOut { other_zid: init_ack.zid, other_whatami: init_ack.whatami, @@ -575,6 +591,7 @@ pub(crate) async fn open_link( ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Clippy raises a warning because `batch_size::UNICAST` is currently equal to `BatchSize::MAX`. @@ -599,8 +616,8 @@ pub(crate) async fn open_link( .open(manager.config.unicast.max_links > 1), #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen::new(), - ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), + ext_patch: ext::patch::StateOpen::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -669,6 +686,7 @@ pub(crate) async fn open_link( is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: UsrPwdId(None), + patch: state.transport.ext_patch.get(), }; let o_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 4539135fe..ba0fd06d1 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -34,7 +34,7 @@ use zenoh_link::Link; use zenoh_protocol::{ core::{Bits, WhatAmI, ZenohIdProto}, network::NetworkMessage, - transport::{close, TransportSn}, + transport::{close, init::ext::PatchType, TransportSn}, }; use zenoh_result::{zerror, ZResult}; @@ -63,6 +63,7 @@ pub(crate) struct TransportConfigUnicast { pub(crate) is_lowlatency: bool, #[cfg(feature = "auth_usrpwd")] pub(crate) auth_id: UsrPwdId, + pub(crate) patch: PatchType, } /// [`TransportUnicast`] is the transport handler returned diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index e69a30587..3ac8a92ed 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,7 +97,7 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -123,6 +123,8 @@ impl TransportUnicastUniversal { more, sn, ext_qos: qos, + ext_first, + ext_drop, payload, } = fragment; @@ -143,10 +145,25 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } + if self.config.patch.has_fragmentation_markers() { + if ext_first.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); + return Ok(()); + } + if ext_drop.is_some() { + guard.defrag.clear(); + return Ok(()); + } + } if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } @@ -178,14 +195,16 @@ impl TransportUnicastUniversal { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.roll(sn)?; if !precedes { tracing::trace!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.config.zid, + message_type, sn, guard.sn.next() ); diff --git a/io/zenoh-transport/tests/unicast_fragmentation.rs b/io/zenoh-transport/tests/unicast_fragmentation.rs new file mode 100644 index 000000000..f45e65702 --- /dev/null +++ b/io/zenoh-transport/tests/unicast_fragmentation.rs @@ -0,0 +1,349 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + any::Any, + convert::TryFrom, + fmt::Write as _, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use lazy_static::lazy_static; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohIdProto}, + network::{ + push::ext::{NodeIdType, QoSType}, + NetworkMessage, Push, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const SLEEP_SEND: Duration = Duration::from_millis(1); + +const MSG_COUNT: usize = 100; +lazy_static! { + #[derive(Debug)] + static ref MSG: NetworkMessage = Push { + wire_expr: "test".into(), + // Set CongestionControl::Drop to test + ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Drop, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::DEFAULT, + payload: Put { + // 10 MB payload to stress fragmentation + payload: (0..10_000_000).map(|b| b as u8).collect::>().into(), + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), + } + .into(); +} + +// Transport Handler for the router +struct SHRouter { + count: Arc, +} + +impl Default for SHRouter { + fn default() -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl SHRouter { + fn get_count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} + +impl TransportEventHandler for SHRouter { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + let arc = Arc::new(SCRouter::new(self.count.clone())); + Ok(arc) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the router +pub struct SCRouter { + count: Arc, +} + +impl SCRouter { + pub fn new(count: Arc) -> Self { + Self { count } + } +} + +impl TransportPeerEventHandler for SCRouter { + fn handle_message(&self, message: NetworkMessage) -> ZResult<()> { + assert_eq!(message, *MSG); + self.count.fetch_add(1, Ordering::SeqCst); + std::thread::sleep(2 * SLEEP_SEND); + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +// Transport Handler for the client +#[derive(Default)] +struct SHClient; + +impl TransportEventHandler for SHClient { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + Ok(Arc::new(SCClient)) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the client +#[derive(Default)] +pub struct SCClient; + +impl TransportPeerEventHandler for SCClient { + fn handle_message(&self, _message: NetworkMessage) -> ZResult<()> { + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +async fn open_transport_unicast( + client_endpoints: &[EndPoint], + server_endpoints: &[EndPoint], +) -> ( + TransportManager, + Arc, + TransportManager, + TransportUnicast, +) { + // Define client and router IDs + let client_id = ZenohIdProto::try_from([1]).unwrap(); + let router_id = ZenohIdProto::try_from([2]).unwrap(); + + // Create the router transport manager + let router_handler = Arc::new(SHRouter::default()); + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + server_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let router_manager = TransportManager::builder() + .zid(router_id) + .whatami(WhatAmI::Router) + .unicast(unicast) + .build(router_handler.clone()) + .unwrap(); + + // Create the listener on the router + for e in server_endpoints.iter() { + println!("Add endpoint: {}", e); + let _ = ztimeout!(router_manager.add_listener(e.clone())).unwrap(); + } + + // Create the client transport manager + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + client_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let client_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client_id) + .unicast(unicast) + .build(Arc::new(SHClient)) + .unwrap(); + + // Create an empty transport with the client + // Open transport -> This should be accepted + for e in client_endpoints.iter() { + println!("Opening transport with {}", e); + let _ = ztimeout!(client_manager.open_transport_unicast(e.clone())).unwrap(); + } + + let client_transport = ztimeout!(client_manager.get_transport_unicast(&router_id)).unwrap(); + + // Return the handlers + ( + router_manager, + router_handler, + client_manager, + client_transport, + ) +} + +async fn close_transport( + router_manager: TransportManager, + client_manager: TransportManager, + client_transport: TransportUnicast, + endpoints: &[EndPoint], +) { + // Close the client transport + let mut ee = String::new(); + for e in endpoints.iter() { + let _ = write!(ee, "{e} "); + } + println!("Closing transport with {}", ee); + ztimeout!(client_transport.close()).unwrap(); + + ztimeout!(async { + while !router_manager.get_transports_unicast().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Stop the locators on the manager + for e in endpoints.iter() { + println!("Del locator: {}", e); + ztimeout!(router_manager.del_listener(e)).unwrap(); + } + + ztimeout!(async { + while !router_manager.get_listeners().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; + + ztimeout!(router_manager.close()); + ztimeout!(client_manager.close()); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn test_transport(router_handler: Arc, client_transport: TransportUnicast) { + println!("Sending {} messages...", MSG_COUNT); + + ztimeout!(async { + let mut sent = 0; + while router_handler.get_count() < MSG_COUNT { + if client_transport.schedule(MSG.clone()).is_ok() { + sent += 1; + println!( + "Sent: {sent}. Received: {}/{MSG_COUNT}", + router_handler.get_count() + ); + } + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn run_single(client_endpoints: &[EndPoint], server_endpoints: &[EndPoint]) { + println!( + "\n>>> Running test for: {:?}, {:?}", + client_endpoints, server_endpoints, + ); + + #[allow(unused_variables)] // Used when stats feature is enabled + let (router_manager, router_handler, client_manager, client_transport) = + open_transport_unicast(client_endpoints, server_endpoints).await; + + test_transport(router_handler.clone(), client_transport.clone()).await; + + #[cfg(feature = "stats")] + { + let c_stats = client_transport.get_stats().unwrap().report(); + println!("\tClient: {:?}", c_stats); + let r_stats = ztimeout!(router_manager.get_transport_unicast(&client_manager.config.zid)) + .unwrap() + .get_stats() + .map(|s| s.report()) + .unwrap(); + println!("\tRouter: {:?}", r_stats); + } + + close_transport( + router_manager, + client_manager, + client_transport, + client_endpoints, + ) + .await; +} + +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn fragmentation_unicast_tcp_only() { + zenoh_util::init_log_from_env_or("error"); + + // Define the locators + let endpoints: Vec = vec![format!("tcp/127.0.0.1:{}", 16800).parse().unwrap()]; + // Run + run_single(&endpoints, &endpoints).await; +}