diff --git a/Cargo.toml b/Cargo.toml index ce8ae1643d..a3a370971b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ members = [ exclude = ["ci/nostd-check", "ci/valgrind-check"] [workspace.package] -rust-version = "1.72.0" +rust-version = "1.75.0" version = "0.11.0-dev" # Zenoh version repository = "https://github.com/eclipse-zenoh/zenoh" homepage = "http://zenoh.io" diff --git a/commons/zenoh-keyexpr/src/key_expr/canon.rs b/commons/zenoh-keyexpr/src/key_expr/canon.rs index 7080dbde1a..8187467004 100644 --- a/commons/zenoh-keyexpr/src/key_expr/canon.rs +++ b/commons/zenoh-keyexpr/src/key_expr/canon.rs @@ -12,114 +12,102 @@ // ZettaScale Zenoh Team, // use alloc::string::String; -use core::{slice, str}; - -use crate::key_expr::{ - utils::{Split, Writer}, - DELIMITER, DOUBLE_WILD, SINGLE_WILD, -}; pub trait Canonize { fn canonize(&mut self); } -const DOLLAR_STAR: &[u8; 2] = b"$*"; - -impl Canonize for &mut str { - fn canonize(&mut self) { - let mut writer = Writer { - ptr: self.as_mut_ptr(), - len: 0, - }; - if let Some(position) = self.find("$*$*") { - writer.len = position; - let mut need_final_write = true; - for between_dollarstar in self.as_bytes()[(position + 4)..].splitter(DOLLAR_STAR) { - need_final_write = between_dollarstar.is_empty(); - if !need_final_write { - writer.write(DOLLAR_STAR.as_ref()); - writer.write(between_dollarstar); - } - } - if need_final_write { - writer.write(DOLLAR_STAR.as_ref()) +// Return the length of the canonized string +fn canonize(bytes: &mut [u8]) -> usize { + let mut index = 0; + let mut written = 0; + let mut double_wild = false; + loop { + match &bytes[index..] { + [b'*', b'*'] => { + bytes[written..written + 2].copy_from_slice(b"**"); + written += 2; + return written; } - *self = unsafe { - str::from_utf8_unchecked_mut(slice::from_raw_parts_mut(writer.ptr, writer.len)) + [b'*', b'*', b'/', ..] => { + double_wild = true; + index += 3; } - } - writer.len = 0; - let mut ke = self.as_bytes().splitter(&b'/'); - let mut in_big_wild = false; - - for chunk in ke.by_ref() { - if chunk.is_empty() { - break; - } - if in_big_wild { - match chunk { - [SINGLE_WILD] | b"$*" => { - writer.write_byte(b'*'); - break; - } - DOUBLE_WILD => continue, - _ => { - writer.write(b"**/"); - writer.write(chunk); - in_big_wild = false; - break; + [b'*', r @ ..] | [b'$', b'*', r @ ..] if r.is_empty() || r.starts_with(b"/") => { + let (end, len) = (!r.starts_with(b"/"), r.len()); + bytes[written] = b'*'; + written += 1; + if end { + if double_wild { + bytes[written..written + 3].copy_from_slice(b"/**"); + written += 3; } + return written; } - } else if chunk == DOUBLE_WILD { - in_big_wild = true; - continue; - } else { - writer.write(if chunk == b"$*" { b"*" } else { chunk }); - break; + bytes[written] = b'/'; + written += 1; + index = bytes.len() - len + 1; } - } - for chunk in ke { - if chunk.is_empty() { - writer.write_byte(b'/'); - continue; + // Handle chunks with only repeated "$*" + [b'$', b'*', b'$', b'*', ..] => { + index += 2; } - if in_big_wild { - match chunk { - [SINGLE_WILD] | b"$*" => { - writer.write(b"/*"); - } - DOUBLE_WILD => {} - _ => { - writer.write(b"/**/"); - writer.write(chunk); - in_big_wild = false; + _ => { + if double_wild && &bytes[index..] != b"**" { + bytes[written..written + 3].copy_from_slice(b"**/"); + written += 3; + double_wild = false; + } + let mut write_start = index; + loop { + match bytes.get(index) { + Some(b'/') => { + index += 1; + bytes.copy_within(write_start..index, written); + written += index - write_start; + break; + } + Some(b'$') if matches!(bytes.get(index + 1..index + 4), Some(b"*$*")) => { + index += 2; + bytes.copy_within(write_start..index, written); + written += index - write_start; + let skip = bytes[index + 4..] + .windows(2) + .take_while(|s| s == b"$*") + .count(); + index += (1 + skip) * 2; + write_start = index; + } + Some(_) => index += 1, + None => { + bytes.copy_within(write_start..index, written); + written += index - write_start; + return written; + } } } - } else if chunk == DOUBLE_WILD { - in_big_wild = true; - } else { - writer.write_byte(DELIMITER); - writer.write(if chunk == b"$*" { b"*" } else { chunk }); - } - } - if in_big_wild { - if writer.len != 0 { - writer.write_byte(DELIMITER); } - writer.write(DOUBLE_WILD) - } - *self = unsafe { - str::from_utf8_unchecked_mut(slice::from_raw_parts_mut(writer.ptr, writer.len)) } } } +impl Canonize for &mut str { + fn canonize(&mut self) { + // SAFETY: canonize leave an UTF8 string within the returned length, + // and remaining garbage bytes are zeroed + let bytes = unsafe { self.as_bytes_mut() }; + let length = canonize(bytes); + bytes[length..].fill(b'\0'); + } +} + impl Canonize for String { fn canonize(&mut self) { - let mut s = self.as_mut(); - s.canonize(); - let len = s.len(); - self.truncate(len); + // SAFETY: canonize leave an UTF8 string within the returned length, + // and remaining garbage bytes are truncated + let bytes = unsafe { self.as_mut_vec() }; + let length = canonize(bytes); + bytes.truncate(length); } } @@ -150,6 +138,9 @@ fn canonizer() { let mut s = String::from("hello/**/**/bye"); s.canonize(); assert_eq!(s, "hello/**/bye"); + let mut s = String::from("hello/**/**"); + s.canonize(); + assert_eq!(s, "hello/**"); // Any $* chunk is replaced by a * chunk let mut s = String::from("hello/$*/bye"); @@ -172,4 +163,9 @@ fn canonizer() { let mut s = String::from("hello/**/*"); s.canonize(); assert_eq!(s, "hello/*/**"); + + // &mut str remaining part is zeroed + let mut s = String::from("$*$*$*/hello/$*$*/bye/$*$*"); + s.as_mut_str().canonize(); + assert_eq!(s, "*/hello/*/bye/*\0\0\0\0\0\0\0\0\0\0\0"); } diff --git a/commons/zenoh-keyexpr/src/key_expr/utils.rs b/commons/zenoh-keyexpr/src/key_expr/utils.rs index 628477174a..63f4b4c088 100644 --- a/commons/zenoh-keyexpr/src/key_expr/utils.rs +++ b/commons/zenoh-keyexpr/src/key_expr/utils.rs @@ -11,25 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use core::ptr; - -pub(crate) struct Writer { - pub ptr: *mut u8, - pub len: usize, -} - -impl Writer { - pub(crate) fn write(&mut self, slice: &[u8]) { - let len = slice.len(); - unsafe { ptr::copy(slice.as_ptr(), self.ptr.add(self.len), len) }; - self.len += len - } - pub(crate) fn write_byte(&mut self, byte: u8) { - unsafe { *self.ptr.add(self.len) = byte }; - self.len += 1 - } -} - #[derive(Debug)] pub struct Splitter<'a, S: ?Sized, D: ?Sized> { s: Option<&'a S>, diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 12b70cad6d..7ec7c533d7 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -26,14 +26,15 @@ version = { workspace = true } [features] compression = [] +tls = ["dep:rustls", "dep:rustls-webpki", "dep:webpki-roots"] [dependencies] async-trait = { workspace = true } base64 = { workspace = true, optional = true } flume = { workspace = true } futures = { workspace = true } -rustls = { workspace = true } -rustls-webpki = { workspace = true } +rustls = { workspace = true, optional = true } +rustls-webpki = { workspace = true, optional = true } serde = { workspace = true, features = ["default"] } tokio = { workspace = true, features = [ "fs", diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 56d99806a2..46c0968f3f 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -21,6 +21,7 @@ extern crate alloc; mod listener; mod multicast; +#[cfg(feature = "tls")] pub mod tls; mod unicast; diff --git a/io/zenoh-links/zenoh-link-quic/Cargo.toml b/io/zenoh-links/zenoh-link-quic/Cargo.toml index 1af2a253b8..ff634d9d15 100644 --- a/io/zenoh-links/zenoh-link-quic/Cargo.toml +++ b/io/zenoh-links/zenoh-link-quic/Cargo.toml @@ -49,7 +49,7 @@ x509-parser = { workspace = true } zenoh-collections = { workspace = true } zenoh-config = { workspace = true } zenoh-core = { workspace = true } -zenoh-link-commons = { workspace = true } +zenoh-link-commons = { workspace = true, features = ["tls"] } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-runtime = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tls/Cargo.toml b/io/zenoh-links/zenoh-link-tls/Cargo.toml index a716c72c99..3bd357d1e4 100644 --- a/io/zenoh-links/zenoh-link-tls/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tls/Cargo.toml @@ -43,7 +43,7 @@ webpki-roots = { workspace = true } zenoh-collections = { workspace = true } zenoh-config = { workspace = true } zenoh-core = { workspace = true } -zenoh-link-commons = { workspace = true } +zenoh-link-commons = { workspace = true, features = ["tls"] } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-runtime = { workspace = true } diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 09a21f2e16..07d993e815 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -143,8 +143,8 @@ impl<'a> PublicationCache<'a> { if conf.session.hlc().is_none() { bail!( "Failed requirement for PublicationCache on {}: \ - the Session is not configured with 'add_timestamp=true'", - key_expr + the 'timestamping' setting must be enabled in the Zenoh configuration", + key_expr, ) } diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 8cf62344f2..f1807333c7 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -18,13 +18,16 @@ use std::{ time::Duration, }; +#[cfg(feature = "unstable")] use zenoh_config::ZenohId; use zenoh_core::{Resolvable, Wait}; use zenoh_keyexpr::OwnedKeyExpr; -use zenoh_protocol::core::{CongestionControl, Parameters, ZenohIdProto}; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::ZenohIdProto; +use zenoh_protocol::core::{CongestionControl, Parameters}; use zenoh_result::ZResult; -#[zenoh_macros::unstable] +#[cfg(feature = "unstable")] use super::{ builders::sample::SampleBuilderTrait, bytes::OptionZBytes, sample::SourceInfo, selector::ZenohParameters, @@ -118,6 +121,7 @@ impl From for ReplyError { #[derive(Clone, Debug)] pub struct Reply { pub(crate) result: Result, + #[cfg(feature = "unstable")] pub(crate) replier_id: Option, } @@ -137,6 +141,7 @@ impl Reply { self.result } + #[zenoh_macros::unstable] /// Gets the id of the zenoh instance that answered this Reply. pub fn replier_id(&self) -> Option { self.replier_id.map(Into::into) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index f5890edc3a..8a5d9e746e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1760,6 +1760,7 @@ impl Session { self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { let state = self.state.clone(); + #[cfg(feature = "unstable")] let zid = self.runtime.zid(); async move { tokio::select! { @@ -1775,6 +1776,7 @@ impl Session { } (query.callback)(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), + #[cfg(feature = "unstable")] replier_id: Some(zid.into()), }); } @@ -1874,6 +1876,7 @@ impl Session { tracing::debug!("Timeout on liveliness query {}! Send error and close.", id); (query.callback)(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), + #[cfg(feature = "unstable")] replier_id: Some(zid.into()), }); } @@ -2238,6 +2241,7 @@ impl Primitives for Session { #[cfg(feature = "unstable")] attachment: None, }), + #[cfg(feature = "unstable")] replier_id: None, }; @@ -2404,8 +2408,9 @@ impl Primitives for Session { encoding: e.encoding.into(), }; let new_reply = Reply { - replier_id: e.ext_sinfo.map(|info| info.id.zid), result: Err(value.into()), + #[cfg(feature = "unstable")] + replier_id: e.ext_sinfo.map(|info| info.id.zid), }; callback(new_reply); } @@ -2487,6 +2492,7 @@ impl Primitives for Session { let sample = info.into_sample(key_expr.into_owned(), payload, attachment); let new_reply = Reply { result: Ok(sample), + #[cfg(feature = "unstable")] replier_id: None, }; let callback = diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index c167c8df15..06d3a4b14f 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -948,7 +948,10 @@ pub(crate) fn declare_token_interest( aggregate: bool, send_declare: &mut SendDeclare, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if mode.current() + && (face.whatami == WhatAmI::Client + || (face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer))) + { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { if aggregate { diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 1b31040d3c..c8881341e0 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -126,11 +126,12 @@ pub(crate) struct DownsamplingInterceptor { impl InterceptorTrait for DownsamplingInterceptor { fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { let ke_id = zlock!(self.ke_id); - if let Some(id) = ke_id.weight_at(&key_expr.clone()) { - Some(Box::new(Some(*id))) - } else { - Some(Box::new(None::)) + if let Some(node) = ke_id.intersecting_keys(key_expr).next() { + if let Some(id) = ke_id.weight_at(&node) { + return Some(Box::new(Some(*id))); + } } + Some(Box::new(None::)) } fn intercept( @@ -188,6 +189,11 @@ impl DownsamplingInterceptor { latest_message_timestamp, }, ); + tracing::debug!( + "New downsampler rule enabled: key_expr={:?}, threshold={:?}", + rule.key_expr, + threshold + ); } Self { ke_id: Arc::new(Mutex::new(ke_id)), diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index fd680ae545..f65c939533 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -20,7 +20,7 @@ use std::{ time::Duration, }; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tokio_util::sync::CancellationToken; use zenoh::{ config::{ModeDependentValue, WhatAmI, WhatAmIMatcher}, prelude::*, @@ -32,9 +32,8 @@ use zenoh_result::bail; const TIMEOUT: Duration = Duration::from_secs(10); const MSG_COUNT: usize = 50; -const MSG_SIZE: [usize; 2] = [1_024, 131_072]; -// Maximal recipes to run at once -const PARALLEL_RECIPES: usize = 4; +#[cfg(feature = "unstable")] +const LIVELINESSGET_DELAY: Duration = Duration::from_millis(10); #[derive(Debug, Clone, PartialEq, Eq)] enum Task { @@ -42,6 +41,14 @@ enum Task { Sub(String, usize), Queryable(String, usize), Get(String, usize), + #[cfg(feature = "unstable")] + Liveliness(String), + #[cfg(feature = "unstable")] + LivelinessGet(String), + #[cfg(feature = "unstable")] + LivelinessLoop(String), + #[cfg(feature = "unstable")] + LivelinessSub(String), Sleep(Duration), Wait, Checkpoint, @@ -99,6 +106,22 @@ impl Task { println!("Pub task done."); } + // The Queryable task keeps replying to requested messages until all checkpoints are finished. + Self::Queryable(ke, payload_size) => { + let queryable = ztimeout!(session.declare_queryable(ke))?; + let payload = vec![0u8; *payload_size]; + + loop { + tokio::select! { + _ = token.cancelled() => break, + query = queryable.recv_async() => { + ztimeout!(query?.reply(ke.to_owned(), payload.clone()))?; + }, + } + } + println!("Queryable task done."); + } + // The Get task gets and checks if the incoming message matches the expected size until it receives enough counts. Self::Get(ke, expected_size) => { let mut counter = 0; @@ -133,20 +156,92 @@ impl Task { println!("Get got sufficient amount of messages. Done."); } - // The Queryable task keeps replying to requested messages until all checkpoints are finished. - Self::Queryable(ke, payload_size) => { - let queryable = ztimeout!(session.declare_queryable(ke))?; - let payload = vec![0u8; *payload_size]; + #[cfg(feature = "unstable")] + // The Liveliness task. + Self::Liveliness(ke) => { + let _liveliness = ztimeout!(session.liveliness().declare_token(ke))?; + + token.cancelled().await; + println!("Liveliness task done."); + } + + #[cfg(feature = "unstable")] + // The LivelinessGet task. + Self::LivelinessGet(ke) => { + let mut counter = 0; + while counter < MSG_COUNT { + tokio::select! { + _ = token.cancelled() => break, + replies = async { session.liveliness().get(ke).timeout(Duration::from_secs(10)).await } => { + let replies = replies?; + while let Ok(reply) = replies.recv_async().await { + if let Err(err) = reply.result() { + tracing::warn!( + "Sample got from {} failed to unwrap! Error: {:?}.", + ke, + err + ); + continue; + } + counter += 1; + } + tokio::time::sleep(LIVELINESSGET_DELAY).await; + } + } + } + println!("LivelinessGet got sufficient amount of messages. Done."); + } + + // The LivelinessLoop task. + #[cfg(feature = "unstable")] + Self::LivelinessLoop(ke) => { + let mut liveliness: Option = None; + loop { + match liveliness.take() { + Some(liveliness) => { + tokio::select! { + _ = token.cancelled() => break, + res = tokio::time::timeout(std::time::Duration::from_secs(1), async {liveliness.undeclare().await}) => { + _ = res?; + } + } + } + None => { + tokio::select! { + _ = token.cancelled() => break, + res = tokio::time::timeout(std::time::Duration::from_secs(1), async {session.liveliness().declare_token(ke) + .await + }) => { + liveliness = res?.ok(); + } + } + } + } + } + println!("LivelinessLoop task done."); + } + + #[cfg(feature = "unstable")] + // The LivelinessSub task. + Self::LivelinessSub(ke) => { + let sub = ztimeout!(session.liveliness().declare_subscriber(ke))?; + let mut counter = 0; loop { tokio::select! { _ = token.cancelled() => break, - query = queryable.recv_async() => { - ztimeout!(query?.reply(ke.to_owned(), payload.clone()))?; - }, + res = sub.recv_async() => { + if res.is_ok() { + counter += 1; + if counter >= MSG_COUNT { + println!("LivelinessSub received sufficient amount of messages. Done."); + break; + } + } + } } } - println!("Queryable task done."); + println!("LivelinessSub task done."); } // Make the zenoh session sleep for a while. @@ -488,12 +583,21 @@ async fn static_failover_brokering() -> Result<()> { Result::Ok(()) } +#[cfg(feature = "unstable")] +use tokio_util::task::TaskTracker; +#[cfg(feature = "unstable")] +const MSG_SIZE: [usize; 2] = [1_024, 131_072]; +// Maximal recipes to run at once +#[cfg(feature = "unstable")] +const PARALLEL_RECIPES: usize = 4; + // All test cases varying in // 1. Message size: 2 (sizes) // 2. Mode: {Client, Peer} x {Client x Peer} x {Router} = 2 x 2 x 1 = 4 (cases) // 3. Spawning order (delay_in_secs for node1, node2, and node3) = 6 (cases) // // Total cases = 2 x 4 x 6 = 48 +#[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 9)] async fn three_node_combination() -> Result<()> { zenoh::try_init_log_from_env(); @@ -524,6 +628,10 @@ async fn three_node_combination() -> Result<()> { let ke_pubsub = format!("three_node_combination_keyexpr_pubsub_{idx}"); let ke_getqueryable = format!("three_node_combination_keyexpr_getqueryable_{idx}"); + let ke_getliveliness = + format!("three_node_combination_keyexpr_getliveliness_{idx}"); + let ke_subliveliness = + format!("three_node_combination_keyexpr_subliveliness_{idx}"); use rand::Rng; let mut rng = rand::thread_rng(); @@ -538,7 +646,7 @@ async fn three_node_combination() -> Result<()> { ..Default::default() }; - let (pub_node, queryable_node) = { + let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = { let base = Node { mode: node1_mode, connect: vec![locator.clone()], @@ -554,7 +662,7 @@ async fn three_node_combination() -> Result<()> { )])]); pub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - let mut queryable_node = base; + let mut queryable_node = base.clone(); queryable_node.name = format!("Queryable {node1_mode}"); queryable_node.con_task = ConcurrentTask::from([SequentialTask::from([Task::Queryable( @@ -563,10 +671,31 @@ async fn three_node_combination() -> Result<()> { )])]); queryable_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - (pub_node, queryable_node) + let mut liveliness_node = base.clone(); + liveliness_node.name = format!("Liveliness {node1_mode}"); + liveliness_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::Liveliness( + ke_getliveliness.clone(), + )])]); + liveliness_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + let mut livelinessloop_node = base; + livelinessloop_node.name = format!("LivelinessLoop {node1_mode}"); + livelinessloop_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop( + ke_subliveliness.clone(), + )])]); + livelinessloop_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + ( + pub_node, + queryable_node, + liveliness_node, + livelinessloop_node, + ) }; - let (sub_node, get_node) = { + let (sub_node, get_node, livelinessget_node, livelinesssub_node) = { let base = Node { mode: node2_mode, connect: vec![locator], @@ -582,7 +711,7 @@ async fn three_node_combination() -> Result<()> { ])]); sub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - let mut get_node = base; + let mut get_node = base.clone(); get_node.name = format!("Get {node2_mode}"); get_node.con_task = ConcurrentTask::from([SequentialTask::from([ Task::Get(ke_getqueryable, msg_size), @@ -590,12 +719,30 @@ async fn three_node_combination() -> Result<()> { ])]); get_node.warmup += Duration::from_millis(rng.gen_range(0..500)); - (sub_node, get_node) + let mut livelinessget_node = base.clone(); + livelinessget_node.name = format!("LivelinessGet {node2_mode}"); + livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessGet(ke_getliveliness), + Task::Checkpoint, + ])]); + livelinessget_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + let mut livelinesssub_node = base; + livelinesssub_node.name = format!("LivelinessSub {node2_mode}"); + livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessSub(ke_subliveliness), + Task::Checkpoint, + ])]); + livelinesssub_node.warmup += Duration::from_millis(rng.gen_range(0..500)); + + (sub_node, get_node, livelinessget_node, livelinesssub_node) }; ( Recipe::new([router_node.clone(), pub_node, sub_node]), - Recipe::new([router_node, queryable_node, get_node]), + Recipe::new([router_node.clone(), queryable_node, get_node]), + Recipe::new([router_node.clone(), liveliness_node, livelinessget_node]), + Recipe::new([router_node, livelinessloop_node, livelinesssub_node]), ) }, ) @@ -603,10 +750,12 @@ async fn three_node_combination() -> Result<()> { for chunks in recipe_list.chunks(4).map(|x| x.to_vec()) { let mut join_set = tokio::task::JoinSet::new(); - for (pubsub, getqueryable) in chunks { + for (pubsub, getqueryable, getliveliness, subliveliness) in chunks { join_set.spawn(async move { pubsub.run().await?; getqueryable.run().await?; + getliveliness.run().await?; + subliveliness.run().await?; Result::Ok(()) }); } @@ -625,6 +774,7 @@ async fn three_node_combination() -> Result<()> { // 2. Mode: {Client, Peer} x {Client, Peer} x {IsFirstListen} = 2 x 2 x 2 = 8 (modes) // // Total cases = 2 x 8 = 16 +#[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn two_node_combination() -> Result<()> { zenoh::try_init_log_from_env(); @@ -649,6 +799,8 @@ async fn two_node_combination() -> Result<()> { idx += 1; let ke_pubsub = format!("two_node_combination_keyexpr_pubsub_{idx}"); let ke_getqueryable = format!("two_node_combination_keyexpr_getqueryable_{idx}"); + let ke_subliveliness = format!("two_node_combination_keyexpr_subliveliness_{idx}"); + let ke_getliveliness = format!("two_node_combination_keyexpr_getliveliness_{idx}"); let (node1_listen_connect, node2_listen_connect) = { let locator = format!("tcp/127.0.0.1:{}", base_port + idx); @@ -662,7 +814,7 @@ async fn two_node_combination() -> Result<()> { } }; - let (pub_node, queryable_node) = { + let (pub_node, queryable_node, liveliness_node, livelinessloop_node) = { let base = Node { mode: node1_mode, listen: node1_listen_connect.0, @@ -677,7 +829,7 @@ async fn two_node_combination() -> Result<()> { msg_size, )])]); - let mut queryable_node = base; + let mut queryable_node = base.clone(); queryable_node.name = format!("Queryable {node1_mode}"); queryable_node.con_task = ConcurrentTask::from([SequentialTask::from([Task::Queryable( @@ -685,10 +837,29 @@ async fn two_node_combination() -> Result<()> { msg_size, )])]); - (pub_node, queryable_node) + let mut liveliness_node = base.clone(); + liveliness_node.name = format!("Liveliness {node1_mode}"); + liveliness_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::Liveliness( + ke_getliveliness.clone(), + )])]); + + let mut livelinessloop_node = base; + livelinessloop_node.name = format!("LivelinessLoop {node1_mode}"); + livelinessloop_node.con_task = + ConcurrentTask::from([SequentialTask::from([Task::LivelinessLoop( + ke_subliveliness.clone(), + )])]); + + ( + pub_node, + queryable_node, + liveliness_node, + livelinessloop_node, + ) }; - let (sub_node, get_node) = { + let (sub_node, get_node, livelinessget_node, livelinesssub_node) = { let base = Node { mode: node2_mode, listen: node2_listen_connect.0, @@ -703,29 +874,47 @@ async fn two_node_combination() -> Result<()> { Task::Checkpoint, ])]); - let mut get_node = base; + let mut get_node = base.clone(); get_node.name = format!("Get {node2_mode}"); get_node.con_task = ConcurrentTask::from([SequentialTask::from([ Task::Get(ke_getqueryable, msg_size), Task::Checkpoint, ])]); - (sub_node, get_node) + let mut livelinessget_node = base.clone(); + livelinessget_node.name = format!("LivelinessGet {node2_mode}"); + livelinessget_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessGet(ke_getliveliness), + Task::Checkpoint, + ])]); + + let mut livelinesssub_node = base; + livelinesssub_node.name = format!("LivelinessSub {node2_mode}"); + livelinesssub_node.con_task = ConcurrentTask::from([SequentialTask::from([ + Task::LivelinessSub(ke_subliveliness), + Task::Checkpoint, + ])]); + + (sub_node, get_node, livelinessget_node, livelinesssub_node) }; ( Recipe::new([pub_node, sub_node]), Recipe::new([queryable_node, get_node]), + Recipe::new([liveliness_node, livelinessget_node]), + Recipe::new([livelinessloop_node, livelinesssub_node]), ) }) .collect(); for chunks in recipe_list.chunks(PARALLEL_RECIPES).map(|x| x.to_vec()) { let task_tracker = TaskTracker::new(); - for (pubsub, getqueryable) in chunks { + for (pubsub, getqueryable, getlivelienss, subliveliness) in chunks { task_tracker.spawn(async move { pubsub.run().await?; getqueryable.run().await?; + getlivelienss.run().await?; + subliveliness.run().await?; Result::Ok(()) }); }