Skip to content

Commit

Permalink
Merge branch 'attachment_stable' into selector_rework3
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Jun 9, 2024
2 parents a65686a + 9bf86e8 commit 5d195e8
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 83 deletions.
13 changes: 2 additions & 11 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl TransportMulticastEventHandler for Handler {
&expr,
Some(info),
serde_json::to_vec(&peer).unwrap().into(),
#[cfg(feature = "unstable")]
None,
);
Ok(Arc::new(PeerHandler {
Expand Down Expand Up @@ -216,7 +215,6 @@ impl TransportPeerEventHandler for PeerHandler {
.with_suffix(&format!("/link/{}", s.finish())),
Some(info),
serde_json::to_vec(&link).unwrap().into(),
#[cfg(feature = "unstable")]
None,
);
}
Expand All @@ -236,7 +234,6 @@ impl TransportPeerEventHandler for PeerHandler {
.with_suffix(&format!("/link/{}", s.finish())),
Some(info),
vec![0u8; 0].into(),
#[cfg(feature = "unstable")]
None,
);
}
Expand All @@ -248,14 +245,8 @@ impl TransportPeerEventHandler for PeerHandler {
kind: SampleKind::Delete,
..Default::default()
};
self.session.handle_data(
true,
&self.expr,
Some(info),
vec![0u8; 0].into(),
#[cfg(feature = "unstable")]
None,
);
self.session
.handle_data(true, &self.expr, Some(info), vec![0u8; 0].into(), None);
}

fn as_any(&self) -> &dyn std::any::Any {
Expand Down
10 changes: 1 addition & 9 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ use std::future::{IntoFuture, Ready};
use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::{core::CongestionControl, network::Mapping};

#[cfg(feature = "unstable")]
use crate::api::bytes::OptionZBytes;
#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
use crate::api::{
builders::sample::{
QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
},
bytes::ZBytes,
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
key_expr::KeyExpr,
publisher::{Priority, Publisher},
Expand Down Expand Up @@ -78,7 +76,6 @@ pub struct PublicationBuilder<P, T> {
pub(crate) timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")]
pub(crate) source_info: SourceInfo,
#[cfg(feature = "unstable")]
pub(crate) attachment: Option<ZBytes>,
}

Expand Down Expand Up @@ -157,7 +154,6 @@ impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
..self
}
}
#[cfg(feature = "unstable")]
fn attachment<TA: Into<OptionZBytes>>(self, attachment: TA) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand Down Expand Up @@ -191,7 +187,6 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand All @@ -208,7 +203,6 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDel
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand Down Expand Up @@ -393,7 +387,6 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand All @@ -408,7 +401,6 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand Down
9 changes: 2 additions & 7 deletions zenoh/src/api/builders/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use zenoh_core::zresult;
use zenoh_protocol::core::CongestionControl;

use crate::api::{
bytes::ZBytes,
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
key_expr::KeyExpr,
publisher::Priority,
sample::{QoS, QoSBuilder, Sample, SampleKind},
value::Value,
};
#[cfg(feature = "unstable")]
use crate::{api::bytes::OptionZBytes, sample::SourceInfo};
use crate::sample::SourceInfo;

pub trait QoSBuilderTrait {
/// Change the `congestion_control` to apply when routing the data.
Expand All @@ -49,7 +49,6 @@ pub trait SampleBuilderTrait {
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self;
/// Attach user-provided data in key-value format
#[zenoh_macros::unstable]
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self;
}

Expand Down Expand Up @@ -95,7 +94,6 @@ impl SampleBuilder<SampleBuilderPut> {
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
},
_t: PhantomData::<SampleBuilderPut>,
Expand All @@ -118,7 +116,6 @@ impl SampleBuilder<SampleBuilderDelete> {
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
},
_t: PhantomData::<SampleBuilderDelete>,
Expand Down Expand Up @@ -162,7 +159,6 @@ impl<T> TimestampBuilderTrait for SampleBuilder<T> {
}
}

