Skip to content

Commit

Permalink
make examples SHM-agnostic
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Jul 24, 2024
1 parent aace7f1 commit 89830f8
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 140 deletions.
5 changes: 0 additions & 5 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ required-features = ["unstable", "shared-memory"]
name = "z_sub"
path = "examples/z_sub.rs"

[[example]]
name = "z_sub_shm"
path = "examples/z_sub_shm.rs"
required-features = ["unstable", "shared-memory"]

[[example]]
name = "z_pull"
path = "examples/z_pull.rs"
Expand Down
20 changes: 4 additions & 16 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh_examples::CommonArgs;
use zenoh_examples::{receive_query, CommonArgs};

#[tokio::main]
async fn main() {
Expand All @@ -22,7 +22,7 @@ async fn main() {

let (mut config, key_expr, payload, complete) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate
// A probing procedure for shared memory is performed upon session opening. To enable `z_queryable` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();
Expand All @@ -43,20 +43,8 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
match query.payload() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(query_payload) => {
// Refer to z_bytes.rs to see how to deserialize different types of message
let deserialized_payload = query_payload
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Queryable ] Received Query '{}' with payload '{}'",
query.selector(),
deserialized_payload
)
}
}
receive_query(&query, "Queryable");

println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
Expand Down
18 changes: 3 additions & 15 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ use zenoh::{
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID,
},
Config,
};
use zenoh_examples::CommonArgs;
use zenoh_examples::{receive_query, CommonArgs};

const N: usize = 10;

Expand Down Expand Up @@ -63,18 +62,7 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
print!(
">> [Queryable] Received Query '{}' ('{}'",
query.selector(),
query.key_expr().as_str(),
);
if let Some(query_payload) = query.payload() {
match query_payload.deserialize::<&zshm>() {
Ok(p) => print!(": '{}'", String::from_utf8_lossy(p)),
Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e),
}
}
println!(")");
receive_query(&query, "Queryable");

// Allocate an SHM buffer
// NOTE: For allocation API please check z_alloc_shm.rs example
Expand Down
22 changes: 2 additions & 20 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh_examples::CommonArgs;
use zenoh_examples::{receive_sample, CommonArgs};

#[tokio::main]
async fn main() {
Expand All @@ -35,25 +35,7 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

print!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
if let Some(att) = sample.attachment() {
let att = att
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
print!(" ({})", att);
}
println!();
receive_sample(&sample, "Subscriber");
}
}

Expand Down
83 changes: 0 additions & 83 deletions examples/examples/z_sub_shm.rs

This file was deleted.

91 changes: 90 additions & 1 deletion examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! See the code in ../examples/
//! Check ../README.md for usage.
//!
use zenoh::config::Config;
use zenoh::{bytes::ZBytes, config::Config, query::Query, sample::Sample};

#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;

#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum Wai {
Expand Down Expand Up @@ -84,3 +87,89 @@ impl From<&CommonArgs> for Config {
config
}
}

pub fn receive_query(query: &Query, entity_name: &str) {
// Print overall payload information
match query.payload() {
Some(payload) => {
let (payload_type, payload) = handle_bytes(payload);
print!(
"{} >> [{}] Received Query ('{}': '{}')",
entity_name,
payload_type,
query.selector(),
payload
);
}
None => {
print!("{} >> Received Query '{}'", entity_name, query.selector());
}
};

// Print attachment information
print_attachment(query.attachment());

println!();
}

pub fn receive_sample(sample: &Sample, entity_name: &str) {
// Print overall payload information
let (payload_type, payload) = handle_bytes(sample.payload());
print!(
"{} >> [{}] Received {} ('{}': '{}')",
entity_name,
payload_type,
sample.kind(),
sample.key_expr().as_str(),
payload
);

// Print attachment information
print_attachment(sample.attachment());

println!();
}

fn print_attachment(attachment: Option<&ZBytes>) {
if let Some(att) = attachment {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}
}

fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#[cfg(not(feature = "shared-memory"))]
{
"RAW"
}

// if Zenoh is built with SHM support but without SHM API (that is unstable), it can
// receive buffers of any type, but there is no way to detect the buffer type
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))]
{
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
};

// In order to indicate the real underlying buffer type the code above is written ^^^
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently.
// In other words, the common application compiled with "shared-memory" feature will be able to
// handle incoming SHM data without any changes in the application code.
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
}

0 comments on commit 89830f8

Please sign in to comment.