From 3af20424427b0c98bd769ec054e8d68f793b5c40 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Fri, 15 Nov 2024 17:13:49 +0100 Subject: [PATCH 01/10] feat: add start/stop extension markers for fragment chain A fragment chain must begin with a start marker, otherwise the whole chain will be dropped. When a fragmented message is dropped by congestion control, a final fragment with stop marker must be emitted. Because no batch is available, an ephemeral batch, i.e. not recycled in the pipeline after, is created. --- commons/zenoh-codec/src/transport/fragment.rs | 38 +- .../zenoh-protocol/src/transport/fragment.rs | 24 +- io/zenoh-transport/src/common/batch.rs | 6 + io/zenoh-transport/src/common/pipeline.rs | 28 +- io/zenoh-transport/src/multicast/rx.rs | 10 + .../src/unicast/establishment/accept.rs | 2 + .../src/unicast/establishment/open.rs | 2 + .../src/unicast/universal/link.rs | 7 +- .../src/unicast/universal/rx.rs | 10 + .../tests/unicast_fragmentation.rs | 349 ++++++++++++++++++ 10 files changed, 468 insertions(+), 8 deletions(-) create mode 100644 io/zenoh-transport/tests/unicast_fragmentation.rs diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index fc30abce9d..c58709931d 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -39,6 +39,8 @@ where more, sn, ext_qos, + ext_start, + ext_stop, } = x; // Header @@ -49,7 +51,10 @@ where if *more { header |= flag::M; } - if ext_qos != &ext::QoSType::DEFAULT { + let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8 + + ext_start.is_some() as u8 + + ext_stop.is_some() as u8; + if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; @@ -59,7 +64,16 @@ where // Extensions if ext_qos != &ext::QoSType::DEFAULT { - self.write(&mut *writer, (*ext_qos, false))?; + n_exts -= 1; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; + } + if let Some(start) = ext_start { + n_exts -= 1; + self.write(&mut *writer, (start, n_exts != 0))? + } + if let Some(stop) = ext_stop { + n_exts -= 1; + self.write(&mut *writer, (stop, n_exts != 0))? } Ok(()) @@ -99,6 +113,8 @@ where // Extensions let mut ext_qos = ext::QoSType::DEFAULT; + let mut ext_start = None; + let mut ext_stop = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -110,6 +126,16 @@ where ext_qos = q; has_ext = ext; } + ext::Start::ID => { + let (start, ext): (ext::Start, bool) = eodec.read(&mut *reader)?; + ext_start = Some(start); + has_ext = ext; + } + ext::Stop::ID => { + let (stop, ext): (ext::Stop, bool) = eodec.read(&mut *reader)?; + ext_stop = Some(stop); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Fragment", ext)?; } @@ -121,6 +147,8 @@ where more, sn, ext_qos, + ext_start, + ext_stop, }) } } @@ -139,6 +167,8 @@ where sn, payload, ext_qos, + ext_start, + ext_stop, } = x; // Header @@ -147,6 +177,8 @@ where more: *more, sn: *sn, ext_qos: *ext_qos, + ext_start: *ext_start, + ext_stop: *ext_stop, }; self.write(&mut *writer, &header)?; @@ -185,6 +217,8 @@ where more: header.more, sn: header.sn, ext_qos: header.ext_qos, + ext_start: header.ext_start, + ext_stop: header.ext_stop, payload, }) } diff --git a/commons/zenoh-protocol/src/transport/fragment.rs b/commons/zenoh-protocol/src/transport/fragment.rs index eccc7b80c0..e34c527302 100644 --- a/commons/zenoh-protocol/src/transport/fragment.rs +++ b/commons/zenoh-protocol/src/transport/fragment.rs @@ -75,14 +75,26 @@ pub struct Fragment { pub sn: TransportSn, pub payload: ZSlice, pub ext_qos: ext::QoSType, + pub ext_start: Option, + pub ext_stop: Option, } // Extensions pub mod ext { - use crate::{common::ZExtZ64, zextz64}; + use crate::{ + common::{ZExtUnit, ZExtZ64}, + zextunit, zextz64, + }; pub type QoS = zextz64!(0x1, true); pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>; + + /// # Start extension + /// Mark the first fragment of a fragmented message + pub type Start = zextunit!(0x2, false); + /// # Stop extension + /// Indicate that the remaining fragments has been dropped + pub type Stop = zextunit!(0x3, false); } impl Fragment { @@ -97,6 +109,8 @@ impl Fragment { let sn: TransportSn = rng.gen(); let payload = ZSlice::rand(rng.gen_range(8..128)); let ext_qos = ext::QoSType::rand(); + let ext_start = rng.gen_bool(0.5).then(ext::Start::rand); + let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand); Fragment { reliability, @@ -104,6 +118,8 @@ impl Fragment { more, payload, ext_qos, + ext_start, + ext_stop, } } } @@ -115,6 +131,8 @@ pub struct FragmentHeader { pub more: bool, pub sn: TransportSn, pub ext_qos: ext::QoSType, + pub ext_start: Option, + pub ext_stop: Option, } impl FragmentHeader { @@ -128,12 +146,16 @@ impl FragmentHeader { let more = rng.gen_bool(0.5); let sn: TransportSn = rng.gen(); let ext_qos = ext::QoSType::rand(); + let ext_start = rng.gen_bool(0.5).then(ext::Start::rand); + let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand); FragmentHeader { reliability, more, sn, ext_qos, + ext_start, + ext_stop, } } } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 2c7316c7cb..9366de1a50 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -84,6 +84,9 @@ pub struct BatchConfig { pub is_streamed: bool, #[cfg(feature = "transport_compression")] pub is_compression: bool, + // an ephemeral batch will not be recycled in the pipeline + // it can be used to push a stop fragment when no batch are available + pub ephemeral: bool, } impl Default for BatchConfig { @@ -93,6 +96,7 @@ impl Default for BatchConfig { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, + ephemeral: false, } } } @@ -525,6 +529,7 @@ mod tests { is_streamed: rng.gen_bool(0.5), #[cfg(feature = "transport_compression")] is_compression: rng.gen_bool(0.5), + ephemeral: false, }; let mut wbatch = WBatch::new(config); wbatch.encode(&msg_in).unwrap(); @@ -566,6 +571,7 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, + ephemeral: false, }; let mut batch = WBatch::new(config); diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 054a5e0a16..dda9d1304d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -34,6 +34,7 @@ use zenoh_protocol::{ core::Priority, network::NetworkMessage, transport::{ + fragment, fragment::FragmentHeader, frame::{self, FrameHeader}, AtomicBatchSize, BatchSize, TransportMessage, @@ -202,6 +203,8 @@ struct StageIn { mutex: StageInMutex, fragbuf: ZBuf, batching: bool, + // used for stop fragment + ephermeral_batch_config: BatchConfig, } impl StageIn { @@ -318,19 +321,32 @@ impl StageIn { more: true, sn, ext_qos: frame.ext_qos, + ext_start: Some(fragment::ext::Start::new()), + ext_stop: None, }; let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch - // If deadline is reached, sequence number is incremented with `SeqNumGenerator::get` - // in order to break the fragment chain already sent. - batch = zgetbatch_rets!(let _ = tch.sn.get()); + batch = zgetbatch_rets!({ + // If no fragment has been sent, the sequence number is just reset + if fragment.ext_start.is_some() { + tch.sn.set(sn).unwrap() + // Otherwise, an ephemeral batch is created to send the stop fragment + } else { + let mut batch = WBatch::new(self.ephermeral_batch_config); + self.fragbuf.clear(); + fragment.ext_stop = Some(fragment::ext::Stop::new()); + let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); + self.s_out.move_batch(batch); + } + }); // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) { Ok(_) => { // Update the SN fragment.sn = tch.sn.get(); + fragment.ext_start = None; // Move the serialization batch into the OUT pipeline self.s_out.move_batch(batch); } @@ -641,6 +657,10 @@ impl TransmissionPipeline { }, fragbuf: ZBuf::empty(), batching: config.batching_enabled, + ephermeral_batch_config: BatchConfig { + ephemeral: true, + ..config.batch + }, })); // The stage out for this priority @@ -849,6 +869,7 @@ mod tests { is_streamed: true, #[cfg(feature = "transport_compression")] is_compression: true, + ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, @@ -863,6 +884,7 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, + ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 8562d5b3eb..5cdb8cc868 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -183,6 +183,8 @@ impl TransportMulticastInner { more, sn, ext_qos, + ext_start, + ext_stop, payload, } = fragment; @@ -209,7 +211,15 @@ impl TransportMulticastInner { // Drop invalid message and continue return Ok(()); } + if ext_stop.is_some() { + return Ok(()); + } if guard.defrag.is_empty() { + if ext_start.is_none() { + // TODO better message + tracing::warn!("a fragment chain was received without the start marker"); + return Ok(()); + } let _ = guard.defrag.sync(sn); } if let Err(e) = guard.defrag.push(sn, payload) { diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index db2310c32f..a9ff60dc00 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -657,6 +657,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, + ephemeral: false, }, priorities: None, reliability: None, @@ -795,6 +796,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), + ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index f3ce60b354..d219ec63b0 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -554,6 +554,7 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression + ephemeral: false, }, priorities: None, reliability: None, @@ -678,6 +679,7 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), + ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 18011249be..45637f6e6b 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -60,6 +60,7 @@ impl TransportLinkUnicastUniversal { is_streamed: link.link.is_streamed(), #[cfg(feature = "transport_compression")] is_compression: link.config.batch.is_compression, + ephemeral: false, }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, @@ -191,8 +192,10 @@ async fn tx_task( stats.inc_tx_bytes(batch.len() as usize); } - // Reinsert the batch into the queue - pipeline.refill(batch, priority); + if !batch.config.ephemeral { + // Reinsert the batch into the queue + pipeline.refill(batch, priority); + } }, Ok(None) => { // The queue has been disabled: break the tx loop, drain the queue, and exit diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index e69a305876..e1b140eb1d 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -123,6 +123,8 @@ impl TransportUnicastUniversal { more, sn, ext_qos: qos, + ext_start, + ext_stop, payload, } = fragment; @@ -147,7 +149,15 @@ impl TransportUnicastUniversal { // Drop invalid message and continue return Ok(()); } + if ext_stop.is_some() { + return Ok(()); + } if guard.defrag.is_empty() { + if ext_start.is_none() { + // TODO better message + tracing::warn!("a fragment chain was received without the start marker"); + return Ok(()); + } let _ = guard.defrag.sync(sn); } if let Err(e) = guard.defrag.push(sn, payload) { diff --git a/io/zenoh-transport/tests/unicast_fragmentation.rs b/io/zenoh-transport/tests/unicast_fragmentation.rs new file mode 100644 index 0000000000..f45e65702a --- /dev/null +++ b/io/zenoh-transport/tests/unicast_fragmentation.rs @@ -0,0 +1,349 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + any::Any, + convert::TryFrom, + fmt::Write as _, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use lazy_static::lazy_static; +use zenoh_core::ztimeout; +use zenoh_link::Link; +use zenoh_protocol::{ + core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohIdProto}, + network::{ + push::ext::{NodeIdType, QoSType}, + NetworkMessage, Push, + }, + zenoh::Put, +}; +use zenoh_result::ZResult; +use zenoh_transport::{ + multicast::TransportMulticast, + unicast::{test_helpers::make_transport_manager_builder, TransportUnicast}, + TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, + TransportPeerEventHandler, +}; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); +const SLEEP_SEND: Duration = Duration::from_millis(1); + +const MSG_COUNT: usize = 100; +lazy_static! { + #[derive(Debug)] + static ref MSG: NetworkMessage = Push { + wire_expr: "test".into(), + // Set CongestionControl::Drop to test + ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Drop, false), + ext_tstamp: None, + ext_nodeid: NodeIdType::DEFAULT, + payload: Put { + // 10 MB payload to stress fragmentation + payload: (0..10_000_000).map(|b| b as u8).collect::>().into(), + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: None, + ext_unknown: vec![], + } + .into(), + } + .into(); +} + +// Transport Handler for the router +struct SHRouter { + count: Arc, +} + +impl Default for SHRouter { + fn default() -> Self { + Self { + count: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl SHRouter { + fn get_count(&self) -> usize { + self.count.load(Ordering::SeqCst) + } +} + +impl TransportEventHandler for SHRouter { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + let arc = Arc::new(SCRouter::new(self.count.clone())); + Ok(arc) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the router +pub struct SCRouter { + count: Arc, +} + +impl SCRouter { + pub fn new(count: Arc) -> Self { + Self { count } + } +} + +impl TransportPeerEventHandler for SCRouter { + fn handle_message(&self, message: NetworkMessage) -> ZResult<()> { + assert_eq!(message, *MSG); + self.count.fetch_add(1, Ordering::SeqCst); + std::thread::sleep(2 * SLEEP_SEND); + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +// Transport Handler for the client +#[derive(Default)] +struct SHClient; + +impl TransportEventHandler for SHClient { + fn new_unicast( + &self, + _peer: TransportPeer, + _transport: TransportUnicast, + ) -> ZResult> { + Ok(Arc::new(SCClient)) + } + + fn new_multicast( + &self, + _transport: TransportMulticast, + ) -> ZResult> { + panic!(); + } +} + +// Transport Callback for the client +#[derive(Default)] +pub struct SCClient; + +impl TransportPeerEventHandler for SCClient { + fn handle_message(&self, _message: NetworkMessage) -> ZResult<()> { + Ok(()) + } + + fn new_link(&self, _link: Link) {} + fn del_link(&self, _link: Link) {} + fn closed(&self) {} + + fn as_any(&self) -> &dyn Any { + self + } +} + +async fn open_transport_unicast( + client_endpoints: &[EndPoint], + server_endpoints: &[EndPoint], +) -> ( + TransportManager, + Arc, + TransportManager, + TransportUnicast, +) { + // Define client and router IDs + let client_id = ZenohIdProto::try_from([1]).unwrap(); + let router_id = ZenohIdProto::try_from([2]).unwrap(); + + // Create the router transport manager + let router_handler = Arc::new(SHRouter::default()); + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + server_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let router_manager = TransportManager::builder() + .zid(router_id) + .whatami(WhatAmI::Router) + .unicast(unicast) + .build(router_handler.clone()) + .unwrap(); + + // Create the listener on the router + for e in server_endpoints.iter() { + println!("Add endpoint: {}", e); + let _ = ztimeout!(router_manager.add_listener(e.clone())).unwrap(); + } + + // Create the client transport manager + let unicast = make_transport_manager_builder( + #[cfg(feature = "transport_multilink")] + client_endpoints.len(), + #[cfg(feature = "shared-memory")] + false, + false, + ); + let client_manager = TransportManager::builder() + .whatami(WhatAmI::Client) + .zid(client_id) + .unicast(unicast) + .build(Arc::new(SHClient)) + .unwrap(); + + // Create an empty transport with the client + // Open transport -> This should be accepted + for e in client_endpoints.iter() { + println!("Opening transport with {}", e); + let _ = ztimeout!(client_manager.open_transport_unicast(e.clone())).unwrap(); + } + + let client_transport = ztimeout!(client_manager.get_transport_unicast(&router_id)).unwrap(); + + // Return the handlers + ( + router_manager, + router_handler, + client_manager, + client_transport, + ) +} + +async fn close_transport( + router_manager: TransportManager, + client_manager: TransportManager, + client_transport: TransportUnicast, + endpoints: &[EndPoint], +) { + // Close the client transport + let mut ee = String::new(); + for e in endpoints.iter() { + let _ = write!(ee, "{e} "); + } + println!("Closing transport with {}", ee); + ztimeout!(client_transport.close()).unwrap(); + + ztimeout!(async { + while !router_manager.get_transports_unicast().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Stop the locators on the manager + for e in endpoints.iter() { + println!("Del locator: {}", e); + ztimeout!(router_manager.del_listener(e)).unwrap(); + } + + ztimeout!(async { + while !router_manager.get_listeners().await.is_empty() { + tokio::time::sleep(SLEEP).await; + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; + + ztimeout!(router_manager.close()); + ztimeout!(client_manager.close()); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn test_transport(router_handler: Arc, client_transport: TransportUnicast) { + println!("Sending {} messages...", MSG_COUNT); + + ztimeout!(async { + let mut sent = 0; + while router_handler.get_count() < MSG_COUNT { + if client_transport.schedule(MSG.clone()).is_ok() { + sent += 1; + println!( + "Sent: {sent}. Received: {}/{MSG_COUNT}", + router_handler.get_count() + ); + } + } + }); + + // Wait a little bit + tokio::time::sleep(SLEEP).await; +} + +async fn run_single(client_endpoints: &[EndPoint], server_endpoints: &[EndPoint]) { + println!( + "\n>>> Running test for: {:?}, {:?}", + client_endpoints, server_endpoints, + ); + + #[allow(unused_variables)] // Used when stats feature is enabled + let (router_manager, router_handler, client_manager, client_transport) = + open_transport_unicast(client_endpoints, server_endpoints).await; + + test_transport(router_handler.clone(), client_transport.clone()).await; + + #[cfg(feature = "stats")] + { + let c_stats = client_transport.get_stats().unwrap().report(); + println!("\tClient: {:?}", c_stats); + let r_stats = ztimeout!(router_manager.get_transport_unicast(&client_manager.config.zid)) + .unwrap() + .get_stats() + .map(|s| s.report()) + .unwrap(); + println!("\tRouter: {:?}", r_stats); + } + + close_transport( + router_manager, + client_manager, + client_transport, + client_endpoints, + ) + .await; +} + +#[cfg(feature = "transport_tcp")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn fragmentation_unicast_tcp_only() { + zenoh_util::init_log_from_env_or("error"); + + // Define the locators + let endpoints: Vec = vec![format!("tcp/127.0.0.1:{}", 16800).parse().unwrap()]; + // Run + run_single(&endpoints, &endpoints).await; +} From ad8d0a4fb6898919822b2020939c94245e088aff Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Fri, 15 Nov 2024 17:22:16 +0100 Subject: [PATCH 02/10] refactor: move ephemeral flag out of batch config I've checked, it doesn't change the size of `WBatch` --- io/zenoh-transport/src/common/batch.rs | 21 +++++++++++++------ io/zenoh-transport/src/common/pipeline.rs | 11 +++------- .../src/unicast/establishment/accept.rs | 2 -- .../src/unicast/establishment/open.rs | 2 -- .../src/unicast/universal/link.rs | 3 +-- 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 9366de1a50..65150f728a 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -84,9 +84,6 @@ pub struct BatchConfig { pub is_streamed: bool, #[cfg(feature = "transport_compression")] pub is_compression: bool, - // an ephemeral batch will not be recycled in the pipeline - // it can be used to push a stop fragment when no batch are available - pub ephemeral: bool, } impl Default for BatchConfig { @@ -96,7 +93,6 @@ impl Default for BatchConfig { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, } } } @@ -205,6 +201,9 @@ pub struct WBatch { // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, + // an ephemeral batch will not be recycled in the pipeline + // it can be used to push a stop fragment when no batch are available + pub ephemeral: bool, } impl WBatch { @@ -213,6 +212,7 @@ impl WBatch { buffer: BBuf::with_capacity(config.mtu as usize), codec: Zenoh080Batch::new(), config, + ephemeral: false, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -223,6 +223,17 @@ impl WBatch { batch } + pub fn new_ephemeral(config: BatchConfig) -> Self { + Self { + ephemeral: true, + ..Self::new(config) + } + } + + pub fn is_ephemeral(&self) -> bool { + self.ephemeral + } + /// Verify that the [`WBatch`] has no serialized bytes. #[inline(always)] pub fn is_empty(&self) -> bool { @@ -529,7 +540,6 @@ mod tests { is_streamed: rng.gen_bool(0.5), #[cfg(feature = "transport_compression")] is_compression: rng.gen_bool(0.5), - ephemeral: false, }; let mut wbatch = WBatch::new(config); wbatch.encode(&msg_in).unwrap(); @@ -571,7 +581,6 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }; let mut batch = WBatch::new(config); diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index dda9d1304d..3c00ee29db 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -204,7 +204,7 @@ struct StageIn { fragbuf: ZBuf, batching: bool, // used for stop fragment - ephermeral_batch_config: BatchConfig, + batch_config: BatchConfig, } impl StageIn { @@ -333,7 +333,7 @@ impl StageIn { tch.sn.set(sn).unwrap() // Otherwise, an ephemeral batch is created to send the stop fragment } else { - let mut batch = WBatch::new(self.ephermeral_batch_config); + let mut batch = WBatch::new_ephemeral(self.batch_config); self.fragbuf.clear(); fragment.ext_stop = Some(fragment::ext::Stop::new()); let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); @@ -657,10 +657,7 @@ impl TransmissionPipeline { }, fragbuf: ZBuf::empty(), batching: config.batching_enabled, - ephermeral_batch_config: BatchConfig { - ephemeral: true, - ..config.batch - }, + batch_config: config.batch, })); // The stage out for this priority @@ -869,7 +866,6 @@ mod tests { is_streamed: true, #[cfg(feature = "transport_compression")] is_compression: true, - ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, @@ -884,7 +880,6 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index a9ff60dc00..db2310c32f 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -657,7 +657,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }, priorities: None, reliability: None, @@ -796,7 +795,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), - ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index d219ec63b0..f3ce60b354 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -554,7 +554,6 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression - ephemeral: false, }, priorities: None, reliability: None, @@ -679,7 +678,6 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), - ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 45637f6e6b..8da4d11cd3 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -60,7 +60,6 @@ impl TransportLinkUnicastUniversal { is_streamed: link.link.is_streamed(), #[cfg(feature = "transport_compression")] is_compression: link.config.batch.is_compression, - ephemeral: false, }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, @@ -192,7 +191,7 @@ async fn tx_task( stats.inc_tx_bytes(batch.len() as usize); } - if !batch.config.ephemeral { + if !batch.is_ephemeral() { // Reinsert the batch into the queue pipeline.refill(batch, priority); } From 9c78c57075084e43b245fa7d74af3b7fc836e1db Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 21 Nov 2024 17:25:24 +0100 Subject: [PATCH 03/10] feat: add a Patch extension to connection protocol --- commons/zenoh-codec/src/transport/init.rs | 30 ++- commons/zenoh-codec/src/transport/join.rs | 16 +- commons/zenoh-codec/src/transport/mod.rs | 40 ++++ commons/zenoh-codec/src/transport/open.rs | 30 ++- commons/zenoh-protocol/src/transport/init.rs | 13 ++ commons/zenoh-protocol/src/transport/join.rs | 15 +- commons/zenoh-protocol/src/transport/mod.rs | 38 ++++ commons/zenoh-protocol/src/transport/open.rs | 21 +- io/zenoh-transport/src/multicast/link.rs | 5 +- io/zenoh-transport/src/multicast/rx.rs | 4 +- io/zenoh-transport/src/multicast/transport.rs | 4 +- .../src/unicast/establishment/accept.rs | 38 +++- .../src/unicast/establishment/cookie.rs | 5 + .../src/unicast/establishment/ext/mod.rs | 1 + .../src/unicast/establishment/ext/patch.rs | 196 ++++++++++++++++++ .../src/unicast/establishment/open.rs | 22 +- io/zenoh-transport/src/unicast/mod.rs | 3 +- .../src/unicast/universal/rx.rs | 4 +- 18 files changed, 464 insertions(+), 21 deletions(-) create mode 100644 io/zenoh-transport/src/unicast/establishment/ext/patch.rs diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index dd47dc2c27..198c8ad28c 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -52,6 +52,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -64,7 +65,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::DEFAULT) as u8; #[cfg(feature = "shared-memory")] { @@ -125,6 +127,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -186,6 +192,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -228,6 +235,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitSyn", ext)?; } @@ -248,6 +260,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } @@ -275,6 +288,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -287,7 +301,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::DEFAULT) as u8; #[cfg(feature = "shared-memory")] { @@ -351,6 +366,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -415,6 +434,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -457,6 +477,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitAck", ext)?; } @@ -478,6 +503,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index 3f70d2ec8b..e1d8752134 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -150,6 +150,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, } = x; // Header @@ -160,7 +161,9 @@ where if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST { header |= flag::S; } - let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + + (ext_shm.is_some() as u8) + + (*ext_patch != ext::PatchType::DEFAULT) as u8; if n_exts != 0 { header |= flag::Z; } @@ -201,6 +204,10 @@ where n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } + if *ext_patch != ext::PatchType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -264,6 +271,7 @@ where // Extensions let mut ext_qos = None; let mut ext_shm = None; + let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -280,6 +288,11 @@ where ext_shm = Some(s); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "Join", ext)?; } @@ -296,6 +309,7 @@ where next_sn, ext_qos, ext_shm, + ext_patch, }) } } diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 3adae0fb72..973eac7e1a 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -176,3 +176,43 @@ where Ok((ext.into(), more)) } } + +// Extensions: Patch +impl WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output { + let (x, more) = x; + let ext: ZExtZ64<{ ID }> = x.into(); + + self.write(&mut *writer, (&ext, more)) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let header: u8 = self.read(&mut *reader)?; + let codec = Zenoh080Header::new(header); + codec.read(reader) + } +} + +impl RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> { + let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?; + Ok((ext.into(), more)) + } +} diff --git a/commons/zenoh-codec/src/transport/open.rs b/commons/zenoh-codec/src/transport/open.rs index 712fe5ca95..3285da0a3b 100644 --- a/commons/zenoh-codec/src/transport/open.rs +++ b/commons/zenoh-codec/src/transport/open.rs @@ -48,6 +48,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -59,7 +60,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::DEFAULT) as u8; #[cfg(feature = "shared-memory")] { @@ -106,6 +108,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -153,6 +159,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -190,6 +197,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenSyn", ext)?; } @@ -207,6 +219,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } @@ -229,6 +242,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } = x; // Header @@ -242,7 +256,8 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8); + + (ext_compression.is_some() as u8) + + (*ext_patch != ext::PatchType::DEFAULT) as u8; #[cfg(feature = "shared-memory")] { @@ -288,6 +303,10 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } + if *ext_patch != ext::PatchType::DEFAULT { + n_exts -= 1; + self.write(&mut *writer, (*ext_patch, n_exts != 0))?; + } Ok(()) } @@ -334,6 +353,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; + let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -371,6 +391,11 @@ where ext_compression = Some(q); has_ext = ext; } + ext::Patch::ID => { + let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; + ext_patch = p; + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenAck", ext)?; } @@ -387,6 +412,7 @@ where ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }) } } diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index e9660ea7ec..574aa1faec 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -122,6 +122,7 @@ pub struct InitSyn { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } // Extensions @@ -156,6 +157,13 @@ pub mod ext { /// # Compression extension /// Used to negotiate the use of compression on the link pub type Compression = zextunit!(0x6, false); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation start/stop marker + pub type Patch = zextz64!(0x7, false); + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl InitSyn { @@ -180,6 +188,7 @@ impl InitSyn { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -195,6 +204,7 @@ impl InitSyn { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } @@ -215,6 +225,7 @@ pub struct InitAck { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } impl InitAck { @@ -244,6 +255,7 @@ impl InitAck { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -260,6 +272,7 @@ impl InitAck { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index 26ed290c04..31058e8107 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -102,6 +102,7 @@ pub struct Join { pub next_sn: PrioritySn, pub ext_qos: Option, pub ext_shm: Option, + pub ext_patch: ext::PatchType, } pub mod flag { @@ -115,7 +116,10 @@ pub mod ext { use alloc::boxed::Box; use super::{Priority, PrioritySn}; - use crate::{common::ZExtZBuf, zextzbuf}; + use crate::{ + common::{ZExtZ64, ZExtZBuf}, + zextz64, zextzbuf, + }; /// # QoS extension /// Used to announce next sn when QoS is enabled @@ -125,6 +129,13 @@ pub mod ext { /// # Shm extension /// Used to advertise shared memory capabilities pub type Shm = zextzbuf!(0x2, true); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation start/stop marker + pub type Patch = zextz64!(0x3, false); + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl Join { @@ -151,6 +162,7 @@ impl Join { .gen_bool(0.5) .then_some(Box::new([PrioritySn::rand(); Priority::NUM])); let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_patch = ext::PatchType::rand(); Self { version, @@ -162,6 +174,7 @@ impl Join { next_sn, ext_qos, ext_shm, + ext_patch, } } } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index ba2ac32c4a..c219f726ec 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -311,4 +311,42 @@ pub mod ext { ZExtZ64::new(ext.inner as u64) } } + + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct PatchType(u8); + + impl PatchType { + pub const DEFAULT: Self = Self(0); + pub const CURRENT: Self = Self(1); + + pub fn new(int: u8) -> Self { + Self(int) + } + + pub fn raw(self) -> u8 { + self.0 + } + + pub fn has_fragmentation_start_stop(&self) -> bool { + self.0 >= 1 + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + Self(rand::thread_rng().gen()) + } + } + + impl From> for PatchType { + fn from(ext: ZExtZ64) -> Self { + Self(ext.value as u8) + } + } + + impl From> for ZExtZ64 { + fn from(ext: PatchType) -> Self { + ZExtZ64::new(ext.0 as u64) + } + } } diff --git a/commons/zenoh-protocol/src/transport/open.rs b/commons/zenoh-protocol/src/transport/open.rs index 8042eeb634..64668b9533 100644 --- a/commons/zenoh-protocol/src/transport/open.rs +++ b/commons/zenoh-protocol/src/transport/open.rs @@ -86,17 +86,14 @@ pub struct OpenSyn { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } // Extensions pub mod ext { - #[cfg(feature = "shared-memory")] - use crate::common::ZExtZ64; - #[cfg(feature = "shared-memory")] - use crate::zextz64; use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension @@ -124,6 +121,13 @@ pub mod ext { /// # Compression extension /// Used to negotiate the use of compression on the link pub type Compression = zextunit!(0x6, false); + + /// # Patch extension + /// Used to negotiate the patch version of the protocol + /// if not present (or 0), then protocol as released with 1.0.0 + /// if >= 1, then fragmentation start/stop marker + pub type Patch = zextz64!(0x7, false); + pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl OpenSyn { @@ -155,6 +159,7 @@ impl OpenSyn { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { lease, @@ -167,6 +172,7 @@ impl OpenSyn { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } @@ -182,6 +188,7 @@ pub struct OpenAck { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, + pub ext_patch: ext::PatchType, } impl OpenAck { @@ -209,6 +216,7 @@ impl OpenAck { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_patch = ext::PatchType::rand(); Self { lease, @@ -220,6 +228,7 @@ impl OpenAck { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } } } diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index c4c23290ee..4f08ec7b30 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -24,7 +24,9 @@ use zenoh_core::{zcondfeat, zlock}; use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn}, + transport::{ + join::ext::PatchType, BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn, + }, }; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal}; @@ -495,6 +497,7 @@ async fn tx_task( next_sn, ext_qos, ext_shm: None, + ext_patch: PatchType::CURRENT } .into(); diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 5cdb8cc868..0873fdf3cf 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -211,11 +211,11 @@ impl TransportMulticastInner { // Drop invalid message and continue return Ok(()); } - if ext_stop.is_some() { + if peer.patch.has_fragmentation_start_stop() && ext_stop.is_some() { return Ok(()); } if guard.defrag.is_empty() { - if ext_start.is_none() { + if peer.patch.has_fragmentation_start_stop() && ext_start.is_none() { // TODO better message tracing::warn!("a fragment chain was received without the start marker"); return Ok(()); diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index bcccaa9a85..ee374b75e1 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -25,7 +25,7 @@ use zenoh_core::{zcondfeat, zread, zwrite}; use zenoh_link::{Link, Locator}; use zenoh_protocol::{ core::{Bits, Field, Priority, Resolution, WhatAmI, ZenohIdProto}, - transport::{batch_size, close, Close, Join, TransportMessage}, + transport::{batch_size, close, join::ext::PatchType, Close, Join, TransportMessage}, }; use zenoh_result::{bail, ZResult}; use zenoh_task::TaskController; @@ -61,6 +61,7 @@ pub(super) struct TransportMulticastPeer { token: CancellationToken, pub(super) priority_rx: Box<[TransportPriorityRx]>, pub(super) handler: Arc, + pub(super) patch: PatchType, } impl TransportMulticastPeer { @@ -417,6 +418,7 @@ impl TransportMulticastInner { token, priority_rx, handler, + patch: join.ext_patch, }; zwrite!(self.peers).insert(locator.clone(), peer); diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index db2310c32f..426ecf82e1 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -61,6 +61,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateAccept, ext_lowlatency: ext::lowlatency::StateAccept, + ext_patch: ext::patch::StateAccept, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -143,6 +144,7 @@ struct AcceptLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -268,6 +270,12 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_syn((&mut state.transport.ext_patch, init_syn.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitSynOut { other_zid: init_syn.zid, other_whatami: init_syn.whatami, @@ -330,7 +338,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Extension MultiLink + // Extension Compression let ext_compression = zcondfeat!( "transport_compression", self.ext_compression @@ -340,6 +348,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_ack(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Create the cookie let cookie_nonce: u64 = zasynclock!(self.prng).gen(); let cookie = Cookie { @@ -358,6 +373,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_lowlatency: state.transport.ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression: state.link.ext_compression, + ext_patch: state.transport.ext_patch, }; let mut encrypted = vec![]; @@ -391,6 +407,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -491,6 +508,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { #[cfg(feature = "shared-memory")] ext_shm: cookie.ext_shm, ext_lowlatency: cookie.ext_lowlatency, + ext_patch: cookie.ext_patch, }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -545,6 +563,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + #[cfg(feature = "transport_compression")] + self.ext_patch + .recv_open_syn((&mut state.transport.ext_patch, open_syn.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvOpenSynOut { other_zid: cookie.zid, other_whatami: cookie.whatami, @@ -618,6 +643,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_open_ack(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Build OpenAck message let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); @@ -631,6 +663,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, }; // Do not send the OpenAck right now since we might still incur in MAX_LINKS error @@ -681,6 +714,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Init handshake @@ -719,6 +753,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - ext_lowlatency: ext::lowlatency::StateAccept::new( manager.config.unicast.is_lowlatency, ), + ext_patch: ext::patch::StateAccept::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -786,6 +821,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: osyn_out.other_auth_id, + patch: state.transport.ext_patch.get(), }; let a_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 4220f8e08b..58f56fa92a 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -44,6 +44,7 @@ pub(crate) struct Cookie { pub(crate) ext_lowlatency: ext::lowlatency::StateAccept, #[cfg(feature = "transport_compression")] pub(crate) ext_compression: ext::compression::StateAccept, + pub(crate) ext_patch: ext::patch::StateAccept, } impl WCodec<&Cookie, &mut W> for Zenoh080 @@ -70,6 +71,7 @@ where self.write(&mut *writer, &x.ext_lowlatency)?; #[cfg(feature = "transport_compression")] self.write(&mut *writer, &x.ext_compression)?; + self.write(&mut *writer, &x.ext_patch)?; Ok(()) } @@ -100,6 +102,7 @@ where let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "transport_compression")] let ext_compression: ext::compression::StateAccept = self.read(&mut *reader)?; + let ext_patch: ext::patch::StateAccept = self.read(&mut *reader)?; let cookie = Cookie { zid, @@ -117,6 +120,7 @@ where ext_lowlatency, #[cfg(feature = "transport_compression")] ext_compression, + ext_patch, }; Ok(cookie) @@ -188,6 +192,7 @@ impl Cookie { ext_lowlatency: ext::lowlatency::StateAccept::rand(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::StateAccept::rand(), + ext_patch: ext::patch::StateAccept::rand(), } } } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs index f4aafa832c..269979b84e 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod compression; pub(crate) mod lowlatency; #[cfg(feature = "transport_multilink")] pub(crate) mod multilink; +pub(crate) mod patch; pub(crate) mod qos; #[cfg(feature = "shared-memory")] pub(crate) mod shm; diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs new file mode 100644 index 0000000000..f01b7eab4d --- /dev/null +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -0,0 +1,196 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use core::marker::PhantomData; + +use async_trait::async_trait; +use zenoh_buffers::{ + reader::{DidntRead, Reader}, + writer::{DidntWrite, Writer}, +}; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_protocol::transport::{init, open}; +use zenoh_result::Error as ZError; + +use crate::unicast::establishment::{AcceptFsm, OpenFsm}; + +// Extension Fsm +pub(crate) struct PatchFsm<'a> { + _a: PhantomData<&'a ()>, +} + +impl<'a> PatchFsm<'a> { + pub(crate) const fn new() -> Self { + Self { _a: PhantomData } + } +} + +/*************************************/ +/* OPEN */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen { + patch: init::ext::PatchType, +} + +impl StateOpen { + pub(crate) const fn new() -> Self { + Self { + patch: init::ext::PatchType::CURRENT, + } + } + + pub(crate) const fn get(&self) -> init::ext::PatchType { + self.patch + } +} + +#[async_trait] +impl<'a> OpenFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type SendInitSynIn = &'a StateOpen; + type SendInitSynOut = init::ext::PatchType; + async fn send_init_syn( + self, + _state: Self::SendInitSynIn, + ) -> Result { + Ok(init::ext::PatchType::CURRENT) + } + + type RecvInitAckIn = (&'a mut StateOpen, init::ext::PatchType); + type RecvInitAckOut = (); + async fn recv_init_ack( + self, + input: Self::RecvInitAckIn, + ) -> Result { + let (state, other_ext) = input; + state.patch = std::cmp::min(state.patch, other_ext); + Ok(()) + } + + type SendOpenSynIn = &'a StateOpen; + type SendOpenSynOut = open::ext::PatchType; + async fn send_open_syn( + self, + _state: Self::SendOpenSynIn, + ) -> Result { + Ok(open::ext::PatchType::DEFAULT) + } + + type RecvOpenAckIn = (&'a mut StateOpen, open::ext::PatchType); + type RecvOpenAckOut = (); + async fn recv_open_ack( + self, + _state: Self::RecvOpenAckIn, + ) -> Result { + Ok(()) + } +} + +/*************************************/ +/* ACCEPT */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept { + patch: init::ext::PatchType, +} + +impl StateAccept { + pub(crate) const fn new() -> Self { + Self { + patch: init::ext::PatchType::CURRENT, + } + } + + pub(crate) const fn get(&self) -> init::ext::PatchType { + self.patch + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + Self { + patch: init::ext::PatchType::rand(), + } + } +} + +// Codec +impl WCodec<&StateAccept, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { + let raw = x.patch.raw(); + self.write(&mut *writer, raw)?; + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let raw: u8 = self.read(&mut *reader)?; + let patch = init::ext::PatchType::new(raw); + Ok(StateAccept { patch }) + } +} + +#[async_trait] +impl<'a> AcceptFsm for &'a PatchFsm<'a> { + type Error = ZError; + + type RecvInitSynIn = (&'a mut StateAccept, init::ext::PatchType); + type RecvInitSynOut = (); + async fn recv_init_syn( + self, + input: Self::RecvInitSynIn, + ) -> Result { + let (state, other_ext) = input; + state.patch = std::cmp::min(state.patch, other_ext); + Ok(()) + } + + type SendInitAckIn = &'a StateAccept; + type SendInitAckOut = init::ext::PatchType; + async fn send_init_ack( + self, + _state: Self::SendInitAckIn, + ) -> Result { + Ok(init::ext::PatchType::CURRENT) + } + + type RecvOpenSynIn = (&'a mut StateAccept, init::ext::PatchType); + type RecvOpenSynOut = (); + async fn recv_open_syn( + self, + _state: Self::RecvOpenSynIn, + ) -> Result { + Ok(()) + } + + type SendOpenAckIn = &'a StateAccept; + type SendOpenAckOut = init::ext::PatchType; + async fn send_open_ack( + self, + _state: Self::SendOpenAckIn, + ) -> Result { + Ok(init::ext::PatchType::DEFAULT) + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index f3ce60b354..9924cc247d 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -58,6 +58,7 @@ struct StateTransport { #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen, ext_lowlatency: ext::lowlatency::StateOpen, + ext_patch: ext::patch::StateOpen, } #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] @@ -124,6 +125,7 @@ struct OpenLink<'a> { ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm<'a>, + ext_patch: ext::patch::PatchFsm<'a>, } #[async_trait] @@ -192,6 +194,13 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_init_syn(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let msg: TransportMessage = InitSyn { version: input.mine_version, whatami: input.mine_whatami, @@ -206,6 +215,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -419,6 +429,13 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { None ); + // Extension Patch + let ext_patch = self + .ext_patch + .send_open_syn(&state.transport.ext_patch) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Build and send an OpenSyn message let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); @@ -433,6 +450,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { ext_mlink, ext_lowlatency, ext_compression, + ext_patch, } .into(); @@ -575,6 +593,7 @@ pub(crate) async fn open_link( ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), #[cfg(feature = "transport_compression")] ext_compression: ext::compression::CompressionFsm::new(), + ext_patch: ext::patch::PatchFsm::new(), }; // Clippy raises a warning because `batch_size::UNICAST` is currently equal to `BatchSize::MAX`. @@ -599,8 +618,8 @@ pub(crate) async fn open_link( .open(manager.config.unicast.max_links > 1), #[cfg(feature = "shared-memory")] ext_shm: ext::shm::StateOpen::new(), - ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), + ext_patch: ext::patch::StateOpen::new(), }, #[cfg(any(feature = "transport_auth", feature = "transport_compression"))] link: StateLink { @@ -669,6 +688,7 @@ pub(crate) async fn open_link( is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), #[cfg(feature = "auth_usrpwd")] auth_id: UsrPwdId(None), + patch: state.transport.ext_patch.get(), }; let o_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 4539135fe9..ba0fd06d1e 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -34,7 +34,7 @@ use zenoh_link::Link; use zenoh_protocol::{ core::{Bits, WhatAmI, ZenohIdProto}, network::NetworkMessage, - transport::{close, TransportSn}, + transport::{close, init::ext::PatchType, TransportSn}, }; use zenoh_result::{zerror, ZResult}; @@ -63,6 +63,7 @@ pub(crate) struct TransportConfigUnicast { pub(crate) is_lowlatency: bool, #[cfg(feature = "auth_usrpwd")] pub(crate) auth_id: UsrPwdId, + pub(crate) patch: PatchType, } /// [`TransportUnicast`] is the transport handler returned diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index e1b140eb1d..fe755dd4b7 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -149,11 +149,11 @@ impl TransportUnicastUniversal { // Drop invalid message and continue return Ok(()); } - if ext_stop.is_some() { + if self.config.patch.has_fragmentation_start_stop() && ext_stop.is_some() { return Ok(()); } if guard.defrag.is_empty() { - if ext_start.is_none() { + if self.config.patch.has_fragmentation_start_stop() && ext_start.is_none() { // TODO better message tracing::warn!("a fragment chain was received without the start marker"); return Ok(()); From 0ed8ffae7ab628beb0f1cd78595067b618a9cda7 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Sun, 24 Nov 2024 22:06:24 +0100 Subject: [PATCH 04/10] fix: apply PR feedbacks --- commons/zenoh-codec/src/transport/init.rs | 12 ++++---- commons/zenoh-codec/src/transport/join.rs | 6 ++-- commons/zenoh-codec/src/transport/open.rs | 30 ++----------------- commons/zenoh-protocol/src/transport/join.rs | 2 +- commons/zenoh-protocol/src/transport/mod.rs | 4 +-- commons/zenoh-protocol/src/transport/open.rs | 21 ++++--------- io/zenoh-transport/src/common/pipeline.rs | 4 ++- io/zenoh-transport/src/multicast/rx.rs | 29 +++++++++++------- .../src/unicast/establishment/accept.rs | 15 ---------- .../src/unicast/establishment/ext/patch.rs | 26 ++++++++-------- .../src/unicast/establishment/open.rs | 14 ++++----- .../src/unicast/universal/link.rs | 6 ++-- .../src/unicast/universal/rx.rs | 29 +++++++++++------- 13 files changed, 82 insertions(+), 116 deletions(-) diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index 198c8ad28c..25b2b29f04 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -66,7 +66,7 @@ where + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) + (ext_compression.is_some() as u8) - + (*ext_patch != ext::PatchType::DEFAULT) as u8; + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -127,7 +127,7 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } - if *ext_patch != ext::PatchType::DEFAULT { + if *ext_patch != ext::PatchType::NONE { n_exts -= 1; self.write(&mut *writer, (*ext_patch, n_exts != 0))?; } @@ -192,7 +192,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; - let mut ext_patch = ext::PatchType::DEFAULT; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -302,7 +302,7 @@ where + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) + (ext_compression.is_some() as u8) - + (*ext_patch != ext::PatchType::DEFAULT) as u8; + + (*ext_patch != ext::PatchType::NONE) as u8; #[cfg(feature = "shared-memory")] { @@ -366,7 +366,7 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } - if *ext_patch != ext::PatchType::DEFAULT { + if *ext_patch != ext::PatchType::NONE { n_exts -= 1; self.write(&mut *writer, (*ext_patch, n_exts != 0))?; } @@ -434,7 +434,7 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; - let mut ext_patch = ext::PatchType::DEFAULT; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index e1d8752134..5504e5d03e 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -163,7 +163,7 @@ where } let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8) - + (*ext_patch != ext::PatchType::DEFAULT) as u8; + + (*ext_patch != ext::PatchType::NONE) as u8; if n_exts != 0 { header |= flag::Z; } @@ -204,7 +204,7 @@ where n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } - if *ext_patch != ext::PatchType::DEFAULT { + if *ext_patch != ext::PatchType::NONE { n_exts -= 1; self.write(&mut *writer, (*ext_patch, n_exts != 0))?; } @@ -271,7 +271,7 @@ where // Extensions let mut ext_qos = None; let mut ext_shm = None; - let mut ext_patch = ext::PatchType::DEFAULT; + let mut ext_patch = ext::PatchType::NONE; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/transport/open.rs b/commons/zenoh-codec/src/transport/open.rs index 3285da0a3b..712fe5ca95 100644 --- a/commons/zenoh-codec/src/transport/open.rs +++ b/commons/zenoh-codec/src/transport/open.rs @@ -48,7 +48,6 @@ where ext_mlink, ext_lowlatency, ext_compression, - ext_patch, } = x; // Header @@ -60,8 +59,7 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8) - + (*ext_patch != ext::PatchType::DEFAULT) as u8; + + (ext_compression.is_some() as u8); #[cfg(feature = "shared-memory")] { @@ -108,10 +106,6 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } - if *ext_patch != ext::PatchType::DEFAULT { - n_exts -= 1; - self.write(&mut *writer, (*ext_patch, n_exts != 0))?; - } Ok(()) } @@ -159,7 +153,6 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; - let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -197,11 +190,6 @@ where ext_compression = Some(q); has_ext = ext; } - ext::Patch::ID => { - let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; - ext_patch = p; - has_ext = ext; - } _ => { has_ext = extension::skip(reader, "OpenSyn", ext)?; } @@ -219,7 +207,6 @@ where ext_mlink, ext_lowlatency, ext_compression, - ext_patch, }) } } @@ -242,7 +229,6 @@ where ext_mlink, ext_lowlatency, ext_compression, - ext_patch, } = x; // Header @@ -256,8 +242,7 @@ where + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) - + (ext_compression.is_some() as u8) - + (*ext_patch != ext::PatchType::DEFAULT) as u8; + + (ext_compression.is_some() as u8); #[cfg(feature = "shared-memory")] { @@ -303,10 +288,6 @@ where n_exts -= 1; self.write(&mut *writer, (compression, n_exts != 0))?; } - if *ext_patch != ext::PatchType::DEFAULT { - n_exts -= 1; - self.write(&mut *writer, (*ext_patch, n_exts != 0))?; - } Ok(()) } @@ -353,7 +334,6 @@ where let mut ext_mlink = None; let mut ext_lowlatency = None; let mut ext_compression = None; - let mut ext_patch = ext::PatchType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -391,11 +371,6 @@ where ext_compression = Some(q); has_ext = ext; } - ext::Patch::ID => { - let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?; - ext_patch = p; - has_ext = ext; - } _ => { has_ext = extension::skip(reader, "OpenAck", ext)?; } @@ -412,7 +387,6 @@ where ext_mlink, ext_lowlatency, ext_compression, - ext_patch, }) } } diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index 31058e8107..d08058d495 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -134,7 +134,7 @@ pub mod ext { /// Used to negotiate the patch version of the protocol /// if not present (or 0), then protocol as released with 1.0.0 /// if >= 1, then fragmentation start/stop marker - pub type Patch = zextz64!(0x3, false); + pub type Patch = zextz64!(0x7, false); // use the same id as Init pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index c219f726ec..6d0b723731 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -312,11 +312,11 @@ pub mod ext { } } - #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct PatchType(u8); impl PatchType { - pub const DEFAULT: Self = Self(0); + pub const NONE: Self = Self(0); pub const CURRENT: Self = Self(1); pub fn new(int: u8) -> Self { diff --git a/commons/zenoh-protocol/src/transport/open.rs b/commons/zenoh-protocol/src/transport/open.rs index 64668b9533..8042eeb634 100644 --- a/commons/zenoh-protocol/src/transport/open.rs +++ b/commons/zenoh-protocol/src/transport/open.rs @@ -86,14 +86,17 @@ pub struct OpenSyn { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, - pub ext_patch: ext::PatchType, } // Extensions pub mod ext { + #[cfg(feature = "shared-memory")] + use crate::common::ZExtZ64; + #[cfg(feature = "shared-memory")] + use crate::zextz64; use crate::{ - common::{ZExtUnit, ZExtZ64, ZExtZBuf}, - zextunit, zextz64, zextzbuf, + common::{ZExtUnit, ZExtZBuf}, + zextunit, zextzbuf, }; /// # QoS extension @@ -121,13 +124,6 @@ pub mod ext { /// # Compression extension /// Used to negotiate the use of compression on the link pub type Compression = zextunit!(0x6, false); - - /// # Patch extension - /// Used to negotiate the patch version of the protocol - /// if not present (or 0), then protocol as released with 1.0.0 - /// if >= 1, then fragmentation start/stop marker - pub type Patch = zextz64!(0x7, false); - pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } impl OpenSyn { @@ -159,7 +155,6 @@ impl OpenSyn { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); - let ext_patch = ext::PatchType::rand(); Self { lease, @@ -172,7 +167,6 @@ impl OpenSyn { ext_mlink, ext_lowlatency, ext_compression, - ext_patch, } } } @@ -188,7 +182,6 @@ pub struct OpenAck { pub ext_mlink: Option, pub ext_lowlatency: Option, pub ext_compression: Option, - pub ext_patch: ext::PatchType, } impl OpenAck { @@ -216,7 +209,6 @@ impl OpenAck { let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); - let ext_patch = ext::PatchType::rand(); Self { lease, @@ -228,7 +220,6 @@ impl OpenAck { ext_mlink, ext_lowlatency, ext_compression, - ext_patch, } } } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 3c00ee29db..59d5db7525 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -802,7 +802,9 @@ impl TransmissionPipelineConsumer { } pub(crate) fn refill(&mut self, batch: WBatch, priority: usize) { - self.stage_out[priority].refill(batch); + if !batch.is_ephemeral() { + self.stage_out[priority].refill(batch); + } } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 0873fdf3cf..6463f67393 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,7 +166,7 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -207,19 +207,26 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } - if peer.patch.has_fragmentation_start_stop() && ext_stop.is_some() { - return Ok(()); - } - if guard.defrag.is_empty() { - if peer.patch.has_fragmentation_start_stop() && ext_start.is_none() { - // TODO better message - tracing::warn!("a fragment chain was received without the start marker"); + if peer.patch.has_fragmentation_start_stop() { + if ext_start.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); + return Ok(()); + } + if ext_stop.is_some() { + guard.defrag.clear(); return Ok(()); } + } + if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } if let Err(e) = guard.defrag.push(sn, payload) { @@ -246,14 +253,16 @@ impl TransportMulticastInner { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.precedes(sn)?; if !precedes { tracing::debug!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.manager.config.zid, + message_type, sn, guard.sn.next() ); diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 426ecf82e1..386b9f832c 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -563,13 +563,6 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Extension Patch - #[cfg(feature = "transport_compression")] - self.ext_patch - .recv_open_syn((&mut state.transport.ext_patch, open_syn.ext_patch)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - let output = RecvOpenSynOut { other_zid: cookie.zid, other_whatami: cookie.whatami, @@ -643,13 +636,6 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { None ); - // Extension Patch - let ext_patch = self - .ext_patch - .send_open_ack(&state.transport.ext_patch) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Build OpenAck message let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); @@ -663,7 +649,6 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { ext_mlink, ext_lowlatency, ext_compression, - ext_patch, }; // Do not send the OpenAck right now since we might still incur in MAX_LINKS error diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs index f01b7eab4d..acca883912 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -19,7 +19,7 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; +use zenoh_protocol::transport::init; use zenoh_result::Error as ZError; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -46,7 +46,7 @@ pub(crate) struct StateOpen { impl StateOpen { pub(crate) const fn new() -> Self { Self { - patch: init::ext::PatchType::CURRENT, + patch: init::ext::PatchType::NONE, } } @@ -75,26 +75,26 @@ impl<'a> OpenFsm for &'a PatchFsm<'a> { input: Self::RecvInitAckIn, ) -> Result { let (state, other_ext) = input; - state.patch = std::cmp::min(state.patch, other_ext); + state.patch = other_ext; Ok(()) } type SendOpenSynIn = &'a StateOpen; - type SendOpenSynOut = open::ext::PatchType; + type SendOpenSynOut = (); async fn send_open_syn( self, _state: Self::SendOpenSynIn, ) -> Result { - Ok(open::ext::PatchType::DEFAULT) + unimplemented!() } - type RecvOpenAckIn = (&'a mut StateOpen, open::ext::PatchType); + type RecvOpenAckIn = (&'a mut StateOpen, ()); type RecvOpenAckOut = (); async fn recv_open_ack( self, _state: Self::RecvOpenAckIn, ) -> Result { - Ok(()) + unimplemented!() } } @@ -109,7 +109,7 @@ pub(crate) struct StateAccept { impl StateAccept { pub(crate) const fn new() -> Self { Self { - patch: init::ext::PatchType::CURRENT, + patch: init::ext::PatchType::NONE, } } @@ -163,7 +163,7 @@ impl<'a> AcceptFsm for &'a PatchFsm<'a> { input: Self::RecvInitSynIn, ) -> Result { let (state, other_ext) = input; - state.patch = std::cmp::min(state.patch, other_ext); + state.patch = other_ext; Ok(()) } @@ -176,21 +176,21 @@ impl<'a> AcceptFsm for &'a PatchFsm<'a> { Ok(init::ext::PatchType::CURRENT) } - type RecvOpenSynIn = (&'a mut StateAccept, init::ext::PatchType); + type RecvOpenSynIn = (&'a mut StateAccept, ()); type RecvOpenSynOut = (); async fn recv_open_syn( self, _state: Self::RecvOpenSynIn, ) -> Result { - Ok(()) + unimplemented!() } type SendOpenAckIn = &'a StateAccept; - type SendOpenAckOut = init::ext::PatchType; + type SendOpenAckOut = (); async fn send_open_ack( self, _state: Self::SendOpenAckIn, ) -> Result { - Ok(init::ext::PatchType::DEFAULT) + unimplemented!() } } diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 9924cc247d..a4283e3643 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -357,6 +357,12 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Patch + self.ext_patch + .recv_init_ack((&mut state.transport.ext_patch, init_ack.ext_patch)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + let output = RecvInitAckOut { other_zid: init_ack.zid, other_whatami: init_ack.whatami, @@ -429,13 +435,6 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { None ); - // Extension Patch - let ext_patch = self - .ext_patch - .send_open_syn(&state.transport.ext_patch) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - // Build and send an OpenSyn message let mine_initial_sn = compute_sn(input.mine_zid, input.other_zid, state.transport.resolution); @@ -450,7 +449,6 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { ext_mlink, ext_lowlatency, ext_compression, - ext_patch, } .into(); diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 8da4d11cd3..18011249be 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -191,10 +191,8 @@ async fn tx_task( stats.inc_tx_bytes(batch.len() as usize); } - if !batch.is_ephemeral() { - // Reinsert the batch into the queue - pipeline.refill(batch, priority); - } + // Reinsert the batch into the queue + pipeline.refill(batch, priority); }, Ok(None) => { // The queue has been disabled: break the tx loop, drain the queue, and exit diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index fe755dd4b7..bea14c0198 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,7 +97,7 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Frame", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } @@ -145,19 +145,26 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if !self.verify_sn(sn, &mut guard)? { + if !self.verify_sn("Fragment", sn, &mut guard)? { // Drop invalid message and continue return Ok(()); } - if self.config.patch.has_fragmentation_start_stop() && ext_stop.is_some() { - return Ok(()); - } - if guard.defrag.is_empty() { - if self.config.patch.has_fragmentation_start_stop() && ext_start.is_none() { - // TODO better message - tracing::warn!("a fragment chain was received without the start marker"); + if self.config.patch.has_fragmentation_start_stop() { + if ext_start.is_some() { + guard.defrag.clear(); + } else if guard.defrag.is_empty() { + tracing::trace!( + "Transport: {}. First fragment received without start marker.", + self.manager.config.zid, + ); return Ok(()); } + if ext_stop.is_some() { + guard.defrag.clear(); + return Ok(()); + } + } + if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } if let Err(e) = guard.defrag.push(sn, payload) { @@ -188,14 +195,16 @@ impl TransportUnicastUniversal { fn verify_sn( &self, + message_type: &str, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, ) -> ZResult { let precedes = guard.sn.roll(sn)?; if !precedes { tracing::trace!( - "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", + "Transport: {}. {} with invalid SN dropped: {}. Expected: {}.", self.config.zid, + message_type, sn, guard.sn.next() ); From e4883f499046056fbd09f25e97f06c1297dd6563 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 25 Nov 2024 12:52:45 +0100 Subject: [PATCH 05/10] fix: apply PR feedbacks --- .../src/unicast/establishment/ext/patch.rs | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs index acca883912..cbceda4589 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use core::marker::PhantomData; +use std::{cmp::min, marker::PhantomData}; use async_trait::async_trait; use zenoh_buffers::{ @@ -19,8 +19,8 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::init; -use zenoh_result::Error as ZError; +use zenoh_protocol::transport::init::ext::PatchType; +use zenoh_result::{bail, Error as ZError}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -40,17 +40,17 @@ impl<'a> PatchFsm<'a> { /*************************************/ #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) struct StateOpen { - patch: init::ext::PatchType, + patch: PatchType, } impl StateOpen { pub(crate) const fn new() -> Self { Self { - patch: init::ext::PatchType::NONE, + patch: PatchType::NONE, } } - pub(crate) const fn get(&self) -> init::ext::PatchType { + pub(crate) const fn get(&self) -> PatchType { self.patch } } @@ -60,21 +60,28 @@ impl<'a> OpenFsm for &'a PatchFsm<'a> { type Error = ZError; type SendInitSynIn = &'a StateOpen; - type SendInitSynOut = init::ext::PatchType; + type SendInitSynOut = PatchType; async fn send_init_syn( self, _state: Self::SendInitSynIn, ) -> Result { - Ok(init::ext::PatchType::CURRENT) + Ok(PatchType::CURRENT) } - type RecvInitAckIn = (&'a mut StateOpen, init::ext::PatchType); + type RecvInitAckIn = (&'a mut StateOpen, PatchType); type RecvInitAckOut = (); async fn recv_init_ack( self, input: Self::RecvInitAckIn, ) -> Result { let (state, other_ext) = input; + if other_ext > PatchType::CURRENT { + bail!( + "Acceptor patch should be lesser or equal to {current:?}, found {other:?}", + current = PatchType::CURRENT.raw(), + other = other_ext.raw(), + ); + } state.patch = other_ext; Ok(()) } @@ -85,7 +92,7 @@ impl<'a> OpenFsm for &'a PatchFsm<'a> { self, _state: Self::SendOpenSynIn, ) -> Result { - unimplemented!() + unimplemented!("There is no patch extension in OPEN") } type RecvOpenAckIn = (&'a mut StateOpen, ()); @@ -94,7 +101,7 @@ impl<'a> OpenFsm for &'a PatchFsm<'a> { self, _state: Self::RecvOpenAckIn, ) -> Result { - unimplemented!() + unimplemented!("There is no patch extension in OPEN") } } @@ -103,24 +110,24 @@ impl<'a> OpenFsm for &'a PatchFsm<'a> { /*************************************/ #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) struct StateAccept { - patch: init::ext::PatchType, + patch: PatchType, } impl StateAccept { pub(crate) const fn new() -> Self { Self { - patch: init::ext::PatchType::NONE, + patch: PatchType::NONE, } } - pub(crate) const fn get(&self) -> init::ext::PatchType { + pub(crate) const fn get(&self) -> PatchType { self.patch } #[cfg(test)] pub(crate) fn rand() -> Self { Self { - patch: init::ext::PatchType::rand(), + patch: PatchType::rand(), } } } @@ -147,7 +154,7 @@ where fn read(self, reader: &mut R) -> Result { let raw: u8 = self.read(&mut *reader)?; - let patch = init::ext::PatchType::new(raw); + let patch = PatchType::new(raw); Ok(StateAccept { patch }) } } @@ -156,7 +163,7 @@ where impl<'a> AcceptFsm for &'a PatchFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut StateAccept, init::ext::PatchType); + type RecvInitSynIn = (&'a mut StateAccept, PatchType); type RecvInitSynOut = (); async fn recv_init_syn( self, @@ -168,12 +175,12 @@ impl<'a> AcceptFsm for &'a PatchFsm<'a> { } type SendInitAckIn = &'a StateAccept; - type SendInitAckOut = init::ext::PatchType; + type SendInitAckOut = PatchType; async fn send_init_ack( self, - _state: Self::SendInitAckIn, + state: Self::SendInitAckIn, ) -> Result { - Ok(init::ext::PatchType::CURRENT) + Ok(min(PatchType::CURRENT, state.patch)) } type RecvOpenSynIn = (&'a mut StateAccept, ()); @@ -182,7 +189,7 @@ impl<'a> AcceptFsm for &'a PatchFsm<'a> { self, _state: Self::RecvOpenSynIn, ) -> Result { - unimplemented!() + unimplemented!("There is no patch extension in OPEN") } type SendOpenAckIn = &'a StateAccept; @@ -191,6 +198,6 @@ impl<'a> AcceptFsm for &'a PatchFsm<'a> { self, _state: Self::SendOpenAckIn, ) -> Result { - unimplemented!() + unimplemented!("There is no patch extension in OPEN") } } From 03a5f208a5f78b6ce05642af7f26e0ed5f09fc78 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 26 Nov 2024 09:46:11 +0100 Subject: [PATCH 06/10] fix: apply PR feedbacks --- io/zenoh-transport/src/multicast/transport.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index ee374b75e1..47d0d8b635 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + cmp::min, collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, @@ -418,7 +419,7 @@ impl TransportMulticastInner { token, priority_rx, handler, - patch: join.ext_patch, + patch: min(PatchType::CURRENT, join.ext_patch), }; zwrite!(self.peers).insert(locator.clone(), peer); From cad0b8da30e5dc8827dcce48ff81b724f0c61ea2 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 09:47:30 +0100 Subject: [PATCH 07/10] refactor: rename start/stop ext to first/drop --- commons/zenoh-codec/src/transport/fragment.rs | 48 +++++++++---------- .../zenoh-protocol/src/transport/fragment.rs | 28 +++++------ commons/zenoh-protocol/src/transport/init.rs | 2 +- commons/zenoh-protocol/src/transport/join.rs | 2 +- commons/zenoh-protocol/src/transport/mod.rs | 2 +- io/zenoh-transport/src/common/pipeline.rs | 10 ++-- io/zenoh-transport/src/multicast/rx.rs | 10 ++-- .../src/unicast/universal/rx.rs | 10 ++-- 8 files changed, 56 insertions(+), 56 deletions(-) diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index c58709931d..73b2b4869b 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -39,8 +39,8 @@ where more, sn, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, } = x; // Header @@ -52,8 +52,8 @@ where header |= flag::M; } let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8 - + ext_start.is_some() as u8 - + ext_stop.is_some() as u8; + + ext_first.is_some() as u8 + + ext_drop.is_some() as u8; if n_exts != 0 { header |= flag::Z; } @@ -67,13 +67,13 @@ where n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(start) = ext_start { + if let Some(first) = ext_first { n_exts -= 1; - self.write(&mut *writer, (start, n_exts != 0))? + self.write(&mut *writer, (first, n_exts != 0))? } - if let Some(stop) = ext_stop { + if let Some(drop) = ext_drop { n_exts -= 1; - self.write(&mut *writer, (stop, n_exts != 0))? + self.write(&mut *writer, (drop, n_exts != 0))? } Ok(()) @@ -113,8 +113,8 @@ where // Extensions let mut ext_qos = ext::QoSType::DEFAULT; - let mut ext_start = None; - let mut ext_stop = None; + let mut ext_first = None; + let mut ext_drop = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -126,14 +126,14 @@ where ext_qos = q; has_ext = ext; } - ext::Start::ID => { - let (start, ext): (ext::Start, bool) = eodec.read(&mut *reader)?; - ext_start = Some(start); + ext::First::ID => { + let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?; + ext_first = Some(first); has_ext = ext; } - ext::Stop::ID => { - let (stop, ext): (ext::Stop, bool) = eodec.read(&mut *reader)?; - ext_stop = Some(stop); + ext::Drop::ID => { + let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?; + ext_drop = Some(drop); has_ext = ext; } _ => { @@ -147,8 +147,8 @@ where more, sn, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, }) } } @@ -167,8 +167,8 @@ where sn, payload, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, } = x; // Header @@ -177,8 +177,8 @@ where more: *more, sn: *sn, ext_qos: *ext_qos, - ext_start: *ext_start, - ext_stop: *ext_stop, + ext_first: *ext_first, + ext_drop: *ext_drop, }; self.write(&mut *writer, &header)?; @@ -217,8 +217,8 @@ where more: header.more, sn: header.sn, ext_qos: header.ext_qos, - ext_start: header.ext_start, - ext_stop: header.ext_stop, + ext_first: header.ext_first, + ext_drop: header.ext_drop, payload, }) } diff --git a/commons/zenoh-protocol/src/transport/fragment.rs b/commons/zenoh-protocol/src/transport/fragment.rs index e34c527302..f8c489ac45 100644 --- a/commons/zenoh-protocol/src/transport/fragment.rs +++ b/commons/zenoh-protocol/src/transport/fragment.rs @@ -75,8 +75,8 @@ pub struct Fragment { pub sn: TransportSn, pub payload: ZSlice, pub ext_qos: ext::QoSType, - pub ext_start: Option, - pub ext_stop: Option, + pub ext_first: Option, + pub ext_drop: Option, } // Extensions @@ -91,10 +91,10 @@ pub mod ext { /// # Start extension /// Mark the first fragment of a fragmented message - pub type Start = zextunit!(0x2, false); + pub type First = zextunit!(0x2, false); /// # Stop extension /// Indicate that the remaining fragments has been dropped - pub type Stop = zextunit!(0x3, false); + pub type Drop = zextunit!(0x3, false); } impl Fragment { @@ -109,8 +109,8 @@ impl Fragment { let sn: TransportSn = rng.gen(); let payload = ZSlice::rand(rng.gen_range(8..128)); let ext_qos = ext::QoSType::rand(); - let ext_start = rng.gen_bool(0.5).then(ext::Start::rand); - let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); Fragment { reliability, @@ -118,8 +118,8 @@ impl Fragment { more, payload, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, } } } @@ -131,8 +131,8 @@ pub struct FragmentHeader { pub more: bool, pub sn: TransportSn, pub ext_qos: ext::QoSType, - pub ext_start: Option, - pub ext_stop: Option, + pub ext_first: Option, + pub ext_drop: Option, } impl FragmentHeader { @@ -146,16 +146,16 @@ impl FragmentHeader { let more = rng.gen_bool(0.5); let sn: TransportSn = rng.gen(); let ext_qos = ext::QoSType::rand(); - let ext_start = rng.gen_bool(0.5).then(ext::Start::rand); - let ext_stop = rng.gen_bool(0.5).then(ext::Stop::rand); + let ext_first = rng.gen_bool(0.5).then(ext::First::rand); + let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand); FragmentHeader { reliability, more, sn, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, } } } diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 574aa1faec..25616a730f 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -161,7 +161,7 @@ pub mod ext { /// # Patch extension /// Used to negotiate the patch version of the protocol /// if not present (or 0), then protocol as released with 1.0.0 - /// if >= 1, then fragmentation start/stop marker + /// if >= 1, then fragmentation first/drop markers pub type Patch = zextz64!(0x7, false); pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index d08058d495..afbe55f3ce 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -133,7 +133,7 @@ pub mod ext { /// # Patch extension /// Used to negotiate the patch version of the protocol /// if not present (or 0), then protocol as released with 1.0.0 - /// if >= 1, then fragmentation start/stop marker + /// if >= 1, then fragmentation first/drop markers pub type Patch = zextz64!(0x7, false); // use the same id as Init pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>; } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index 6d0b723731..d534fef04a 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -327,7 +327,7 @@ pub mod ext { self.0 } - pub fn has_fragmentation_start_stop(&self) -> bool { + pub fn has_fragmentation_markers(&self) -> bool { self.0 >= 1 } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 59d5db7525..fe069d23cf 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -321,21 +321,21 @@ impl StageIn { more: true, sn, ext_qos: frame.ext_qos, - ext_start: Some(fragment::ext::Start::new()), - ext_stop: None, + ext_first: Some(fragment::ext::First::new()), + ext_drop: None, }; let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch batch = zgetbatch_rets!({ // If no fragment has been sent, the sequence number is just reset - if fragment.ext_start.is_some() { + if fragment.ext_first.is_some() { tch.sn.set(sn).unwrap() // Otherwise, an ephemeral batch is created to send the stop fragment } else { let mut batch = WBatch::new_ephemeral(self.batch_config); self.fragbuf.clear(); - fragment.ext_stop = Some(fragment::ext::Stop::new()); + fragment.ext_drop = Some(fragment::ext::Drop::new()); let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); self.s_out.move_batch(batch); } @@ -346,7 +346,7 @@ impl StageIn { Ok(_) => { // Update the SN fragment.sn = tch.sn.get(); - fragment.ext_start = None; + fragment.ext_first = None; // Move the serialization batch into the OUT pipeline self.s_out.move_batch(batch); } diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 6463f67393..dc501742ec 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -183,8 +183,8 @@ impl TransportMulticastInner { more, sn, ext_qos, - ext_start, - ext_stop, + ext_first, + ext_drop, payload, } = fragment; @@ -211,8 +211,8 @@ impl TransportMulticastInner { // Drop invalid message and continue return Ok(()); } - if peer.patch.has_fragmentation_start_stop() { - if ext_start.is_some() { + if peer.patch.has_fragmentation_markers() { + if ext_first.is_some() { guard.defrag.clear(); } else if guard.defrag.is_empty() { tracing::trace!( @@ -221,7 +221,7 @@ impl TransportMulticastInner { ); return Ok(()); } - if ext_stop.is_some() { + if ext_drop.is_some() { guard.defrag.clear(); return Ok(()); } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index bea14c0198..3ac8a92ed4 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -123,8 +123,8 @@ impl TransportUnicastUniversal { more, sn, ext_qos: qos, - ext_start, - ext_stop, + ext_first, + ext_drop, payload, } = fragment; @@ -149,8 +149,8 @@ impl TransportUnicastUniversal { // Drop invalid message and continue return Ok(()); } - if self.config.patch.has_fragmentation_start_stop() { - if ext_start.is_some() { + if self.config.patch.has_fragmentation_markers() { + if ext_first.is_some() { guard.defrag.clear(); } else if guard.defrag.is_empty() { tracing::trace!( @@ -159,7 +159,7 @@ impl TransportUnicastUniversal { ); return Ok(()); } - if ext_stop.is_some() { + if ext_drop.is_some() { guard.defrag.clear(); return Ok(()); } From 1eba4b1ec588150ad3dbc9d073e5a7b25e129d89 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 5 Dec 2024 07:48:07 +0100 Subject: [PATCH 08/10] fix: fix merge --- io/zenoh-transport/src/common/pipeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 52fdd21dcd..256b4e760a 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -882,8 +882,8 @@ impl TransmissionPipelineConsumer { pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) { if !batch.is_ephemeral() { self.stage_out[priority as usize].refill(batch); + self.status.set_congested(priority, false); } - self.status.set_congested(priority, false); } pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> { From 8cae816ae36cbd6cfa2da73cc32ebf83cc0176d3 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 5 Dec 2024 07:51:04 +0100 Subject: [PATCH 09/10] fix: lint --- io/zenoh-transport/src/unicast/establishment/ext/patch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs index cbceda4589..16509df209 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/patch.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/patch.rs @@ -29,7 +29,7 @@ pub(crate) struct PatchFsm<'a> { _a: PhantomData<&'a ()>, } -impl<'a> PatchFsm<'a> { +impl PatchFsm<'_> { pub(crate) const fn new() -> Self { Self { _a: PhantomData } } From 13cf0de464c28ebc9409b65ea63c6462eb708331 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 5 Dec 2024 08:51:01 +0100 Subject: [PATCH 10/10] Retrigger CI