diff --git a/Cargo.lock b/Cargo.lock index d3ea8978b5..9dff82ad80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,9 +165,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.12" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" dependencies = [ "anstyle", "anstyle-parse", @@ -1103,9 +1103,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" dependencies = [ "anstream", "anstyle", @@ -1122,9 +1122,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.3.31" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" +checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" dependencies = [ "serde", ] @@ -1541,9 +1541,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -1854,9 +1854,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ "serde", "value-bag", @@ -2865,9 +2865,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" dependencies = [ "log", "ring 0.17.6", @@ -2923,9 +2923,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" +checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" [[package]] name = "rustls-webpki" @@ -3701,9 +3701,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -3743,7 +3743,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.2", + "rustls 0.22.3", "rustls-pki-types", "tokio", ] @@ -4030,9 +4030,9 @@ dependencies = [ [[package]] name = "value-bag" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" +checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" dependencies = [ "value-bag-serde1", "value-bag-sval2", @@ -4040,9 +4040,9 @@ dependencies = [ [[package]] name = "value-bag-serde1" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0b9f3feef403a50d4d67e9741a6d8fc688bcbb4e4f31bd4aab72cc690284394" +checksum = "cc35703541cbccb5278ef7b589d79439fc808ff0b5867195a3230f9a47421d39" dependencies = [ "erased-serde", "serde", @@ -4051,9 +4051,9 @@ dependencies = [ [[package]] name = "value-bag-sval2" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b24f4146b6f3361e91cbf527d1fb35e9376c3c0cef72ca5ec5af6d640fad7d" +checksum = "285b43c29d0b4c0e65aad24561baee67a1b69dc9be9375d4a85138cbf556f7f8" dependencies = [ "sval", "sval_buffer", @@ -4676,7 +4676,7 @@ dependencies = [ "flume", "futures", "log", - "rustls 0.22.2", + "rustls 0.22.3", "rustls-webpki 0.102.2", "serde", "tokio", @@ -4763,7 +4763,7 @@ dependencies = [ "base64 0.21.4", "futures", "log", - "rustls 0.22.2", + "rustls 0.22.3", "rustls-pemfile 2.0.0", "rustls-pki-types", "rustls-webpki 0.102.2", diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index e9e40b3954..8735ae8daa 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -28,15 +28,14 @@ async fn main() { let session = zenoh::open(config).res().await.unwrap(); println!("Sending Query '{selector}'..."); - let replies = match value { - Some(value) => session.get(&selector).with_value(value), - None => session.get(&selector), - } - .target(target) - .timeout(timeout) - .res() - .await - .unwrap(); + let replies = session + .get(&selector) + .value(value) + .target(target) + .timeout(timeout) + .res() + .await + .unwrap(); while let Ok(reply) = replies.recv_async().await { match reply.sample { Ok(sample) => { diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index 4863387df0..8cd3c4edba 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -37,12 +37,12 @@ async fn main() { println!("Putting Data ('{}': '{}')...", &key_expr, buf); let mut put = publisher.put(buf); if let Some(attachment) = &attachment { - put = put.with_attachment( + put = put.attachment(Some( attachment .split('&') .map(|pair| split_once(pair, '=')) .collect(), - ) + )) } put.res().await.unwrap(); } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 12c0dd6405..43c3f33776 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -419,7 +419,7 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result { @@ -463,13 +463,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result { - session - .put(&key_expr, bytes) - .with_encoding(encoding) - .res() - .await - } + SampleKind::Put => session.put(&key_expr, bytes).encoding(encoding).res().await, SampleKind::Delete => session.delete(&key_expr).res().await, }; match res { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index 32be4a5534..1ce6a1cb16 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -127,8 +127,8 @@ impl AlignQueryable { AlignData::Data(k, (v, ts)) => { query .reply(k, v.payload) - .with_encoding(v.encoding) - .with_timestamp(ts) + .encoding(v.encoding) + .timestamp(ts) .res() .await .unwrap(); diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index fb46b78082..64d5cfa1cd 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -21,6 +21,7 @@ use std::str; use zenoh::key_expr::{KeyExpr, OwnedKeyExpr}; use zenoh::payload::StringOrBase64; use zenoh::prelude::r#async::*; +use zenoh::sample::builder::SampleBuilder; use zenoh::time::Timestamp; use zenoh::Session; @@ -108,9 +109,10 @@ impl Aligner { let Value { payload, encoding, .. } = value; - let sample = Sample::new(key, payload) - .with_encoding(encoding) - .with_timestamp(ts); + let sample = SampleBuilder::put(key, payload) + .encoding(encoding) + .timestamp(ts) + .into(); log::debug!("[ALIGNER] Adding {:?} to storage", sample); self.tx_sample.send_async(sample).await.unwrap_or_else(|e| { log::error!("[ALIGNER] Error adding sample to storage: {}", e) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 108beaabb2..06c5882408 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -21,10 +21,14 @@ use futures::select; use std::collections::{HashMap, HashSet}; use std::str::{self, FromStr}; use std::time::{SystemTime, UNIX_EPOCH}; +use zenoh::buffers::buffer::SplitBuffer; use zenoh::buffers::ZBuf; use zenoh::prelude::r#async::*; -use zenoh::query::ConsolidationMode; -use zenoh::time::{Timestamp, NTP64}; +use zenoh::query::{ConsolidationMode, QueryTarget}; +use zenoh::sample::builder::SampleBuilder; +use zenoh::sample::{Sample, SampleKind}; +use zenoh::time::{new_reception_timestamp, Timestamp, NTP64}; +use zenoh::value::Value; use zenoh::{Result as ZResult, Session}; use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig}; use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData}; @@ -219,14 +223,15 @@ impl StorageService { select!( // on sample for key_expr sample = storage_sub.recv_async() => { - let mut sample = match sample { + let sample = match sample { Ok(sample) => sample, Err(e) => { log::error!("Error in sample: {}", e); continue; } }; - sample.ensure_timestamp(); + let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp()); + let sample = SampleBuilder::from(sample).timestamp(timestamp).into(); self.process_sample(sample).await; }, // on query on key_expr @@ -290,23 +295,25 @@ impl StorageService { ); // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. // get the relevant wild card entry and use that value and timestamp to update the storage - let sample_to_store = match self + let sample_to_store: Sample = if let Some(update) = self .ovderriding_wild_update(&k, sample.timestamp().unwrap()) .await { - Some(overriding_update) => { - let Value { - payload, encoding, .. - } = overriding_update.data.value; - Sample::new(KeyExpr::from(k.clone()), payload) - .with_encoding(encoding) - .with_timestamp(overriding_update.data.timestamp) - .with_kind(overriding_update.kind) + match update.kind { + SampleKind::Put => { + SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload) + .encoding(update.data.value.encoding) + .timestamp(update.data.timestamp) + .into() + } + SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone())) + .timestamp(update.data.timestamp) + .into(), } - None => Sample::new(KeyExpr::from(k.clone()), sample.payload().clone()) - .with_encoding(sample.encoding().clone()) - .with_timestamp(*sample.timestamp().unwrap()) - .with_kind(sample.kind()), + } else { + SampleBuilder::from(sample.clone()) + .keyexpr(k.clone()) + .into() }; let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { @@ -323,7 +330,7 @@ impl StorageService { .put( stripped_key, Value::new(sample_to_store.payload().clone()) - .with_encoding(sample_to_store.encoding().clone()), + .encoding(sample_to_store.encoding().clone()), *sample_to_store.timestamp().unwrap(), ) .await @@ -506,13 +513,13 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - let Value { - payload, encoding, .. - } = entry.value; - let sample = Sample::new(key.clone(), payload) - .with_encoding(encoding) - .with_timestamp(entry.timestamp); - if let Err(e) = q.reply_sample(sample).res().await { + if let Err(e) = q + .reply(key.clone(), entry.value.payload) + .encoding(entry.value.encoding) + .timestamp(entry.timestamp) + .res() + .await + { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -538,13 +545,13 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - let Value { - payload, encoding, .. - } = entry.value; - let sample = Sample::new(q.key_expr().clone(), payload) - .with_encoding(encoding) - .with_timestamp(entry.timestamp); - if let Err(e) = q.reply_sample(sample).res().await { + if let Err(e) = q + .reply(q.key_expr().clone(), entry.value.payload) + .encoding(entry.value.encoding) + .timestamp(entry.timestamp) + .res() + .await + { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -692,7 +699,7 @@ fn construct_update(data: String) -> Update { for slice in result.3 { payload.push_zslice(slice.to_vec().into()); } - let value = Value::new(payload).with_encoding(result.2); + let value = Value::new(payload).encoding(result.2); let data = StoredData { value, timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap() diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index f7f09f495c..d749a94ed9 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -20,8 +20,9 @@ use std::time::Duration; use zenoh::handlers::{locked, DefaultHandler}; use zenoh::prelude::r#async::*; use zenoh::query::{QueryConsolidation, QueryTarget, ReplyKeyExpr}; +use zenoh::sample::builder::SampleBuilder; use zenoh::subscriber::{Reliability, Subscriber}; -use zenoh::time::Timestamp; +use zenoh::time::{new_reception_timestamp, Timestamp}; use zenoh::Result as ZResult; use zenoh::SessionRef; use zenoh_core::{zlock, AsyncResolve, Resolvable, SyncResolve}; @@ -656,7 +657,7 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> { let sub_callback = { let state = state.clone(); let callback = callback.clone(); - move |mut s| { + move |s| { let state = &mut zlock!(state); if state.pending_fetches == 0 { callback(s); @@ -664,8 +665,10 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> { log::trace!("Sample received while fetch in progress: push it to merge_queue"); // ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue // after any timestamped Sample possibly coming from a fetch reply. - s.ensure_timestamp(); - state.merge_queue.push(s); + let timestamp = s.timestamp().cloned().unwrap_or(new_reception_timestamp()); + state + .merge_queue + .push(SampleBuilder::from(s).timestamp(timestamp).into()); } } }; diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index aaa1d13724..d2bfb5bcfe 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -185,7 +185,7 @@ impl<'a> KeyExpr<'a> { /// # Safety /// Key Expressions must follow some rules to be accepted by a Zenoh network. /// Messages addressed with invalid key expressions will be dropped. - pub unsafe fn from_str_uncheckend(s: &'a str) -> Self { + pub unsafe fn from_str_unchecked(s: &'a str) -> Self { keyexpr::from_str_unchecked(s).into() } diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 391987fcde..23e1846741 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -15,6 +15,8 @@ //! Liveliness primitives. //! //! see [`Liveliness`] +use zenoh_protocol::network::request; + use crate::{query::Reply, Id}; #[zenoh_macros::unstable] @@ -756,18 +758,19 @@ where { fn res_sync(self) -> ::To { let (callback, receiver) = self.handler.into_handler(); - self.session .query( &self.key_expr?.into(), &Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)), QueryTarget::DEFAULT, QueryConsolidation::DEFAULT, + request::ext::QoSType::REQUEST.into(), Locality::default(), self.timeout, None, #[cfg(feature = "unstable")] None, + SourceInfo::empty(), callback, ) .map(|_| receiver) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index d460ee3f1c..5ef6b7cdfe 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -20,6 +20,7 @@ use crate::plugins::sealed::{self as plugins}; use crate::prelude::sync::SyncResolve; use crate::queryable::Query; use crate::queryable::QueryInner; +use crate::sample::builder::ValueBuilderTrait; use crate::value::Value; use log::{error, trace}; use serde_json::json; @@ -427,7 +428,7 @@ impl Primitives for AdminSpace { parameters, value: query .ext_body - .map(|b| Value::from(b.payload).with_encoding(b.encoding)), + .map(|b| Value::from(b.payload).encoding(b.encoding)), qid: msg.id, zid, primitives, @@ -580,7 +581,7 @@ fn router_data(context: &AdminContext, query: Query) { }; if let Err(e) = query .reply(reply_key, payload) - .with_encoding(Encoding::APPLICATION_JSON) + .encoding(Encoding::APPLICATION_JSON) .res_sync() { log::error!("Error sending AdminSpace reply: {:?}", e); diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 26c93e1801..e2327c0dcc 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -60,6 +60,11 @@ pub(crate) mod common { #[zenoh_macros::unstable] pub use crate::publication::PublisherDeclarations; pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI}; + + pub use crate::sample::builder::{QoSBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait}; + + #[zenoh_macros::unstable] + pub use crate::sample::builder::SampleBuilderTrait; } /// Prelude to import when using Zenoh's sync API. diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 67e9f1d02c..c176ad32e0 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -17,7 +17,7 @@ use crate::net::primitives::Primitives; use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; -use crate::sample::{DataInfo, QoS, Sample, SampleKind}; +use crate::sample::{DataInfo, QoS, Sample, SampleFields, SampleKind}; use crate::SessionRef; use crate::Undeclarable; #[cfg(feature = "unstable")] @@ -30,8 +30,6 @@ use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, SyncResolve}; use zenoh_protocol::network::push::ext; use zenoh_protocol::network::Mapping; use zenoh_protocol::network::Push; -#[zenoh_macros::unstable] -use zenoh_protocol::zenoh::ext::SourceInfoType; use zenoh_protocol::zenoh::Del; use zenoh_protocol::zenoh::PushBody; use zenoh_protocol::zenoh::Put; @@ -40,26 +38,16 @@ use zenoh_result::ZResult; /// The kind of congestion control. pub use zenoh_protocol::core::CongestionControl; -/// A builder for initializing a [`delete`](crate::Session::delete) operation. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::prelude::r#async::*; -/// use zenoh::publication::CongestionControl; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap(); -/// session -/// .delete("key/expression") -/// .res() -/// .await -/// .unwrap(); -/// # } -/// ``` -pub type DeleteBuilder<'a, 'b> = PutBuilder<'a, 'b>; +#[derive(Debug, Clone)] +pub struct PublicationBuilderPut { + pub(crate) payload: Payload, + pub(crate) encoding: Encoding, +} +#[derive(Debug, Clone)] +pub struct PublicationBuilderDelete; -/// A builder for initializing a [`put`](crate::Session::put) operation. +/// A builder for initializing [`Session::put`](crate::Session::put), [`Session::delete`](crate::Session::delete), +/// [`Publisher::put`](crate::Publisher::put), and [`Publisher::delete`](crate::Publisher::delete) operations. /// /// # Examples /// ``` @@ -67,11 +55,12 @@ pub type DeleteBuilder<'a, 'b> = PutBuilder<'a, 'b>; /// # async fn main() { /// use zenoh::prelude::r#async::*; /// use zenoh::publication::CongestionControl; +/// use zenoh::sample::builder::{ValueBuilderTrait, QoSBuilderTrait}; /// /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// session /// .put("key/expression", "payload") -/// .with_encoding(Encoding::TEXT_PLAIN) +/// .encoding(Encoding::TEXT_PLAIN) /// .congestion_control(CongestionControl::Block) /// .res() /// .await @@ -80,39 +69,52 @@ pub type DeleteBuilder<'a, 'b> = PutBuilder<'a, 'b>; /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug, Clone)] -pub struct PutBuilder<'a, 'b> { - pub(crate) publisher: PublisherBuilder<'a, 'b>, - pub(crate) payload: Payload, - pub(crate) kind: SampleKind, - pub(crate) encoding: Encoding, +pub struct PublicationBuilder { + pub(crate) publisher: P, + pub(crate) kind: T, + pub(crate) timestamp: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: SourceInfo, #[cfg(feature = "unstable")] pub(crate) attachment: Option, } -impl PutBuilder<'_, '_> { - /// Change the `congestion_control` to apply when routing the data. +pub type SessionPutBuilder<'a, 'b> = + PublicationBuilder, PublicationBuilderPut>; + +pub type SessionDeleteBuilder<'a, 'b> = + PublicationBuilder, PublicationBuilderDelete>; + +pub type PublisherPutBuilder<'a> = PublicationBuilder<&'a Publisher<'a>, PublicationBuilderPut>; + +pub type PublisherDeleteBuilder<'a> = + PublicationBuilder<&'a Publisher<'a>, PublicationBuilderDelete>; + +impl QoSBuilderTrait for PublicationBuilder, T> { #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.publisher = self.publisher.congestion_control(congestion_control); - self + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + Self { + publisher: self.publisher.congestion_control(congestion_control), + ..self + } } - - /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { - self.publisher = self.publisher.priority(priority); - self + fn priority(self, priority: Priority) -> Self { + Self { + publisher: self.publisher.priority(priority), + ..self + } } - - /// Change the `express` policy to apply when routing the data. - /// When express is set to `true`, then the message will not be batched. - /// This usually has a positive impact on latency but negative impact on throughput. #[inline] - pub fn express(mut self, is_express: bool) -> Self { - self.publisher = self.publisher.express(is_express); - self + fn express(self, is_express: bool) -> Self { + Self { + publisher: self.publisher.express(is_express), + ..self + } } +} +impl PublicationBuilder, T> { /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -121,66 +123,115 @@ impl PutBuilder<'_, '_> { self.publisher = self.publisher.allowed_destination(destination); self } +} - /// Set the [`Encoding`] of the written data. - #[inline] - pub fn with_encoding(mut self, encoding: IntoEncoding) -> Self +impl

ValueBuilderTrait for PublicationBuilder { + fn encoding>(self, encoding: T) -> Self { + Self { + kind: PublicationBuilderPut { + encoding: encoding.into(), + ..self.kind + }, + ..self + } + } + + fn payload(self, payload: IntoPayload) -> Self where - IntoEncoding: Into, + IntoPayload: Into, { - self.encoding = encoding.into(); - self + Self { + kind: PublicationBuilderPut { + payload: payload.into(), + ..self.kind + }, + ..self + } } + fn value>(self, value: T) -> Self { + let Value { payload, encoding } = value.into(); + Self { + kind: PublicationBuilderPut { payload, encoding }, + ..self + } + } +} - #[zenoh_macros::unstable] - /// Attach user-provided data to the written data. - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); - self +impl SampleBuilderTrait for PublicationBuilder { + #[cfg(feature = "unstable")] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } + #[cfg(feature = "unstable")] + fn attachment>>(self, attachment: TA) -> Self { + Self { + attachment: attachment.into(), + ..self + } } } -impl Resolvable for PutBuilder<'_, '_> { +impl TimestampBuilderTrait for PublicationBuilder { + fn timestamp>>(self, timestamp: TS) -> Self { + Self { + timestamp: timestamp.into(), + ..self + } + } +} + +impl Resolvable for PublicationBuilder { type To = ZResult<()>; } -impl SyncResolve for PutBuilder<'_, '_> { +impl SyncResolve for PublicationBuilder, PublicationBuilderPut> { #[inline] fn res_sync(self) -> ::To { - let PublisherBuilder { - session, - key_expr, - congestion_control, - priority, - is_express, - destination, - } = self.publisher; - - let publisher = Publisher { - session, + let publisher = self.publisher.create_one_shot_publisher()?; + resolve_put( + &publisher, + self.kind.payload, + SampleKind::Put, + self.kind.encoding, + self.timestamp, #[cfg(feature = "unstable")] - eid: 0, // This is a one shot Publisher - key_expr: key_expr?, - congestion_control, - priority, - is_express, - destination, - }; + self.source_info, + #[cfg(feature = "unstable")] + self.attachment, + ) + } +} +impl SyncResolve for PublicationBuilder, PublicationBuilderDelete> { + #[inline] + fn res_sync(self) -> ::To { + let publisher = self.publisher.create_one_shot_publisher()?; resolve_put( &publisher, - self.payload, - self.kind, - self.encoding, + Payload::empty(), + SampleKind::Delete, + Encoding::ZENOH_BYTES, + self.timestamp, #[cfg(feature = "unstable")] - None, + self.source_info, #[cfg(feature = "unstable")] self.attachment, ) } } -impl AsyncResolve for PutBuilder<'_, '_> { +impl AsyncResolve for PublicationBuilder, PublicationBuilderPut> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +impl AsyncResolve for PublicationBuilder, PublicationBuilderDelete> { type Future = Ready; fn res_async(self) -> Self::Future { @@ -298,25 +349,22 @@ impl<'a> Publisher<'a> { /// Change the `congestion_control` to apply when routing the data. #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { + pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) { self.congestion_control = congestion_control; - self } /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { + pub fn set_priority(&mut self, priority: Priority) { self.priority = priority; - self } /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] #[inline] - pub fn allowed_destination(mut self, destination: Locality) -> Self { + pub fn set_allowed_destination(&mut self, destination: Locality) { self.destination = destination; - self } /// Consumes the given `Publisher`, returning a thread-safe reference-counting @@ -355,19 +403,6 @@ impl<'a> Publisher<'a> { std::sync::Arc::new(self) } - fn _write(&self, kind: SampleKind, payload: Payload) -> Publication { - Publication { - publisher: self, - payload, - kind, - encoding: Encoding::ZENOH_BYTES, - #[cfg(feature = "unstable")] - source_info: None, - #[cfg(feature = "unstable")] - attachment: None, - } - } - /// Put data. /// /// # Examples @@ -382,11 +417,22 @@ impl<'a> Publisher<'a> { /// # } /// ``` #[inline] - pub fn put(&self, payload: IntoPayload) -> Publication + pub fn put(&self, payload: IntoPayload) -> PublisherPutBuilder<'_> where IntoPayload: Into, { - self._write(SampleKind::Put, payload.into()) + PublicationBuilder { + publisher: self, + kind: PublicationBuilderPut { + payload: payload.into(), + encoding: Encoding::ZENOH_BYTES, + }, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + } } /// Delete data. @@ -402,8 +448,16 @@ impl<'a> Publisher<'a> { /// publisher.delete().res().await.unwrap(); /// # } /// ``` - pub fn delete(&self) -> Publication { - self._write(SampleKind::Delete, Payload::empty()) + pub fn delete(&self) -> PublisherDeleteBuilder<'_> { + PublicationBuilder { + publisher: self, + kind: PublicationBuilderDelete, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + } } /// Return the [`MatchingStatus`] of the publisher. @@ -632,66 +686,30 @@ impl Drop for Publisher<'_> { } } -/// A [`Resolvable`] returned by [`Publisher::put()`](Publisher::put), -/// [`Publisher::delete()`](Publisher::delete) and [`Publisher::write()`](Publisher::write). -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -pub struct Publication<'a> { - publisher: &'a Publisher<'a>, - payload: Payload, - kind: SampleKind, - encoding: Encoding, - #[cfg(feature = "unstable")] - pub(crate) source_info: Option, - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, -} - -impl<'a> Publication<'a> { - pub fn with_encoding(mut self, encoding: Encoding) -> Self { - self.encoding = encoding; - self - } - - #[zenoh_macros::unstable] - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); - self - } - - /// Send data with the given [`SourceInfo`]. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); - /// publisher.put("Value").with_source_info(SourceInfo { - /// source_id: Some(publisher.id()), - /// source_sn: Some(0), - /// }).res().await.unwrap(); - /// # } - /// ``` - #[zenoh_macros::unstable] - pub fn with_source_info(mut self, source_info: SourceInfo) -> Self { - self.source_info = Some(source_info); - self +impl SyncResolve for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> { + fn res_sync(self) -> ::To { + resolve_put( + self.publisher, + self.kind.payload, + SampleKind::Put, + self.kind.encoding, + self.timestamp, + #[cfg(feature = "unstable")] + self.source_info, + #[cfg(feature = "unstable")] + self.attachment, + ) } } -impl Resolvable for Publication<'_> { - type To = ZResult<()>; -} - -impl SyncResolve for Publication<'_> { +impl SyncResolve for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> { fn res_sync(self) -> ::To { resolve_put( self.publisher, - self.payload, - self.kind, - self.encoding, + Payload::empty(), + SampleKind::Delete, + Encoding::ZENOH_BYTES, + self.timestamp, #[cfg(feature = "unstable")] self.source_info, #[cfg(feature = "unstable")] @@ -700,7 +718,15 @@ impl SyncResolve for Publication<'_> { } } -impl AsyncResolve for Publication<'_> { +impl AsyncResolve for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +impl AsyncResolve for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> { type Future = Ready; fn res_async(self) -> Self::Future { @@ -718,17 +744,25 @@ impl<'a> Sink for Publisher<'a> { #[inline] fn start_send(self: Pin<&mut Self>, item: Sample) -> Result<(), Self::Error> { - Publication { - publisher: &self, - payload: item.payload, - kind: item.kind, - encoding: item.encoding, + let SampleFields { + payload, + kind, + encoding, #[cfg(feature = "unstable")] - source_info: None, + attachment, + .. + } = item.into(); + resolve_put( + &self, + payload, + kind, + encoding, + None, #[cfg(feature = "unstable")] - attachment: item.attachment, - } - .res_sync() + SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment, + ) } #[inline] @@ -750,6 +784,7 @@ impl<'a> Sink for Publisher<'a> { /// # async fn main() { /// use zenoh::prelude::r#async::*; /// use zenoh::publication::CongestionControl; +/// use zenoh::sample::builder::QoSBuilderTrait; /// /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let publisher = session @@ -787,30 +822,32 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { } } -impl<'a, 'b> PublisherBuilder<'a, 'b> { +impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Change the `congestion_control` to apply when routing the data. #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.congestion_control = congestion_control; - self + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + Self { + congestion_control, + ..self + } } /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { - self.priority = priority; - self + fn priority(self, priority: Priority) -> Self { + Self { priority, ..self } } /// Change the `express` policy to apply when routing the data. /// When express is set to `true`, then the message will not be batched. /// This usually has a positive impact on latency but negative impact on throughput. #[inline] - pub fn express(mut self, is_express: bool) -> Self { - self.is_express = is_express; - self + fn express(self, is_express: bool) -> Self { + Self { is_express, ..self } } +} +impl<'a, 'b> PublisherBuilder<'a, 'b> { /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -819,6 +856,20 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { self.destination = destination; self } + + // internal function for perfroming the publication + fn create_one_shot_publisher(self) -> ZResult> { + Ok(Publisher { + session: self.session, + #[cfg(feature = "unstable")] + eid: 0, // This is a one shot Publisher + key_expr: self.key_expr?, + congestion_control: self.congestion_control, + priority: self.priority, + is_express: self.is_express, + destination: self.destination, + }) + } } impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> { @@ -891,7 +942,8 @@ fn resolve_put( payload: Payload, kind: SampleKind, encoding: Encoding, - #[cfg(feature = "unstable")] source_info: Option, + timestamp: Option, + #[cfg(feature = "unstable")] source_info: SourceInfo, #[cfg(feature = "unstable")] attachment: Option, ) -> ZResult<()> { log::trace!("write({:?}, [...])", &publisher.key_expr); @@ -900,8 +952,11 @@ fn resolve_put( .as_ref() .unwrap() .clone(); - let timestamp = publisher.session.runtime.new_timestamp(); - + let timestamp = if timestamp.is_none() { + publisher.session.runtime.new_timestamp() + } else { + timestamp + }; if publisher.destination != Locality::SessionLocal { primitives.send_push(Push { wire_expr: publisher.key_expr.to_wire(&publisher.session).to_owned(), @@ -926,10 +981,7 @@ fn resolve_put( timestamp, encoding: encoding.clone().into(), #[cfg(feature = "unstable")] - ext_sinfo: source_info.map(|s| SourceInfoType { - id: s.source_id.unwrap_or_default(), - sn: s.source_sn.unwrap_or_default() as u32, - }), + ext_sinfo: source_info.into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, #[cfg(feature = "shared-memory")] @@ -951,10 +1003,7 @@ fn resolve_put( PushBody::Del(Del { timestamp, #[cfg(feature = "unstable")] - ext_sinfo: source_info.map(|s| SourceInfoType { - id: s.source_id.unwrap_or_default(), - sn: s.source_sn.unwrap_or_default() as u32, - }), + ext_sinfo: source_info.into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, ext_attachment, diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index 542081eb2e..cb1116130d 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -17,6 +17,7 @@ use crate::handlers::{locked, Callback, DefaultHandler}; use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; +use crate::sample::QoSBuilder; use crate::Session; use std::collections::HashMap; use std::future::Ready; @@ -121,12 +122,69 @@ pub struct GetBuilder<'a, 'b, Handler> { pub(crate) scope: ZResult>>, pub(crate) target: QueryTarget, pub(crate) consolidation: QueryConsolidation, + pub(crate) qos: QoSBuilder, pub(crate) destination: Locality, pub(crate) timeout: Duration, pub(crate) handler: Handler, pub(crate) value: Option, #[cfg(feature = "unstable")] pub(crate) attachment: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: SourceInfo, +} + +impl SampleBuilderTrait for GetBuilder<'_, '_, Handler> { + #[cfg(feature = "unstable")] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } + + #[cfg(feature = "unstable")] + fn attachment>>(self, attachment: T) -> Self { + Self { + attachment: attachment.into(), + ..self + } + } +} + +impl QoSBuilderTrait for GetBuilder<'_, '_, DefaultHandler> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } + } + + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +impl ValueBuilderTrait for GetBuilder<'_, '_, Handler> { + fn encoding>(self, encoding: T) -> Self { + let value = Some(self.value.unwrap_or_default().encoding(encoding)); + Self { value, ..self } + } + + fn payload>(self, payload: T) -> Self { + let value = Some(self.value.unwrap_or_default().payload(payload)); + Self { value, ..self } + } + fn value>(self, value: T) -> Self { + let value: Value = value.into(); + Self { + value: if value.is_empty() { None } else { Some(value) }, + ..self + } + } } impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { @@ -158,11 +216,14 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { scope, target, consolidation, + qos, destination, timeout, value, #[cfg(feature = "unstable")] attachment, + #[cfg(feature = "unstable")] + source_info, handler: _, } = self; GetBuilder { @@ -171,11 +232,14 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { scope, target, consolidation, + qos, destination, timeout, value, #[cfg(feature = "unstable")] attachment, + #[cfg(feature = "unstable")] + source_info, handler: callback, } } @@ -243,11 +307,14 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { scope, target, consolidation, + qos, destination, timeout, value, #[cfg(feature = "unstable")] attachment, + #[cfg(feature = "unstable")] + source_info, handler: _, } = self; GetBuilder { @@ -256,11 +323,14 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { scope, target, consolidation, + qos, destination, timeout, value, #[cfg(feature = "unstable")] attachment, + #[cfg(feature = "unstable")] + source_info, handler, } } @@ -268,48 +338,34 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { /// Change the target of the query. #[inline] - pub fn target(mut self, target: QueryTarget) -> Self { - self.target = target; - self + pub fn target(self, target: QueryTarget) -> Self { + Self { target, ..self } } /// Change the consolidation mode of the query. #[inline] - pub fn consolidation>(mut self, consolidation: QC) -> Self { - self.consolidation = consolidation.into(); - self + pub fn consolidation>(self, consolidation: QC) -> Self { + Self { + consolidation: consolidation.into(), + ..self + } } /// Restrict the matching queryables that will receive the query /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] #[inline] - pub fn allowed_destination(mut self, destination: Locality) -> Self { - self.destination = destination; - self + pub fn allowed_destination(self, destination: Locality) -> Self { + Self { + destination, + ..self + } } /// Set query timeout. #[inline] - pub fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = timeout; - self - } - - /// Set query value. - #[inline] - pub fn with_value(mut self, value: IntoValue) -> Self - where - IntoValue: Into, - { - self.value = Some(value.into()); - self - } - - #[zenoh_macros::unstable] - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); - self + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout, ..self } } /// By default, `get` guarantees that it will only receive replies whose key expressions intersect @@ -319,29 +375,11 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { /// expressions that don't intersect with the query's. #[zenoh_macros::unstable] pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self { - let Self { - session, - selector, - scope, - target, - consolidation, - destination, - timeout, - value, - attachment, - handler, - } = self; Self { - session, - selector: selector.and_then(|s| s.accept_any_keyexpr(accept == ReplyKeyExpr::Any)), - scope, - target, - consolidation, - destination, - timeout, - value, - attachment, - handler, + selector: self + .selector + .and_then(|s| s.accept_any_keyexpr(accept == ReplyKeyExpr::Any)), + ..self } } } @@ -386,11 +424,13 @@ where &self.scope?, self.target, self.consolidation, + self.qos.into(), self.destination, self.timeout, self.value, #[cfg(feature = "unstable")] self.attachment, + self.source_info, callback, ) .map(|_| receiver) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 58589bfe8f..6fbb4e9090 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -18,13 +18,12 @@ use crate::encoding::Encoding; use crate::handlers::{locked, DefaultHandler}; use crate::net::primitives::Primitives; use crate::prelude::*; -use crate::sample::QoS; -use crate::sample::SourceInfo; +use crate::sample::{QoSBuilder, SourceInfo}; use crate::Id; use crate::SessionRef; use crate::Undeclarable; #[cfg(feature = "unstable")] -use crate::{query::ReplyKeyExpr, sample::Attachment}; +use crate::{query::ReplyKeyExpr, sample::builder::SampleBuilder, sample::Attachment}; use std::fmt; use std::future::Ready; use std::ops::Deref; @@ -98,10 +97,23 @@ impl Query { self.inner.value.as_ref() } + /// This Query's payload. + #[inline(always)] + pub fn payload(&self) -> Option<&Payload> { + self.inner.value.as_ref().map(|v| &v.payload) + } + + /// This Query's encoding. + #[inline(always)] + pub fn encoding(&self) -> Option<&Encoding> { + self.inner.value.as_ref().map(|v| &v.encoding) + } + #[zenoh_macros::unstable] pub fn attachment(&self) -> Option<&Attachment> { self.inner.attachment.as_ref() } + /// Sends a reply in the form of [`Sample`] to this Query. /// /// By default, queries only accept replies whose key expression intersects with the query's. @@ -111,31 +123,10 @@ impl Query { #[inline(always)] #[cfg(feature = "unstable")] #[doc(hidden)] - pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_, 'static> { - let Sample { - key_expr, - payload, - kind, - encoding, - timestamp, - qos, - #[cfg(feature = "unstable")] - source_info, - #[cfg(feature = "unstable")] - attachment, - } = sample; - ReplyBuilder { + pub fn reply_sample(&self, sample: Sample) -> ReplySample<'_> { + ReplySample { query: self, - key_expr: Ok(key_expr), - payload, - kind, - encoding, - timestamp, - qos, - #[cfg(feature = "unstable")] - source_info, - #[cfg(feature = "unstable")] - attachment, + sample, } } @@ -149,7 +140,7 @@ impl Query { &self, key_expr: TryIntoKeyExpr, payload: IntoPayload, - ) -> ReplyBuilder<'_, 'b> + ) -> ReplyPutBuilder<'_, 'b> where TryIntoKeyExpr: TryInto>, >>::Error: Into, @@ -158,17 +149,17 @@ impl Query { ReplyBuilder { query: self, key_expr: key_expr.try_into().map_err(Into::into), - payload: payload.into(), - kind: SampleKind::Put, - timestamp: None, - encoding: Encoding::default(), qos: response::ext::QoSType::RESPONSE.into(), - #[cfg(feature = "unstable")] + kind: ReplyBuilderPut { + payload: payload.into(), + encoding: Encoding::default(), + }, + timestamp: None, source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] attachment: None, } } + /// Sends a error reply to this Query. /// #[inline(always)] @@ -188,7 +179,10 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply_del<'b, TryIntoKeyExpr>(&self, key_expr: TryIntoKeyExpr) -> ReplyBuilder<'_, 'b> + pub fn reply_del<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> ReplyDeleteBuilder<'_, 'b> where TryIntoKeyExpr: TryInto>, >>::Error: Into, @@ -196,14 +190,10 @@ impl Query { ReplyBuilder { query: self, key_expr: key_expr.try_into().map_err(Into::into), - payload: Payload::empty(), - kind: SampleKind::Delete, - timestamp: None, - encoding: Encoding::default(), qos: response::ext::QoSType::RESPONSE.into(), - #[cfg(feature = "unstable")] + kind: ReplyBuilderDelete, + timestamp: None, source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] attachment: None, } } @@ -247,123 +237,230 @@ impl fmt::Display for Query { } } -/// A builder returned by [`Query::reply()`](Query::reply) or [`Query::reply()`](Query::reply). +pub struct ReplySample<'a> { + query: &'a Query, + sample: Sample, +} + +impl Resolvable for ReplySample<'_> { + type To = ZResult<()>; +} + +impl SyncResolve for ReplySample<'_> { + fn res_sync(self) -> ::To { + self.query._reply_sample(self.sample) + } +} + +impl AsyncResolve for ReplySample<'_> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +#[derive(Debug)] +pub struct ReplyBuilderPut { + payload: super::Payload, + encoding: super::Encoding, +} +#[derive(Debug)] +pub struct ReplyBuilderDelete; + +/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct ReplyBuilder<'a, 'b> { +pub struct ReplyBuilder<'a, 'b, T> { query: &'a Query, key_expr: ZResult>, - payload: Payload, - kind: SampleKind, - encoding: Encoding, + kind: T, timestamp: Option, - qos: QoS, + qos: QoSBuilder, + #[cfg(feature = "unstable")] source_info: SourceInfo, + #[cfg(feature = "unstable")] attachment: Option, } -/// A builder returned by [`Query::reply_err()`](Query::reply_err). -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] -pub struct ReplyErrBuilder<'a> { - query: &'a Query, - value: Value, +pub type ReplyPutBuilder<'a, 'b> = ReplyBuilder<'a, 'b, ReplyBuilderPut>; + +pub type ReplyDeleteBuilder<'a, 'b> = ReplyBuilder<'a, 'b, ReplyBuilderDelete>; + +impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { + fn timestamp>>(self, timestamp: U) -> Self { + Self { + timestamp: timestamp.into(), + ..self + } + } } -impl<'a, 'b> ReplyBuilder<'a, 'b> { - #[zenoh_macros::unstable] - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); - self +impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { + #[cfg(feature = "unstable")] + fn attachment>>(self, attachment: U) -> Self { + Self { + attachment: attachment.into(), + ..self + } } - #[zenoh_macros::unstable] - pub fn with_source_info(mut self, source_info: SourceInfo) -> Self { - self.source_info = source_info; - self + + #[cfg(feature = "unstable")] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } } - pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self { - self.timestamp = Some(timestamp); - self +} + +impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } } - pub fn with_encoding(mut self, encoding: Encoding) -> Self { - self.encoding = encoding; - self + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +impl ValueBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn encoding>(self, encoding: T) -> Self { + Self { + kind: ReplyBuilderPut { + encoding: encoding.into(), + ..self.kind + }, + ..self + } + } + + fn payload>(self, payload: T) -> Self { + Self { + kind: ReplyBuilderPut { + payload: payload.into(), + ..self.kind + }, + ..self + } + } + fn value>(self, value: T) -> Self { + let Value { payload, encoding } = value.into(); + Self { + kind: ReplyBuilderPut { payload, encoding }, + ..self + } } } -impl<'a, 'b> Resolvable for ReplyBuilder<'a, 'b> { +impl Resolvable for ReplyBuilder<'_, '_, T> { type To = ZResult<()>; } -impl<'a, 'b> SyncResolve for ReplyBuilder<'a, 'b> { +impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> { fn res_sync(self) -> ::To { - let key_expr = self.key_expr?; - if !self.query._accepts_any_replies().unwrap_or(false) - && !self.query.key_expr().intersects(&key_expr) - { - bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", &key_expr, self.query.key_expr()) - } - #[allow(unused_mut)] // will be unused if feature = "unstable" is not enabled - let mut ext_sinfo = None; + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::put(key_expr, self.kind.payload) + .encoding(self.kind.encoding) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + #[cfg(feature = "unstable")] + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> { + fn res_sync(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::delete(key_expr) + .timestamp(self.timestamp) + .qos(self.qos.into()); #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + #[cfg(feature = "unstable")] + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl Query { + fn _reply_sample(&self, sample: Sample) -> ZResult<()> { + if !self._accepts_any_replies().unwrap_or(false) + && !self.key_expr().intersects(&sample.key_expr) { - if self.source_info.source_id.is_some() || self.source_info.source_sn.is_some() { - ext_sinfo = Some(zenoh::put::ext::SourceInfoType { - id: self.source_info.source_id.unwrap_or_default(), - sn: self.source_info.source_sn.unwrap_or_default() as u32, - }) - } + bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.key_expr()) } - self.query.inner.primitives.send_response(Response { - rid: self.query.inner.qid, + #[cfg(not(feature = "unstable"))] + let ext_sinfo = None; + #[cfg(feature = "unstable")] + let ext_sinfo = sample.source_info.into(); + self.inner.primitives.send_response(Response { + rid: self.inner.qid, wire_expr: WireExpr { scope: 0, - suffix: std::borrow::Cow::Owned(key_expr.into()), + suffix: std::borrow::Cow::Owned(sample.key_expr.into()), mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { consolidation: zenoh::Consolidation::DEFAULT, ext_unknown: vec![], - payload: match self.kind { + payload: match sample.kind { SampleKind::Put => ReplyBody::Put(Put { - timestamp: self.timestamp, - encoding: self.encoding.into(), + timestamp: sample.timestamp, + encoding: sample.encoding.into(), ext_sinfo, #[cfg(feature = "shared-memory")] ext_shm: None, #[cfg(feature = "unstable")] - ext_attachment: self.attachment.map(|a| a.into()), + ext_attachment: sample.attachment.map(|a| a.into()), #[cfg(not(feature = "unstable"))] ext_attachment: None, ext_unknown: vec![], - payload: self.payload.into(), + payload: sample.payload.into(), }), SampleKind::Delete => ReplyBody::Del(Del { - timestamp: self.timestamp, + timestamp: sample.timestamp, ext_sinfo, #[cfg(feature = "unstable")] - ext_attachment: self.attachment.map(|a| a.into()), + ext_attachment: sample.attachment.map(|a| a.into()), #[cfg(not(feature = "unstable"))] ext_attachment: None, ext_unknown: vec![], }), }, }), - ext_qos: self.qos.into(), + ext_qos: sample.qos.into(), ext_tstamp: None, ext_respid: Some(response::ext::ResponderIdType { - zid: self.query.inner.zid, - eid: self.query.eid, + zid: self.inner.zid, + eid: self.eid, }), }); Ok(()) } } -impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> { +impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> { type Future = Ready; fn res_async(self) -> Self::Future { @@ -371,6 +468,37 @@ impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> { } } +/// A builder returned by [`Query::reply_err()`](Query::reply_err). +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +#[derive(Debug)] +pub struct ReplyErrBuilder<'a> { + query: &'a Query, + value: Value, +} + +impl ValueBuilderTrait for ReplyErrBuilder<'_> { + fn encoding>(self, encoding: T) -> Self { + Self { + value: self.value.encoding(encoding), + ..self + } + } + + fn payload>(self, payload: T) -> Self { + Self { + value: self.value.payload(payload), + ..self + } + } + + fn value>(self, value: T) -> Self { + Self { + value: value.into(), + ..self + } + } +} + impl<'a> Resolvable for ReplyErrBuilder<'a> { type To = ZResult<()>; } @@ -400,6 +528,7 @@ impl SyncResolve for ReplyErrBuilder<'_> { Ok(()) } } + impl<'a> AsyncResolve for ReplyErrBuilder<'a> { type Future = Ready; diff --git a/zenoh/src/sample/builder.rs b/zenoh/src/sample/builder.rs new file mode 100644 index 0000000000..fca55edd09 --- /dev/null +++ b/zenoh/src/sample/builder.rs @@ -0,0 +1,288 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::marker::PhantomData; + +#[cfg(feature = "unstable")] +use crate::sample::{Attachment, SourceInfo}; +use crate::sample::{QoS, QoSBuilder}; +use crate::Encoding; +use crate::KeyExpr; +use crate::Payload; +use crate::Priority; +use crate::Sample; +use crate::SampleKind; +use crate::Value; +use uhlc::Timestamp; +use zenoh_core::zresult; +use zenoh_protocol::core::CongestionControl; + +pub trait QoSBuilderTrait { + /// Change the `congestion_control` to apply when routing the data. + fn congestion_control(self, congestion_control: CongestionControl) -> Self; + /// Change the priority of the written data. + fn priority(self, priority: Priority) -> Self; + /// Change the `express` policy to apply when routing the data. + /// When express is set to `true`, then the message will not be batched. + /// This usually has a positive impact on latency but negative impact on throughput. + fn express(self, is_express: bool) -> Self; +} + +pub trait TimestampBuilderTrait { + /// Sets of clears timestamp + fn timestamp>>(self, timestamp: T) -> Self; +} + +#[zenoh_macros::unstable] +pub trait SampleBuilderTrait { + /// Attach source information + #[zenoh_macros::unstable] + fn source_info(self, source_info: SourceInfo) -> Self; + /// Attach user-provided data in key-value format + #[zenoh_macros::unstable] + fn attachment>>(self, attachment: T) -> Self; +} + +pub trait ValueBuilderTrait { + /// Set the [`Encoding`] + fn encoding>(self, encoding: T) -> Self; + /// Sets the payload + fn payload>(self, payload: T) -> Self; + /// Sets both payload and encoding at once. + /// This is convenient for passing user type which supports `Into` when both payload and encoding depends on user type + fn value>(self, value: T) -> Self; +} + +#[derive(Clone, Debug)] +pub struct SampleBuilderPut; +#[derive(Clone, Debug)] +pub struct SampleBuilderDelete; +#[derive(Clone, Debug)] +pub struct SampleBuilderAny; + +#[derive(Clone, Debug)] +pub struct SampleBuilder { + sample: Sample, + _t: PhantomData, +} + +impl SampleBuilder { + pub fn put( + key_expr: IntoKeyExpr, + payload: IntoPayload, + ) -> SampleBuilder + where + IntoKeyExpr: Into>, + IntoPayload: Into, + { + Self { + sample: Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }, + _t: PhantomData::, + } + } +} + +impl SampleBuilder { + pub fn delete(key_expr: IntoKeyExpr) -> SampleBuilder + where + IntoKeyExpr: Into>, + { + Self { + sample: Sample { + key_expr: key_expr.into(), + payload: Payload::empty(), + kind: SampleKind::Delete, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }, + _t: PhantomData::, + } + } +} + +impl SampleBuilder { + /// Allows to change keyexpr of [`Sample`] + pub fn keyexpr(self, key_expr: IntoKeyExpr) -> Self + where + IntoKeyExpr: Into>, + { + Self { + sample: Sample { + key_expr: key_expr.into(), + ..self.sample + }, + _t: PhantomData::, + } + } + + // Allows to change qos as a whole of [`Sample`] + pub fn qos(self, qos: QoS) -> Self { + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } + } +} + +impl TimestampBuilderTrait for SampleBuilder { + fn timestamp>>(self, timestamp: U) -> Self { + Self { + sample: Sample { + timestamp: timestamp.into(), + ..self.sample + }, + _t: PhantomData::, + } + } +} + +impl SampleBuilderTrait for SampleBuilder { + #[zenoh_macros::unstable] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + sample: Sample { + source_info, + ..self.sample + }, + _t: PhantomData::, + } + } + + #[zenoh_macros::unstable] + fn attachment>>(self, attachment: U) -> Self { + Self { + sample: Sample { + attachment: attachment.into(), + ..self.sample + }, + _t: PhantomData::, + } + } +} + +impl QoSBuilderTrait for SampleBuilder { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.congestion_control(congestion_control).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } + } + fn priority(self, priority: Priority) -> Self { + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.priority(priority).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } + } + fn express(self, is_express: bool) -> Self { + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.express(is_express).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } + } +} + +impl ValueBuilderTrait for SampleBuilder { + fn encoding>(self, encoding: T) -> Self { + Self { + sample: Sample { + encoding: encoding.into(), + ..self.sample + }, + _t: PhantomData::, + } + } + fn payload>(self, payload: T) -> Self { + Self { + sample: Sample { + payload: payload.into(), + ..self.sample + }, + _t: PhantomData::, + } + } + fn value>(self, value: T) -> Self { + let Value { payload, encoding } = value.into(); + Self { + sample: Sample { + payload, + encoding, + ..self.sample + }, + _t: PhantomData::, + } + } +} + +impl From for SampleBuilder { + fn from(sample: Sample) -> Self { + SampleBuilder { + sample, + _t: PhantomData::, + } + } +} + +impl TryFrom for SampleBuilder { + type Error = zresult::Error; + fn try_from(sample: Sample) -> Result { + if sample.kind != SampleKind::Put { + bail!("Sample is not a put sample") + } + Ok(SampleBuilder { + sample, + _t: PhantomData::, + }) + } +} + +impl TryFrom for SampleBuilder { + type Error = zresult::Error; + fn try_from(sample: Sample) -> Result { + if sample.kind != SampleKind::Delete { + bail!("Sample is not a delete sample") + } + Ok(SampleBuilder { + sample, + _t: PhantomData::, + }) + } +} + +impl From> for Sample { + fn from(sample_builder: SampleBuilder) -> Self { + sample_builder.sample + } +} diff --git a/zenoh/src/sample.rs b/zenoh/src/sample/mod.rs similarity index 74% rename from zenoh/src/sample.rs rename to zenoh/src/sample/mod.rs index 2af8fb7106..6e457578a3 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample/mod.rs @@ -16,16 +16,17 @@ use crate::encoding::Encoding; use crate::payload::Payload; use crate::prelude::{KeyExpr, Value}; -use crate::time::{new_reception_timestamp, Timestamp}; +use crate::sample::builder::{QoSBuilderTrait, ValueBuilderTrait}; +use crate::time::Timestamp; use crate::Priority; #[zenoh_macros::unstable] use serde::Serialize; -use std::{ - convert::{TryFrom, TryInto}, - fmt, -}; +use std::{convert::TryFrom, fmt}; use zenoh_protocol::core::EntityGlobalId; -use zenoh_protocol::{core::CongestionControl, network::push::ext::QoSType}; +use zenoh_protocol::network::declare::ext::QoSType; +use zenoh_protocol::{core::CongestionControl, zenoh}; + +pub mod builder; pub type SourceSn = u64; @@ -57,6 +58,88 @@ pub(crate) struct DataInfo { pub qos: QoS, } +pub(crate) trait DataInfoIntoSample { + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into; +} + +impl DataInfoIntoSample for DataInfo { + // This function is for internal use only. + // Technically it may create invalid sample (e.g. a delete sample with a payload and encoding) + // The test for it is intentionally not added to avoid inserting extra "if" into hot path. + // The correctness of the data should be ensured by the caller. + #[inline] + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into, + { + Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: self.kind, + encoding: self.encoding.unwrap_or_default(), + timestamp: self.timestamp, + qos: self.qos, + #[cfg(feature = "unstable")] + source_info: SourceInfo { + source_id: self.source_id, + source_sn: self.source_sn, + }, + #[cfg(feature = "unstable")] + attachment, + } + } +} + +impl DataInfoIntoSample for Option { + #[inline] + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into, + { + if let Some(data_info) = self { + data_info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + attachment, + ) + } else { + Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment, + } + } + } +} + /// Informations on the source of a zenoh [`Sample`]. #[zenoh_macros::unstable] #[derive(Debug, Clone)] @@ -89,6 +172,23 @@ impl SourceInfo { source_sn: None, } } + pub(crate) fn is_empty(&self) -> bool { + self.source_id.is_none() && self.source_sn.is_none() + } +} + +#[zenoh_macros::unstable] +impl From for Option { + fn from(source_info: SourceInfo) -> Option { + if source_info.is_empty() { + None + } else { + Some(zenoh::put::ext::SourceInfoType { + id: source_info.source_id.unwrap_or_default(), + sn: source_info.source_sn.unwrap_or_default() as u32, + }) + } + } } #[zenoh_macros::unstable] @@ -170,6 +270,17 @@ mod attachment { } } } + #[zenoh_macros::unstable] + impl From for Option { + fn from(value: AttachmentBuilder) -> Self { + if value.inner.is_empty() { + None + } else { + Some(value.into()) + } + } + } + #[zenoh_macros::unstable] #[derive(Clone)] pub struct Attachment { @@ -359,6 +470,41 @@ impl TryFrom for SampleKind { #[zenoh_macros::unstable] pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; +/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields. +pub struct SampleFields { + pub key_expr: KeyExpr<'static>, + pub payload: Payload, + pub kind: SampleKind, + pub encoding: Encoding, + pub timestamp: Option, + pub express: bool, + pub priority: Priority, + pub congestion_control: CongestionControl, + #[cfg(feature = "unstable")] + pub source_info: SourceInfo, + #[cfg(feature = "unstable")] + pub attachment: Option, +} + +impl From for SampleFields { + fn from(sample: Sample) -> Self { + SampleFields { + key_expr: sample.key_expr, + payload: sample.payload, + kind: sample.kind, + encoding: sample.encoding, + timestamp: sample.timestamp, + express: sample.qos.express(), + priority: sample.qos.priority(), + congestion_control: sample.qos.congestion_control(), + #[cfg(feature = "unstable")] + source_info: sample.source_info, + #[cfg(feature = "unstable")] + attachment: sample.attachment, + } + } +} + /// A zenoh sample. #[non_exhaustive] #[derive(Clone, Debug)] @@ -378,79 +524,6 @@ pub struct Sample { } impl Sample { - /// Creates a new Sample. - #[inline] - pub fn new(key_expr: IntoKeyExpr, payload: IntoPayload) -> Self - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - Sample { - key_expr: key_expr.into(), - payload: payload.into(), - encoding: Encoding::default(), - kind: SampleKind::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment: None, - } - } - /// Creates a new Sample. - #[inline] - pub fn try_from( - key_expr: TryIntoKeyExpr, - payload: IntoPayload, - ) -> Result - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - IntoPayload: Into, - { - Ok(Sample { - key_expr: key_expr.try_into().map_err(Into::into)?, - payload: payload.into(), - encoding: Encoding::default(), - kind: SampleKind::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment: None, - }) - } - - /// Creates a new Sample with optional data info. - #[inline] - pub(crate) fn with_info(mut self, mut data_info: Option) -> Self { - if let Some(mut data_info) = data_info.take() { - self.kind = data_info.kind; - if let Some(encoding) = data_info.encoding.take() { - self.encoding = encoding; - } - self.qos = data_info.qos; - self.timestamp = data_info.timestamp; - #[cfg(feature = "unstable")] - { - self.source_info = SourceInfo { - source_id: data_info.source_id, - source_sn: data_info.source_sn, - }; - } - } - self - } - - /// Sets the encoding of this Sample. - #[inline] - pub fn with_encoding(mut self, encoding: Encoding) -> Self { - self.encoding = encoding; - self - } - /// Gets the key expression on which this Sample was published. #[inline] pub fn key_expr(&self) -> &KeyExpr<'static> { @@ -469,15 +542,6 @@ impl Sample { self.kind } - /// Sets the kind of this Sample. - #[inline] - #[doc(hidden)] - #[zenoh_macros::unstable] - pub fn with_kind(mut self, kind: SampleKind) -> Self { - self.kind = kind; - self - } - /// Gets the encoding of this sample #[inline] pub fn encoding(&self) -> &Encoding { @@ -490,15 +554,6 @@ impl Sample { self.timestamp.as_ref() } - /// Sets the timestamp of this Sample. - #[inline] - #[doc(hidden)] - #[zenoh_macros::unstable] - pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self { - self.timestamp = Some(timestamp); - self - } - /// Gets the quality of service settings this Sample was sent with. #[inline] pub fn qos(&self) -> &QoS { @@ -512,57 +567,17 @@ impl Sample { &self.source_info } - /// Sets the source info of this Sample. - #[zenoh_macros::unstable] - #[inline] - pub fn with_source_info(mut self, source_info: SourceInfo) -> Self { - self.source_info = source_info; - self - } - - /// Ensure that an associated Timestamp is present in this Sample. - /// If not, a new one is created with the current system time and 0x00 as id. - /// Get the timestamp of this sample (either existing one or newly created) - #[inline] - #[doc(hidden)] - #[zenoh_macros::unstable] - pub fn ensure_timestamp(&mut self) -> &Timestamp { - if let Some(ref timestamp) = self.timestamp { - timestamp - } else { - let timestamp = new_reception_timestamp(); - self.timestamp = Some(timestamp); - self.timestamp.as_ref().unwrap() - } - } - /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. #[zenoh_macros::unstable] #[inline] pub fn attachment(&self) -> Option<&Attachment> { self.attachment.as_ref() } - - /// Gets the mutable sample attachment: a map of key-value pairs, where each key and value are byte-slices. - #[inline] - #[doc(hidden)] - #[zenoh_macros::unstable] - pub fn attachment_mut(&mut self) -> &mut Option { - &mut self.attachment - } - - #[inline] - #[doc(hidden)] - #[zenoh_macros::unstable] - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); - self - } } impl From for Value { fn from(sample: Sample) -> Self { - Value::new(sample.payload).with_encoding(sample.encoding) + Value::new(sample.payload).encoding(sample.encoding) } } @@ -572,6 +587,47 @@ pub struct QoS { inner: QoSType, } +#[derive(Debug)] +pub struct QoSBuilder(QoS); + +impl From for QoSBuilder { + fn from(qos: QoS) -> Self { + QoSBuilder(qos) + } +} + +impl From for QoSBuilder { + fn from(qos: QoSType) -> Self { + QoSBuilder(QoS { inner: qos }) + } +} + +impl From for QoS { + fn from(builder: QoSBuilder) -> Self { + builder.0 + } +} + +impl QoSBuilderTrait for QoSBuilder { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let mut inner = self.0.inner; + inner.set_congestion_control(congestion_control); + Self(QoS { inner }) + } + + fn priority(self, priority: Priority) -> Self { + let mut inner = self.0.inner; + inner.set_priority(priority.into()); + Self(QoS { inner }) + } + + fn express(self, is_express: bool) -> Self { + let mut inner = self.0.inner; + inner.set_is_express(is_express); + Self(QoS { inner }) + } +} + impl QoS { /// Gets priority of the message. pub fn priority(&self) -> Priority { @@ -596,24 +652,6 @@ impl QoS { pub fn express(&self) -> bool { self.inner.is_express() } - - /// Sets priority value. - pub fn with_priority(mut self, priority: Priority) -> Self { - self.inner.set_priority(priority.into()); - self - } - - /// Sets congestion control value. - pub fn with_congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.inner.set_congestion_control(congestion_control); - self - } - - /// Sets express flag vlaue. - pub fn with_express(mut self, is_express: bool) -> Self { - self.inner.set_is_express(is_express); - self - } } impl From for QoS { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index addb757807..67bec5f488 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -32,6 +32,7 @@ use crate::queryable::*; #[cfg(feature = "unstable")] use crate::sample::Attachment; use crate::sample::DataInfo; +use crate::sample::DataInfoIntoSample; use crate::sample::QoS; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; @@ -40,6 +41,8 @@ use crate::Priority; use crate::Sample; use crate::SampleKind; use crate::Selector; +#[cfg(feature = "unstable")] +use crate::SourceInfo; use crate::Value; use log::{error, trace, warn}; use std::collections::HashMap; @@ -687,11 +690,12 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// use zenoh::prelude::r#async::*; + /// use zenoh::prelude::*; /// /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// session /// .put("key/expression", "payload") - /// .with_encoding(Encoding::TEXT_PLAIN) + /// .encoding(Encoding::TEXT_PLAIN) /// .res() /// .await /// .unwrap(); @@ -702,19 +706,23 @@ impl Session { &'a self, key_expr: TryIntoKeyExpr, payload: IntoPayload, - ) -> PutBuilder<'a, 'b> + ) -> SessionPutBuilder<'a, 'b> where TryIntoKeyExpr: TryInto>, >>::Error: Into, IntoPayload: Into, { - PutBuilder { + PublicationBuilder { publisher: self.declare_publisher(key_expr), - payload: payload.into(), - kind: SampleKind::Put, - encoding: Encoding::default(), + kind: PublicationBuilderPut { + payload: payload.into(), + encoding: Encoding::default(), + }, + timestamp: None, #[cfg(feature = "unstable")] attachment: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), } } @@ -738,18 +746,19 @@ impl Session { pub fn delete<'a, 'b: 'a, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, - ) -> DeleteBuilder<'a, 'b> + ) -> SessionDeleteBuilder<'a, 'b> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PutBuilder { + PublicationBuilder { publisher: self.declare_publisher(key_expr), - payload: Payload::empty(), - kind: SampleKind::Delete, - encoding: Encoding::default(), + kind: PublicationBuilderDelete, + timestamp: None, #[cfg(feature = "unstable")] attachment: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), } } /// Query data from the matching queryables in the system. @@ -787,18 +796,22 @@ impl Session { let conf = self.runtime.config().lock(); Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) }; + let qos: QoS = request::ext::QoSType::REQUEST.into(); GetBuilder { session: self, selector, scope: Ok(None), target: QueryTarget::DEFAULT, consolidation: QueryConsolidation::DEFAULT, + qos: qos.into(), destination: Locality::default(), timeout, value: None, #[cfg(feature = "unstable")] attachment: None, handler: DefaultHandler, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), } } } @@ -1554,21 +1567,21 @@ impl Session { drop(state); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - #[allow(unused_mut)] - let mut sample = Sample::new(key_expr, payload.clone()).with_info(info.clone()); - #[cfg(feature = "unstable")] - { - sample.attachment = attachment.clone(); - } + let sample = info.clone().into_sample( + key_expr, + payload.clone(), + #[cfg(feature = "unstable")] + attachment.clone(), + ); cb(sample); } if let Some((cb, key_expr)) = last { - #[allow(unused_mut)] - let mut sample = Sample::new(key_expr, payload).with_info(info); - #[cfg(feature = "unstable")] - { - sample.attachment = attachment; - } + let sample = info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + attachment.clone(), + ); cb(sample); } } @@ -1580,10 +1593,12 @@ impl Session { scope: &Option>, target: QueryTarget, consolidation: QueryConsolidation, + qos: QoS, destination: Locality, timeout: Duration, value: Option, #[cfg(feature = "unstable")] attachment: Option, + #[cfg(feature = "unstable")] source: SourceInfo, callback: Callback<'static, Reply>, ) -> ZResult<()> { log::trace!("get({}, {:?}, {:?})", selector, target, consolidation); @@ -1663,7 +1678,7 @@ impl Session { primitives.send_request(Request { id: qid, wire_expr: wexpr.clone(), - ext_qos: request::ext::QoSType::REQUEST, + ext_qos: qos.into(), ext_tstamp: None, ext_nodeid: request::ext::NodeIdType::DEFAULT, ext_target: target, @@ -1672,7 +1687,7 @@ impl Session { payload: RequestBody::Query(zenoh_protocol::zenoh::Query { consolidation, parameters: selector.parameters().to_string(), - ext_sinfo: None, + ext_sinfo: source.into(), ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, @@ -2244,14 +2259,12 @@ impl Primitives for Session { attachment: _attachment.map(Into::into), }, }; - - #[allow(unused_mut)] - let mut sample = - Sample::new(key_expr.into_owned(), payload).with_info(Some(info)); - #[cfg(feature = "unstable")] - { - sample.attachment = attachment; - } + let sample = info.into_sample( + key_expr.into_owned(), + payload, + #[cfg(feature = "unstable")] + attachment, + ); let new_reply = Reply { sample: Ok(sample), replier_id: ZenohId::rand(), // TODO diff --git a/zenoh/src/value.rs b/zenoh/src/value.rs index 128f0ff605..92a87cb6c5 100644 --- a/zenoh/src/value.rs +++ b/zenoh/src/value.rs @@ -13,7 +13,7 @@ // //! Value primitives. -use crate::{encoding::Encoding, payload::Payload}; +use crate::{encoding::Encoding, payload::Payload, sample::builder::ValueBuilderTrait}; /// A zenoh [`Value`] contains a `payload` and an [`Encoding`] that indicates how the [`Payload`] should be interpreted. #[non_exhaustive] @@ -36,7 +36,6 @@ impl Value { encoding: Encoding::default(), } } - /// Creates an empty [`Value`]. pub const fn empty() -> Self { Value { @@ -44,15 +43,29 @@ impl Value { encoding: Encoding::default(), } } + /// Checks if the [`Value`] is empty. + /// Value is considered empty if its payload is empty and encoding is default. + pub fn is_empty(&self) -> bool { + self.payload.is_empty() && self.encoding == Encoding::default() + } +} - /// Sets the encoding of this [`Value`]`. - #[inline(always)] - pub fn with_encoding(mut self, encoding: IntoEncoding) -> Self - where - IntoEncoding: Into, - { - self.encoding = encoding.into(); - self +impl ValueBuilderTrait for Value { + fn encoding>(self, encoding: T) -> Self { + Self { + encoding: encoding.into(), + ..self + } + } + fn payload>(self, payload: T) -> Self { + Self { + payload: payload.into(), + ..self + } + } + fn value>(self, value: T) -> Self { + let Value { payload, encoding } = value.into(); + Self { payload, encoding } } } @@ -67,3 +80,18 @@ where } } } + +impl From> for Value +where + T: Into, +{ + fn from(t: Option) -> Self { + t.map_or_else(Value::empty, Into::into) + } +} + +impl Default for Value { + fn default() -> Self { + Value::empty() + } +} diff --git a/zenoh/tests/attachments.rs b/zenoh/tests/attachments.rs index e6a3356559..9fb99b7cc0 100644 --- a/zenoh/tests/attachments.rs +++ b/zenoh/tests/attachments.rs @@ -38,22 +38,22 @@ fn pubsub() { } zenoh .put("test/attachment", "put") - .with_attachment( + .attachment(Some( backer .iter() .map(|b| (b.0.as_slice(), b.1.as_slice())) .collect(), - ) + )) .res() .unwrap(); publisher .put("publisher") - .with_attachment( + .attachment(Some( backer .iter() .map(|b| (b.0.as_slice(), b.1.as_slice())) .collect(), - ) + )) .res() .unwrap(); } @@ -61,7 +61,7 @@ fn pubsub() { #[cfg(feature = "unstable")] #[test] fn queries() { - use zenoh::{prelude::sync::*, sample::Attachment}; + use zenoh::{prelude::sync::*, sample::builder::SampleBuilderTrait, sample::Attachment}; let zenoh = zenoh::open(Config::default()).res().unwrap(); let _sub = zenoh @@ -84,7 +84,7 @@ fn queries() { query.key_expr().clone(), query.value().unwrap().payload.clone(), ) - .with_attachment(attachment) + .attachment(attachment) .res() .unwrap(); }) @@ -100,13 +100,13 @@ fn queries() { } let get = zenoh .get("test/attachment") - .with_value("query") - .with_attachment( + .payload("query") + .attachment(Some( backer .iter() .map(|b| (b.0.as_slice(), b.1.as_slice())) .collect(), - ) + )) .res() .unwrap(); while let Ok(reply) = get.recv() { diff --git a/zenoh/tests/handler.rs b/zenoh/tests/handler.rs index c1e912fc75..ceed15e2c3 100644 --- a/zenoh/tests/handler.rs +++ b/zenoh/tests/handler.rs @@ -57,12 +57,12 @@ fn query_with_ringbuffer() { let _reply1 = zenoh .get("test/ringbuffer_query") - .with_value("query1") + .payload("query1") .res() .unwrap(); let _reply2 = zenoh .get("test/ringbuffer_query") - .with_value("query2") + .payload("query2") .res() .unwrap(); diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index b90f0f568f..56bacd7fdd 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -11,17 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{ - str::FromStr, - sync::{atomic::AtomicUsize, atomic::Ordering, Arc}, - time::Duration, -}; +use std::str::FromStr; +use std::sync::atomic::Ordering; +use std::sync::{atomic::AtomicUsize, Arc}; +use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use zenoh::{ - config::{Config, ModeDependentValue}, - prelude::r#async::*, - Result, -}; +use zenoh::config::{Config, ModeDependentValue}; +use zenoh::prelude::r#async::*; +use zenoh::Result; use zenoh_core::ztimeout; use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher}; use zenoh_result::bail;