Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shm mutation example #1672

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use std::borrow::Cow;

use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::{zshm, zshmmut};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

Expand All @@ -31,43 +33,25 @@ async fn main() {
let subscriber = session.declare_subscriber(&key_expr).await.unwrap();

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
while let Ok(mut sample) = subscriber.recv_async().await {
let kind = sample.kind();
let key_str = sample.key_expr().as_str().to_owned();

// Print overall payload information
let (payload_type, payload) = handle_bytes(sample.payload());
let (payload_type, payload) = handle_bytes(sample.payload_mut());
print!(
">> [Subscriber] Received {} ('{}': '{}') [{}] ",
sample.kind(),
sample.key_expr().as_str(),
payload,
payload_type,
kind, key_str, payload, payload_type,
);

// Print attachment information
if let Some(att) = sample.attachment() {
if let Some(att) = sample.attachment_mut() {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}

println!();
}

// // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber
// // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it.
// // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content.

// while let Ok(mut sample) = subscriber.recv_async().await {
// let kind = sample.kind();
// let key_expr = sample.key_expr().to_string();
// match sample.payload_mut().as_shm_mut() {
// Ok(payload) => println!(
// ">> [Subscriber] Received {} ('{}': '{:02x?}')",
// kind, key_expr, payload
// ),
// Err(e) => {
// println!(">> [Subscriber] Not a ShmBufInner: {:?}", e);
// }
// }
// }
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand All @@ -84,7 +68,7 @@ fn parse_args() -> (Config, KeyExpr<'static>) {
(args.common.into(), args.key)
}

fn handle_bytes(bytes: &ZBytes) -> (&str, Cow<str>) {
fn handle_bytes(bytes: &mut ZBytes) -> (&str, Cow<str>) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
Expand All @@ -100,10 +84,14 @@ fn handle_bytes(bytes: &ZBytes) -> (&str, Cow<str>) {
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.as_shm() {
Some(_) => "SHM",
match bytes.as_shm_mut() {
// try to mutate SHM buffer to get it's mutability property
Some(shm) => match <&mut zshm as TryInto<&mut zshmmut>>::try_into(shm) {
Ok(_shm_mut) => "SHM (MUT)",
Err(_) => "SHM (IMMUT)",
},
None => "RAW",
}
};
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ impl ReplyError {
&self.payload
}

/// Gets the mutable payload of this ReplyError.
#[inline]
pub fn payload_mut(&mut self) -> &mut ZBytes {
&mut self.payload
}

/// Gets the encoding of this ReplyError.
#[inline]
pub fn encoding(&self) -> &Encoding {
Expand Down
Loading