From dc9ec915d3a4a6415758f96720061991faa5437a Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 26 Mar 2024 11:48:18 +0100 Subject: [PATCH] refactor: Handle incoming liveliness token declaration/undeclaration --- zenoh/src/session.rs | 92 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 4c303ae974..a54f7bcf96 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -59,6 +59,7 @@ use zenoh_config::unwrap_or_default; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; #[cfg(feature = "unstable")] use zenoh_protocol::network::declare::SubscriberId; +use zenoh_protocol::network::declare::TokenId; use zenoh_protocol::network::AtomicRequestId; use zenoh_protocol::network::RequestId; use zenoh_protocol::zenoh::reply::ReplyBody; @@ -103,6 +104,8 @@ pub(crate) struct SessionState { pub(crate) remote_resources: HashMap, #[cfg(feature = "unstable")] pub(crate) remote_subscribers: HashMap>, + #[cfg(feature = "unstable")] + pub(crate) remote_tokens: HashMap>, //pub(crate) publications: Vec, pub(crate) subscribers: HashMap>, pub(crate) queryables: HashMap>, @@ -128,6 +131,8 @@ impl SessionState { remote_resources: HashMap::new(), #[cfg(feature = "unstable")] remote_subscribers: HashMap::new(), + #[cfg(feature = "unstable")] + remote_tokens: HashMap::new(), //publications: Vec::new(), subscribers: HashMap::new(), queryables: HashMap::new(), @@ -2044,11 +2049,88 @@ impl Primitives for Session { zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); } - DeclareBody::DeclareToken(_) => todo!(), - DeclareBody::UndeclareToken(_) => todo!(), - DeclareBody::DeclareInterest(_) => todo!(), - DeclareBody::FinalInterest(_) => todo!(), - DeclareBody::UndeclareInterest(_) => todo!(), + zenoh_protocol::network::DeclareBody::DeclareToken(m) => { + trace!("recv DeclareToken {:?}", m.id); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + match state + .wireexpr_to_keyexpr(&m.wire_expr, false) + .map(|e| e.into_owned()) + { + Ok(expr) => { + state.remote_tokens.insert(m.id, expr.clone()); + + // NOTE(fuzzypixelz): I didn't put + // self.update_status_up() here because it doesn't + // make sense. An application which declares a + // liveliness token is not a subscriber and thus + // doens't need to to be visible to publishers + // through .matching_status(). + + if expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + drop(state); + + self.handle_data( + false, + &m.wire_expr, + None, + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); + } + } + Err(err) => { + log::error!("Received DeclareToken for unkown wire_expr: {}", err) + } + } + } + } + zenoh_protocol::network::DeclareBody::UndeclareToken(m) => { + trace!("recv UndeclareToken {:?}", m.id); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + if let Some(expr) = state.remote_tokens.remove(&m.id) { + // NOTE(fuzzypixelz): I didn't put + // self.update_status_down() here because it doesn't + // make sense. An application which declares a + // liveliness token is not a subscriber and thus + // doens't need to to be visible to publishers + // through .matching_status(). + + if expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + drop(state); + + let data_info = DataInfo { + kind: SampleKind::Delete, + ..Default::default() + }; + + self.handle_data( + false, + &m.ext_wire_expr.wire_expr, + Some(data_info), + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); + } + } else { + log::error!("Received UndeclareToken for unkown id: {}", m.id); + } + } + } + zenoh_protocol::network::DeclareBody::DeclareInterest(_) => todo!(), + zenoh_protocol::network::DeclareBody::FinalInterest(_) => todo!(), + zenoh_protocol::network::DeclareBody::UndeclareInterest(_) => todo!(), } }