Skip to content

Commit

Permalink
refactor: Handle incoming liveliness token declaration/undeclaration
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Mar 26, 2024
1 parent 06013c4 commit dc9ec91
Showing 1 changed file with 87 additions and 5 deletions.
92 changes: 87 additions & 5 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use zenoh_config::unwrap_or_default;
use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve};
#[cfg(feature = "unstable")]
use zenoh_protocol::network::declare::SubscriberId;
use zenoh_protocol::network::declare::TokenId;
use zenoh_protocol::network::AtomicRequestId;
use zenoh_protocol::network::RequestId;
use zenoh_protocol::zenoh::reply::ReplyBody;
Expand Down Expand Up @@ -103,6 +104,8 @@ pub(crate) struct SessionState {
pub(crate) remote_resources: HashMap<ExprId, Resource>,
#[cfg(feature = "unstable")]
pub(crate) remote_subscribers: HashMap<SubscriberId, KeyExpr<'static>>,
#[cfg(feature = "unstable")]
pub(crate) remote_tokens: HashMap<TokenId, KeyExpr<'static>>,
//pub(crate) publications: Vec<OwnedKeyExpr>,
pub(crate) subscribers: HashMap<Id, Arc<SubscriberState>>,
pub(crate) queryables: HashMap<Id, Arc<QueryableState>>,
Expand All @@ -128,6 +131,8 @@ impl SessionState {
remote_resources: HashMap::new(),
#[cfg(feature = "unstable")]
remote_subscribers: HashMap::new(),
#[cfg(feature = "unstable")]
remote_tokens: HashMap::new(),
//publications: Vec::new(),
subscribers: HashMap::new(),
queryables: HashMap::new(),
Expand Down Expand Up @@ -2044,11 +2049,88 @@ impl Primitives for Session {
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
trace!("recv UndeclareQueryable {:?}", m.id);
}
DeclareBody::DeclareToken(_) => todo!(),
DeclareBody::UndeclareToken(_) => todo!(),
DeclareBody::DeclareInterest(_) => todo!(),
DeclareBody::FinalInterest(_) => todo!(),
DeclareBody::UndeclareInterest(_) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
trace!("recv DeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
Ok(expr) => {
state.remote_tokens.insert(m.id, expr.clone());

// NOTE(fuzzypixelz): I didn't put
// self.update_status_up() here because it doesn't
// make sense. An application which declares a
// liveliness token is not a subscriber and thus
// doens't need to to be visible to publishers
// through .matching_status().

if expr
.as_str()
.starts_with(crate::liveliness::PREFIX_LIVELINESS)
{
drop(state);

self.handle_data(
false,
&m.wire_expr,
None,
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
}
}
Err(err) => {
log::error!("Received DeclareToken for unkown wire_expr: {}", err)
}
}
}
}
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
trace!("recv UndeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
if let Some(expr) = state.remote_tokens.remove(&m.id) {
// NOTE(fuzzypixelz): I didn't put
// self.update_status_down() here because it doesn't
// make sense. An application which declares a
// liveliness token is not a subscriber and thus
// doens't need to to be visible to publishers
// through .matching_status().

if expr
.as_str()
.starts_with(crate::liveliness::PREFIX_LIVELINESS)
{
drop(state);

let data_info = DataInfo {
kind: SampleKind::Delete,
..Default::default()
};

self.handle_data(
false,
&m.ext_wire_expr.wire_expr,
Some(data_info),
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
}
} else {
log::error!("Received UndeclareToken for unkown id: {}", m.id);
}
}
}
zenoh_protocol::network::DeclareBody::DeclareInterest(_) => todo!(),
zenoh_protocol::network::DeclareBody::FinalInterest(_) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareInterest(_) => todo!(),
}
}

Expand Down

0 comments on commit dc9ec91

Please sign in to comment.