From a312380c235d9db48e5c8a84c5a962d299c18f01 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 15 Nov 2023 20:38:51 +0100 Subject: [PATCH 1/9] Add publication cache and querying subscriber --- Cargo.toml | 1 + Cargo.toml.in | 1 + examples/z_pub_cache.c | 78 ++++++++++++ examples/z_query_sub.c | 88 ++++++++++++++ include/zenoh_commons.h | 175 +++++++++++++++++++++++++++ include/zenoh_macros.h | 85 ++++++------- src/commons.rs | 65 ++++++++++ src/config.rs | 2 +- src/lib.rs | 4 + src/publication_cache.rs | 191 +++++++++++++++++++++++++++++ src/querying_subscriber.rs | 238 +++++++++++++++++++++++++++++++++++++ 11 files changed, 886 insertions(+), 42 deletions(-) create mode 100644 examples/z_pub_cache.c create mode 100644 examples/z_query_sub.c create mode 100644 src/publication_cache.rs create mode 100644 src/querying_subscriber.rs diff --git a/Cargo.toml b/Cargo.toml index 47af03b38..5b31508cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/Cargo.toml.in b/Cargo.toml.in index fd810dbab..0eab1c852 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c new file mode 100644 index 000000000..910e07f81 --- /dev/null +++ b/examples/z_pub_cache.c @@ -0,0 +1,78 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include "zenoh.h" +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif + +int main(int argc, char **argv) { + char *keyexpr = "demo/example/zenoh-c-pub"; + char *value = "Pub from C!"; + + if (argc > 1) keyexpr = argv[1]; + if (argc > 2) value = argv[2]; + + z_owned_config_t config = z_config_default(); + if (argc > 3) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) { + printf("Unable to configure timestamps!\n"); + exit(-1); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = 42; + + printf("Declaring publication cache on '%s'...\n", keyexpr); + ze_owned_publication_cache_t pub_cache = + ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts); + if (!ze_publication_cache_check(&pub_cache)) { + printf("Unable to declare publication cache for key expression!\n"); + exit(-1); + } + + char buf[256]; + for (int idx = 0; 1; ++idx) { + sleep(1); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL); + } + + ze_close_publication_cache(z_move(pub_cache)); + + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_query_sub.c b/examples/z_query_sub.c new file mode 100644 index 000000000..ad3306a61 --- /dev/null +++ b/examples/z_query_sub.c @@ -0,0 +1,88 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +const char *kind_to_str(z_sample_kind_t kind); + +void data_handler(const z_sample_t *sample, void *arg) { + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr), + (int)sample->payload.len, sample->payload.start); + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + char *expr = "demo/example/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + z_owned_closure_sample_t callback = z_closure(data_handler); + printf("Declaring querying subscriber on '%s'...\n", expr); + ze_owned_querying_subscriber_t sub = + ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare querying subscriber.\n"); + exit(-1); + } + + printf("Enter 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } + } + + z_drop(z_move(sub)); + z_close(z_move(s)); + return 0; +} + +const char *kind_to_str(z_sample_kind_t kind) { + switch (kind) { + case Z_SAMPLE_KIND_PUT: + return "PUT"; + case Z_SAMPLE_KIND_DELETE: + return "DELETE"; + default: + return "UNKNOWN"; + } +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index aecd40786..d9acebe95 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -139,6 +139,15 @@ typedef enum z_sample_kind_t { Z_SAMPLE_KIND_PUT = 0, Z_SAMPLE_KIND_DELETE = 1, } z_sample_kind_t; +typedef enum zc_locality_t { + ZC_LOCALITY_ANY = 0, + ZC_LOCALITY_SESSION_LOCAL = 1, + ZC_LOCALITY_REMOTE = 2, +} zc_locality_t; +typedef enum zc_reply_keyexpr_t { + ZC_REPLY_KEYEXPR_ANY = 0, + ZC_REPLY_KEYEXPR_MATCHING_QUERY = 1, +} zc_reply_keyexpr_t; /** * An array of bytes. */ @@ -717,6 +726,68 @@ typedef struct zc_owned_shmbuf_t { typedef struct zc_owned_shm_manager_t { uintptr_t _0; } zc_owned_shm_manager_t; +/** + * An owned zenoh publication_cache. + * + * Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_publication_cache_t { + uintptr_t _0[1]; +} ze_owned_publication_cache_t; +/** + * Options passed to the :c:func:`ze_declare_publication_cache` function. + * + * Members: + * queryable_prefix: the prefix used for queryable + * queryable_origin: the restriction for the matching queries that will be receive by this + * publication cache + * history: the the history size + * resources_limit: the limit number of cached resources + */ +typedef struct ze_publication_cache_options_t { + struct z_keyexpr_t queryable_prefix; + enum zc_locality_t queryable_origin; + uintptr_t history; + uintptr_t resources_limit; +} ze_publication_cache_options_t; +/** + * An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. + * + * Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_querying_subscriber_t { + uintptr_t _0[1]; +} ze_owned_querying_subscriber_t; +/** + * Represents the set of options that can be applied to a querying subscriber, + * upon its declaration via :c:func:`ze_declare_querying_subscriber`. + * + * Members: + * z_reliability_t reliability: The subscription reliability. + */ +typedef struct ze_querying_subscriber_options_t { + enum z_reliability_t reliability; + enum zc_locality_t allowed_origin; + struct z_keyexpr_t query_selector; + enum z_query_target_t query_target; + struct z_query_consolidation_t query_consolidation; + enum zc_reply_keyexpr_t query_accept_replies; + uint64_t query_timeout_ms; +} ze_querying_subscriber_options_t; ZENOHC_API extern const unsigned int Z_ROUTER; ZENOHC_API extern const unsigned int Z_PEER; ZENOHC_API extern const unsigned int Z_CLIENT; @@ -1800,6 +1871,7 @@ ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); * Destroys a liveliness token, notifying subscribers of its destruction. */ ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); +ZENOHC_API enum zc_locality_t zc_locality_default(void); /** * Returns `false` if `payload` is the gravestone value. */ @@ -1872,6 +1944,7 @@ int8_t zc_put_owned(struct z_session_t session, */ ZENOHC_API struct z_owned_reply_channel_t zc_reply_fifo_new(uintptr_t bound); +ZENOHC_API enum zc_reply_keyexpr_t zc_reply_keyexpr_default(void); /** * Creates a new non-blocking fifo channel, returned as a pair of closures. * @@ -1966,3 +2039,105 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf); ZENOHC_API void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf, uintptr_t len); +/** + * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache); +/** + * Declares a publication cache. + * + * Parameters: + * session: the zenoh session. + * keyexpr: the key expression to publish. + * options: additional options for the publication_cache. + * + * Returns: + * A :c:type:`ze_owned_publication_cache_t`. + * + * + * Example: + * Declaring a publication cache `NULL` for the options: + * + * .. code-block:: C + * + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); + * + * is equivalent to initializing and passing the default publication cache options: + * + * .. code-block:: C + * + * ze_publication_cache_options_t opts = ze_publication_cache_options_default(); + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); + */ +ZENOHC_API +struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_session_t session, + struct z_keyexpr_t keyexpr, + const struct ze_publication_cache_options_t *options); +/** + * Declares a querying subscriber for a given key expression. + * + * Parameters: + * session: The zenoh session. + * keyexpr: The key expression to subscribe. + * callback: The callback function that will be called each time a data matching the subscribed expression is received. + * opts: additional options for the querying subscriber. + * + * Returns: + * A :c:type:`ze_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the querying subscriber is still valid, + * you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * Example: + * Declaring a subscriber passing ``NULL`` for the options: + * + * .. code-block:: C + * + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); + * + * is equivalent to initializing and passing the default subscriber options: + * + * .. code-block:: C + * + * z_subscriber_options_t opts = z_subscriber_options_default(); + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); + */ +ZENOHC_API +struct ze_owned_querying_subscriber_t ze_declare_querying_subscriber(struct z_session_t session, + struct z_keyexpr_t keyexpr, + struct z_owned_closure_sample_t *callback, + const struct ze_querying_subscriber_options_t *options); +/** + * Returns ``true`` if `pub_cache` is valid. + */ +ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *pub_cache); +/** + * Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type + */ +ZENOHC_API struct ze_owned_publication_cache_t ze_publication_cache_null(void); +/** + * Constructs the default value for :c:type:`ze_publication_cache_options_t`. + */ +ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void); +/** + * Returns ``true`` if `sub` is valid. + */ +ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub); +/** + * Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type + */ +ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(void); +/** + * Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. + */ +ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void); +/** + * Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *sub); diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 2a2c23e51..f5a81781c 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -16,29 +16,31 @@ )(&x) #define z_drop(x) \ - _Generic((x), z_owned_session_t * : z_close, \ - z_owned_publisher_t * : z_undeclare_publisher, \ - z_owned_keyexpr_t * : z_keyexpr_drop, \ - z_owned_config_t * : z_config_drop, \ - z_owned_scouting_config_t * : z_scouting_config_drop, \ - z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ - z_owned_subscriber_t * : z_undeclare_subscriber, \ - z_owned_queryable_t * : z_undeclare_queryable, \ - z_owned_encoding_t * : z_encoding_drop, \ - z_owned_reply_t * : z_reply_drop, \ - z_owned_hello_t * : z_hello_drop, \ - z_owned_str_t * : z_str_drop, \ - z_owned_closure_sample_t * : z_closure_sample_drop, \ - z_owned_closure_query_t * : z_closure_query_drop, \ - z_owned_closure_reply_t * : z_closure_reply_drop, \ - z_owned_closure_hello_t * : z_closure_hello_drop, \ - z_owned_closure_zid_t * : z_closure_zid_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ - zc_owned_payload_t * : zc_payload_drop, \ - zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop, \ - zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token \ + _Generic((x), z_owned_session_t * : z_close, \ + z_owned_publisher_t * : z_undeclare_publisher, \ + z_owned_keyexpr_t * : z_keyexpr_drop, \ + z_owned_config_t * : z_config_drop, \ + z_owned_scouting_config_t * : z_scouting_config_drop, \ + z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ + z_owned_subscriber_t * : z_undeclare_subscriber, \ + z_owned_queryable_t * : z_undeclare_queryable, \ + z_owned_encoding_t * : z_encoding_drop, \ + z_owned_reply_t * : z_reply_drop, \ + z_owned_hello_t * : z_hello_drop, \ + z_owned_str_t * : z_str_drop, \ + z_owned_closure_sample_t * : z_closure_sample_drop, \ + z_owned_closure_query_t * : z_closure_query_drop, \ + z_owned_closure_reply_t * : z_closure_reply_drop, \ + z_owned_closure_hello_t * : z_closure_hello_drop, \ + z_owned_closure_zid_t * : z_closure_zid_drop, \ + z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ + z_owned_reply_channel_t * : z_reply_channel_drop, \ + zc_owned_payload_t * : zc_payload_drop, \ + zc_owned_shmbuf_t * : zc_shmbuf_drop, \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ + ze_owned_publication_cache_t* : ze_close_publication_cache, \ + ze_owned_querying_subscriber_t* : ze_undeclare_querying_subscriber \ )(x) #define z_null(x) (*x = \ @@ -68,24 +70,25 @@ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check, \ - zc_owned_liveliness_token_t : zc_liveliness_token_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check, \ + ze_owned_querying_subscriber_t: ze_querying_subscriber_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/commons.rs b/src/commons.rs index 908dfac76..3cf70ff19 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -20,6 +20,8 @@ use libc::{c_char, c_ulong}; use zenoh::buffers::ZBuf; use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; +use zenoh::query::ReplyKeyExpr; +use zenoh::sample::Locality; use zenoh::sample::Sample; use zenoh_protocol::core::Timestamp; @@ -554,3 +556,66 @@ pub extern "C" fn z_str_null() -> z_owned_str_t { pub extern "C" fn z_str_loan(s: &z_owned_str_t) -> *const libc::c_char { s._cstr } + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zc_locality_t { + ANY = 0, + SESSION_LOCAL = 1, + REMOTE = 2, +} + +impl From for zc_locality_t { + fn from(k: Locality) -> Self { + match k { + Locality::Any => zc_locality_t::ANY, + Locality::SessionLocal => zc_locality_t::SESSION_LOCAL, + Locality::Remote => zc_locality_t::REMOTE, + } + } +} + +impl From for Locality { + fn from(k: zc_locality_t) -> Self { + match k { + zc_locality_t::ANY => Locality::Any, + zc_locality_t::SESSION_LOCAL => Locality::SessionLocal, + zc_locality_t::REMOTE => Locality::Remote, + } + } +} + +#[no_mangle] +pub extern "C" fn zc_locality_default() -> zc_locality_t { + Locality::default().into() +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zc_reply_keyexpr_t { + ANY = 0, + MATCHING_QUERY = 1, +} + +impl From for zc_reply_keyexpr_t { + fn from(k: ReplyKeyExpr) -> Self { + match k { + ReplyKeyExpr::Any => zc_reply_keyexpr_t::ANY, + ReplyKeyExpr::MatchingQuery => zc_reply_keyexpr_t::MATCHING_QUERY, + } + } +} + +impl From for ReplyKeyExpr { + fn from(k: zc_reply_keyexpr_t) -> Self { + match k { + zc_reply_keyexpr_t::ANY => ReplyKeyExpr::Any, + zc_reply_keyexpr_t::MATCHING_QUERY => ReplyKeyExpr::MatchingQuery, + } + } +} + +#[no_mangle] +pub extern "C" fn zc_reply_keyexpr_default() -> zc_reply_keyexpr_t { + ReplyKeyExpr::default().into() +} diff --git a/src/config.rs b/src/config.rs index 378c966d3..6e259f3b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,7 +55,7 @@ pub static Z_CONFIG_SCOUTING_DELAY_KEY: &c_char = unsafe { &*(b"scouting/delay\0".as_ptr() as *const c_char) }; #[no_mangle] pub static Z_CONFIG_ADD_TIMESTAMP_KEY: &c_char = - unsafe { &*(b"add_timestamp\0".as_ptr() as *const c_char) }; + unsafe { &*(b"timestamping/enabled\0".as_ptr() as *const c_char) }; /// A loaned zenoh configuration. #[repr(C)] diff --git a/src/lib.rs b/src/lib.rs index cc2a7bb3d..2a4f661d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,10 @@ mod closures; pub use closures::*; mod liveliness; pub use liveliness::*; +mod publication_cache; +pub use publication_cache::*; +mod querying_subscriber; +pub use querying_subscriber::*; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/publication_cache.rs b/src/publication_cache.rs new file mode 100644 index 000000000..39f0eb9a9 --- /dev/null +++ b/src/publication_cache.rs @@ -0,0 +1,191 @@ +// +// Copyright (c) 2023 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use std::ops::Deref; + +use zenoh_ext::SessionExt; +use zenoh_util::core::zresult::ErrNo; +use zenoh_util::core::SyncResolve; + +use crate::{ + impl_guarded_transmute, z_keyexpr_t, z_session_t, zc_locality_default, zc_locality_t, + GuardedTransmute, UninitializedKeyExprError, +}; + +/// Options passed to the :c:func:`ze_declare_publication_cache` function. +/// +/// Members: +/// queryable_prefix: the prefix used for queryable +/// queryable_origin: the restriction for the matching queries that will be receive by this +/// publication cache +/// history: the the history size +/// resources_limit: the limit number of cached resources +#[repr(C)] +pub struct ze_publication_cache_options_t { + pub queryable_prefix: z_keyexpr_t, + pub queryable_origin: zc_locality_t, + pub history: usize, + pub resources_limit: usize, +} + +/// Constructs the default value for :c:type:`ze_publication_cache_options_t`. +#[no_mangle] +pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache_options_t { + ze_publication_cache_options_t { + queryable_prefix: z_keyexpr_t::null(), + queryable_origin: zc_locality_default(), + history: 1, + resources_limit: 0, + } +} + +type PublicationCache = Option>>; + +/// An owned zenoh publication_cache. +/// +/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_publication_cache_t([usize; 1]); + +impl_guarded_transmute!(PublicationCache, ze_owned_publication_cache_t); + +impl From for ze_owned_publication_cache_t { + fn from(val: PublicationCache) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_publication_cache_t { + fn as_ref(&self) -> &PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl AsMut for ze_owned_publication_cache_t { + fn as_mut(&mut self) -> &mut PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_publication_cache_t { + pub fn new(pub_cache: zenoh_ext::PublicationCache<'static>) -> Self { + Some(Box::new(pub_cache)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Declares a publication cache. +/// +/// Parameters: +/// session: the zenoh session. +/// keyexpr: the key expression to publish. +/// options: additional options for the publication_cache. +/// +/// Returns: +/// A :c:type:`ze_owned_publication_cache_t`. +/// +/// +/// Example: +/// Declaring a publication cache `NULL` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); +/// +/// is equivalent to initializing and passing the default publication cache options: +/// +/// .. code-block:: C +/// +/// ze_publication_cache_options_t opts = ze_publication_cache_options_default(); +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_declare_publication_cache( + session: z_session_t, + keyexpr: z_keyexpr_t, + options: Option<&ze_publication_cache_options_t>, +) -> ze_owned_publication_cache_t { + match session.upgrade() { + Some(s) => { + let keyexpr = keyexpr.deref().as_ref().map(|s| s.clone().into_owned()); + if let Some(key_expr) = keyexpr { + let mut p = s.declare_publication_cache(key_expr); + if let Some(options) = options { + p = p.history(options.history.into()); + p = p.queryable_allowed_origin(options.queryable_origin.into()); + if options.resources_limit != 0 { + p = p.resources_limit(options.resources_limit) + } + if options.queryable_prefix.deref().is_some() { + let queryable_prefix = options + .queryable_prefix + .deref() + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(queryable_prefix) = queryable_prefix { + p = p.queryable_prefix(queryable_prefix) + } + } + } + match p.res_sync() { + Ok(publication_cache) => ze_owned_publication_cache_t::new(publication_cache), + Err(e) => { + log::error!("{}", e); + ze_owned_publication_cache_t::null() + } + } + } else { + log::error!("{}", UninitializedKeyExprError); + ze_owned_publication_cache_t::null() + } + } + None => ze_owned_publication_cache_t::null(), + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_null() -> ze_owned_publication_cache_t { + ze_owned_publication_cache_t::null() +} + +/// Returns ``true`` if `pub_cache` is valid. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_check(pub_cache: &ze_owned_publication_cache_t) -> bool { + pub_cache.as_ref().is_some() +} + +/// Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_close_publication_cache(pub_cache: &mut ze_owned_publication_cache_t) -> i8 { + if let Some(p) = pub_cache.as_mut().take() { + if let Err(e) = p.close().res_sync() { + log::error!("{}", e); + return e.errno().get(); + } + } + 0 +} diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs new file mode 100644 index 000000000..6f3bb3578 --- /dev/null +++ b/src/querying_subscriber.rs @@ -0,0 +1,238 @@ +// +// Copyright (c) 2017, 2022 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use crate::commons::*; +use crate::z_closure_sample_call; +use crate::z_owned_closure_sample_t; +use crate::z_reliability_t; +use crate::LOG_INVALID_SESSION; +use zenoh::prelude::sync::SyncResolve; +use zenoh::prelude::SessionDeclarations; +use zenoh::prelude::SplitBuffer; +use zenoh_ext::*; +use zenoh_protocol::core::SubInfo; +use zenoh_util::core::zresult::ErrNo; + +use crate::{ + impl_guarded_transmute, z_keyexpr_t, z_query_consolidation_default, z_query_consolidation_t, + z_query_target_default, z_query_target_t, z_session_t, zc_locality_default, zc_locality_t, + zc_reply_keyexpr_default, zc_reply_keyexpr_t, GuardedTransmute, +}; + +/**************************************/ +/* DECLARATION */ +/**************************************/ +type FetchingSubscriber = Option>>; + +/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. +/// +/// Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_querying_subscriber_t([usize; 1]); + +impl_guarded_transmute!(FetchingSubscriber, ze_owned_querying_subscriber_t); + +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_t<'a>(&'a ze_owned_querying_subscriber_t); + +impl From for ze_owned_querying_subscriber_t { + fn from(val: FetchingSubscriber) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_querying_subscriber_t { + fn as_ref(&self) -> &FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl<'a> AsRef for ze_querying_subscriber_t<'a> { + fn as_ref(&self) -> &FetchingSubscriber { + self.0.as_ref() + } +} + +impl AsMut for ze_owned_querying_subscriber_t { + fn as_mut(&mut self) -> &mut FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_querying_subscriber_t { + pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>) -> Self { + Some(Box::new(sub)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_querying_subscriber_null() -> ze_owned_querying_subscriber_t { + ze_owned_querying_subscriber_t::null() +} + +/// Represents the set of options that can be applied to a querying subscriber, +/// upon its declaration via :c:func:`ze_declare_querying_subscriber`. +/// +/// Members: +/// z_reliability_t reliability: The subscription reliability. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_options_t { + reliability: z_reliability_t, + allowed_origin: zc_locality_t, + query_selector: z_keyexpr_t, + query_target: z_query_target_t, + query_consolidation: z_query_consolidation_t, + query_accept_replies: zc_reply_keyexpr_t, + query_timeout_ms: u64, +} + +/// Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscriber_options_t { + ze_querying_subscriber_options_t { + reliability: SubInfo::default().reliability.into(), + allowed_origin: zc_locality_default(), + query_selector: z_keyexpr_t::null(), + query_target: z_query_target_default(), + query_consolidation: z_query_consolidation_default(), + query_accept_replies: zc_reply_keyexpr_default(), + query_timeout_ms: 0, + } +} + +/// Declares a querying subscriber for a given key expression. +/// +/// Parameters: +/// session: The zenoh session. +/// keyexpr: The key expression to subscribe. +/// callback: The callback function that will be called each time a data matching the subscribed expression is received. +/// opts: additional options for the querying subscriber. +/// +/// Returns: +/// A :c:type:`ze_owned_subscriber_t`. +/// +/// To check if the subscription succeeded and if the querying subscriber is still valid, +/// you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// Example: +/// Declaring a subscriber passing ``NULL`` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); +/// +/// is equivalent to initializing and passing the default subscriber options: +/// +/// .. code-block:: C +/// +/// z_subscriber_options_t opts = z_subscriber_options_default(); +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ze_declare_querying_subscriber( + session: z_session_t, + keyexpr: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + options: Option<&ze_querying_subscriber_options_t>, +) -> ze_owned_querying_subscriber_t { + let mut closure = z_owned_closure_sample_t::empty(); + std::mem::swap(callback, &mut closure); + + match session.upgrade() { + Some(s) => { + let mut sub = s.declare_subscriber(keyexpr).querying(); + if let Some(options) = options { + sub = sub + .reliability(options.reliability.into()) + .allowed_origin(options.allowed_origin.into()) + .query_target(options.query_target.into()) + .query_consolidation(options.query_consolidation) + .query_accept_replies(options.query_accept_replies.into()); + if options.query_selector.is_some() { + let query_selector = options + .query_selector + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(query_selector) = query_selector { + sub = sub.query_selector(query_selector) + } + } + if options.query_timeout_ms != 0 { + sub = sub + .query_timeout(std::time::Duration::from_millis(options.query_timeout_ms)); + } + } + match sub + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&closure, &sample) + }) + .res() + { + Ok(sub) => ze_owned_querying_subscriber_t::new(sub), + Err(e) => { + log::debug!("{}", e); + ze_owned_querying_subscriber_t::null() + } + } + } + None => { + log::debug!("{}", LOG_INVALID_SESSION); + ze_owned_querying_subscriber_t::null() + } + } +} + +/// Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 { + if let Some(s) = sub.as_mut().take() { + if let Err(e) = s.close().res_sync() { + log::warn!("{}", e); + return e.errno().get(); + } + } + 0 +} + +/// Returns ``true`` if `sub` is valid. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_check(sub: &ze_owned_querying_subscriber_t) -> bool { + sub.as_ref().is_some() +} From a47016a413e1cff4e3f42b73a7a85e78c289697f Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 15 Nov 2023 20:55:12 +0100 Subject: [PATCH 2/9] Fix test --- tests/z_api_constants.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/z_api_constants.c b/tests/z_api_constants.c index bddb4c488..2c9a5e6fc 100644 --- a/tests/z_api_constants.c +++ b/tests/z_api_constants.c @@ -49,5 +49,5 @@ int main(int argc, char **argv) { printf("Z_CONFIG_SCOUTING_DELAY_KEY: %s\n", Z_CONFIG_SCOUTING_DELAY_KEY); assert(strcmp(Z_CONFIG_SCOUTING_DELAY_KEY, "scouting/delay") == 0); printf("Z_CONFIG_ADD_TIMESTAMP_KEY: %s\n", Z_CONFIG_ADD_TIMESTAMP_KEY); - assert(strcmp(Z_CONFIG_ADD_TIMESTAMP_KEY, "add_timestamp") == 0); -} \ No newline at end of file + assert(strcmp(Z_CONFIG_ADD_TIMESTAMP_KEY, "timestamping/enabled") == 0); +} From 32b6d8986351357b950b2200a434f445b011f247 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Nov 2023 21:26:31 +0100 Subject: [PATCH 3/9] Rename unstable methods with the prefix zcu_ --- include/zenoh_commons.h | 28 +++++++++++++------------- src/commons.rs | 40 +++++++++++++++++++------------------- src/publication_cache.rs | 6 +++--- src/querying_subscriber.rs | 12 ++++++------ 4 files changed, 43 insertions(+), 43 deletions(-) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index d9acebe95..840bd2b0e 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -139,15 +139,15 @@ typedef enum z_sample_kind_t { Z_SAMPLE_KIND_PUT = 0, Z_SAMPLE_KIND_DELETE = 1, } z_sample_kind_t; -typedef enum zc_locality_t { - ZC_LOCALITY_ANY = 0, - ZC_LOCALITY_SESSION_LOCAL = 1, - ZC_LOCALITY_REMOTE = 2, -} zc_locality_t; -typedef enum zc_reply_keyexpr_t { - ZC_REPLY_KEYEXPR_ANY = 0, - ZC_REPLY_KEYEXPR_MATCHING_QUERY = 1, -} zc_reply_keyexpr_t; +typedef enum zcu_locality_t { + ZCU_LOCALITY_ANY = 0, + ZCU_LOCALITY_SESSION_LOCAL = 1, + ZCU_LOCALITY_REMOTE = 2, +} zcu_locality_t; +typedef enum zcu_reply_keyexpr_t { + ZCU_REPLY_KEYEXPR_ANY = 0, + ZCU_REPLY_KEYEXPR_MATCHING_QUERY = 1, +} zcu_reply_keyexpr_t; /** * An array of bytes. */ @@ -753,7 +753,7 @@ typedef struct ze_owned_publication_cache_t { */ typedef struct ze_publication_cache_options_t { struct z_keyexpr_t queryable_prefix; - enum zc_locality_t queryable_origin; + enum zcu_locality_t queryable_origin; uintptr_t history; uintptr_t resources_limit; } ze_publication_cache_options_t; @@ -781,11 +781,11 @@ typedef struct ze_owned_querying_subscriber_t { */ typedef struct ze_querying_subscriber_options_t { enum z_reliability_t reliability; - enum zc_locality_t allowed_origin; + enum zcu_locality_t allowed_origin; struct z_keyexpr_t query_selector; enum z_query_target_t query_target; struct z_query_consolidation_t query_consolidation; - enum zc_reply_keyexpr_t query_accept_replies; + enum zcu_reply_keyexpr_t query_accept_replies; uint64_t query_timeout_ms; } ze_querying_subscriber_options_t; ZENOHC_API extern const unsigned int Z_ROUTER; @@ -1871,7 +1871,6 @@ ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); * Destroys a liveliness token, notifying subscribers of its destruction. */ ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); -ZENOHC_API enum zc_locality_t zc_locality_default(void); /** * Returns `false` if `payload` is the gravestone value. */ @@ -1944,7 +1943,6 @@ int8_t zc_put_owned(struct z_session_t session, */ ZENOHC_API struct z_owned_reply_channel_t zc_reply_fifo_new(uintptr_t bound); -ZENOHC_API enum zc_reply_keyexpr_t zc_reply_keyexpr_default(void); /** * Creates a new non-blocking fifo channel, returned as a pair of closures. * @@ -2039,6 +2037,8 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf); ZENOHC_API void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf, uintptr_t len); +ZENOHC_API enum zcu_locality_t zcu_locality_default(void); +ZENOHC_API enum zcu_reply_keyexpr_t zcu_reply_keyexpr_default(void); /** * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. */ diff --git a/src/commons.rs b/src/commons.rs index 3cf70ff19..c5fbf3950 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -559,63 +559,63 @@ pub extern "C" fn z_str_loan(s: &z_owned_str_t) -> *const libc::c_char { #[repr(C)] #[derive(Clone, Copy, Debug)] -pub enum zc_locality_t { +pub enum zcu_locality_t { ANY = 0, SESSION_LOCAL = 1, REMOTE = 2, } -impl From for zc_locality_t { +impl From for zcu_locality_t { fn from(k: Locality) -> Self { match k { - Locality::Any => zc_locality_t::ANY, - Locality::SessionLocal => zc_locality_t::SESSION_LOCAL, - Locality::Remote => zc_locality_t::REMOTE, + Locality::Any => zcu_locality_t::ANY, + Locality::SessionLocal => zcu_locality_t::SESSION_LOCAL, + Locality::Remote => zcu_locality_t::REMOTE, } } } -impl From for Locality { - fn from(k: zc_locality_t) -> Self { +impl From for Locality { + fn from(k: zcu_locality_t) -> Self { match k { - zc_locality_t::ANY => Locality::Any, - zc_locality_t::SESSION_LOCAL => Locality::SessionLocal, - zc_locality_t::REMOTE => Locality::Remote, + zcu_locality_t::ANY => Locality::Any, + zcu_locality_t::SESSION_LOCAL => Locality::SessionLocal, + zcu_locality_t::REMOTE => Locality::Remote, } } } #[no_mangle] -pub extern "C" fn zc_locality_default() -> zc_locality_t { +pub extern "C" fn zcu_locality_default() -> zcu_locality_t { Locality::default().into() } #[repr(C)] #[derive(Clone, Copy, Debug)] -pub enum zc_reply_keyexpr_t { +pub enum zcu_reply_keyexpr_t { ANY = 0, MATCHING_QUERY = 1, } -impl From for zc_reply_keyexpr_t { +impl From for zcu_reply_keyexpr_t { fn from(k: ReplyKeyExpr) -> Self { match k { - ReplyKeyExpr::Any => zc_reply_keyexpr_t::ANY, - ReplyKeyExpr::MatchingQuery => zc_reply_keyexpr_t::MATCHING_QUERY, + ReplyKeyExpr::Any => zcu_reply_keyexpr_t::ANY, + ReplyKeyExpr::MatchingQuery => zcu_reply_keyexpr_t::MATCHING_QUERY, } } } -impl From for ReplyKeyExpr { - fn from(k: zc_reply_keyexpr_t) -> Self { +impl From for ReplyKeyExpr { + fn from(k: zcu_reply_keyexpr_t) -> Self { match k { - zc_reply_keyexpr_t::ANY => ReplyKeyExpr::Any, - zc_reply_keyexpr_t::MATCHING_QUERY => ReplyKeyExpr::MatchingQuery, + zcu_reply_keyexpr_t::ANY => ReplyKeyExpr::Any, + zcu_reply_keyexpr_t::MATCHING_QUERY => ReplyKeyExpr::MatchingQuery, } } } #[no_mangle] -pub extern "C" fn zc_reply_keyexpr_default() -> zc_reply_keyexpr_t { +pub extern "C" fn zcu_reply_keyexpr_default() -> zcu_reply_keyexpr_t { ReplyKeyExpr::default().into() } diff --git a/src/publication_cache.rs b/src/publication_cache.rs index 39f0eb9a9..072ea4e49 100644 --- a/src/publication_cache.rs +++ b/src/publication_cache.rs @@ -19,7 +19,7 @@ use zenoh_util::core::zresult::ErrNo; use zenoh_util::core::SyncResolve; use crate::{ - impl_guarded_transmute, z_keyexpr_t, z_session_t, zc_locality_default, zc_locality_t, + impl_guarded_transmute, z_keyexpr_t, z_session_t, zcu_locality_default, zcu_locality_t, GuardedTransmute, UninitializedKeyExprError, }; @@ -34,7 +34,7 @@ use crate::{ #[repr(C)] pub struct ze_publication_cache_options_t { pub queryable_prefix: z_keyexpr_t, - pub queryable_origin: zc_locality_t, + pub queryable_origin: zcu_locality_t, pub history: usize, pub resources_limit: usize, } @@ -44,7 +44,7 @@ pub struct ze_publication_cache_options_t { pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache_options_t { ze_publication_cache_options_t { queryable_prefix: z_keyexpr_t::null(), - queryable_origin: zc_locality_default(), + queryable_origin: zcu_locality_default(), history: 1, resources_limit: 0, } diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 6f3bb3578..1ecb6f495 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -26,8 +26,8 @@ use zenoh_util::core::zresult::ErrNo; use crate::{ impl_guarded_transmute, z_keyexpr_t, z_query_consolidation_default, z_query_consolidation_t, - z_query_target_default, z_query_target_t, z_session_t, zc_locality_default, zc_locality_t, - zc_reply_keyexpr_default, zc_reply_keyexpr_t, GuardedTransmute, + z_query_target_default, z_query_target_t, z_session_t, zcu_locality_default, zcu_locality_t, + zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, }; /**************************************/ @@ -103,11 +103,11 @@ pub extern "C" fn ze_querying_subscriber_null() -> ze_owned_querying_subscriber_ #[allow(non_camel_case_types)] pub struct ze_querying_subscriber_options_t { reliability: z_reliability_t, - allowed_origin: zc_locality_t, + allowed_origin: zcu_locality_t, query_selector: z_keyexpr_t, query_target: z_query_target_t, query_consolidation: z_query_consolidation_t, - query_accept_replies: zc_reply_keyexpr_t, + query_accept_replies: zcu_reply_keyexpr_t, query_timeout_ms: u64, } @@ -116,11 +116,11 @@ pub struct ze_querying_subscriber_options_t { pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscriber_options_t { ze_querying_subscriber_options_t { reliability: SubInfo::default().reliability.into(), - allowed_origin: zc_locality_default(), + allowed_origin: zcu_locality_default(), query_selector: z_keyexpr_t::null(), query_target: z_query_target_default(), query_consolidation: z_query_consolidation_default(), - query_accept_replies: zc_reply_keyexpr_default(), + query_accept_replies: zcu_reply_keyexpr_default(), query_timeout_ms: 0, } } From e5fd268197f78815b45a6a2382f2f7269bdb9205 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Nov 2023 23:25:47 +0100 Subject: [PATCH 4/9] Update comments and documentation --- Cargo.lock | 23 ++++++++++++++++++++++- docs/api.rst | 36 +++++++++++++++++++++++++++++++++++- include/zenoh_commons.h | 37 ++++++++++++++++++++++--------------- src/publication_cache.rs | 18 +++++++++--------- src/querying_subscriber.rs | 37 +++++++++++++++++++------------------ 5 files changed, 107 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0eba19b5e..da6b67040 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2791,6 +2791,7 @@ dependencies = [ "serde_yaml", "spin 0.9.8", "zenoh", + "zenoh-ext", "zenoh-protocol", "zenoh-util", ] @@ -2854,10 +2855,30 @@ dependencies = [ "zenoh-result", ] +[[package]] +name = "zenoh-ext" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" +dependencies = [ + "async-std", + "bincode", + "env_logger", + "flume", + "futures", + "log", + "serde", + "zenoh", + "zenoh-core", + "zenoh-macros", + "zenoh-result", + "zenoh-sync", + "zenoh-util", +] + [[package]] name = "zenoh-keyexpr" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "hashbrown 0.14.0", "keyed-set", diff --git a/docs/api.rst b/docs/api.rst index d97007358..50df3a8a6 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,5 +1,5 @@ .. -.. Copyright (c) 2022 ZettaScale Technology +.. Copyright (c) 2023 ZettaScale Technology .. .. This program and the accompanying materials are made available under the .. terms of the Eclipse Public License 2.0 which is available at @@ -262,4 +262,38 @@ Functions .. autocfunction:: zenoh_commons.h::z_closure_query_call .. autocfunction:: zenoh_commons.h::z_closure_query_drop +Publication Cache +================= +Types +----- + +.. autocstruct:: zenoh_commons.h::ze_publication_cache_options_t +.. autocstruct:: zenoh_commons.h::ze_owned_publication_cache_t + +Functions +--------- + +.. autocfunction:: zenoh_commons.h::ze_declare_publication_cache +.. autocfunction:: zenoh_commons.h::ze_close_publication_cache +.. autocfunction:: zenoh_commons.h::ze_publication_cache_check +.. autocfunction:: zenoh_commons.h::ze_publication_cache_null +.. autocfunction:: zenoh_commons.h::ze_publication_cache_options_default + +Querying Subscriber +=================== + +Types +----- + +.. autocstruct:: zenoh_commons.h::ze_owned_querying_subscriber_t +.. autocstruct:: zenoh_commons.h::ze_querying_subscriber_options_t + +Functions +--------- + +.. autocfunction:: zenoh_commons.h::ze_declare_querying_subscriber +.. autocfunction:: zenoh_commons.h::ze_undeclare_querying_subscriber +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_check +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_null +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_options_default diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 840bd2b0e..9aeb26d01 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -745,11 +745,11 @@ typedef struct ze_owned_publication_cache_t { * Options passed to the :c:func:`ze_declare_publication_cache` function. * * Members: - * queryable_prefix: the prefix used for queryable - * queryable_origin: the restriction for the matching queries that will be receive by this + * z_keyexpr_t queryable_prefix: The prefix used for queryable + * zcu_locality_t queryable_origin: The restriction for the matching queries that will be receive by this * publication cache - * history: the the history size - * resources_limit: the limit number of cached resources + * size_t history: The the history size + * size_t resources_limit: The limit number of cached resources */ typedef struct ze_publication_cache_options_t { struct z_keyexpr_t queryable_prefix; @@ -778,6 +778,13 @@ typedef struct ze_owned_querying_subscriber_t { * * Members: * z_reliability_t reliability: The subscription reliability. + * zcu_locality_t allowed_origin: The restriction for the matching publications that will be + * receive by this subscriber. + * z_keyexpr_t query_selector: The selector to be used for queries. + * z_query_target_t query_target: The target to be used for queries. + * z_query_consolidation_t query_consolidation: The consolidation mode to be used for queries. + * zcu_reply_keyexpr_t query_accept_replies: The accepted replies for queries. + * uint64_t query_timeout_ms: The timeout to be used for queries. */ typedef struct ze_querying_subscriber_options_t { enum z_reliability_t reliability; @@ -2045,15 +2052,15 @@ ZENOHC_API enum zcu_reply_keyexpr_t zcu_reply_keyexpr_default(void); ZENOHC_API int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache); /** - * Declares a publication cache. + * Declares a Publication Cache. * * Parameters: - * session: the zenoh session. - * keyexpr: the key expression to publish. - * options: additional options for the publication_cache. + * z_session_t session: The zenoh session. + * z_keyexpr_t keyexpr: The key expression to publish. + * ze_publication_cache_options_t options: Additional options for the publication_cache. * * Returns: - * A :c:type:`ze_owned_publication_cache_t`. + * :c:type:`ze_owned_publication_cache_t`. * * * Example: @@ -2075,16 +2082,16 @@ struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_sessio struct z_keyexpr_t keyexpr, const struct ze_publication_cache_options_t *options); /** - * Declares a querying subscriber for a given key expression. + * Declares a Querying Subscriber for a given key expression. * * Parameters: - * session: The zenoh session. - * keyexpr: The key expression to subscribe. - * callback: The callback function that will be called each time a data matching the subscribed expression is received. - * opts: additional options for the querying subscriber. + * z_session_t session: The zenoh session. + * z_keyexpr_t keyexpr: The key expression to subscribe. + * z_owned_closure_sample_t callback: The callback function that will be called each time a data matching the subscribed expression is received. + * ze_querying_subscriber_options_t options: Additional options for the querying subscriber. * * Returns: - * A :c:type:`ze_owned_subscriber_t`. + * :c:type:`ze_owned_subscriber_t`. * * To check if the subscription succeeded and if the querying subscriber is still valid, * you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. diff --git a/src/publication_cache.rs b/src/publication_cache.rs index 072ea4e49..45aa0e7d2 100644 --- a/src/publication_cache.rs +++ b/src/publication_cache.rs @@ -26,11 +26,11 @@ use crate::{ /// Options passed to the :c:func:`ze_declare_publication_cache` function. /// /// Members: -/// queryable_prefix: the prefix used for queryable -/// queryable_origin: the restriction for the matching queries that will be receive by this +/// z_keyexpr_t queryable_prefix: The prefix used for queryable +/// zcu_locality_t queryable_origin: The restriction for the matching queries that will be receive by this /// publication cache -/// history: the the history size -/// resources_limit: the limit number of cached resources +/// size_t history: The the history size +/// size_t resources_limit: The limit number of cached resources #[repr(C)] pub struct ze_publication_cache_options_t { pub queryable_prefix: z_keyexpr_t, @@ -94,15 +94,15 @@ impl ze_owned_publication_cache_t { } } -/// Declares a publication cache. +/// Declares a Publication Cache. /// /// Parameters: -/// session: the zenoh session. -/// keyexpr: the key expression to publish. -/// options: additional options for the publication_cache. +/// z_session_t session: The zenoh session. +/// z_keyexpr_t keyexpr: The key expression to publish. +/// ze_publication_cache_options_t options: Additional options for the publication_cache. /// /// Returns: -/// A :c:type:`ze_owned_publication_cache_t`. +/// :c:type:`ze_owned_publication_cache_t`. /// /// /// Example: diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 1ecb6f495..aa1d09bb0 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2017, 2022 ZettaScale Technology. +// Copyright (c) 2023 ZettaScale Technology. // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -12,11 +12,6 @@ // ZettaScale Zenoh team, // -use crate::commons::*; -use crate::z_closure_sample_call; -use crate::z_owned_closure_sample_t; -use crate::z_reliability_t; -use crate::LOG_INVALID_SESSION; use zenoh::prelude::sync::SyncResolve; use zenoh::prelude::SessionDeclarations; use zenoh::prelude::SplitBuffer; @@ -25,14 +20,13 @@ use zenoh_protocol::core::SubInfo; use zenoh_util::core::zresult::ErrNo; use crate::{ - impl_guarded_transmute, z_keyexpr_t, z_query_consolidation_default, z_query_consolidation_t, - z_query_target_default, z_query_target_t, z_session_t, zcu_locality_default, zcu_locality_t, - zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, + impl_guarded_transmute, z_closure_sample_call, z_keyexpr_t, z_owned_closure_sample_t, + z_query_consolidation_default, z_query_consolidation_t, z_query_target_default, + z_query_target_t, z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, + zcu_locality_t, zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, + LOG_INVALID_SESSION, }; -/**************************************/ -/* DECLARATION */ -/**************************************/ type FetchingSubscriber = Option>>; /// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. @@ -99,6 +93,13 @@ pub extern "C" fn ze_querying_subscriber_null() -> ze_owned_querying_subscriber_ /// /// Members: /// z_reliability_t reliability: The subscription reliability. +/// zcu_locality_t allowed_origin: The restriction for the matching publications that will be +/// receive by this subscriber. +/// z_keyexpr_t query_selector: The selector to be used for queries. +/// z_query_target_t query_target: The target to be used for queries. +/// z_query_consolidation_t query_consolidation: The consolidation mode to be used for queries. +/// zcu_reply_keyexpr_t query_accept_replies: The accepted replies for queries. +/// uint64_t query_timeout_ms: The timeout to be used for queries. #[repr(C)] #[allow(non_camel_case_types)] pub struct ze_querying_subscriber_options_t { @@ -125,16 +126,16 @@ pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscr } } -/// Declares a querying subscriber for a given key expression. +/// Declares a Querying Subscriber for a given key expression. /// /// Parameters: -/// session: The zenoh session. -/// keyexpr: The key expression to subscribe. -/// callback: The callback function that will be called each time a data matching the subscribed expression is received. -/// opts: additional options for the querying subscriber. +/// z_session_t session: The zenoh session. +/// z_keyexpr_t keyexpr: The key expression to subscribe. +/// z_owned_closure_sample_t callback: The callback function that will be called each time a data matching the subscribed expression is received. +/// ze_querying_subscriber_options_t options: Additional options for the querying subscriber. /// /// Returns: -/// A :c:type:`ze_owned_subscriber_t`. +/// :c:type:`ze_owned_subscriber_t`. /// /// To check if the subscription succeeded and if the querying subscriber is still valid, /// you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. From 9741a34b9972ebbe77f480da80febe29cd0c4855 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 16 Nov 2023 23:47:27 +0100 Subject: [PATCH 5/9] Update Cargo.lock --- Cargo.lock | 61 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da6b67040..00a33a63a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,6 +1869,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -2721,7 +2731,7 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "zenoh" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-global-executor", "async-std", @@ -2769,7 +2779,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "zenoh-collections", ] @@ -2799,7 +2809,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "log", "serde", @@ -2812,16 +2822,17 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" [[package]] name = "zenoh-config" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "flume", "json5", "num_cpus", + "secrecy", "serde", "serde_json", "serde_yaml", @@ -2835,7 +2846,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "lazy_static", @@ -2845,7 +2856,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "aes", "hmac", @@ -2892,7 +2903,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2911,7 +2922,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2927,11 +2938,12 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-rustls", "async-std", "async-trait", + "base64", "futures", "log", "quinn", @@ -2939,6 +2951,7 @@ dependencies = [ "rustls-native-certs", "rustls-pemfile", "rustls-webpki", + "secrecy", "zenoh-config", "zenoh-core", "zenoh-link-commons", @@ -2951,7 +2964,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2967,16 +2980,18 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-rustls", "async-std", "async-trait", + "base64", "futures", "log", "rustls", "rustls-pemfile", "rustls-webpki", + "secrecy", "webpki-roots", "zenoh-config", "zenoh-core", @@ -2990,7 +3005,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -3009,7 +3024,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -3027,7 +3042,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -3047,7 +3062,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "proc-macro2", "quote", @@ -3060,7 +3075,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "libloading", "log", @@ -3073,7 +3088,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "const_format", "hex", @@ -3089,7 +3104,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "anyhow", ] @@ -3097,7 +3112,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "bincode", "log", @@ -3110,7 +3125,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "event-listener", @@ -3125,7 +3140,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-executor", "async-global-executor", @@ -3157,7 +3172,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", From 651484f5b2da40ec58567ba74223e94328bf67a3 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 17 Nov 2023 10:25:06 +0100 Subject: [PATCH 6/9] Update examples, fix querying_subscriber initialization --- examples/z_pub_cache.c | 4 ++-- examples/z_query_sub.c | 3 ++- src/querying_subscriber.rs | 9 ++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c index 910e07f81..1ff355aa2 100644 --- a/examples/z_pub_cache.c +++ b/examples/z_pub_cache.c @@ -71,8 +71,8 @@ int main(int argc, char **argv) { z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL); } - ze_close_publication_cache(z_move(pub_cache)); - + z_drop(z_move(pub_cache)); z_close(z_move(s)); + return 0; } diff --git a/examples/z_query_sub.c b/examples/z_query_sub.c index ad3306a61..40d559825 100644 --- a/examples/z_query_sub.c +++ b/examples/z_query_sub.c @@ -53,10 +53,11 @@ int main(int argc, char **argv) { exit(-1); } + ze_querying_subscriber_options_t sub_opts = ze_querying_subscriber_options_default(); z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring querying subscriber on '%s'...\n", expr); ze_owned_querying_subscriber_t sub = - ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL); + ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), &sub_opts); if (!z_check(sub)) { printf("Unable to declare querying subscriber.\n"); exit(-1); diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index aa1d09bb0..20522c031 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -21,10 +21,9 @@ use zenoh_util::core::zresult::ErrNo; use crate::{ impl_guarded_transmute, z_closure_sample_call, z_keyexpr_t, z_owned_closure_sample_t, - z_query_consolidation_default, z_query_consolidation_t, z_query_target_default, - z_query_target_t, z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, - zcu_locality_t, zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, - LOG_INVALID_SESSION, + z_query_consolidation_none, z_query_consolidation_t, z_query_target_default, z_query_target_t, + z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, zcu_locality_t, + zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION, }; type FetchingSubscriber = Option>>; @@ -120,7 +119,7 @@ pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscr allowed_origin: zcu_locality_default(), query_selector: z_keyexpr_t::null(), query_target: z_query_target_default(), - query_consolidation: z_query_consolidation_default(), + query_consolidation: z_query_consolidation_none(), query_accept_replies: zcu_reply_keyexpr_default(), query_timeout_ms: 0, } From 206635932d5d8ea62e792127516703b8ca183101 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 17 Nov 2023 10:46:41 +0100 Subject: [PATCH 7/9] Remove unused include, formatting fix --- examples/z_pub_cache.c | 1 - examples/z_query_sub.c | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c index 1ff355aa2..bd9fd17ae 100644 --- a/examples/z_pub_cache.c +++ b/examples/z_pub_cache.c @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // #include -#include #include "zenoh.h" #if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) diff --git a/examples/z_query_sub.c b/examples/z_query_sub.c index 40d559825..680cfd973 100644 --- a/examples/z_query_sub.c +++ b/examples/z_query_sub.c @@ -74,6 +74,7 @@ int main(int argc, char **argv) { z_drop(z_move(sub)); z_close(z_move(s)); + return 0; } From 57eca8a1159322094ff0f5b74c4f62bf71d6b746 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 17 Nov 2023 11:00:59 +0100 Subject: [PATCH 8/9] Add missed include for Mac OS build --- examples/z_pub_cache.c | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c index bd9fd17ae..1ff355aa2 100644 --- a/examples/z_pub_cache.c +++ b/examples/z_pub_cache.c @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // #include +#include #include "zenoh.h" #if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) From a3e39a7ab5495a84c883f0fdca7024f0e8d089ad Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 17 Nov 2023 17:09:07 +0100 Subject: [PATCH 9/9] Rename ze_close_publication_cache to ze_undeclare_publication_cache + macros and examples update --- examples/z_pub_cache.c | 2 +- include/zenoh_commons.h | 10 ++--- include/zenoh_macros.h | 90 ++++++++++++++++++++-------------------- src/publication_cache.rs | 4 +- 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c index 1ff355aa2..b293197a7 100644 --- a/examples/z_pub_cache.c +++ b/examples/z_pub_cache.c @@ -58,7 +58,7 @@ int main(int argc, char **argv) { printf("Declaring publication cache on '%s'...\n", keyexpr); ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts); - if (!ze_publication_cache_check(&pub_cache)) { + if (!z_check(pub_cache)) { printf("Unable to declare publication cache for key expression!\n"); exit(-1); } diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 9aeb26d01..095f9eb01 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -2046,11 +2046,6 @@ void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf, uintptr_t len); ZENOHC_API enum zcu_locality_t zcu_locality_default(void); ZENOHC_API enum zcu_reply_keyexpr_t zcu_reply_keyexpr_default(void); -/** - * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. - */ -ZENOHC_API -int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache); /** * Declares a Publication Cache. * @@ -2143,6 +2138,11 @@ ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(voi * Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. */ ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void); +/** + * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_undeclare_publication_cache(struct ze_owned_publication_cache_t *pub_cache); /** * Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index f5a81781c..9fa37aec2 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -16,31 +16,31 @@ )(&x) #define z_drop(x) \ - _Generic((x), z_owned_session_t * : z_close, \ - z_owned_publisher_t * : z_undeclare_publisher, \ - z_owned_keyexpr_t * : z_keyexpr_drop, \ - z_owned_config_t * : z_config_drop, \ - z_owned_scouting_config_t * : z_scouting_config_drop, \ - z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ - z_owned_subscriber_t * : z_undeclare_subscriber, \ - z_owned_queryable_t * : z_undeclare_queryable, \ - z_owned_encoding_t * : z_encoding_drop, \ - z_owned_reply_t * : z_reply_drop, \ - z_owned_hello_t * : z_hello_drop, \ - z_owned_str_t * : z_str_drop, \ - z_owned_closure_sample_t * : z_closure_sample_drop, \ - z_owned_closure_query_t * : z_closure_query_drop, \ - z_owned_closure_reply_t * : z_closure_reply_drop, \ - z_owned_closure_hello_t * : z_closure_hello_drop, \ - z_owned_closure_zid_t * : z_closure_zid_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ - zc_owned_payload_t * : zc_payload_drop, \ - zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop, \ - zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ - ze_owned_publication_cache_t* : ze_close_publication_cache, \ - ze_owned_querying_subscriber_t* : ze_undeclare_querying_subscriber \ + _Generic((x), z_owned_session_t * : z_close, \ + z_owned_publisher_t * : z_undeclare_publisher, \ + z_owned_keyexpr_t * : z_keyexpr_drop, \ + z_owned_config_t * : z_config_drop, \ + z_owned_scouting_config_t * : z_scouting_config_drop, \ + z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ + z_owned_subscriber_t * : z_undeclare_subscriber, \ + z_owned_queryable_t * : z_undeclare_queryable, \ + z_owned_encoding_t * : z_encoding_drop, \ + z_owned_reply_t * : z_reply_drop, \ + z_owned_hello_t * : z_hello_drop, \ + z_owned_str_t * : z_str_drop, \ + z_owned_closure_sample_t * : z_closure_sample_drop, \ + z_owned_closure_query_t * : z_closure_query_drop, \ + z_owned_closure_reply_t * : z_closure_reply_drop, \ + z_owned_closure_hello_t * : z_closure_hello_drop, \ + z_owned_closure_zid_t * : z_closure_zid_drop, \ + z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ + z_owned_reply_channel_t * : z_reply_channel_drop, \ + zc_owned_payload_t * : zc_payload_drop, \ + zc_owned_shmbuf_t * : zc_shmbuf_drop, \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ + ze_owned_publication_cache_t * : ze_undeclare_publication_cache, \ + ze_owned_querying_subscriber_t * : ze_undeclare_querying_subscriber \ )(x) #define z_null(x) (*x = \ @@ -66,29 +66,31 @@ zc_owned_payload_t * : zc_payload_null, \ zc_owned_shmbuf_t * : zc_shmbuf_null, \ zc_owned_shm_manager_t * : zc_shm_manager_null, \ + ze_owned_publication_cache_t * : ze_publication_cache_null, \ zc_owned_liveliness_token_t * : zc_liveliness_token_null \ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check, \ - zc_owned_liveliness_token_t : zc_liveliness_token_check, \ - ze_owned_querying_subscriber_t: ze_querying_subscriber_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check, \ + ze_owned_publication_cache_t : ze_publication_cache_check, \ + ze_owned_querying_subscriber_t : ze_querying_subscriber_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/publication_cache.rs b/src/publication_cache.rs index 45aa0e7d2..d7fbf347c 100644 --- a/src/publication_cache.rs +++ b/src/publication_cache.rs @@ -180,7 +180,9 @@ pub extern "C" fn ze_publication_cache_check(pub_cache: &ze_owned_publication_ca /// Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. #[no_mangle] #[allow(clippy::missing_safety_doc)] -pub extern "C" fn ze_close_publication_cache(pub_cache: &mut ze_owned_publication_cache_t) -> i8 { +pub extern "C" fn ze_undeclare_publication_cache( + pub_cache: &mut ze_owned_publication_cache_t, +) -> i8 { if let Some(p) = pub_cache.as_mut().take() { if let Err(e) = p.close().res_sync() { log::error!("{}", e);