From 2f01adca5f7731b23ae91c861dbdb57651126ec4 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Mon, 10 Jun 2024 20:10:54 +0200 Subject: [PATCH] reply_key_expr in trait --- plugins/zenoh-plugin-rest/src/lib.rs | 4 +- .../src/replica/storage.rs | 1 - zenoh-ext/src/publication_cache.rs | 4 +- zenoh/src/api/query.rs | 8 +--- zenoh/src/api/queryable.rs | 8 ++-- zenoh/src/api/selector.rs | 42 +++++++++++++------ zenoh/src/api/session.rs | 22 +++++----- zenoh/src/lib.rs | 4 -- zenoh/src/prelude.rs | 1 + 9 files changed, 49 insertions(+), 45 deletions(-) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 072a060d1a..a230aa8748 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -35,7 +35,7 @@ use zenoh::{ key_expr::{keyexpr, KeyExpr}, query::{QueryConsolidation, Reply}, sample::{Sample, SampleKind, ValueBuilderTrait}, - selector::{Parameters, Selector, TIME_RANGE_KEY}, + selector::{Parameters, PredefinedParameters, Selector}, session::{Session, SessionDeclarations}, value::Value, }; @@ -383,7 +383,7 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result PublicationCache<'a> { if !query.key_expr().as_str().contains('*') { if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) { for sample in queue { - if let (Ok(Some(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { if !time_range.contains(timestamp.get_time().to_system_time()){ continue; } @@ -230,7 +230,7 @@ impl<'a> PublicationCache<'a> { for (key_expr, queue) in cache.iter() { if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) { for sample in queue { - if let (Ok(Some(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { if !time_range.contains(timestamp.get_time().to_system_time()){ continue; } diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index ba925876c9..0cc6b1e388 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -35,7 +35,7 @@ use super::{ key_expr::KeyExpr, publisher::Priority, sample::{Locality, QoSBuilder, Sample}, - selector::Selector, + selector::{PredefinedParameters, Selector}, session::Session, value::Value, }; @@ -423,7 +423,7 @@ impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> { }| { if accept == ReplyKeyExpr::Any { let mut parameters = parameters.into_owned(); - parameters.insert(_REPLY_KEY_EXPR_ANY_SEL_PARAM, ""); + parameters.set_reply_key_expr_any(); let parameters = Cow::Owned(parameters); Selector { key_expr, @@ -442,10 +442,6 @@ impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> { } } -pub(crate) const _REPLY_KEY_EXPR_ANY_SEL_PARAM: &str = "_anyke"; -#[zenoh_macros::unstable] -pub const REPLY_KEY_EXPR_ANY_SEL_PARAM: &str = _REPLY_KEY_EXPR_ANY_SEL_PARAM; - #[zenoh_macros::unstable] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] pub enum ReplyKeyExpr { diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index f3c23146d3..f4f16e8ecf 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -32,6 +32,8 @@ use { zenoh_protocol::core::EntityGlobalId, }; +#[zenoh_macros::unstable] +use super::selector::PredefinedParameters; use super::{ builders::sample::{ QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, @@ -235,11 +237,7 @@ impl Query { } #[cfg(feature = "unstable")] fn _accepts_any_replies(&self) -> ZResult { - use crate::api::query::_REPLY_KEY_EXPR_ANY_SEL_PARAM; - - Ok(self - .parameters() - .contains_key(_REPLY_KEY_EXPR_ANY_SEL_PARAM)) + Ok(self.parameters().reply_key_expr_any()) } } diff --git a/zenoh/src/api/selector.rs b/zenoh/src/api/selector.rs index 6e2e0e7890..ef63719e01 100644 --- a/zenoh/src/api/selector.rs +++ b/zenoh/src/api/selector.rs @@ -68,9 +68,6 @@ pub struct Selector<'a> { pub parameters: Cow<'a, Parameters<'a>>, } -#[zenoh_macros::unstable] -pub const TIME_RANGE_KEY: &str = "_time"; - impl<'a> Selector<'a> { /// Builds a new selector which owns keyexpr and parameters pub fn owned(key_expr: K, parameters: P) -> Self @@ -120,32 +117,49 @@ impl<'a> From<&'a Selector<'a>> for (&'a KeyExpr<'a>, &'a Parameters<'a>) { #[zenoh_macros::unstable] pub trait PredefinedParameters { + const REPLY_KEY_EXPR_ANY_SEL_PARAM: &'static str = "_anyke"; const TIME_RANGE_KEY: &'static str = "_time"; /// Sets the time range targeted by the selector parameters. fn set_time_range>>(&mut self, time_range: T); + /// Sets parameter allowing to querier to reply to this request even + /// it the requested key expression does not match the reply key expression. + /// TODO: add example + fn set_reply_key_expr_any(&mut self); /// Extracts the standardized `_time` argument from the selector parameters. - fn time_range(&self) -> ZResult>; + /// Returns `None` if the `_time` argument is not present or `Some` with the result of parsing the `_time` argument + /// if it is present. + fn time_range(&self) -> Option>; + /// Returns true if `_anyke` parameter is present in the selector parameters + fn reply_key_expr_any(&self) -> bool; } -#[zenoh_macros::unstable] impl PredefinedParameters for Parameters<'_> { /// Sets the time range targeted by the selector parameters. fn set_time_range>>(&mut self, time_range: T) { let mut time_range: Option = time_range.into(); match time_range.take() { - Some(tr) => self.insert(TIME_RANGE_KEY, format!("{}", tr)), - None => self.remove(TIME_RANGE_KEY), + Some(tr) => self.insert(Self::TIME_RANGE_KEY, format!("{}", tr)), + None => self.remove(Self::TIME_RANGE_KEY), }; } + /// Sets parameter allowing to querier to reply to this request even + /// it the requested key expression does not match the reply key expression. + fn set_reply_key_expr_any(&mut self) { + self.insert(Self::REPLY_KEY_EXPR_ANY_SEL_PARAM, ""); + } + /// Extracts the standardized `_time` argument from the selector parameters. /// /// The default implementation still causes a complete pass through the selector parameters to ensure that there are no duplicates of the `_time` key. - fn time_range(&self) -> ZResult> { - match self.get(TIME_RANGE_KEY) { - Some(tr) => Ok(Some(tr.parse()?)), - None => Ok(None), - } + fn time_range(&self) -> Option> { + self.get(Self::TIME_RANGE_KEY) + .map(|tr| tr.parse().map_err(Into::into)) + } + + /// Returns true if `_anyke` parameter is present in the selector parameters + fn reply_key_expr_any(&self) -> bool { + self.contains_key(Self::REPLY_KEY_EXPR_ANY_SEL_PARAM) } } @@ -270,7 +284,6 @@ impl<'a> From> for Selector<'a> { #[test] fn selector_accessors() { - use crate::api::query::_REPLY_KEY_EXPR_ANY_SEL_PARAM as ANYKE; use std::collections::HashMap; for s in [ @@ -294,6 +307,9 @@ fn selector_accessors() { assert_eq!(parameters.get("_timetrick").unwrap(), ""); + const TIME_RANGE_KEY: &str = Parameters::TIME_RANGE_KEY; + const ANYKE: &str = Parameters::REPLY_KEY_EXPR_ANY_SEL_PARAM; + let time_range = "[now(-2s)..now(2s)]"; zcondfeat!( "unstable", diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 663ddb866a..4eec95bb87 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -87,14 +87,15 @@ use super::{ liveliness::{Liveliness, LivelinessTokenState}, publisher::Publisher, publisher::{MatchingListenerState, MatchingStatus}, - query::_REPLY_KEY_EXPR_ANY_SEL_PARAM, sample::SourceInfo, - selector::TIME_RANGE_KEY, }; -use crate::net::{ - primitives::Primitives, - routing::dispatcher::face::Face, - runtime::{Runtime, RuntimeBuilder}, +use crate::{ + api::selector::PredefinedParameters, + net::{ + primitives::Primitives, + routing::dispatcher::face::Face, + runtime::{Runtime, RuntimeBuilder}, + }, }; zconfigurable! { @@ -1672,7 +1673,7 @@ impl Session { let mut state = zwrite!(self.state); let consolidation = match consolidation.mode { ConsolidationMode::Auto => { - if parameters.contains_key(TIME_RANGE_KEY) { + if parameters.time_range().is_none() { ConsolidationMode::None } else { ConsolidationMode::Latest @@ -2223,11 +2224,8 @@ impl Primitives for Session { }; match state.queries.get_mut(&msg.rid) { Some(query) => { - let c = zcondfeat!( - "unstable", - !query.parameters.contains_key(_REPLY_KEY_EXPR_ANY_SEL_PARAM), - true - ); + let c = + zcondfeat!("unstable", !query.parameters.reply_key_expr_any(), true); if c && !query.key_expr.intersects(&key_expr) { tracing::warn!( "Received Reply for `{}` from `{:?}, which didn't match query `{}`: dropping Reply.", diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 3bf3e9f9f7..6b1c300f72 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -254,8 +254,6 @@ pub mod selector { pub use zenoh_util::time_range::{TimeBound, TimeExpr, TimeRange}; pub use crate::api::selector::Selector; - #[zenoh_macros::unstable] - pub use crate::api::selector::TIME_RANGE_KEY; } /// Subscribing primitives @@ -301,8 +299,6 @@ pub mod querier { pub mod query { #[zenoh_macros::unstable] pub use crate::api::query::ReplyKeyExpr; - #[zenoh_macros::unstable] - pub use crate::api::query::REPLY_KEY_EXPR_ANY_SEL_PARAM; #[zenoh_macros::internal] pub use crate::api::queryable::ReplySample; pub use crate::api::{ diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 605b0638ab..b08b58a8ed 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -28,6 +28,7 @@ mod _prelude { #[zenoh_macros::unstable] pub use crate::api::publisher::PublisherDeclarations; pub use crate::{ + api::selector::PredefinedParameters, api::{ builders::sample::{ QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,