Skip to content

Commit

Permalink
zenoh API: add Zenoh::router_pid() and supports paths starting with '…
Browse files Browse the repository at this point in the history
…/@/router/local'
  • Loading branch information
JEnoch committed Nov 2, 2020
1 parent 7ee8959 commit 47d8602
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 20 deletions.
14 changes: 14 additions & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ use log::debug;

pub mod net;

use net::info::ZN_INFO_ROUTER_PID_KEY;
use net::Session;
pub use net::{ZError, ZErrorKind, ZResult};
use zenoh_router::runtime::Runtime;
Expand Down Expand Up @@ -184,10 +185,23 @@ impl Zenoh {

/// Returns the zenoh-net [Session](net::Session) used by this zenoh session.
/// This is for advanced use cases requiring fine usage of the zenoh-net API.
#[inline(always)]
pub fn session(&self) -> &Session {
&self.session
}

/// Returns the PeerId of the zenoh router this zenoh API is connected to (if any).
/// This calls [Session::info()](net::Session::info) and returns the first router pid from
/// the ZN_INFO_ROUTER_PID_KEY property.
pub async fn router_pid(&self) -> Option<String> {
match self.session().info().await.remove(&ZN_INFO_ROUTER_PID_KEY) {
None => None,
Some(s) if s.is_empty() => None,
Some(s) if !s.contains(',') => Some(s),
Some(s) => Some(s.split(',').next().unwrap().to_string()),
}
}

/// Creates a [`Workspace`] with an optional [`Path`] as `prefix`.
/// All relative [`Path`] or [`Selector`] used with this Workspace will be relative to the
/// specified prefix. Not specifying a prefix is equivalent to specifying "/" as prefix,
Expand Down
54 changes: 34 additions & 20 deletions zenoh/src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub struct Workspace<'a> {
prefix: Option<Path>,
}

const LOCAL_ROUTER_PREFIX: &str = "/@/router/local";

impl Workspace<'_> {
pub(crate) async fn new(zenoh: &Zenoh, prefix: Option<Path>) -> ZResult<Workspace<'_>> {
Ok(Workspace { zenoh, prefix })
Expand All @@ -76,30 +78,42 @@ impl Workspace<'_> {

/// Returns the zenoh-net [Session](net::Session) used by this workspace.
/// This is for advanced use cases requiring fine usage of the zenoh-net API.
#[inline]
pub fn session(&self) -> &Session {
&self.zenoh.session
}

fn path_to_reskey(&self, path: &Path) -> ResKey {
if path.is_relative() {
async fn canonicalize(&self, path: &str) -> ZResult<String> {
let abs_path = if path.starts_with('/') {
path.to_string()
} else {
match &self.prefix {
Some(prefix) => ResKey::from(path.with_prefix(prefix)),
None => ResKey::from(format!("/{}", path)),
Some(prefix) => format!("{}/{}", prefix, path),
None => format!("/{}", path),
}
};
if abs_path.starts_with(LOCAL_ROUTER_PREFIX) {
match self.zenoh.router_pid().await {
Some(pid) => Ok(format!(
"/@/router/{}{}",
pid,
abs_path.strip_prefix(LOCAL_ROUTER_PREFIX).unwrap()
)),
None => zerror!(ZErrorKind::Other {
descr: "Not connected to a router; can't resolve '/@/router/local' path".into()
}),
}
} else {
ResKey::from(path)
Ok(abs_path)
}
}

fn pathexpr_to_reskey(&self, path: &PathExpr) -> ResKey {
if path.is_relative() {
match &self.prefix {
Some(prefix) => ResKey::from(path.with_prefix(prefix)),
None => ResKey::from(format!("/{}", path)),
}
} else {
ResKey::from(path)
}
async fn path_to_reskey(&self, path: &Path) -> ZResult<ResKey> {
self.canonicalize(path.as_str()).await.map(ResKey::from)
}

async fn pathexpr_to_reskey(&self, path: &PathExpr) -> ZResult<ResKey> {
self.canonicalize(path.as_str()).await.map(ResKey::from)
}

/// Put a [`Path`]/[`Value`] into zenoh.
Expand All @@ -125,7 +139,7 @@ impl Workspace<'_> {
let (encoding, payload) = value.encode();
self.session()
.write_ext(
&self.path_to_reskey(path),
&self.path_to_reskey(path).await?,
payload,
encoding,
data_kind::PUT,
Expand Down Expand Up @@ -155,7 +169,7 @@ impl Workspace<'_> {
debug!("delete on {:?}", path);
self.session()
.write_ext(
&self.path_to_reskey(path),
&self.path_to_reskey(path).await?,
RBuf::empty(),
encoding::NONE,
data_kind::DELETE,
Expand Down Expand Up @@ -187,7 +201,7 @@ impl Workspace<'_> {
/// ```
pub async fn get(&self, selector: &Selector) -> ZResult<DataStream> {
debug!("get on {}", selector);
let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let reskey = self.pathexpr_to_reskey(&selector.path_expr).await?;
let decode_value = !selector.properties.contains_key("raw");

self.session()
Expand Down Expand Up @@ -241,7 +255,7 @@ impl Workspace<'_> {
}
let decode_value = !selector.properties.contains_key("raw");

let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let reskey = self.pathexpr_to_reskey(&selector.path_expr).await?;
let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
Expand Down Expand Up @@ -300,7 +314,7 @@ impl Workspace<'_> {
}
let decode_value = !selector.properties.contains_key("raw");

let reskey = self.pathexpr_to_reskey(&selector.path_expr);
let reskey = self.pathexpr_to_reskey(&selector.path_expr).await?;
let sub_info = SubInfo {
reliability: Reliability::Reliable,
mode: SubMode::Push,
Expand Down Expand Up @@ -350,7 +364,7 @@ impl Workspace<'_> {
/// ```
pub async fn register_eval(&self, path_expr: &PathExpr) -> ZResult<GetRequestStream<'_>> {
debug!("eval on {}", path_expr);
let reskey = self.pathexpr_to_reskey(&path_expr);
let reskey = self.pathexpr_to_reskey(&path_expr).await?;

self.session()
.declare_queryable(&reskey, EVAL)
Expand Down

0 comments on commit 47d8602

Please sign in to comment.