diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index fcc87b87be..0e801f4522 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1113,6 +1113,23 @@ impl Session { let state = zread!(self.state); self.update_status_up(&state, &key_expr) } + } else if key_expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareInterest(DeclareInterest { + id, + wire_expr: Some(key_expr.to_wire(self).to_owned()), + interest: Interest::KEYEXPRS + Interest::SUBSCRIBERS + Interest::FUTURE, + }), + }); } Ok(sub_state) @@ -1170,6 +1187,23 @@ impl Session { self.update_status_down(&state, &sub_state.key_expr) } } + } else if sub_state + .key_expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareInterest(UndeclareInterest { + id: sub_state.id, + ext_wire_expr: WireExprType::null(), + }), + }); } Ok(()) } else { @@ -2026,7 +2060,7 @@ impl Primitives for Session { }; self.handle_data( false, - &m.ext_wire_expr.wire_expr, + &expr.to_wire(self), Some(data_info), ZBuf::default(), #[cfg(feature = "unstable")]