From 216505cb987240624fa1b4f71ca498cee61751b1 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Mon, 16 Dec 2024 12:42:05 +0300 Subject: [PATCH 1/3] Add SHM buffer mutation functionality to examples --- examples/examples/z_sub_shm.rs | 47 +++++++++++++--------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index f49b0628b..1bfcf7100 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -17,6 +17,9 @@ use clap::Parser; use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr}; use zenoh_examples::CommonArgs; +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +use zenoh::shm::{zshm, zshmmut}; + #[tokio::main] async fn main() { // Initiate logging @@ -31,43 +34,25 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).await.unwrap(); println!("Press CTRL-C to quit..."); - while let Ok(sample) = subscriber.recv_async().await { + while let Ok(mut sample) = subscriber.recv_async().await { + let kind = sample.kind(); + let key_str = sample.key_expr().as_str().to_owned(); + // Print overall payload information - let (payload_type, payload) = handle_bytes(sample.payload()); + let (payload_type, payload) = handle_bytes(sample.payload_mut()); print!( ">> [Subscriber] Received {} ('{}': '{}') [{}] ", - sample.kind(), - sample.key_expr().as_str(), - payload, - payload_type, + kind, key_str, payload, payload_type, ); // Print attachment information - if let Some(att) = sample.attachment() { + if let Some(att) = sample.attachment_mut() { let (attachment_type, attachment) = handle_bytes(att); print!(" ({}: {})", attachment_type, attachment); } println!(); } - - // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber - // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. - // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. - - // while let Ok(mut sample) = subscriber.recv_async().await { - // let kind = sample.kind(); - // let key_expr = sample.key_expr().to_string(); - // match sample.payload_mut().as_shm_mut() { - // Ok(payload) => println!( - // ">> [Subscriber] Received {} ('{}': '{:02x?}')", - // kind, key_expr, payload - // ), - // Err(e) => { - // println!(">> [Subscriber] Not a ShmBufInner: {:?}", e); - // } - // } - // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] @@ -84,7 +69,7 @@ fn parse_args() -> (Config, KeyExpr<'static>) { (args.common.into(), args.key) } -fn handle_bytes(bytes: &ZBytes) -> (&str, Cow) { +fn handle_bytes(bytes: &mut ZBytes) -> (&str, Cow) { // Determine buffer type for indication purpose let bytes_type = { // if Zenoh is built without SHM support, the only buffer type it can receive is RAW @@ -100,10 +85,14 @@ fn handle_bytes(bytes: &ZBytes) -> (&str, Cow) { "UNKNOWN" } - // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type + // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type #[cfg(all(feature = "shared-memory", feature = "unstable"))] - match bytes.as_shm() { - Some(_) => "SHM", + match bytes.as_shm_mut() { + // try to mutate SHM buffer to get it's mutability property + Some(shm) => match <&mut zshm as TryInto<&mut zshmmut>>::try_into(shm) { + Ok(_shm_mut) => "SHM (MUT)", + Err(_) => "SHM (IMMUT)", + }, None => "RAW", } }; From 3ba037d34f0dff7a496b972f4efa91fc2750f093 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 17 Dec 2024 11:01:10 +0300 Subject: [PATCH 2/3] add payload_mut accessor to ReplyError --- zenoh/src/api/query.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 3e1f66ed1..1d763dece 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -86,6 +86,12 @@ impl ReplyError { &self.payload } + /// Gets the mutable payload of this ReplyError. + #[inline] + pub fn payload_mut(&mut self) -> &mut ZBytes { + &mut self.payload + } + /// Gets the encoding of this ReplyError. #[inline] pub fn encoding(&self) -> &Encoding { From bee739edc4f7cfd467492a3ff861f5dde83abf3a Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 17 Dec 2024 11:42:43 +0300 Subject: [PATCH 3/3] code format fix --- examples/examples/z_sub_shm.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 1bfcf7100..98137b7dc 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -14,11 +14,10 @@ use std::borrow::Cow; use clap::Parser; -use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr}; -use zenoh_examples::CommonArgs; - #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh::shm::{zshm, zshmmut}; +use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr}; +use zenoh_examples::CommonArgs; #[tokio::main] async fn main() {