Skip to content

Commit

Permalink
feat: add background to advanced subscriber (#1651)
Browse files Browse the repository at this point in the history
* Expose and use ke macro

* Fix SourceInfo publication

* Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber

* Fix doctests

* Fix doc warnings

* Remove debug trace

* Add history test

* Fix periodic queries

* Remove debug trace

* Lower test debug level

* Add retransmission tests

* Liveliness sub callback shoud increase pending queries counter

* Liveliness sub callback shoud spawn periodic queries when enbaled

* Add late_joiner test

* Only treat pending samples when there are no more pending queries

* Apply proper sequencing for history

* Improve AdvancedSubscriber

* Code reorg

* Code reorg

* Fix deduplication

* Subscribe to liveliness tokens with history

* Update builders

* Add examples

* Fix rustdoc

* Move stuff in State

* Code reorg

* Add smaple_miss_callback

* Add sample miss test

* Update z_advanced_sub example

* Explicit use in examples

* Update API

* Fix rustdoc

* Allow sample miss detection when recovery disabled

* Add miss_sample_callback to DataSubscriberBuilderExt

* Add sample_miss_detection to PublisherBuilderExt

* Add test_advanced_sample_miss test

* Deliver sample even when no miss callback

* Replace sample_miss_callback with sample_miss_listener

* Fix clippy warnings

* Fix tests

* Add HistoryConf max_samples option

* Add HistoryConf max_age option

* Use BTreeMap

* Add meta_keyexpr option

* Add late_joiner_detection and meta_keyexpr options on Subcriber side

* Renaming

* Fix compilation issues

* Remove AdvancedCache from public API

* Update Session admin to match AdvancedSub

* Gather constants

* Fix doc build

* Renaming

* Mark PublicationCache and QueryingSubscriber as deprecated and remove related examples

* Remove z_pub_cache and z_query_sub entries from zenoh-ext examples README

* Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml

* Add CacheConfig replies_qos option

* Call cache directly from publisher

* Update doc

* Add missing unstable tags

* Add missing unstable tags

* Add missing unstable tags

* Add unstable tag everywhere

* Add missing AdvancedSubscriber methods

* Fix WeakSession::Session internal function

* Expose missing SampleMissListener and related structs

* Add AdvancedPublisherBuilderExt::advanced function

* Add missing AdvancedPublisherBuilder functions

* Fix doctests

* Expose Miss struct

* impl QoSBuilderTrait for AdvancedPublisherBuilder

* Propagate PublisherBuilder values to AdvancedPublisherBuilder

* Rename AdvancedSubscriber::close()

* Add unstable tags

* Add AdvancedSubscriber::detect_publishers function

* Remove debug println

* Renaming

* Add unstable tags

* Use std Range

* Spawn Timer in a tokio runtime

* Fix panic when last_delivered is None

* Release lock before calling get

* Update key mapping

* Improve doc

* fix: fix callback API (#1647)

* Update doc

* feat: add background to advanced subscriber

* Fix ke_liveliness

* fix: add missing mut

* Fix doc

* Fix doc

* Fix zenoh-ext Cargo.toml

---------

Co-authored-by: OlivierHecart <[email protected]>
  • Loading branch information
wyfo and OlivierHecart authored Dec 11, 2024
1 parent 2f27e18 commit 83bc4ed
Showing 1 changed file with 66 additions and 16 deletions.
82 changes: 66 additions & 16 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
{
AdvancedSubscriberBuilder {
session: self.session,
key_expr: self.key_expr.map(|s| s.into_owned()),
key_expr: self.key_expr,
origin: self.origin,
retransmission: self.retransmission,
query_target: self.query_target,
Expand All @@ -203,7 +203,30 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> {
}

#[zenoh_macros::unstable]
impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> {
impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>> {
/// Register the subscriber callback to be run in background until the session is closed.
///
/// Background builder doesn't return a `AdvancedSubscriber` object anymore.
pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback<Sample>, true> {
AdvancedSubscriberBuilder {
session: self.session,
key_expr: self.key_expr,
origin: self.origin,
retransmission: self.retransmission,
query_target: self.query_target,
query_timeout: self.query_timeout,
history: self.history,
liveliness: self.liveliness,
meta_key_expr: self.meta_key_expr,
handler: self.handler,
}
}
}

#[zenoh_macros::unstable]
impl<'a, 'c, Handler, const BACKGROUND: bool>
AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND>
{
/// Restrict the matching publications that will be receive by this [`Subscriber`]
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
Expand Down Expand Up @@ -323,6 +346,35 @@ where
}
}

#[zenoh_macros::unstable]
impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
type To = ZResult<()>;
}

#[zenoh_macros::unstable]
impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let mut sub = AdvancedSubscriber::new(self.with_static_keys())?;
sub.subscriber.set_background(true);
if let Some(mut liveliness_sub) = sub.liveliness_subscriber.take() {
liveliness_sub.set_background(true);
}
Ok(())
}
}

#[zenoh_macros::unstable]
impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback<Sample>, true> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

#[zenoh_macros::unstable]
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

#[zenoh_macros::unstable]
struct Period {
timer: Timer,
Expand All @@ -343,6 +395,7 @@ struct State {
query_timeout: Duration,
callback: Callback<Sample>,
miss_handlers: HashMap<usize, Callback<Miss>>,
token: Option<LivelinessToken>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -387,8 +440,7 @@ pub struct AdvancedSubscriber<Receiver> {
statesref: Arc<Mutex<State>>,
subscriber: Subscriber<()>,
receiver: Receiver,
_liveliness_subscriber: Option<Subscriber<()>>,
_token: Option<LivelinessToken>,
liveliness_subscriber: Option<Subscriber<()>>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -573,6 +625,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
query_timeout: conf.query_timeout,
callback: callback.clone(),
miss_handlers: HashMap::new(),
token: None,
}));

let sub_callback = {
Expand Down Expand Up @@ -868,7 +921,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
None
};

let token = if conf.liveliness {
if conf.liveliness {
let prefix = KE_ADV_PREFIX
/ KE_SUB
/ &subscriber.id().zid().into_keyexpr()
Expand All @@ -878,22 +931,19 @@ impl<Handler> AdvancedSubscriber<Handler> {
// We need this empty chunk because af a routing matching bug
_ => prefix / KE_EMPTY / KE_AT,
};
Some(
conf.session
.liveliness()
.declare_token(prefix / &key_expr)
.wait()?,
)
} else {
None
};
let token = conf
.session
.liveliness()
.declare_token(prefix / &key_expr)
.wait()?;
zlock!(statesref).token = Some(token)
}

let reliable_subscriber = AdvancedSubscriber {
statesref,
subscriber,
receiver,
_liveliness_subscriber: liveliness_subscriber,
_token: token,
liveliness_subscriber,
};

Ok(reliable_subscriber)
Expand Down

0 comments on commit 83bc4ed

Please sign in to comment.