Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SHM sub\queryable examples more robust #1261

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 59 additions & 10 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
Expand Down Expand Up @@ -63,18 +64,29 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
print!(
">> [Queryable] Received Query '{}' ('{}'",
query.selector(),
query.key_expr().as_str(),
);
if let Some(query_payload) = query.payload() {
match query_payload.deserialize::<&zshm>() {
Ok(p) => print!(": '{}'", String::from_utf8_lossy(p)),
Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e),
// Print overall query payload information
match query.payload() {
Some(payload) => {
let (payload_type, payload) = handle_bytes(payload);
print!(
">> [Queryable] Received Query [{}] ('{}': '{}')",
payload_type,
query.selector(),
payload
);
}
None => {
print!(">> Received Query '{}'", query.selector());
}
};

// Print attachment information
if let Some(att) = query.attachment() {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}
println!(")");

println!();

// Allocate an SHM buffer
// NOTE: For allocation API please check z_alloc_shm.rs example
Expand Down Expand Up @@ -119,3 +131,40 @@ fn parse_args() -> (Config, KeyExpr<'static>, String, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.payload, args.complete)
}

fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#[cfg(not(feature = "shared-memory"))]
{
"RAW"
}

// if Zenoh is built with SHM support but without SHM API (that is unstable), it can
// receive buffers of any type, but there is no way to detect the buffer type
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))]
{
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
};

// In order to indicate the real underlying buffer type the code above is written ^^^
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently.
// In other words, the common application compiled with "shared-memory" feature will be able to
// handle incoming SHM data without any changes in the application code.
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
}
58 changes: 52 additions & 6 deletions examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{config::Config, key_expr::KeyExpr, prelude::*, shm::zshm};
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -35,16 +37,23 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
// Print overall payload information
let (payload_type, payload) = handle_bytes(sample.payload());
print!(
">> [Subscriber] Received {} ('{}': ",
">> [Subscriber] Received [{}] {} ('{}': '{}')",
payload_type,
sample.kind(),
sample.key_expr().as_str(),
payload
);
match sample.payload().deserialize::<&zshm>() {
Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)),
Err(e) => print!("'Not a ShmBufInner: {:?}'", e),

// Print attachment information
if let Some(att) = sample.attachment() {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}
println!(")");

println!();
}

// // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber
Expand Down Expand Up @@ -81,3 +90,40 @@ fn parse_args() -> (Config, KeyExpr<'static>) {
let args = SubArgs::parse();
(args.common.into(), args.key)
}

fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#[cfg(not(feature = "shared-memory"))]
{
"RAW"
}

// if Zenoh is built with SHM support but without SHM API (that is unstable), it can
// receive buffers of any type, but there is no way to detect the buffer type
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))]
{
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
};

// In order to indicate the real underlying buffer type the code above is written ^^^
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently.
// In other words, the common application compiled with "shared-memory" feature will be able to
// handle incoming SHM data without any changes in the application code.
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
}