Skip to content

Commit

Permalink
feat: add unstable background method to subscriber/queryable/matchi…
Browse files Browse the repository at this point in the history
…ng listeners

The only real change of this PR is to undeclare objects at session closing/publisher undeclaration.
Calling `background` is then only a `mem::forget`.
  • Loading branch information
wyfo committed Jun 10, 2024
1 parent b0ba472 commit 1999c4d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 20 deletions.
2 changes: 2 additions & 0 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
})
}
}
Expand Down Expand Up @@ -371,6 +372,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
})
}
}
Expand Down
75 changes: 56 additions & 19 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
//

use std::{
collections::HashSet,
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

use futures::Sink;
use zenoh_core::{zread, Resolvable, Resolve, Wait};
use zenoh_keyexpr::keyexpr;
use zenoh_protocol::{
core::CongestionControl,
network::{push::ext, Push},
Expand Down Expand Up @@ -135,6 +136,7 @@ pub struct Publisher<'a> {
pub(crate) priority: Priority,
pub(crate) is_express: bool,
pub(crate) destination: Locality,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
}

impl<'a> Publisher<'a> {
Expand All @@ -161,28 +163,33 @@ impl<'a> Publisher<'a> {
}
}

#[inline]
pub fn key_expr(&self) -> &KeyExpr<'a> {
&self.key_expr
}

#[inline]
/// Get the `congestion_control` applied when routing the data.
pub fn congestion_control(&self) -> CongestionControl {
self.congestion_control
}

/// Change the `congestion_control` to apply when routing the data.
#[inline]
pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) {
self.congestion_control = congestion_control;
}

/// Change the priority of the written data.
/// Get the priority of the written data.
#[inline]
pub fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
pub fn priority(&self) -> Priority {
self.priority
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
/// Change the priority of the written data.
#[inline]
pub fn set_allowed_destination(&mut self, destination: Locality) {
self.destination = destination;
pub fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
}

/// Consumes the given `Publisher`, returning a thread-safe reference-counting
Expand Down Expand Up @@ -331,6 +338,7 @@ impl<'a> Publisher<'a> {
pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> {
MatchingListenerBuilder {
publisher: PublisherRef::Borrow(self),
background: false,
handler: DefaultHandler::default(),
}
}
Expand All @@ -351,6 +359,13 @@ impl<'a> Publisher<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

fn undeclare_matching_listeners(&self) -> ZResult<()> {
for id in zlock!(self.matching_listeners).drain() {
self.session.undeclare_matches_listener_inner(id)?
}
Ok(())
}
}

/// Functions to create zenoh entities with `'static` lifetime.
Expand Down Expand Up @@ -436,6 +451,7 @@ impl PublisherDeclarations for std::sync::Arc<Publisher<'static>> {
fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> {
MatchingListenerBuilder {
publisher: PublisherRef::Shared(self.clone()),
background: false,
handler: DefaultHandler::default(),
}
}
Expand Down Expand Up @@ -473,6 +489,7 @@ impl Resolvable for PublisherUndeclaration<'_> {

impl Wait for PublisherUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.undeclare_matching_listeners()?;
self.publisher
.session
.undeclare_publisher_inner(self.publisher.id)
Expand All @@ -490,6 +507,7 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
}
Expand Down Expand Up @@ -755,6 +773,7 @@ impl MatchingStatus {
#[derive(Debug)]
pub struct MatchingListenerBuilder<'a, Handler> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) background: bool,
pub handler: Handler,
}

Expand Down Expand Up @@ -791,10 +810,12 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> {
{
let MatchingListenerBuilder {
publisher,
background,
handler: _,
} = self;
MatchingListenerBuilder {
publisher,
background,
handler: callback,
}
}
Expand Down Expand Up @@ -861,9 +882,14 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> {
{
let MatchingListenerBuilder {
publisher,
background,
handler: _,
} = self;
MatchingListenerBuilder { publisher, handler }
MatchingListenerBuilder {
publisher,
background,
handler,
}
}
}

Expand All @@ -885,16 +911,18 @@ where
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
self.publisher
let state = self
.publisher
.session
.declare_matches_listener_inner(&self.publisher, callback)
.map(|listener_state| MatchingListener {
listener: MatchingListenerInner {
publisher: self.publisher,
state: listener_state,
},
receiver,
})
.declare_matches_listener_inner(&self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
Ok(MatchingListener {
listener: MatchingListenerInner {
publisher: self.publisher,
state,
},
receiver,
})
}
}

Expand Down Expand Up @@ -1005,6 +1033,13 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
}

/// Make the matching listener run in background, until the publisher is undeclared.
#[inline]
#[zenoh_macros::unstable]
pub fn background(self) {
std::mem::forget(self);
}
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -1042,6 +1077,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
.session
Expand All @@ -1062,6 +1098,7 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
.session
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,13 @@ impl<'a, Handler> Queryable<'a, Handler> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Make the queryable run in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(self) {
std::mem::forget(self);
}
}

impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> {
Expand Down
20 changes: 19 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,14 +533,32 @@ impl Session {
let session = ManuallyDrop::new(self);
ResolveFuture::new(async move {
trace!("close()");
let mut publishers = Vec::new();
let mut queryables = Vec::new();
let mut subscribers = Vec::new();
{
let state = zread!(session.state);
publishers.extend(state.publishers.keys());
queryables.extend(state.queryables.keys());
subscribers.extend(state.subscribers.keys());
}
for id in publishers {
session.undeclare_publisher_inner(id)?;
}
for id in queryables {
session.close_queryable(id)?;
}
for id in subscribers {
session.undeclare_subscriber_inner(id)?;
}
session
.task_controller
.terminate_all(Duration::from_secs(10));
if session.owns_runtime {
session.runtime.close().await?;
}
let mut state = zwrite!(session.state);
// clean up to break cyclic references from self.state to itself
// clean up to break cyclic references from session.state to itself
let primitives = state.primitives.take();
state.queryables.clear();
drop(state);
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ impl<'a, Handler> Subscriber<'a, Handler> {
pub fn undeclare(self) -> SubscriberUndeclaration<'a> {
self.subscriber.undeclare()
}

/// Make the subscriber run in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(self) {
std::mem::forget(self);
}
}

impl<'a, T> Undeclarable<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> {
Expand Down

0 comments on commit 1999c4d

Please sign in to comment.