Skip to content

Commit

Permalink
add Querier (#1591)
Browse files Browse the repository at this point in the history
* add querier

* add LivelinessQuerier

* code clean up

* interest support

* make keyexpr include/intersect checking functions generic

* remove liveliness querier

* add matching status for querier

* add matching listener support

* clippy fix

* clippy fix

* clippy fix

* clippy and fmt fix

* doc test fix

* docs fix

* fix MatchingStatus/Listener to work on session-local entities with origin=Locality::SessionLocal

* Merge branch 'main' into querier

* clippy fix

* fix review comments

* explain #[allow(unused_mut)]

* explain behaviour of keyexpr_intersect and keyexpr_include in case of conversion failure

* log error when keyexpr_intersect/includes fails keyexpr conversion

* add matching listener to z_pub example;
add flag to enable/disable matching listener in the z_pub and z_querier examples;

* add test for querier

* add test for matching listener/status

* simplify MatchingListenerBuilder::with<Handler>

* remove aggregated queriers

* moved all MatchingStatus/Listener functionality under separate module

* fixed z_querier example to accept selector instead of keyexpr

* new clippy fixes

* mark querier related features as unstable
  • Loading branch information
DenisBiryukov91 authored Dec 2, 2024
1 parent c764bf9 commit 549bc7b
Show file tree
Hide file tree
Showing 27 changed files with 2,553 additions and 750 deletions.
5 changes: 5 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ required-features = ["unstable", "shared-memory"]
name = "z_pull"
path = "examples/z_pull.rs"

[[example]]
name = "z_querier"
path = "examples/z_querier.rs"
required-features = ["unstable"]

[[example]]
name = "z_queryable"
path = "examples/z_queryable.rs"
Expand Down
18 changes: 18 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@
z_get -s 'demo/**'
```

### z_querier

Continuously sends query messages for a selector.
The queryables with a matching path or selector (for instance [z_queryable](#z_queryable) and [z_storage](#z_storage))
will receive these queries and reply with paths/values that will be received by the querier.

Typical usage:

```bash
z_querier
```

or

```bash
z_querier -s 'demo/**'
```

### z_queryable

Declares a queryable function with a path.
Expand Down
38 changes: 35 additions & 3 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,33 @@ async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr, payload, attachment) = parse_args();
#[cfg(feature = "unstable")]
let (config, key_expr, payload, attachment, add_matching_listener) = parse_args();
#[cfg(not(feature = "unstable"))]
let (config, key_expr, payload, attachment, _) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring Publisher on '{key_expr}'...");
let publisher = session.declare_publisher(&key_expr).await.unwrap();

#[cfg(feature = "unstable")]
if add_matching_listener {
publisher
.matching_listener()
.callback(|matching_status| {
if matching_status.matching() {
println!("Publisher has matching subscribers.");
} else {
println!("Publisher has NO MORE matching subscribers.");
}
})
.background()
.await
.unwrap();
}

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -56,11 +75,24 @@ struct Args {
#[arg(short, long)]
/// The attachments to add to each put.
attach: Option<String>,
/// Enable matching listener.
#[cfg(feature = "unstable")]
#[arg(long)]
add_matching_listener: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.payload, args.attach)
(
args.common.into(),
args.key,
args.payload,
args.attach,
#[cfg(feature = "unstable")]
args.add_matching_listener,
#[cfg(not(feature = "unstable"))]
false,
)
}
157 changes: 157 additions & 0 deletions examples/examples/z_querier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::Parser;
use zenoh::{
query::{QueryTarget, Selector},
Config,
};
use zenoh_examples::CommonArgs;

#[tokio::main]
async fn main() {
// initiate logging
zenoh::init_log_from_env_or("error");
#[cfg(feature = "unstable")]
let (config, selector, payload, target, timeout, add_matching_listener) = parse_args();
#[cfg(not(feature = "unstable"))]
let (config, selector, payload, target, timeout, _) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring Querier on '{}'...", selector.key_expr());
let querier = session
.declare_querier(selector.key_expr())
.target(target)
.timeout(timeout)
.await
.unwrap();

#[cfg(feature = "unstable")]
if add_matching_listener {
querier
.matching_listener()
.callback(|matching_status| {
if matching_status.matching() {
println!("Querier has matching queryables.");
} else {
println!("Querier has NO MORE matching queryables.");
}
})
.background()
.await
.unwrap();
}

let params = selector.parameters().as_str();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {}", payload.clone().unwrap_or_default());
println!("Querying '{}' with payload: '{}'...", &selector, buf);
let replies = querier
.get()
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
// Refer to z_bytes.rs to see how to serialize different types of message
.payload(buf)
.parameters(params)
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> Received ('{}': '{}')",
sample.key_expr().as_str(),
payload,
);
}
Err(err) => {
let payload = err
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(">> Received (ERROR: '{}')", payload);
}
}
}
}
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
#[value(rename_all = "SCREAMING_SNAKE_CASE")]
enum Qt {
BestMatching,
All,
AllComplete,
}

#[derive(Parser, Clone, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The selection of resources to query
selector: Selector<'static>,
#[arg(short, long)]
/// An optional payload to put in the query.
payload: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
#[arg(short = 'o', long, default_value = "10000")]
/// The query timeout in milliseconds.
timeout: u64,
/// Enable matching listener.
#[cfg(feature = "unstable")]
#[arg(long)]
add_matching_listener: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
Selector<'static>,
Option<String>,
QueryTarget,
Duration,
bool,
) {
let args = Args::parse();
(
args.common.into(),
args.selector,
args.payload,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Qt::AllComplete => QueryTarget::AllComplete,
},
Duration::from_millis(args.timeout),
#[cfg(feature = "unstable")]
args.add_matching_listener,
#[cfg(not(feature = "unstable"))]
false,
)
}
4 changes: 1 addition & 3 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ lazy_static::lazy_static!(

pub(crate) fn init(session: WeakSession) {
if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) {
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR)
.to_wire(&session)
.to_owned();
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR);

let _admin_qabl = session.declare_queryable_inner(
&admin_key,
Expand Down
Loading

0 comments on commit 549bc7b

Please sign in to comment.