Skip to content

Commit

Permalink
First working example
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 6, 2023
1 parent 4f691bd commit 2c6cb6d
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 41 deletions.
3 changes: 3 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@
/// enable 'lowlatency' you need to explicitly disable 'qos'.
lowlatency: false,
},
compression: {
enabled: true,
},
qos: {
enabled: true,
},
Expand Down
62 changes: 36 additions & 26 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ impl WBatch {
let (_header, payload) = self.split();

// Create a new empty buffer
let mut buffer = BBuf::with_capacity(self.buffer.capacity());
let mut buffer =
BBuf::with_capacity(lz4_flex::block::get_maximum_output_size(self.buffer.len()));

// Write the initial bytes for the batch
let mut writer = buffer.writer();
Expand Down Expand Up @@ -371,6 +372,8 @@ impl Decode<TransportMessage> for &mut RBatch {

#[cfg(test)]
mod tests {
use std::vec;

use super::*;
use rand::Rng;
use zenoh_buffers::ZBuf;
Expand All @@ -379,7 +382,7 @@ mod tests {
network::{ext, Push},
transport::{
frame::{self, FrameHeader},
KeepAlive, TransportMessage,
Fragment, KeepAlive, TransportMessage,
},
zenoh::{PushBody, Put},
};
Expand All @@ -389,30 +392,37 @@ mod tests {
let mut rng = rand::thread_rng();

for _ in 0..1_000 {
let msg_in = TransportMessage::rand();

let config = BatchConfig {
mtu: BatchSize::MAX,
#[cfg(feature = "transport_compression")]
is_compression: rng.gen_bool(0.5),
};
let mut wbatch = WBatch::new(config);
wbatch.encode(&msg_in).unwrap();
println!("Encoded WBatch: {:?}", wbatch);
wbatch.finalize().unwrap();
println!("Finalized WBatch: {:?}", wbatch);

let mut rbatch = RBatch::new(
config,
wbatch.buffer.as_slice().to_vec().into_boxed_slice().into(),
);
println!("Decoded RBatch: {:?}", rbatch);
rbatch
.initialize(|| zenoh_buffers::vec::uninit(config.mtu as usize).into_boxed_slice())
.unwrap();
println!("Initialized RBatch: {:?}", rbatch);
let msg_out: TransportMessage = rbatch.decode().unwrap();
assert_eq!(msg_in, msg_out);
let msg_ins: [TransportMessage; 2] = [TransportMessage::rand(), {
let mut msg_in = Fragment::rand();
msg_in.payload = vec![0u8; rng.gen_range(8..1_024)].into();
msg_in.into()
}];
for msg_in in msg_ins {
let config = BatchConfig {
mtu: BatchSize::MAX,
#[cfg(feature = "transport_compression")]
is_compression: rng.gen_bool(0.5),
};
let mut wbatch = WBatch::new(config);
wbatch.encode(&msg_in).unwrap();
println!("Encoded WBatch: {:?}", wbatch);
wbatch.finalize().unwrap();
println!("Finalized WBatch: {:?}", wbatch);

let mut rbatch = RBatch::new(
config,
wbatch.buffer.as_slice().to_vec().into_boxed_slice().into(),
);
println!("Decoded RBatch: {:?}", rbatch);
rbatch
.initialize(|| {
zenoh_buffers::vec::uninit(config.mtu as usize).into_boxed_slice()
})
.unwrap();
println!("Initialized RBatch: {:?}", rbatch);
let msg_out: TransportMessage = rbatch.decode().unwrap();
assert_eq!(msg_in, msg_out);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl TransmissionPipeline {
let bc = BatchConfig {
mtu: config.batch_size,
#[cfg(feature = "transport_compression")]
is_compression: false,
is_compression: config.is_compression,
};
let batch = WBatch::new(bc);
assert!(s_ref_w.push(batch).is_none());
Expand Down
7 changes: 6 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,11 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
.await
.map_err(|e| (e, Some(close::reason::GENERIC))));

#[cfg(feature = "transport_compression")]
{
link.config.is_compression = state.link.ext_compression.is_compression();
}

// Sync the RX sequence number
let _ = step!(transport
.get_inner()
Expand All @@ -732,7 +737,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager)
.map_err(|e| (e, Some(close::reason::INVALID))));

log::debug!(
"New transport link accepted from {} to {}: {}",
"New transport link accepted from {} to {}: {:?}",
osyn_out.other_zid,
manager.config.zid,
link
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ pub(crate) async fn open_link(
}

log::debug!(
"New transport link opened from {} to {}: {}",
"New transport link opened from {} to {}: {:?}",
manager.config.zid,
iack_out.other_zid,
link
Expand Down
8 changes: 6 additions & 2 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,14 @@ impl TransportLinkUnicast {
};

let buffer = ZSlice::make(Arc::new(into), 0, end)
.map_err(|_| zerror!("ZSlice index(es) out of bounds"))?;
.map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?;

log::trace!("RBatch: {:?}", buffer);

let mut batch = RBatch::new(self.batch_config(), buffer);
batch.initialize(buff).map_err(|_| zerror!("{ERR}{self}"))?;
batch
.initialize(buff)
.map_err(|e| zerror!("{ERR}{self}. {e}."))?;

Ok(batch)
}
Expand Down
10 changes: 0 additions & 10 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ impl TransportLinkUnicastUniversal {
backoff: self.transport.manager.config.queue_backoff,
};

#[cfg(all(feature = "unstable", feature = "transport_compression"))]
let is_compressed = self.transport.config.manager.config.unicast.is_compressed;

// The pipeline
let (producer, consumer) = TransmissionPipeline::make(config, priority_tx);
self.pipeline = Some(producer);
Expand All @@ -97,8 +94,6 @@ impl TransportLinkUnicastUniversal {
keep_alive,
#[cfg(feature = "stats")]
c_transport.stats.clone(),
#[cfg(all(feature = "unstable", feature = "transport_compression"))]
is_compressed,
)
.await;
if let Err(e) = res {
Expand Down Expand Up @@ -181,12 +176,7 @@ async fn tx_task(
link: TransportLinkUnicast,
keep_alive: Duration,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
#[cfg(all(feature = "unstable", feature = "transport_compression"))] is_compressed: bool,
) -> ZResult<()> {
#[cfg(all(feature = "unstable", feature = "transport_compression"))]
let mut compression_aux_buff: Box<[u8]> =
vec![0; lz4_flex::block::get_maximum_output_size(MAX_BATCH_SIZE)].into_boxed_slice();

loop {
match pipeline.pull().timeout(keep_alive).await {
Ok(res) => match res {
Expand Down

0 comments on commit 2c6cb6d

Please sign in to comment.