Skip to content

Commit

Permalink
feat: remove alive flag in session/undeclarable objects
Browse files Browse the repository at this point in the history
This flag was just used to prevent double close/undeclaration, i.e. object being drop after `close`/`undeclare`.
Using `ManuallyDrop` instead in `close`/`undeclare` solve this issue, and save some space in structs
(often one word counting the padding, which is not negligible).
  • Loading branch information
wyfo committed Jun 10, 2024
1 parent 04f05cb commit e024016
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 46 deletions.
15 changes: 6 additions & 9 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::{
convert::TryInto,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -235,7 +236,6 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
.map(|tok_state| LivelinessToken {
session,
state: tok_state,
alive: true,
})
}
}
Expand Down Expand Up @@ -291,7 +291,6 @@ pub(crate) struct LivelinessTokenState {
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
pub(crate) alive: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
Expand All @@ -315,7 +314,7 @@ pub struct LivelinessToken<'a> {
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
pub struct LivelinessTokenUndeclaration<'a> {
token: LivelinessToken<'a>,
token: ManuallyDrop<LivelinessToken<'a>>,
}

#[zenoh_macros::unstable]
Expand All @@ -326,7 +325,6 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.token.alive = false;
self.token.session.undeclare_liveliness(self.token.state.id)
}
}
Expand Down Expand Up @@ -374,16 +372,16 @@ impl<'a> LivelinessToken<'a> {
#[zenoh_macros::unstable]
impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> {
fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> {
LivelinessTokenUndeclaration { token: self }
LivelinessTokenUndeclaration {
token: ManuallyDrop::new(self),
}
}
}

#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.alive {
let _ = self.session.undeclare_liveliness(self.state.id);
}
let _ = self.session.undeclare_liveliness(self.state.id);
}
}

Expand Down Expand Up @@ -553,7 +551,6 @@ where
subscriber: SubscriberInner {
session,
state: sub_state,
alive: true,
},
handler,
})
Expand Down
20 changes: 9 additions & 11 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -894,7 +895,6 @@ where
listener: MatchingListenerInner {
publisher: self.publisher,
state: listener_state,
alive: true,
},
receiver,
})
Expand Down Expand Up @@ -939,7 +939,6 @@ impl std::fmt::Debug for MatchingListenerState {
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
pub(crate) alive: bool,
}

#[zenoh_macros::unstable]
Expand All @@ -953,7 +952,9 @@ impl<'a> MatchingListenerInner<'a> {
#[zenoh_macros::unstable]
impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
MatchingListenerUndeclaration { subscriber: self }
MatchingListenerUndeclaration {
subscriber: ManuallyDrop::new(self),
}
}
}

Expand Down Expand Up @@ -1033,7 +1034,7 @@ impl<Receiver> std::ops::DerefMut for MatchingListener<'_, Receiver> {

#[zenoh_macros::unstable]
pub struct MatchingListenerUndeclaration<'a> {
subscriber: MatchingListenerInner<'a>,
subscriber: ManuallyDrop<MatchingListenerInner<'a>>,
}

#[zenoh_macros::unstable]
Expand All @@ -1044,7 +1045,6 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.subscriber.alive = false;
self.subscriber
.publisher
.session
Expand All @@ -1065,12 +1065,10 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if self.alive {
let _ = self
.publisher
.session
.undeclare_matches_listener_inner(self.state.id);
}
let _ = self
.publisher
.session
.undeclare_matches_listener_inner(self.state.id);
}
}

Expand Down
14 changes: 6 additions & 8 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use std::{
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::Arc,
};
Expand Down Expand Up @@ -611,12 +612,13 @@ impl fmt::Debug for QueryableState {
pub(crate) struct CallbackQueryable<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<QueryableState>,
pub(crate) alive: bool,
}

impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> {
fn undeclare_inner(self, _: ()) -> QueryableUndeclaration<'a> {
QueryableUndeclaration { queryable: self }
QueryableUndeclaration {
queryable: ManuallyDrop::new(self),
}
}
}

Expand All @@ -635,7 +637,7 @@ impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a>
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct QueryableUndeclaration<'a> {
queryable: CallbackQueryable<'a>,
queryable: ManuallyDrop<CallbackQueryable<'a>>,
}

impl Resolvable for QueryableUndeclaration<'_> {
Expand All @@ -644,7 +646,6 @@ impl Resolvable for QueryableUndeclaration<'_> {

