Skip to content

Commit

Permalink
refactor: move ephemeral flag out of batch config
Browse files Browse the repository at this point in the history
I've checked, it doesn't change the size of `WBatch`
  • Loading branch information
wyfo committed Nov 15, 2024
1 parent 3af2042 commit ad8d0a4
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 20 deletions.
21 changes: 15 additions & 6 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ pub struct BatchConfig {
pub is_streamed: bool,
#[cfg(feature = "transport_compression")]
pub is_compression: bool,
// an ephemeral batch will not be recycled in the pipeline
// it can be used to push a stop fragment when no batch are available
pub ephemeral: bool,
}

impl Default for BatchConfig {
Expand All @@ -96,7 +93,6 @@ impl Default for BatchConfig {
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
ephemeral: false,
}
}
}
Expand Down Expand Up @@ -205,6 +201,9 @@ pub struct WBatch {
// Statistics related to this batch
#[cfg(feature = "stats")]
pub stats: WBatchStats,
// an ephemeral batch will not be recycled in the pipeline
// it can be used to push a stop fragment when no batch are available
pub ephemeral: bool,
}

impl WBatch {
Expand All @@ -213,6 +212,7 @@ impl WBatch {
buffer: BBuf::with_capacity(config.mtu as usize),
codec: Zenoh080Batch::new(),
config,
ephemeral: false,
#[cfg(feature = "stats")]
stats: WBatchStats::default(),
};
Expand All @@ -223,6 +223,17 @@ impl WBatch {
batch
}

pub fn new_ephemeral(config: BatchConfig) -> Self {
Self {
ephemeral: true,
..Self::new(config)
}
}

pub fn is_ephemeral(&self) -> bool {
self.ephemeral
}

/// Verify that the [`WBatch`] has no serialized bytes.
#[inline(always)]
pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -529,7 +540,6 @@ mod tests {
is_streamed: rng.gen_bool(0.5),
#[cfg(feature = "transport_compression")]
is_compression: rng.gen_bool(0.5),
ephemeral: false,
};
let mut wbatch = WBatch::new(config);
wbatch.encode(&msg_in).unwrap();
Expand Down Expand Up @@ -571,7 +581,6 @@ mod tests {
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
ephemeral: false,
};
let mut batch = WBatch::new(config);

Expand Down
11 changes: 3 additions & 8 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ struct StageIn {
fragbuf: ZBuf,
batching: bool,
// used for stop fragment
ephermeral_batch_config: BatchConfig,
batch_config: BatchConfig,
}

impl StageIn {
Expand Down Expand Up @@ -333,7 +333,7 @@ impl StageIn {
tch.sn.set(sn).unwrap()
// Otherwise, an ephemeral batch is created to send the stop fragment
} else {
let mut batch = WBatch::new(self.ephermeral_batch_config);
let mut batch = WBatch::new_ephemeral(self.batch_config);
self.fragbuf.clear();
fragment.ext_stop = Some(fragment::ext::Stop::new());
let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment));
Expand Down Expand Up @@ -657,10 +657,7 @@ impl TransmissionPipeline {
},
fragbuf: ZBuf::empty(),
batching: config.batching_enabled,
ephermeral_batch_config: BatchConfig {
ephemeral: true,
..config.batch
},
batch_config: config.batch,
}));

// The stage out for this priority
Expand Down Expand Up @@ -869,7 +866,6 @@ mod tests {
is_streamed: true,
#[cfg(feature = "transport_compression")]
is_compression: true,
ephemeral: false,
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
Expand All @@ -884,7 +880,6 @@ mod tests {
is_streamed: false,
#[cfg(feature = "transport_compression")]
is_compression: false,
ephemeral: false,
},
queue_size: [1; Priority::NUM],
batching_enabled: true,
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: false,
ephemeral: false,
},
priorities: None,
reliability: None,
Expand Down Expand Up @@ -796,7 +795,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
ephemeral: false,
},
priorities: state.transport.ext_qos.priorities(),
reliability: state.transport.ext_qos.reliability(),
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ pub(crate) async fn open_link(
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: false, // Perform the exchange Init/Open exchange with no compression
ephemeral: false,
},
priorities: None,
reliability: None,
Expand Down Expand Up @@ -679,7 +678,6 @@ pub(crate) async fn open_link(
is_streamed,
#[cfg(feature = "transport_compression")]
is_compression: state.link.ext_compression.is_compression(),
ephemeral: false,
},
priorities: state.transport.ext_qos.priorities(),
reliability: state.transport.ext_qos.reliability(),
Expand Down
3 changes: 1 addition & 2 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ impl TransportLinkUnicastUniversal {
is_streamed: link.link.is_streamed(),
#[cfg(feature = "transport_compression")]
is_compression: link.config.batch.is_compression,
ephemeral: false,
},
queue_size: transport.manager.config.queue_size,
wait_before_drop: transport.manager.config.wait_before_drop,
Expand Down Expand Up @@ -192,7 +191,7 @@ async fn tx_task(
stats.inc_tx_bytes(batch.len() as usize);
}

if !batch.config.ephemeral {
if !batch.is_ephemeral() {
// Reinsert the batch into the queue
pipeline.refill(batch, priority);
}
Expand Down

0 comments on commit ad8d0a4

Please sign in to comment.