Skip to content

Commit

Permalink
reply_key_expr in trait
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Jun 10, 2024
1 parent 5d195e8 commit 2f01adc
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 45 deletions.
4 changes: 2 additions & 2 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use zenoh::{
key_expr::{keyexpr, KeyExpr},
query::{QueryConsolidation, Reply},
sample::{Sample, SampleKind, ValueBuilderTrait},
selector::{Parameters, Selector, TIME_RANGE_KEY},
selector::{Parameters, PredefinedParameters, Selector},
session::{Session, SessionDeclarations},
value::Value,
};
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
};
let query_part = url.query();
let parameters = Parameters::from(query_part.unwrap_or_default());
let consolidation = if parameters.contains_key(TIME_RANGE_KEY) {
let consolidation = if parameters.time_range().is_some() {
QueryConsolidation::from(zenoh::query::ConsolidationMode::None)
} else {
QueryConsolidation::from(zenoh::query::ConsolidationMode::Latest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use zenoh::{
},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait, ValueBuilderTrait},
selector::Selector,
session::{Session, SessionDeclarations},
time::{new_timestamp, Timestamp, NTP64},
value::Value,
Expand Down
4 changes: 2 additions & 2 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl<'a> PublicationCache<'a> {
if !query.key_expr().as_str().contains('*') {
if let Some(queue) = cache.get(query.key_expr().as_keyexpr()) {
for sample in queue {
if let (Ok(Some(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if !time_range.contains(timestamp.get_time().to_system_time()){
continue;
}
Expand All @@ -230,7 +230,7 @@ impl<'a> PublicationCache<'a> {
for (key_expr, queue) in cache.iter() {
if query.key_expr().intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) {
for sample in queue {
if let (Ok(Some(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) {
if !time_range.contains(timestamp.get_time().to_system_time()){
continue;
}
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use super::{
key_expr::KeyExpr,
publisher::Priority,
sample::{Locality, QoSBuilder, Sample},
selector::Selector,
selector::{PredefinedParameters, Selector},
session::Session,
value::Value,
};
Expand Down Expand Up @@ -423,7 +423,7 @@ impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> {
}| {
if accept == ReplyKeyExpr::Any {
let mut parameters = parameters.into_owned();
parameters.insert(_REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
parameters.set_reply_key_expr_any();
let parameters = Cow::Owned(parameters);
Selector {
key_expr,
Expand All @@ -442,10 +442,6 @@ impl<'a, 'b, Handler> SessionGetBuilder<'a, 'b, Handler> {
}
}

pub(crate) const _REPLY_KEY_EXPR_ANY_SEL_PARAM: &str = "_anyke";
#[zenoh_macros::unstable]
pub const REPLY_KEY_EXPR_ANY_SEL_PARAM: &str = _REPLY_KEY_EXPR_ANY_SEL_PARAM;

#[zenoh_macros::unstable]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum ReplyKeyExpr {
Expand Down
8 changes: 3 additions & 5 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use {
zenoh_protocol::core::EntityGlobalId,
};

#[zenoh_macros::unstable]
use super::selector::PredefinedParameters;
use super::{
builders::sample::{
QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait,
Expand Down Expand Up @@ -235,11 +237,7 @@ impl Query {
}
#[cfg(feature = "unstable")]
fn _accepts_any_replies(&self) -> ZResult<bool> {
use crate::api::query::_REPLY_KEY_EXPR_ANY_SEL_PARAM;

Ok(self
.parameters()
.contains_key(_REPLY_KEY_EXPR_ANY_SEL_PARAM))
Ok(self.parameters().reply_key_expr_any())
}
}

Expand Down
42 changes: 29 additions & 13 deletions zenoh/src/api/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ pub struct Selector<'a> {
pub parameters: Cow<'a, Parameters<'a>>,
}

#[zenoh_macros::unstable]
pub const TIME_RANGE_KEY: &str = "_time";

impl<'a> Selector<'a> {
/// Builds a new selector which owns keyexpr and parameters
pub fn owned<K, P>(key_expr: K, parameters: P) -> Self
Expand Down Expand Up @@ -120,32 +117,49 @@ impl<'a> From<&'a Selector<'a>> for (&'a KeyExpr<'a>, &'a Parameters<'a>) {

#[zenoh_macros::unstable]
pub trait PredefinedParameters {
const REPLY_KEY_EXPR_ANY_SEL_PARAM: &'static str = "_anyke";
const TIME_RANGE_KEY: &'static str = "_time";
/// Sets the time range targeted by the selector parameters.
fn set_time_range<T: Into<Option<TimeRange>>>(&mut self, time_range: T);
/// Sets parameter allowing to querier to reply to this request even
/// it the requested key expression does not match the reply key expression.
/// TODO: add example
fn set_reply_key_expr_any(&mut self);
/// Extracts the standardized `_time` argument from the selector parameters.
fn time_range(&self) -> ZResult<Option<TimeRange>>;
/// Returns `None` if the `_time` argument is not present or `Some` with the result of parsing the `_time` argument
/// if it is present.
fn time_range(&self) -> Option<ZResult<TimeRange>>;
/// Returns true if `_anyke` parameter is present in the selector parameters
fn reply_key_expr_any(&self) -> bool;
}

#[zenoh_macros::unstable]
impl PredefinedParameters for Parameters<'_> {
/// Sets the time range targeted by the selector parameters.
fn set_time_range<T: Into<Option<TimeRange>>>(&mut self, time_range: T) {
let mut time_range: Option<TimeRange> = time_range.into();
match time_range.take() {
Some(tr) => self.insert(TIME_RANGE_KEY, format!("{}", tr)),
None => self.remove(TIME_RANGE_KEY),
Some(tr) => self.insert(Self::TIME_RANGE_KEY, format!("{}", tr)),
None => self.remove(Self::TIME_RANGE_KEY),
};
}

/// Sets parameter allowing to querier to reply to this request even
/// it the requested key expression does not match the reply key expression.
fn set_reply_key_expr_any(&mut self) {
self.insert(Self::REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
}

/// Extracts the standardized `_time` argument from the selector parameters.
///
/// The default implementation still causes a complete pass through the selector parameters to ensure that there are no duplicates of the `_time` key.
fn time_range(&self) -> ZResult<Option<TimeRange>> {
match self.get(TIME_RANGE_KEY) {
Some(tr) => Ok(Some(tr.parse()?)),
None => Ok(None),
}
fn time_range(&self) -> Option<ZResult<TimeRange>> {
self.get(Self::TIME_RANGE_KEY)
.map(|tr| tr.parse().map_err(Into::into))
}

/// Returns true if `_anyke` parameter is present in the selector parameters
fn reply_key_expr_any(&self) -> bool {
self.contains_key(Self::REPLY_KEY_EXPR_ANY_SEL_PARAM)
}
}

Expand Down Expand Up @@ -270,7 +284,6 @@ impl<'a> From<KeyExpr<'a>> for Selector<'a> {

#[test]
fn selector_accessors() {
use crate::api::query::_REPLY_KEY_EXPR_ANY_SEL_PARAM as ANYKE;
use std::collections::HashMap;

for s in [
Expand All @@ -294,6 +307,9 @@ fn selector_accessors() {

assert_eq!(parameters.get("_timetrick").unwrap(), "");

const TIME_RANGE_KEY: &str = Parameters::TIME_RANGE_KEY;
const ANYKE: &str = Parameters::REPLY_KEY_EXPR_ANY_SEL_PARAM;

let time_range = "[now(-2s)..now(2s)]";
zcondfeat!(
"unstable",
Expand Down
22 changes: 10 additions & 12 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ use super::{
liveliness::{Liveliness, LivelinessTokenState},
publisher::Publisher,
publisher::{MatchingListenerState, MatchingStatus},
query::_REPLY_KEY_EXPR_ANY_SEL_PARAM,
sample::SourceInfo,
selector::TIME_RANGE_KEY,
};
use crate::net::{
primitives::Primitives,
routing::dispatcher::face::Face,
runtime::{Runtime, RuntimeBuilder},
use crate::{
api::selector::PredefinedParameters,
net::{
primitives::Primitives,
routing::dispatcher::face::Face,
runtime::{Runtime, RuntimeBuilder},
},
};

zconfigurable! {
Expand Down Expand Up @@ -1672,7 +1673,7 @@ impl Session {
let mut state = zwrite!(self.state);
let consolidation = match consolidation.mode {
ConsolidationMode::Auto => {
if parameters.contains_key(TIME_RANGE_KEY) {
if parameters.time_range().is_none() {
ConsolidationMode::None
} else {
ConsolidationMode::Latest
Expand Down Expand Up @@ -2223,11 +2224,8 @@ impl Primitives for Session {
};
match state.queries.get_mut(&msg.rid) {
Some(query) => {
let c = zcondfeat!(
"unstable",
!query.parameters.contains_key(_REPLY_KEY_EXPR_ANY_SEL_PARAM),
true
);
let c =
zcondfeat!("unstable", !query.parameters.reply_key_expr_any(), true);
if c && !query.key_expr.intersects(&key_expr) {
tracing::warn!(
"Received Reply for `{}` from `{:?}, which didn't match query `{}`: dropping Reply.",
Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ pub mod selector {
pub use zenoh_util::time_range::{TimeBound, TimeExpr, TimeRange};

pub use crate::api::selector::Selector;
#[zenoh_macros::unstable]
pub use crate::api::selector::TIME_RANGE_KEY;
}

/// Subscribing primitives
Expand Down Expand Up @@ -301,8 +299,6 @@ pub mod querier {
pub mod query {
#[zenoh_macros::unstable]
pub use crate::api::query::ReplyKeyExpr;
#[zenoh_macros::unstable]
pub use crate::api::query::REPLY_KEY_EXPR_ANY_SEL_PARAM;
#[zenoh_macros::internal]
pub use crate::api::queryable::ReplySample;
pub use crate::api::{
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod _prelude {
#[zenoh_macros::unstable]
pub use crate::api::publisher::PublisherDeclarations;
pub use crate::{
api::selector::PredefinedParameters,
api::{
builders::sample::{
QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
Expand Down

0 comments on commit 2f01adc

Please sign in to comment.