Skip to content

Commit

Permalink
Merge commit 'bfec17fc062b6c940faf6f3dea8a56ad395fadfd'
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Jul 16, 2024
2 parents 4bd4471 + bfec17f commit e829cfd
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 149 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ members = [
exclude = ["ci/nostd-check", "ci/valgrind-check"]

[workspace.package]
rust-version = "1.72.0"
rust-version = "1.75.0"
version = "0.11.0-dev" # Zenoh version
repository = "https://github.com/eclipse-zenoh/zenoh"
homepage = "http://zenoh.io"
Expand Down
172 changes: 84 additions & 88 deletions commons/zenoh-keyexpr/src/key_expr/canon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,114 +12,102 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::string::String;
use core::{slice, str};

use crate::key_expr::{
utils::{Split, Writer},
DELIMITER, DOUBLE_WILD, SINGLE_WILD,
};

pub trait Canonize {
fn canonize(&mut self);
}

const DOLLAR_STAR: &[u8; 2] = b"$*";

impl Canonize for &mut str {
fn canonize(&mut self) {
let mut writer = Writer {
ptr: self.as_mut_ptr(),
len: 0,
};
if let Some(position) = self.find("$*$*") {
writer.len = position;
let mut need_final_write = true;
for between_dollarstar in self.as_bytes()[(position + 4)..].splitter(DOLLAR_STAR) {
need_final_write = between_dollarstar.is_empty();
if !need_final_write {
writer.write(DOLLAR_STAR.as_ref());
writer.write(between_dollarstar);
}
}
if need_final_write {
writer.write(DOLLAR_STAR.as_ref())
// Return the length of the canonized string
fn canonize(bytes: &mut [u8]) -> usize {
let mut index = 0;
let mut written = 0;
let mut double_wild = false;
loop {
match &bytes[index..] {
[b'*', b'*'] => {
bytes[written..written + 2].copy_from_slice(b"**");
written += 2;
return written;
}
*self = unsafe {
str::from_utf8_unchecked_mut(slice::from_raw_parts_mut(writer.ptr, writer.len))
[b'*', b'*', b'/', ..] => {
double_wild = true;
index += 3;
}
}
writer.len = 0;
let mut ke = self.as_bytes().splitter(&b'/');
let mut in_big_wild = false;

for chunk in ke.by_ref() {
if chunk.is_empty() {
break;
}
if in_big_wild {
match chunk {
[SINGLE_WILD] | b"$*" => {
writer.write_byte(b'*');
break;
}
DOUBLE_WILD => continue,
_ => {
writer.write(b"**/");
writer.write(chunk);
in_big_wild = false;
break;
[b'*', r @ ..] | [b'$', b'*', r @ ..] if r.is_empty() || r.starts_with(b"/") => {
let (end, len) = (!r.starts_with(b"/"), r.len());
bytes[written] = b'*';
written += 1;
if end {
if double_wild {
bytes[written..written + 3].copy_from_slice(b"/**");
written += 3;
}
return written;
}
} else if chunk == DOUBLE_WILD {
in_big_wild = true;
continue;
} else {
writer.write(if chunk == b"$*" { b"*" } else { chunk });
break;
bytes[written] = b'/';
written += 1;
index = bytes.len() - len + 1;
}
}
for chunk in ke {
if chunk.is_empty() {
writer.write_byte(b'/');
continue;
// Handle chunks with only repeated "$*"
[b'$', b'*', b'$', b'*', ..] => {
index += 2;
}
if in_big_wild {
match chunk {
[SINGLE_WILD] | b"$*" => {
writer.write(b"/*");
}
DOUBLE_WILD => {}
_ => {
writer.write(b"/**/");
writer.write(chunk);
in_big_wild = false;
_ => {
if double_wild && &bytes[index..] != b"**" {
bytes[written..written + 3].copy_from_slice(b"**/");
written += 3;
double_wild = false;
}
let mut write_start = index;
loop {
match bytes.get(index) {
Some(b'/') => {
index += 1;
bytes.copy_within(write_start..index, written);
written += index - write_start;
break;
}
Some(b'$') if matches!(bytes.get(index + 1..index + 4), Some(b"*$*")) => {
index += 2;
bytes.copy_within(write_start..index, written);
written += index - write_start;
let skip = bytes[index + 4..]
.windows(2)
.take_while(|s| s == b"$*")
.count();
index += (1 + skip) * 2;
write_start = index;
}
Some(_) => index += 1,
None => {
bytes.copy_within(write_start..index, written);
written += index - write_start;
return written;
}
}
}
} else if chunk == DOUBLE_WILD {
in_big_wild = true;
} else {
writer.write_byte(DELIMITER);
writer.write(if chunk == b"$*" { b"*" } else { chunk });
}
}
if in_big_wild {
if writer.len != 0 {
writer.write_byte(DELIMITER);
}
writer.write(DOUBLE_WILD)
}
*self = unsafe {
str::from_utf8_unchecked_mut(slice::from_raw_parts_mut(writer.ptr, writer.len))
}
}
}

impl Canonize for &mut str {
fn canonize(&mut self) {
// SAFETY: canonize leave an UTF8 string within the returned length,
// and remaining garbage bytes are zeroed
let bytes = unsafe { self.as_bytes_mut() };
let length = canonize(bytes);
bytes[length..].fill(b'\0');
}
}

impl Canonize for String {
fn canonize(&mut self) {
let mut s = self.as_mut();
s.canonize();
let len = s.len();
self.truncate(len);
// SAFETY: canonize leave an UTF8 string within the returned length,
// and remaining garbage bytes are truncated
let bytes = unsafe { self.as_mut_vec() };
let length = canonize(bytes);
bytes.truncate(length);
}
}

Expand Down Expand Up @@ -150,6 +138,9 @@ fn canonizer() {
let mut s = String::from("hello/**/**/bye");
s.canonize();
assert_eq!(s, "hello/**/bye");
let mut s = String::from("hello/**/**");
s.canonize();
assert_eq!(s, "hello/**");

// Any $* chunk is replaced by a * chunk
let mut s = String::from("hello/$*/bye");
Expand All @@ -172,4 +163,9 @@ fn canonizer() {
let mut s = String::from("hello/**/*");
s.canonize();
assert_eq!(s, "hello/*/**");

// &mut str remaining part is zeroed
let mut s = String::from("$*$*$*/hello/$*$*/bye/$*$*");
s.as_mut_str().canonize();
assert_eq!(s, "*/hello/*/bye/*\0\0\0\0\0\0\0\0\0\0\0");
}
19 changes: 0 additions & 19 deletions commons/zenoh-keyexpr/src/key_expr/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use core::ptr;

pub(crate) struct Writer {
pub ptr: *mut u8,
pub len: usize,
}

impl Writer {
pub(crate) fn write(&mut self, slice: &[u8]) {
let len = slice.len();
unsafe { ptr::copy(slice.as_ptr(), self.ptr.add(self.len), len) };
self.len += len
}
pub(crate) fn write_byte(&mut self, byte: u8) {
unsafe { *self.ptr.add(self.len) = byte };
self.len += 1
}
}

#[derive(Debug)]
pub struct Splitter<'a, S: ?Sized, D: ?Sized> {
s: Option<&'a S>,
Expand Down
5 changes: 3 additions & 2 deletions io/zenoh-link-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ version = { workspace = true }

[features]
compression = []
tls = ["dep:rustls", "dep:rustls-webpki", "dep:webpki-roots"]

[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true, optional = true }
flume = { workspace = true }
futures = { workspace = true }
rustls = { workspace = true }
rustls-webpki = { workspace = true }
rustls = { workspace = true, optional = true }
rustls-webpki = { workspace = true, optional = true }
serde = { workspace = true, features = ["default"] }
tokio = { workspace = true, features = [
"fs",
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate alloc;

mod listener;
mod multicast;
#[cfg(feature = "tls")]
pub mod tls;
mod unicast;

Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-quic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ x509-parser = { workspace = true }
zenoh-collections = { workspace = true }
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-link-commons = { workspace = true, features = ["tls"] }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-runtime = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-tls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ webpki-roots = { workspace = true }
zenoh-collections = { workspace = true }
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-link-commons = { workspace = true, features = ["tls"] }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-runtime = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ impl<'a> PublicationCache<'a> {
if conf.session.hlc().is_none() {
bail!(
"Failed requirement for PublicationCache on {}: \
the Session is not configured with 'add_timestamp=true'",
key_expr
the 'timestamping' setting must be enabled in the Zenoh configuration",
key_expr,
)
}

Expand Down
9 changes: 7 additions & 2 deletions zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use std::{
time::Duration,
};

#[cfg(feature = "unstable")]
use zenoh_config::ZenohId;
use zenoh_core::{Resolvable, Wait};
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_protocol::core::{CongestionControl, Parameters, ZenohIdProto};
#[cfg(feature = "unstable")]
use zenoh_protocol::core::ZenohIdProto;
use zenoh_protocol::core::{CongestionControl, Parameters};
use zenoh_result::ZResult;

#[zenoh_macros::unstable]
#[cfg(feature = "unstable")]
use super::{
builders::sample::SampleBuilderTrait, bytes::OptionZBytes, sample::SourceInfo,
selector::ZenohParameters,
Expand Down Expand Up @@ -118,6 +121,7 @@ impl From<Value> for ReplyError {
#[derive(Clone, Debug)]
pub struct Reply {
pub(crate) result: Result<Sample, ReplyError>,
#[cfg(feature = "unstable")]
pub(crate) replier_id: Option<ZenohIdProto>,
}

Expand All @@ -137,6 +141,7 @@ impl Reply {
self.result
}

#[zenoh_macros::unstable]
/// Gets the id of the zenoh instance that answered this Reply.
pub fn replier_id(&self) -> Option<ZenohId> {
self.replier_id.map(Into::into)
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,7 @@ impl Session {
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, {
let state = self.state.clone();
#[cfg(feature = "unstable")]
let zid = self.runtime.zid();
async move {
tokio::select! {
Expand All @@ -1775,6 +1776,7 @@ impl Session {
}
(query.callback)(Reply {
result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()),
#[cfg(feature = "unstable")]
replier_id: Some(zid.into()),
});
}
Expand Down Expand Up @@ -1874,6 +1876,7 @@ impl Session {
tracing::debug!("Timeout on liveliness query {}! Send error and close.", id);
(query.callback)(Reply {
result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()),
#[cfg(feature = "unstable")]
replier_id: Some(zid.into()),
});
}
Expand Down Expand Up @@ -2238,6 +2241,7 @@ impl Primitives for Session {
#[cfg(feature = "unstable")]
attachment: None,
}),
#[cfg(feature = "unstable")]
replier_id: None,
};

Expand Down Expand Up @@ -2404,8 +2408,9 @@ impl Primitives for Session {
encoding: e.encoding.into(),
};
let new_reply = Reply {
replier_id: e.ext_sinfo.map(|info| info.id.zid),
result: Err(value.into()),
#[cfg(feature = "unstable")]
replier_id: e.ext_sinfo.map(|info| info.id.zid),
};
callback(new_reply);
}
Expand Down Expand Up @@ -2487,6 +2492,7 @@ impl Primitives for Session {
let sample = info.into_sample(key_expr.into_owned(), payload, attachment);
let new_reply = Reply {
result: Ok(sample),
#[cfg(feature = "unstable")]
replier_id: None,
};
let callback =
Expand Down
5 changes: 4 additions & 1 deletion zenoh/src/net/routing/hat/router/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,10 @@ pub(crate) fn declare_token_interest(
aggregate: bool,
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
if mode.current()
&& (face.whatami == WhatAmI::Client
|| (face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer)))
{
let interest_id = (!mode.future()).then_some(id);
if let Some(res) = res.as_ref() {
if aggregate {
Expand Down
Loading

0 comments on commit e829cfd

Please sign in to comment.