impl Wait for QueryableUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.queryable.alive = false;
self.queryable
.session
.close_queryable(self.queryable.state.id)
Expand All @@ -662,9 +663,7 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> {

impl Drop for CallbackQueryable<'_> {
fn drop(&mut self) {
if self.alive {
let _ = self.session.close_queryable(self.state.id);
}
let _ = self.session.close_queryable(self.state.id);
}
}

Expand Down Expand Up @@ -944,7 +943,6 @@ where
queryable: CallbackQueryable {
session,
state: qable_state,
alive: true,
},
handler: receiver,
})
Expand Down
20 changes: 10 additions & 10 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
convert::{TryFrom, TryInto},
fmt,
future::{IntoFuture, Ready},
mem::ManuallyDrop,
ops::Deref,
sync::{
atomic::{AtomicU16, Ordering},
Expand Down Expand Up @@ -404,7 +405,6 @@ pub struct Session {
pub(crate) runtime: Runtime,
pub(crate) state: Arc<RwLock<SessionState>>,
pub(crate) id: u16,
pub(crate) alive: bool,
owns_runtime: bool,
task_controller: TaskController,
}
Expand Down Expand Up @@ -530,20 +530,22 @@ impl Session {
/// session.close().await.unwrap();
/// # }
/// ```
pub fn close(mut self) -> impl Resolve<ZResult<()>> {
pub fn close(self) -> impl Resolve<ZResult<()>> {
let session = ManuallyDrop::new(self);
ResolveFuture::new(async move {
trace!("close()");
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
session
.task_controller
.terminate_all(Duration::from_secs(10));
if session.owns_runtime {
session.runtime.close().await?;
}
let mut state = zwrite!(self.state);
let mut state = zwrite!(session.state);
// clean up to break cyclic references from self.state to itself
let primitives = state.primitives.take();
state.queryables.clear();
drop(state);
primitives.as_ref().unwrap().send_close();
self.alive = false;
Ok(())
})
}
Expand Down Expand Up @@ -2472,9 +2474,7 @@ impl Primitives for Session {

impl Drop for Session {
fn drop(&mut self) {
if self.alive {
let _ = self.clone().close().wait();
}
let _ = self.clone().close().wait();
}
}

Expand Down
15 changes: 7 additions & 8 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::{
fmt,
future::{IntoFuture, Ready},
mem,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::Arc,
};
Expand Down Expand Up @@ -78,7 +80,6 @@ impl fmt::Debug for SubscriberState {
pub(crate) struct SubscriberInner<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<SubscriberState>,
pub(crate) alive: bool,
}

impl<'a> SubscriberInner<'a> {
Expand Down Expand Up @@ -111,7 +112,9 @@ impl<'a> SubscriberInner<'a> {

impl<'a> Undeclarable<(), SubscriberUndeclaration<'a>> for SubscriberInner<'a> {
fn undeclare_inner(self, _: ()) -> SubscriberUndeclaration<'a> {
SubscriberUndeclaration { subscriber: self }
SubscriberUndeclaration {
subscriber: ManuallyDrop::new(self),
}
}
}

Expand All @@ -133,7 +136,7 @@ impl<'a> Undeclarable<(), SubscriberUndeclaration<'a>> for SubscriberInner<'a> {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct SubscriberUndeclaration<'a> {
subscriber: SubscriberInner<'a>,
subscriber: ManuallyDrop<SubscriberInner<'a>>,
}

impl Resolvable for SubscriberUndeclaration<'_> {
Expand All @@ -142,7 +145,6 @@ impl Resolvable for SubscriberUndeclaration<'_> {

impl Wait for SubscriberUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.subscriber.alive = false;
self.subscriber
.session
.undeclare_subscriber_inner(self.subscriber.state.id)
Expand All @@ -160,9 +162,7 @@ impl IntoFuture for SubscriberUndeclaration<'_> {

impl Drop for SubscriberInner<'_> {
fn drop(&mut self) {
if self.alive {
let _ = self.session.undeclare_subscriber_inner(self.state.id);
}
let _ = self.session.undeclare_subscriber_inner(self.state.id);
}
}

Expand Down Expand Up @@ -387,7 +387,6 @@ where
subscriber: SubscriberInner {
session,
state: sub_state,
alive: true,
},
handler: receiver,
})
Expand Down

0 comments on commit e024016

Please sign in to comment.