diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 9366de1a5..65150f728 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -84,9 +84,6 @@ pub struct BatchConfig { pub is_streamed: bool, #[cfg(feature = "transport_compression")] pub is_compression: bool, - // an ephemeral batch will not be recycled in the pipeline - // it can be used to push a stop fragment when no batch are available - pub ephemeral: bool, } impl Default for BatchConfig { @@ -96,7 +93,6 @@ impl Default for BatchConfig { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, } } } @@ -205,6 +201,9 @@ pub struct WBatch { // Statistics related to this batch #[cfg(feature = "stats")] pub stats: WBatchStats, + // an ephemeral batch will not be recycled in the pipeline + // it can be used to push a stop fragment when no batch are available + pub ephemeral: bool, } impl WBatch { @@ -213,6 +212,7 @@ impl WBatch { buffer: BBuf::with_capacity(config.mtu as usize), codec: Zenoh080Batch::new(), config, + ephemeral: false, #[cfg(feature = "stats")] stats: WBatchStats::default(), }; @@ -223,6 +223,17 @@ impl WBatch { batch } + pub fn new_ephemeral(config: BatchConfig) -> Self { + Self { + ephemeral: true, + ..Self::new(config) + } + } + + pub fn is_ephemeral(&self) -> bool { + self.ephemeral + } + /// Verify that the [`WBatch`] has no serialized bytes. #[inline(always)] pub fn is_empty(&self) -> bool { @@ -529,7 +540,6 @@ mod tests { is_streamed: rng.gen_bool(0.5), #[cfg(feature = "transport_compression")] is_compression: rng.gen_bool(0.5), - ephemeral: false, }; let mut wbatch = WBatch::new(config); wbatch.encode(&msg_in).unwrap(); @@ -571,7 +581,6 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }; let mut batch = WBatch::new(config); diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index dda9d1304..3c00ee29d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -204,7 +204,7 @@ struct StageIn { fragbuf: ZBuf, batching: bool, // used for stop fragment - ephermeral_batch_config: BatchConfig, + batch_config: BatchConfig, } impl StageIn { @@ -333,7 +333,7 @@ impl StageIn { tch.sn.set(sn).unwrap() // Otherwise, an ephemeral batch is created to send the stop fragment } else { - let mut batch = WBatch::new(self.ephermeral_batch_config); + let mut batch = WBatch::new_ephemeral(self.batch_config); self.fragbuf.clear(); fragment.ext_stop = Some(fragment::ext::Stop::new()); let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment)); @@ -657,10 +657,7 @@ impl TransmissionPipeline { }, fragbuf: ZBuf::empty(), batching: config.batching_enabled, - ephermeral_batch_config: BatchConfig { - ephemeral: true, - ..config.batch - }, + batch_config: config.batch, })); // The stage out for this priority @@ -869,7 +866,6 @@ mod tests { is_streamed: true, #[cfg(feature = "transport_compression")] is_compression: true, - ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, @@ -884,7 +880,6 @@ mod tests { is_streamed: false, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }, queue_size: [1; Priority::NUM], batching_enabled: true, diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index a9ff60dc0..db2310c32 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -657,7 +657,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, - ephemeral: false, }, priorities: None, reliability: None, @@ -796,7 +795,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), - ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index d219ec63b..f3ce60b35 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -554,7 +554,6 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression - ephemeral: false, }, priorities: None, reliability: None, @@ -679,7 +678,6 @@ pub(crate) async fn open_link( is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), - ephemeral: false, }, priorities: state.transport.ext_qos.priorities(), reliability: state.transport.ext_qos.reliability(), diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 45637f6e6..8da4d11cd 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -60,7 +60,6 @@ impl TransportLinkUnicastUniversal { is_streamed: link.link.is_streamed(), #[cfg(feature = "transport_compression")] is_compression: link.config.batch.is_compression, - ephemeral: false, }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, @@ -192,7 +191,7 @@ async fn tx_task( stats.inc_tx_bytes(batch.len() as usize); } - if !batch.config.ephemeral { + if !batch.is_ephemeral() { // Reinsert the batch into the queue pipeline.refill(batch, priority); }