diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 950cd946b3..91100829e5 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -320,6 +320,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { priority: self.priority, is_express: self.is_express, destination: self.destination, + matching_listeners: Default::default(), }) } } @@ -371,6 +372,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { priority: self.priority, is_express: self.is_express, destination: self.destination, + matching_listeners: Default::default(), }) } } diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index cfce42d6ed..ccb36888e2 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -13,17 +13,18 @@ // use std::{ + collections::HashSet, convert::TryFrom, fmt, future::{IntoFuture, Ready}, mem::ManuallyDrop, pin::Pin, + sync::{Arc, Mutex}, task::{Context, Poll}, }; use futures::Sink; use zenoh_core::{zread, Resolvable, Resolve, Wait}; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::CongestionControl, network::{push::ext, Push}, @@ -135,6 +136,7 @@ pub struct Publisher<'a> { pub(crate) priority: Priority, pub(crate) is_express: bool, pub(crate) destination: Locality, + pub(crate) matching_listeners: Arc>>, } impl<'a> Publisher<'a> { @@ -161,28 +163,33 @@ impl<'a> Publisher<'a> { } } + #[inline] pub fn key_expr(&self) -> &KeyExpr<'a> { &self.key_expr } + #[inline] + /// Get the `congestion_control` applied when routing the data. + pub fn congestion_control(&self) -> CongestionControl { + self.congestion_control + } + /// Change the `congestion_control` to apply when routing the data. #[inline] pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) { self.congestion_control = congestion_control; } - /// Change the priority of the written data. + /// Get the priority of the written data. #[inline] - pub fn set_priority(&mut self, priority: Priority) { - self.priority = priority; + pub fn priority(&self) -> Priority { + self.priority } - /// Restrict the matching subscribers that will receive the published data - /// to the ones that have the given [`Locality`](crate::prelude::Locality). - #[zenoh_macros::unstable] + /// Change the priority of the written data. #[inline] - pub fn set_allowed_destination(&mut self, destination: Locality) { - self.destination = destination; + pub fn set_priority(&mut self, priority: Priority) { + self.priority = priority; } /// Consumes the given `Publisher`, returning a thread-safe reference-counting @@ -331,6 +338,7 @@ impl<'a> Publisher<'a> { pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { MatchingListenerBuilder { publisher: PublisherRef::Borrow(self), + background: false, handler: DefaultHandler::default(), } } @@ -351,6 +359,13 @@ impl<'a> Publisher<'a> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } + + fn undeclare_matching_listeners(&self) -> ZResult<()> { + for id in zlock!(self.matching_listeners).drain() { + self.session.undeclare_matches_listener_inner(id)? + } + Ok(()) + } } /// Functions to create zenoh entities with `'static` lifetime. @@ -436,6 +451,7 @@ impl PublisherDeclarations for std::sync::Arc> { fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> { MatchingListenerBuilder { publisher: PublisherRef::Shared(self.clone()), + background: false, handler: DefaultHandler::default(), } } @@ -473,6 +489,7 @@ impl Resolvable for PublisherUndeclaration<'_> { impl Wait for PublisherUndeclaration<'_> { fn wait(self) -> ::To { + self.publisher.undeclare_matching_listeners()?; self.publisher .session .undeclare_publisher_inner(self.publisher.id) @@ -490,6 +507,7 @@ impl IntoFuture for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { + let _ = self.undeclare_matching_listeners(); let _ = self.session.undeclare_publisher_inner(self.id); } } @@ -755,6 +773,7 @@ impl MatchingStatus { #[derive(Debug)] pub struct MatchingListenerBuilder<'a, Handler> { pub(crate) publisher: PublisherRef<'a>, + pub(crate) background: bool, pub handler: Handler, } @@ -791,10 +810,12 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { { let MatchingListenerBuilder { publisher, + background, handler: _, } = self; MatchingListenerBuilder { publisher, + background, handler: callback, } } @@ -861,9 +882,14 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { { let MatchingListenerBuilder { publisher, + background, handler: _, } = self; - MatchingListenerBuilder { publisher, handler } + MatchingListenerBuilder { + publisher, + background, + handler, + } } } @@ -885,16 +911,18 @@ where #[zenoh_macros::unstable] fn wait(self) -> ::To { let (callback, receiver) = self.handler.into_handler(); - self.publisher + let state = self + .publisher .session - .declare_matches_listener_inner(&self.publisher, callback) - .map(|listener_state| MatchingListener { - listener: MatchingListenerInner { - publisher: self.publisher, - state: listener_state, - }, - receiver, - }) + .declare_matches_listener_inner(&self.publisher, callback)?; + zlock!(self.publisher.matching_listeners).insert(state.id); + Ok(MatchingListener { + listener: MatchingListenerInner { + publisher: self.publisher, + state, + }, + receiver, + }) } } @@ -1005,6 +1033,13 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> { pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { self.listener.undeclare() } + + /// Make the matching listener run in background, until the publisher is undeclared. + #[inline] + #[zenoh_macros::unstable] + pub fn background(self) { + std::mem::forget(self); + } } #[zenoh_macros::unstable] @@ -1042,6 +1077,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for MatchingListenerUndeclaration<'_> { fn wait(self) -> ::To { + zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher .session @@ -1062,6 +1098,7 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Drop for MatchingListenerInner<'_> { fn drop(&mut self) { + zlock!(self.publisher.matching_listeners).remove(&self.state.id); let _ = self .publisher .session diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index e3b9f30ba8..381e1265cb 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -894,6 +894,13 @@ impl<'a, Handler> Queryable<'a, Handler> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } + + /// Make the queryable run in background, until the session is closed. + #[inline] + #[zenoh_macros::unstable] + pub fn background(self) { + std::mem::forget(self); + } } impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index eb51817dfb..f3fe09ec52 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -533,6 +533,24 @@ impl Session { let session = ManuallyDrop::new(self); ResolveFuture::new(async move { trace!("close()"); + let mut publishers = Vec::new(); + let mut queryables = Vec::new(); + let mut subscribers = Vec::new(); + { + let state = zread!(session.state); + publishers.extend(state.publishers.keys()); + queryables.extend(state.queryables.keys()); + subscribers.extend(state.subscribers.keys()); + } + for id in publishers { + session.undeclare_publisher_inner(id)?; + } + for id in queryables { + session.close_queryable(id)?; + } + for id in subscribers { + session.undeclare_subscriber_inner(id)?; + } session .task_controller .terminate_all(Duration::from_secs(10)); @@ -540,7 +558,7 @@ impl Session { session.runtime.close().await?; } let mut state = zwrite!(session.state); - // clean up to break cyclic references from self.state to itself + // clean up to break cyclic references from session.state to itself let primitives = state.primitives.take(); state.queryables.clear(); drop(state); diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 83acd4c219..e156e7223e 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -503,6 +503,13 @@ impl<'a, Handler> Subscriber<'a, Handler> { pub fn undeclare(self) -> SubscriberUndeclaration<'a> { self.subscriber.undeclare() } + + /// Make the subscriber run in background, until the session is closed. + #[inline] + #[zenoh_macros::unstable] + pub fn background(self) { + std::mem::forget(self); + } } impl<'a, T> Undeclarable<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> {