diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e0d9ed7e3..e922c4ecb 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -189,7 +189,7 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> { { AdvancedSubscriberBuilder { session: self.session, - key_expr: self.key_expr.map(|s| s.into_owned()), + key_expr: self.key_expr, origin: self.origin, retransmission: self.retransmission, query_target: self.query_target, @@ -203,7 +203,30 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> { } #[zenoh_macros::unstable] -impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> { +impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback> { + /// Register the subscriber callback to be run in background until the session is closed. + /// + /// Background builder doesn't return a `AdvancedSubscriber` object anymore. + pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback, true> { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr, + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + history: self.history, + liveliness: self.liveliness, + meta_key_expr: self.meta_key_expr, + handler: self.handler, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'c, Handler, const BACKGROUND: bool> + AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND> +{ /// Restrict the matching publications that will be receive by this [`Subscriber`] /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -323,6 +346,35 @@ where } } +#[zenoh_macros::unstable] +impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + type To = ZResult<()>; +} + +#[zenoh_macros::unstable] +impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + #[zenoh_macros::unstable] + fn wait(self) -> ::To { + let mut sub = AdvancedSubscriber::new(self.with_static_keys())?; + sub.subscriber.set_background(true); + if let Some(mut liveliness_sub) = sub.liveliness_subscriber.take() { + liveliness_sub.set_background(true); + } + Ok(()) + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + #[zenoh_macros::unstable] + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + #[zenoh_macros::unstable] struct Period { timer: Timer, @@ -343,6 +395,7 @@ struct State { query_timeout: Duration, callback: Callback, miss_handlers: HashMap>, + token: Option, } #[zenoh_macros::unstable] @@ -387,8 +440,7 @@ pub struct AdvancedSubscriber { statesref: Arc>, subscriber: Subscriber<()>, receiver: Receiver, - _liveliness_subscriber: Option>, - _token: Option, + liveliness_subscriber: Option>, } #[zenoh_macros::unstable] @@ -573,6 +625,7 @@ impl AdvancedSubscriber { query_timeout: conf.query_timeout, callback: callback.clone(), miss_handlers: HashMap::new(), + token: None, })); let sub_callback = { @@ -868,7 +921,7 @@ impl AdvancedSubscriber { None }; - let token = if conf.liveliness { + if conf.liveliness { let prefix = KE_ADV_PREFIX / KE_SUB / &subscriber.id().zid().into_keyexpr() @@ -878,22 +931,19 @@ impl AdvancedSubscriber { // We need this empty chunk because af a routing matching bug _ => prefix / KE_EMPTY / KE_AT, }; - Some( - conf.session - .liveliness() - .declare_token(prefix / &key_expr) - .wait()?, - ) - } else { - None - }; + let token = conf + .session + .liveliness() + .declare_token(prefix / &key_expr) + .wait()?; + zlock!(statesref).token = Some(token) + } let reliable_subscriber = AdvancedSubscriber { statesref, subscriber, receiver, - _liveliness_subscriber: liveliness_subscriber, - _token: token, + liveliness_subscriber, }; Ok(reliable_subscriber)