Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Advanced Pub/Sub feature #1582

Merged
merged 91 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
2cc7f12
Expose and use ke macro
OlivierHecart Nov 5, 2024
163d3d5
Fix SourceInfo publication
OlivierHecart Nov 8, 2024
43a5d3c
Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber
OlivierHecart Nov 8, 2024
9f076a7
Fix doctests
OlivierHecart Nov 8, 2024
3935974
Fix doc warnings
OlivierHecart Nov 8, 2024
405d76e
Remove debug trace
OlivierHecart Nov 13, 2024
71b22d3
Add history test
OlivierHecart Nov 13, 2024
af1b2a2
Fix periodic queries
OlivierHecart Nov 13, 2024
bd32356
Remove debug trace
OlivierHecart Nov 13, 2024
4e4bbb6
Lower test debug level
OlivierHecart Nov 13, 2024
f36c890
Add retransmission tests
OlivierHecart Nov 13, 2024
23f145d
Liveliness sub callback shoud increase pending queries counter
OlivierHecart Nov 13, 2024
de396a4
Liveliness sub callback shoud spawn periodic queries when enbaled
OlivierHecart Nov 13, 2024
ff24135
Add late_joiner test
OlivierHecart Nov 13, 2024
975aba4
Only treat pending samples when there are no more pending queries
OlivierHecart Nov 14, 2024
5d9ac8d
Apply proper sequencing for history
OlivierHecart Nov 14, 2024
2305b41
Improve AdvancedSubscriber
OlivierHecart Nov 14, 2024
883885b
Code reorg
OlivierHecart Nov 14, 2024
3201700
Code reorg
OlivierHecart Nov 14, 2024
ded789c
Fix deduplication
OlivierHecart Nov 14, 2024
e033553
Subscribe to liveliness tokens with history
OlivierHecart Nov 15, 2024
acaf341
Update builders
OlivierHecart Nov 15, 2024
ef63165
Add examples
OlivierHecart Nov 15, 2024
788a8e5
Fix rustdoc
OlivierHecart Nov 15, 2024
a40639c
Move stuff in State
OlivierHecart Nov 18, 2024
e1caa06
Code reorg
OlivierHecart Nov 18, 2024
ee1895f
Add smaple_miss_callback
OlivierHecart Nov 18, 2024
c8a43d1
Add sample miss test
OlivierHecart Nov 18, 2024
d11243d
Update z_advanced_sub example
OlivierHecart Nov 18, 2024
eb6dbf6
Explicit use in examples
OlivierHecart Nov 21, 2024
ee93de1
Update API
OlivierHecart Nov 28, 2024
596d0ed
Fix rustdoc
OlivierHecart Nov 28, 2024
35ae472
Allow sample miss detection when recovery disabled
OlivierHecart Nov 28, 2024
52bec17
Add miss_sample_callback to DataSubscriberBuilderExt
OlivierHecart Nov 28, 2024
a16d64c
Add sample_miss_detection to PublisherBuilderExt
OlivierHecart Nov 28, 2024
5662f34
Add test_advanced_sample_miss test
OlivierHecart Nov 28, 2024
18adcc7
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 28, 2024
6da5655
Deliver sample even when no miss callback
OlivierHecart Nov 28, 2024
3449d73
Replace sample_miss_callback with sample_miss_listener
OlivierHecart Nov 28, 2024
76344c9
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Nov 29, 2024
ba72a96
Fix clippy warnings
OlivierHecart Nov 29, 2024
4df529e
Fix tests
OlivierHecart Nov 29, 2024
443d540
Add HistoryConf max_samples option
OlivierHecart Nov 29, 2024
f9c7d0e
Add HistoryConf max_age option
OlivierHecart Nov 29, 2024
3b28b04
Use BTreeMap
OlivierHecart Dec 2, 2024
9e7a99b
Add meta_keyexpr option
OlivierHecart Dec 4, 2024
43d9d79
Add late_joiner_detection and meta_keyexpr options on Subcriber side
OlivierHecart Dec 4, 2024
54dc83c
Renaming
OlivierHecart Dec 4, 2024
aacea79
Merge branch 'main' into dev/advanced_pubsub
OlivierHecart Dec 4, 2024
9d18085
Fix compilation issues
OlivierHecart Dec 4, 2024
7d3ab8d
Remove AdvancedCache from public API
OlivierHecart Dec 5, 2024
e59e76b
Update Session admin to match AdvancedSub
OlivierHecart Dec 5, 2024
71ce64b
Gather constants
OlivierHecart Dec 5, 2024
8810b37
Fix doc build
OlivierHecart Dec 5, 2024
dba1d50
Renaming
OlivierHecart Dec 5, 2024
95ce535
Mark PublicationCache and QueryingSubscriber as deprecated and remove…
OlivierHecart Dec 5, 2024
6023920
Remove z_pub_cache and z_query_sub entries from zenoh-ext examples RE…
OlivierHecart Dec 5, 2024
dbd7d22
Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml
OlivierHecart Dec 5, 2024
7537985
Add CacheConfig replies_qos option
OlivierHecart Dec 5, 2024
f850eaa
Call cache directly from publisher
OlivierHecart Dec 5, 2024
1799a95
Update doc
OlivierHecart Dec 6, 2024
feeb7ef
Add missing unstable tags
OlivierHecart Dec 6, 2024
cd68431
Add missing unstable tags
OlivierHecart Dec 6, 2024
c9cc963
Add missing unstable tags
OlivierHecart Dec 6, 2024
2d6550d
Add unstable tag everywhere
OlivierHecart Dec 6, 2024
ab05e1e
Add missing AdvancedSubscriber methods
OlivierHecart Dec 9, 2024
1bae945
Fix WeakSession::Session internal function
OlivierHecart Dec 9, 2024
5ca2e50
Expose missing SampleMissListener and related structs
OlivierHecart Dec 9, 2024
7c374c4
Add AdvancedPublisherBuilderExt::advanced function
OlivierHecart Dec 9, 2024
0ffb133
Add missing AdvancedPublisherBuilder functions
OlivierHecart Dec 9, 2024
dc9200c
Fix doctests
OlivierHecart Dec 9, 2024
6e9f9b9
Expose Miss struct
OlivierHecart Dec 9, 2024
d3a78a0
impl QoSBuilderTrait for AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
7332d02
Propagate PublisherBuilder values to AdvancedPublisherBuilder
OlivierHecart Dec 9, 2024
b63c080
Rename AdvancedSubscriber::close()
OlivierHecart Dec 9, 2024
4aaca6e
Add unstable tags
OlivierHecart Dec 9, 2024
a8bb82f
Add AdvancedSubscriber::detect_publishers function
OlivierHecart Dec 9, 2024
2df37bc
Remove debug println
OlivierHecart Dec 9, 2024
fe51436
Renaming
OlivierHecart Dec 9, 2024
0bc7233
Add unstable tags
OlivierHecart Dec 9, 2024
00963f3
Use std Range
OlivierHecart Dec 9, 2024
3c6ba61
Spawn Timer in a tokio runtime
OlivierHecart Dec 10, 2024
18682ff
Fix panic when last_delivered is None
OlivierHecart Dec 10, 2024
24e1ede
Release lock before calling get
OlivierHecart Dec 10, 2024
87acf15
Update key mapping
OlivierHecart Dec 10, 2024
6332331
Improve doc
OlivierHecart Dec 10, 2024
b807ee8
fix: fix callback API (#1647)
wyfo Dec 10, 2024
2096085
Update doc
OlivierHecart Dec 10, 2024
dce3304
Fix ke_liveliness
OlivierHecart Dec 10, 2024
e547a23
Fix doc
OlivierHecart Dec 10, 2024
e870d79
Fix doc
OlivierHecart Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ quote = "1.0.37"
rand = { version = "0.8.5", default-features = false } # Default features are disabled due to usage in no_std crates
rand_chacha = "0.3.1"
rcgen = "0.13.1"
ref-cast = "1.0.23"
regex = "1.10.6"
ron = "0.8.1"
ringbuffer-spsc = "0.1.9"
Expand Down
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ pub struct EntityGlobalId(EntityGlobalIdProto);
pub type EntityId = u32;

impl EntityGlobalId {
/// Creates a new EntityGlobalId.
#[zenoh_macros::internal]
pub fn new(zid: ZenohId, eid: EntityId) -> Self {
EntityGlobalIdProto {
zid: zid.into(),
eid,
}
.into()
}

/// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to.
pub fn zid(&self) -> ZenohId {
self.0.zid.into()
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub fn ke(tokens: TokenStream) -> TokenStream {
let value: LitStr = syn::parse(tokens).unwrap();
let ke = value.value();
match zenoh_keyexpr::keyexpr::new(&ke) {
Ok(_) => quote!(unsafe {::zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Ok(_) => quote!(unsafe { zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Err(e) => panic!("{}", e),
}
}
Expand Down
4 changes: 3 additions & 1 deletion zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ tokio = { workspace = true, features = [
"macros",
"io-std",
] }
async-trait = { workspace = true }
bincode = { workspace = true }
zenoh-util = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["default"] }
leb128 = { workspace = true }
zenoh = { workspace = true, default-features = false }
uhlc = { workspace = true }
zenoh = { workspace = true, features = ["default"] }
zenoh-macros = { workspace = true }

[dev-dependencies]
Expand Down
8 changes: 4 additions & 4 deletions zenoh-ext/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ zenoh-ext = { workspace = true, features = ["unstable"] }
zenoh-config = { workspace = true }

[[example]]
name = "z_query_sub"
path = "examples/z_query_sub.rs"
name = "z_advanced_pub"
path = "examples/z_advanced_pub.rs"

[[example]]
name = "z_pub_cache"
path = "examples/z_pub_cache.rs"
name = "z_advanced_sub"
path = "examples/z_advanced_sub.rs"

[[example]]
name = "z_member"
Expand Down
22 changes: 12 additions & 10 deletions zenoh-ext/examples/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@

## Examples description

### z_pub_cache
### z_advanced_pub

Declares a publisher and an associated publication cache with a given key expression.
All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource). The cache can be queried by a QueryingSubscriber at startup (see next example).
Declares an AdvancedPublisher with a given key expression.
All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource, default 1). The cache can be queried by an AdvancedSubscriber for hsitory
or retransmission.

Typical usage:
```bash
z_pub_cache
z_advanced_pub
```
or
```bash
z_pub_cache --history 10
z_advanced_pub --history 10
```

### z_query_sub
### z_advanced_sub

Declares a querying subscriber with a selector.
At startup, the subscriber issuez a query (by default on the same selector than the subscription) and merge/sort/de-duplicate the query results with the publications received in parallel.
Declares an AdvancedSubscriber with a given key expression.
The AdvancedSubscriber can query for AdvancedPublisher history at startup
and on late joiner publisher detection. The AdvancedSubscriber can detect
sample loss and ask for retransmission.

Typical usage:
```bash
z_query_sub
z_advanced_sub
```


### z_member

Group Management example: join a group and display the received group events (Join, Leave, LeaseExpired), as well as an updated group view.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand All @@ -11,40 +13,37 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::{config::Config, key_expr::KeyExpr};
use zenoh_config::ModeDependentValue;
use zenoh_ext::*;
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr, value, history, prefix, complete) = parse_args();
let (config, key_expr, value, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
let _publication_cache = publication_cache_builder.await.unwrap();
println!("Declaring AdvancedPublisher on {}", &key_expr);
let publisher = session
.declare_publisher(&key_expr)
.cache(CacheConfig::default().max_samples(history))
.sample_miss_detection()
.publisher_detection()
.await
.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Put Data ('{}': '{}')", &key_expr, buf);
session.put(&key_expr, buf).await.unwrap();
publisher.put(buf).await.unwrap();
}
}

Expand All @@ -59,36 +58,16 @@ struct Args {
#[arg(short = 'i', long, default_value = "1")]
/// The number of publications to keep in cache.
history: usize,
#[arg(short = 'o', long)]
/// Set `complete` option to true. This means that this queryable is ultimate data source, no need to scan other queryables.
complete: bool,
#[arg(short = 'x', long)]
/// An optional queryable prefix.
prefix: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
KeyExpr<'static>,
String,
usize,
Option<String>,
bool,
) {
fn parse_args() -> (Config, KeyExpr<'static>, String, usize) {
let args = Args::parse();
let mut config: Config = args.common.into();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
(
config,
args.key,
args.value,
args.history,
args.prefix,
args.complete,
)
(config, args.key, args.value, args.history)
}
84 changes: 84 additions & 0 deletions zenoh-ext/examples/examples/z_advanced_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::config::Config;
use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring AdvancedSubscriber on {}", key_expr);
let subscriber = session
.declare_subscriber(key_expr)
.history(HistoryConfig::default().detect_late_publishers())
.recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))
.subscriber_detection()
.await
.unwrap();

let miss_listener = subscriber.sample_miss_listener().await.unwrap();

println!("Press CTRL-C to quit...");
loop {
tokio::select! {
sample = subscriber.recv_async() => {
if let Ok(sample) = sample {
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
}
},
miss = miss_listener.recv_async() => {
if let Ok(miss) = miss {
println!(
">> [Subscriber] Missed {} samples from {:?} !!!",
miss.nb(),
miss.source()
);
}
},
}
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The key expression to subscribe onto.
key: String,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, String) {
let args = Args::parse();
(args.common.into(), args.key)
}
Loading
Loading