#[cfg(feature = "unstable")]
impl<T> SampleBuilderTrait for SampleBuilder<T> {
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self {
Expand All @@ -175,7 +171,6 @@ impl<T> SampleBuilderTrait for SampleBuilder<T> {
}
}

#[zenoh_macros::unstable]
fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,6 @@ where
Locality::default(),
self.timeout,
None,
#[cfg(feature = "unstable")]
None,
SourceInfo::empty(),
callback,
Expand Down
13 changes: 6 additions & 7 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ use zenoh_protocol::{
use zenoh_result::ZResult;
#[zenoh_macros::unstable]
use {
super::{
builders::sample::SampleBuilderTrait, bytes::OptionZBytes, query::ReplyKeyExpr,
sample::SourceInfo,
},
super::{query::ReplyKeyExpr, sample::SourceInfo},
zenoh_protocol::core::EntityGlobalId,
};

use super::{
builders::sample::{QoSBuilderTrait, SampleBuilder, TimestampBuilderTrait, ValueBuilderTrait},
bytes::ZBytes,
builders::sample::{
QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait,
ValueBuilderTrait,
},
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
handlers::{locked, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
Expand Down Expand Up @@ -322,7 +322,6 @@ impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
}
}

#[cfg(feature = "unstable")]
impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
let attachment: OptionZBytes = attachment.into();
Expand Down
58 changes: 11 additions & 47 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use super::{
},
queryable::{Query, QueryInner, QueryableBuilder, QueryableState},
sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind},
selector::{Selector, TIME_RANGE_KEY},
selector::Selector,
subscriber::{SubscriberBuilder, SubscriberState},
value::Value,
Id,
Expand All @@ -89,6 +89,7 @@ use super::{
publisher::{MatchingListenerState, MatchingStatus},
query::_REPLY_KEY_EXPR_ANY_SEL_PARAM,
sample::SourceInfo,
selector::TIME_RANGE_KEY,
};
use crate::net::{
primitives::Primitives,
Expand Down Expand Up @@ -1635,21 +1636,13 @@ impl Session {
drop(state);
let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter();
for (cb, key_expr) in drain {
let sample = info.clone().into_sample(
key_expr,
payload.clone(),
#[cfg(feature = "unstable")]
attachment.clone(),
);
let sample = info
.clone()
.into_sample(key_expr, payload.clone(), attachment.clone());
cb(sample);
}
if let Some((cb, key_expr)) = last {
let sample = info.into_sample(
key_expr,
payload,
#[cfg(feature = "unstable")]
attachment.clone(),
);
let sample = info.into_sample(key_expr, payload, attachment.clone());
cb(sample);
}
}
Expand All @@ -1666,7 +1659,7 @@ impl Session {
destination: Locality,
timeout: Duration,
value: Option<Value>,
#[cfg(feature = "unstable")] attachment: Option<ZBytes>,
attachment: Option<ZBytes>,
#[cfg(feature = "unstable")] source: SourceInfo,
callback: Callback<'static, Reply>,
) -> ZResult<()> {
Expand Down Expand Up @@ -1745,14 +1738,7 @@ impl Session {
drop(state);

if destination != Locality::SessionLocal {
#[allow(unused_mut)]
let mut ext_attachment = None;
#[cfg(feature = "unstable")]
{
if let Some(attachment) = attachment.clone() {
ext_attachment = Some(attachment.into());
}
}
let ext_attachment = attachment.clone().map(Into::into);
primitives.send_request(Request {
id: qid,
wire_expr: wexpr.clone(),
Expand Down Expand Up @@ -1794,7 +1780,6 @@ impl Session {
encoding: v.encoding.clone().into(),
payload: v.payload.clone().into(),
}),
#[cfg(feature = "unstable")]
attachment,
);
}
Expand All @@ -1811,7 +1796,7 @@ impl Session {
_target: TargetType,
_consolidation: Consolidation,
body: Option<QueryBodyType>,
#[cfg(feature = "unstable")] attachment: Option<ZBytes>,
attachment: Option<ZBytes>,
) {
let (primitives, key_expr, queryables) = {
let state = zread!(self.state);
Expand Down Expand Up @@ -1874,7 +1859,6 @@ impl Session {
payload: b.payload.clone().into(),
encoding: b.encoding.clone().into(),
}),
#[cfg(feature = "unstable")]
attachment: attachment.clone(),
});
}
Expand Down Expand Up @@ -2084,14 +2068,7 @@ impl Primitives for Session {
.starts_with(crate::api::liveliness::PREFIX_LIVELINESS)
{
drop(state);
self.handle_data(
false,
&m.wire_expr,
None,
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
self.handle_data(false, &m.wire_expr, None, ZBuf::default(), None);
}
}
Err(err) => {
Expand Down Expand Up @@ -2125,7 +2102,6 @@ impl Primitives for Session {
&expr.to_wire(self),
Some(data_info),
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
}
Expand Down Expand Up @@ -2169,7 +2145,6 @@ impl Primitives for Session {
&msg.wire_expr,
Some(info),
m.payload,
#[cfg(feature = "unstable")]
m.ext_attachment.map(Into::into),
)
}
Expand All @@ -2187,7 +2162,6 @@ impl Primitives for Session {
&msg.wire_expr,
Some(info),
ZBuf::empty(),
#[cfg(feature = "unstable")]
m.ext_attachment.map(Into::into),
)
}
Expand All @@ -2205,7 +2179,6 @@ impl Primitives for Session {
msg.ext_target,
m.consolidation,
m.ext_body,
#[cfg(feature = "unstable")]
m.ext_attachment.map(Into::into),
),
}
Expand Down Expand Up @@ -2294,13 +2267,11 @@ impl Primitives for Session {
struct Ret {
payload: ZBuf,
info: DataInfo,
#[cfg(feature = "unstable")]
attachment: Option<ZBytes>,
}
let Ret {
payload,
info,
#[cfg(feature = "unstable")]
attachment,
} = match m.payload {
ReplyBody::Put(Put {
Expand All @@ -2320,7 +2291,6 @@ impl Primitives for Session {
source_id: ext_sinfo.as_ref().map(|i| i.id),
source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64),
},
#[cfg(feature = "unstable")]
attachment: _attachment.map(Into::into),
},
ReplyBody::Del(Del {
Expand All @@ -2338,16 +2308,10 @@ impl Primitives for Session {
source_id: ext_sinfo.as_ref().map(|i| i.id),
source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64),
},
#[cfg(feature = "unstable")]
attachment: _attachment.map(Into::into),
},
};
let sample = info.into_sample(
key_expr.into_owned(),
payload,
#[cfg(feature = "unstable")]
attachment,
);
let sample = info.into_sample(key_expr.into_owned(), payload, attachment);
let new_reply = Reply {
result: Ok(sample),
replier_id: zenoh_protocol::core::ZenohId::rand(), // TODO
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ impl Primitives for AdminSpace {
}),
eid: self.queryable_id,
value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)),
#[cfg(feature = "unstable")]
attachment: query.ext_attachment.map(Into::into),
};

Expand Down

0 comments on commit 5d195e8

Please sign in to comment.