Skip to content

Commit

Permalink
fix: rework everything
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Jun 10, 2024
1 parent 6e29422 commit 94bd2e9
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 106 deletions.
8 changes: 4 additions & 4 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::{
key_expr::KeyExpr,
queryable::Query,
sample::{DataInfo, Locality, SampleKind},
session::{Session, SessionClone},
session::Session,
};

macro_rules! ke_for_sure {
Expand Down Expand Up @@ -121,11 +121,11 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) {

#[derive(Clone)]
pub(crate) struct Handler {
pub(crate) session: Arc<SessionClone>,
pub(crate) session: Arc<Session>,
}

impl Handler {
pub(crate) fn new(session: SessionClone) -> Self {
pub(crate) fn new(session: Session) -> Self {
Self {
session: Arc::new(session),
}
Expand Down Expand Up @@ -193,7 +193,7 @@ impl TransportMulticastEventHandler for Handler {

pub(crate) struct PeerHandler {
pub(crate) expr: WireExpr<'static>,
pub(crate) session: Arc<SessionClone>,
pub(crate) session: Arc<Session>,
}

impl TransportPeerEventHandler for PeerHandler {
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -373,6 +374,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down
31 changes: 21 additions & 10 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::{
convert::TryInto,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -236,6 +235,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
.map(|tok_state| LivelinessToken {
session,
state: tok_state,
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -291,6 +291,7 @@ pub(crate) struct LivelinessTokenState {
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
Expand All @@ -314,9 +315,7 @@ pub struct LivelinessToken<'a> {
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
pub struct LivelinessTokenUndeclaration<'a> {
// ManuallyDrop wrapper prevents the drop code to be executed,
// which would lead to a double undeclaration
token: ManuallyDrop<LivelinessToken<'a>>,
token: LivelinessToken<'a>,
}

#[zenoh_macros::unstable]
Expand All @@ -326,7 +325,9 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
}
}
Expand Down Expand Up @@ -369,21 +370,31 @@ impl<'a> LivelinessToken<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> {
fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> {
LivelinessTokenUndeclaration {
token: ManuallyDrop::new(self),
}
LivelinessTokenUndeclaration { token: self }
}
}

#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
let _ = self.session.undeclare_liveliness(self.state.id);
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
}
}
}

Expand Down Expand Up @@ -553,7 +564,7 @@ where
subscriber: SubscriberInner {
session,
state: sub_state,
background: false,
undeclare_on_drop: true,
},
handler,
})
Expand Down
41 changes: 20 additions & 21 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -137,6 +136,7 @@ pub struct Publisher<'a> {
pub(crate) is_express: bool,
pub(crate) destination: Locality,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
}

impl<'a> Publisher<'a> {
Expand Down Expand Up @@ -457,9 +457,7 @@ impl PublisherDeclarations for std::sync::Arc<Publisher<'static>> {

impl<'a> Undeclarable<(), PublisherUndeclaration<'a>> for Publisher<'a> {
fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> {
PublisherUndeclaration {
publisher: ManuallyDrop::new(self),
}
PublisherUndeclaration { publisher: self }
}
}

Expand All @@ -478,17 +476,17 @@ impl<'a> Undeclarable<(), PublisherUndeclaration<'a>> for Publisher<'a> {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct PublisherUndeclaration<'a> {
// ManuallyDrop wrapper prevents the drop code to be executed,
// which would lead to a double undeclaration
publisher: ManuallyDrop<Publisher<'a>>,
publisher: Publisher<'a>,
}

impl Resolvable for PublisherUndeclaration<'_> {
type To = ZResult<()>;
}

impl Wait for PublisherUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.publisher.undeclare_on_drop = false;
self.publisher.undeclare_matching_listeners()?;
self.publisher
.session
Expand All @@ -507,8 +505,10 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
if self.undeclare_on_drop {
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
}
}

Expand Down Expand Up @@ -912,7 +912,7 @@ where
listener: MatchingListenerInner {
publisher: self.publisher,
state,
background: false,
undeclare_on_drop: true,
},
receiver,
})
Expand Down Expand Up @@ -957,7 +957,7 @@ impl std::fmt::Debug for MatchingListenerState {
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
background: bool,
undeclare_on_drop: bool,
}

#[zenoh_macros::unstable]
Expand All @@ -971,9 +971,7 @@ impl<'a> MatchingListenerInner<'a> {
#[zenoh_macros::unstable]
impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
MatchingListenerUndeclaration {
subscriber: ManuallyDrop::new(self),
}
MatchingListenerUndeclaration { subscriber: self }
}
}

Expand Down Expand Up @@ -1032,7 +1030,8 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
self.listener.background = true;
// The matching listener will be undeclared as part of publisher undeclaration.
self.listener.undeclare_on_drop = false;
}
}

Expand Down Expand Up @@ -1060,9 +1059,7 @@ impl<Receiver> std::ops::DerefMut for MatchingListener<'_, Receiver> {

#[zenoh_macros::unstable]
pub struct MatchingListenerUndeclaration<'a> {
// ManuallyDrop wrapper prevents the drop code to be executed,
// which would lead to a double undeclaration
subscriber: ManuallyDrop<MatchingListenerInner<'a>>,
subscriber: MatchingListenerInner<'a>,
}

#[zenoh_macros::unstable]
Expand All @@ -1072,7 +1069,9 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.subscriber.undeclare_on_drop = false;
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
Expand All @@ -1094,7 +1093,7 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if !self.background {
if self.undeclare_on_drop {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
Expand Down
24 changes: 12 additions & 12 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use std::{
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::Arc,
};
Expand Down Expand Up @@ -612,14 +611,12 @@ impl fmt::Debug for QueryableState {
pub(crate) struct CallbackQueryable<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<QueryableState>,
background: bool,
undeclare_on_drop: bool,
}

impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> {
fn undeclare_inner(self, _: ()) -> QueryableUndeclaration<'a> {
QueryableUndeclaration {
queryable: ManuallyDrop::new(self),
}
QueryableUndeclaration { queryable: self }
}
}

Expand All @@ -638,17 +635,17 @@ impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a>
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct QueryableUndeclaration<'a> {
// ManuallyDrop wrapper prevents the drop code to be executed,
// which would lead to a double undeclaration
queryable: ManuallyDrop<CallbackQueryable<'a>>,
queryable: CallbackQueryable<'a>,
}

impl Resolvable for QueryableUndeclaration<'_> {
type To = ZResult<()>;
}

impl Wait for QueryableUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.queryable.undeclare_on_drop = false;
self.queryable
.session
.close_queryable(self.queryable.state.id)
Expand All @@ -666,7 +663,7 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> {

impl Drop for CallbackQueryable<'_> {
fn drop(&mut self) {
if !self.background {
if self.undeclare_on_drop {
let _ = self.session.close_queryable(self.state.id);
}
}
Expand Down Expand Up @@ -904,7 +901,10 @@ impl<'a, Handler> Queryable<'a, Handler> {
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
self.queryable.background = true;
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.queryable.undeclare_on_drop = false;
}
}

Expand Down Expand Up @@ -955,7 +955,7 @@ where
queryable: CallbackQueryable {
session,
state: qable_state,
background: false,
undeclare_on_drop: true,
},
handler: receiver,
})
Expand Down
Loading

0 comments on commit 94bd2e9

Please sign in to comment.