Skip to content

Commit

Permalink
Support bytes::Bytes for ZBytes (#1248)
Browse files Browse the repository at this point in the history
* Support bytes::Bytes for ZBytes.

Signed-off-by: ChenYing Kuo <[email protected]>

* Avoid Bytes copy in ZBytes serialization

---------

Signed-off-by: ChenYing Kuo <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
evshary and Mallets authored Jul 23, 2024
1 parent 9408f74 commit cf4d3d3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 3 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ async-std = { version = "=1.12.0", default-features = false } # Default features
async-trait = "0.1.60"
base64 = "0.22.1"
bincode = "1.3.3"
bytes = "1.6.1"
clap = { version = "4.4.11", features = ["derive"] }
console-subscriber = "0.3.0"
const_format = "0.2.30"
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ tokio-util = { workspace = true }
ahash = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
event-listener = { workspace = true }
flume = { workspace = true }
form_urlencoded = { workspace = true }
Expand Down
81 changes: 80 additions & 1 deletion zenoh/src/api/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use zenoh_buffers::{
buffer::{Buffer, SplitBuffer},
reader::{DidntRead, HasReader, Reader},
writer::HasWriter,
ZBuf, ZBufReader, ZBufWriter, ZSlice,
ZBuf, ZBufReader, ZBufWriter, ZSlice, ZSliceBuffer,
};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::{
Expand Down Expand Up @@ -2102,6 +2102,81 @@ impl TryFrom<&mut ZBytes> for serde_pickle::Value {
}
}

// bytes::Bytes

// Define a transparent wrapper type to get around Rust's orphan rule.
// This allows to use bytes::Bytes directly as supporting buffer of a
// ZSlice resulting in zero-copy and zero-alloc bytes::Bytes serialization.
#[repr(transparent)]
#[derive(Debug)]
struct BytesWrap(bytes::Bytes);

impl ZSliceBuffer for BytesWrap {
fn as_slice(&self) -> &[u8] {
&self.0
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}

impl Serialize<bytes::Bytes> for ZSerde {
type Output = ZBytes;

fn serialize(self, s: bytes::Bytes) -> Self::Output {
ZBytes::new(BytesWrap(s))
}
}

impl From<bytes::Bytes> for ZBytes {
fn from(t: bytes::Bytes) -> Self {
ZSerde.serialize(t)
}
}

impl Deserialize<bytes::Bytes> for ZSerde {
type Input<'a> = &'a ZBytes;
type Error = Infallible;

fn deserialize(self, v: Self::Input<'_>) -> Result<bytes::Bytes, Self::Error> {
// bytes::Bytes can be constructed only by passing ownership to the constructor.
// Thereofore, here we are forced to allocate a vector and copy the whole ZBytes
// content since bytes::Bytes does not support anything else than Box<u8> (and its
// variants like Vec<u8> and String).
let v: Vec<u8> = ZSerde.deserialize(v).unwrap_infallible();
Ok(bytes::Bytes::from(v))
}
}

impl TryFrom<ZBytes> for bytes::Bytes {
type Error = Infallible;

fn try_from(value: ZBytes) -> Result<Self, Self::Error> {
ZSerde.deserialize(&value)
}
}

impl TryFrom<&ZBytes> for bytes::Bytes {
type Error = Infallible;

fn try_from(value: &ZBytes) -> Result<Self, Self::Error> {
ZSerde.deserialize(value)
}
}

impl TryFrom<&mut ZBytes> for bytes::Bytes {
type Error = Infallible;

fn try_from(value: &mut ZBytes) -> Result<Self, Self::Error> {
ZSerde.deserialize(&*value)
}
}

// Shared memory conversion
#[cfg(feature = "shared-memory")]
impl Serialize<ZShm> for ZSerde {
Expand Down Expand Up @@ -3168,6 +3243,10 @@ mod tests {
serialize_deserialize!(Parameters, Parameters::from(""));
serialize_deserialize!(Parameters, Parameters::from("a=1;b=2;c3"));

// Bytes
serialize_deserialize!(bytes::Bytes, bytes::Bytes::from(vec![1, 2, 3, 4]));
serialize_deserialize!(bytes::Bytes, bytes::Bytes::from("Hello World"));

// Tuple
serialize_deserialize!((usize, usize), (0, 1));
serialize_deserialize!((usize, String), (0, String::from("a")));
Expand Down

0 comments on commit cf4d3d3

Please sign in to comment.