Skip to content

Commit

Permalink
Merge branch 'dev/advanced_pubsub' into dev/adv_pubsub_background
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Dec 11, 2024
2 parents 8574b4e + c1cbf39 commit 8bb1796
Show file tree
Hide file tree
Showing 51 changed files with 1,900 additions and 220 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 48 additions & 6 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
connect: {
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -68,6 +71,9 @@
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
listen: {
/// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -128,10 +134,10 @@
/// The time-to-live on multicast scouting packets
ttl: 1,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on UDP multicast.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"])
/// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer", "router" and/or "client".
autoconnect: { router: [], peer: ["router", "peer"] },
autoconnect: { router: [], peer: ["router", "peer"], client: ["router", "peer"] },
/// Whether or not to listen for scout messages on UDP multicast and reply to them.
listen: true,
},
Expand All @@ -146,10 +152,10 @@
/// direct connectivity with each other.
multihop: false,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"])
/// or different values for router, peer and client (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
/// Each value is a list of: "peer", "router" and/or "client".
autoconnect: { router: [], peer: ["router", "peer"] },
autoconnect: { router: [], peer: ["router", "peer"], client: ["router", "peer"] },
},
},

Expand Down Expand Up @@ -184,6 +190,27 @@
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
// publication: [
// {
// /// PUT and DELETE messages on key expressions that are included by these key expressions
// /// will have their QoS options overwritten by the given config.
// key_exprs: ["demo/**", "example/key"],
// /// Configurations that will be applied on the publisher.
// /// Options that are supplied here will overwrite the configuration given in Zenoh API
// config: {
// congestion_control: "block",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down Expand Up @@ -473,7 +500,22 @@
// If set to true, links that require certificates (tls/quic) will automatically disconnect when the time of expiration of the remote certificate chain is reached
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
/// Optional configuration for TCP system buffers sizes for TLS links
///
/// Configure TCP read buffer size (bytes)
// so_rcvbuf: 123456,
/// Configure TCP write buffer size (bytes)
// so_sndbuf: 123456,
},
// // Configure optional TCP link specific parameters
// tcp: {
// /// Optional configuration for TCP system buffers sizes for TCP links
// ///
// /// Configure TCP read buffer size (bytes)
// // so_rcvbuf: 123456,
// /// Configure TCP write buffer size (bytes)
// // so_sndbuf: 123456,
// }
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Zenoh's router is built as `target/release/zenohd`. All the examples are built i
* run the Zenoh router with permission to perform config changes via the admin space, and with a memory storage:

```sh
./target/release/zenohd --adminspace-permissions=rw --cfg='plugins/storage_manager/storages/demo:{key_expr:"demo/example/**",volume:"memory"}'
./target/release/zenohd --rest-http-port=8000 --adminspace-permissions=rw --cfg='plugins/storage_manager/storages/demo:{key_expr:"demo/example/**",volume:"memory"}'
```

* in another shell, get info of the zenoh router via the zenoh admin space (you may use `jq` for pretty json formatting):
Expand Down Expand Up @@ -246,11 +246,11 @@ By default the Zenoh router is delivered or built with 2 plugins. These may be c
> [!WARNING]
> Since `v0.6`, `zenohd` no longer loads every available plugin at startup. Instead, only configured plugins are loaded (after processing `--cfg` and `--plugin` options). Once `zenohd` is running, plugins can be hot-loaded and, if they support it, reconfigured at runtime by editing their configuration through the adminspace.

Note that the REST plugin is added to the configuration by the default value of the `--rest-http-port` CLI argument.

**[REST plugin](https://zenoh.io/docs/manual/plugin-http/)** (exposing a REST API):
This plugin converts GET and PUT REST requests into Zenoh gets and puts respectively.

Note that to activate the REST plugin on `zenohd` the CLI argument should be passed: `--rest-http-port=8000` (or any other port of your choice).

**[Storages plugin](https://zenoh.io/docs/manual/plugin-storage-manager/)** (managing [backends and storages](https://zenoh.io/docs/manual/plugin-storage-manager/#backends-and-volumes))
This plugin allows you to easily define storages. These will store key-value pairs they subscribed to, and send the most recent ones when queried. Check out [DEFAULT_CONFIG.json5](DEFAULT_CONFIG.json5) for info on how to configure them.

Expand Down
38 changes: 36 additions & 2 deletions commons/zenoh-codec/src/transport/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
Expand All @@ -49,7 +51,10 @@ where
if *more {
header |= flag::M;
}
if ext_qos != &ext::QoSType::DEFAULT {
let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
+ ext_first.is_some() as u8
+ ext_drop.is_some() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;
Expand All @@ -59,7 +64,16 @@ where

// Extensions
if ext_qos != &ext::QoSType::DEFAULT {
self.write(&mut *writer, (*ext_qos, false))?;
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(first) = ext_first {
n_exts -= 1;
self.write(&mut *writer, (first, n_exts != 0))?
}
if let Some(drop) = ext_drop {
n_exts -= 1;
self.write(&mut *writer, (drop, n_exts != 0))?
}

Ok(())
Expand Down Expand Up @@ -99,6 +113,8 @@ where

// Extensions
let mut ext_qos = ext::QoSType::DEFAULT;
let mut ext_first = None;
let mut ext_drop = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand All @@ -110,6 +126,16 @@ where
ext_qos = q;
has_ext = ext;
}
ext::First::ID => {
let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?;
ext_first = Some(first);
has_ext = ext;
}
ext::Drop::ID => {
let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?;
ext_drop = Some(drop);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Fragment", ext)?;
}
Expand All @@ -121,6 +147,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
})
}
}
Expand All @@ -139,6 +167,8 @@ where
sn,
payload,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
Expand All @@ -147,6 +177,8 @@ where
more: *more,
sn: *sn,
ext_qos: *ext_qos,
ext_first: *ext_first,
ext_drop: *ext_drop,
};
self.write(&mut *writer, &header)?;

Expand Down Expand Up @@ -185,6 +217,8 @@ where
more: header.more,
sn: header.sn,
ext_qos: header.ext_qos,
ext_first: header.ext_first,
ext_drop: header.ext_drop,
payload,
})
}
Expand Down
30 changes: 28 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -64,7 +65,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -125,6 +127,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -186,6 +192,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -228,6 +235,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
Expand All @@ -248,6 +260,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Expand Down Expand Up @@ -275,6 +288,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
Expand All @@ -287,7 +301,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
Expand Down Expand Up @@ -351,6 +366,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -415,6 +434,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
Expand Down Expand Up @@ -457,6 +477,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
Expand All @@ -478,6 +503,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
Loading

0 comments on commit 8bb1796

Please sign in to comment.