diff --git a/Cargo.toml b/Cargo.toml index 47af03b38..5b31508cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/Cargo.toml.in b/Cargo.toml.in index fd810dbab..0eab1c852 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c new file mode 100644 index 000000000..910e07f81 --- /dev/null +++ b/examples/z_pub_cache.c @@ -0,0 +1,78 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include "zenoh.h" +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif + +int main(int argc, char **argv) { + char *keyexpr = "demo/example/zenoh-c-pub"; + char *value = "Pub from C!"; + + if (argc > 1) keyexpr = argv[1]; + if (argc > 2) value = argv[2]; + + z_owned_config_t config = z_config_default(); + if (argc > 3) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) { + printf("Unable to configure timestamps!\n"); + exit(-1); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = 42; + + printf("Declaring publication cache on '%s'...\n", keyexpr); + ze_owned_publication_cache_t pub_cache = + ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts); + if (!ze_publication_cache_check(&pub_cache)) { + printf("Unable to declare publication cache for key expression!\n"); + exit(-1); + } + + char buf[256]; + for (int idx = 0; 1; ++idx) { + sleep(1); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL); + } + + ze_close_publication_cache(z_move(pub_cache)); + + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_query_sub.c b/examples/z_query_sub.c new file mode 100644 index 000000000..ad3306a61 --- /dev/null +++ b/examples/z_query_sub.c @@ -0,0 +1,88 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +const char *kind_to_str(z_sample_kind_t kind); + +void data_handler(const z_sample_t *sample, void *arg) { + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr), + (int)sample->payload.len, sample->payload.start); + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + char *expr = "demo/example/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + z_owned_closure_sample_t callback = z_closure(data_handler); + printf("Declaring querying subscriber on '%s'...\n", expr); + ze_owned_querying_subscriber_t sub = + ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare querying subscriber.\n"); + exit(-1); + } + + printf("Enter 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } + } + + z_drop(z_move(sub)); + z_close(z_move(s)); + return 0; +} + +const char *kind_to_str(z_sample_kind_t kind) { + switch (kind) { + case Z_SAMPLE_KIND_PUT: + return "PUT"; + case Z_SAMPLE_KIND_DELETE: + return "DELETE"; + default: + return "UNKNOWN"; + } +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index aecd40786..d9acebe95 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -139,6 +139,15 @@ typedef enum z_sample_kind_t { Z_SAMPLE_KIND_PUT = 0, Z_SAMPLE_KIND_DELETE = 1, } z_sample_kind_t; +typedef enum zc_locality_t { + ZC_LOCALITY_ANY = 0, + ZC_LOCALITY_SESSION_LOCAL = 1, + ZC_LOCALITY_REMOTE = 2, +} zc_locality_t; +typedef enum zc_reply_keyexpr_t { + ZC_REPLY_KEYEXPR_ANY = 0, + ZC_REPLY_KEYEXPR_MATCHING_QUERY = 1, +} zc_reply_keyexpr_t; /** * An array of bytes. */ @@ -717,6 +726,68 @@ typedef struct zc_owned_shmbuf_t { typedef struct zc_owned_shm_manager_t { uintptr_t _0; } zc_owned_shm_manager_t; +/** + * An owned zenoh publication_cache. + * + * Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_publication_cache_t { + uintptr_t _0[1]; +} ze_owned_publication_cache_t; +/** + * Options passed to the :c:func:`ze_declare_publication_cache` function. + * + * Members: + * queryable_prefix: the prefix used for queryable + * queryable_origin: the restriction for the matching queries that will be receive by this + * publication cache + * history: the the history size + * resources_limit: the limit number of cached resources + */ +typedef struct ze_publication_cache_options_t { + struct z_keyexpr_t queryable_prefix; + enum zc_locality_t queryable_origin; + uintptr_t history; + uintptr_t resources_limit; +} ze_publication_cache_options_t; +/** + * An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. + * + * Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_querying_subscriber_t { + uintptr_t _0[1]; +} ze_owned_querying_subscriber_t; +/** + * Represents the set of options that can be applied to a querying subscriber, + * upon its declaration via :c:func:`ze_declare_querying_subscriber`. + * + * Members: + * z_reliability_t reliability: The subscription reliability. + */ +typedef struct ze_querying_subscriber_options_t { + enum z_reliability_t reliability; + enum zc_locality_t allowed_origin; + struct z_keyexpr_t query_selector; + enum z_query_target_t query_target; + struct z_query_consolidation_t query_consolidation; + enum zc_reply_keyexpr_t query_accept_replies; + uint64_t query_timeout_ms; +} ze_querying_subscriber_options_t; ZENOHC_API extern const unsigned int Z_ROUTER; ZENOHC_API extern const unsigned int Z_PEER; ZENOHC_API extern const unsigned int Z_CLIENT; @@ -1800,6 +1871,7 @@ ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); * Destroys a liveliness token, notifying subscribers of its destruction. */ ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); +ZENOHC_API enum zc_locality_t zc_locality_default(void); /** * Returns `false` if `payload` is the gravestone value. */ @@ -1872,6 +1944,7 @@ int8_t zc_put_owned(struct z_session_t session, */ ZENOHC_API struct z_owned_reply_channel_t zc_reply_fifo_new(uintptr_t bound); +ZENOHC_API enum zc_reply_keyexpr_t zc_reply_keyexpr_default(void); /** * Creates a new non-blocking fifo channel, returned as a pair of closures. * @@ -1966,3 +2039,105 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf); ZENOHC_API void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf, uintptr_t len); +/** + * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache); +/** + * Declares a publication cache. + * + * Parameters: + * session: the zenoh session. + * keyexpr: the key expression to publish. + * options: additional options for the publication_cache. + * + * Returns: + * A :c:type:`ze_owned_publication_cache_t`. + * + * + * Example: + * Declaring a publication cache `NULL` for the options: + * + * .. code-block:: C + * + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); + * + * is equivalent to initializing and passing the default publication cache options: + * + * .. code-block:: C + * + * ze_publication_cache_options_t opts = ze_publication_cache_options_default(); + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); + */ +ZENOHC_API +struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_session_t session, + struct z_keyexpr_t keyexpr, + const struct ze_publication_cache_options_t *options); +/** + * Declares a querying subscriber for a given key expression. + * + * Parameters: + * session: The zenoh session. + * keyexpr: The key expression to subscribe. + * callback: The callback function that will be called each time a data matching the subscribed expression is received. + * opts: additional options for the querying subscriber. + * + * Returns: + * A :c:type:`ze_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the querying subscriber is still valid, + * you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * Example: + * Declaring a subscriber passing ``NULL`` for the options: + * + * .. code-block:: C + * + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); + * + * is equivalent to initializing and passing the default subscriber options: + * + * .. code-block:: C + * + * z_subscriber_options_t opts = z_subscriber_options_default(); + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); + */ +ZENOHC_API +struct ze_owned_querying_subscriber_t ze_declare_querying_subscriber(struct z_session_t session, + struct z_keyexpr_t keyexpr, + struct z_owned_closure_sample_t *callback, + const struct ze_querying_subscriber_options_t *options); +/** + * Returns ``true`` if `pub_cache` is valid. + */ +ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *pub_cache); +/** + * Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type + */ +ZENOHC_API struct ze_owned_publication_cache_t ze_publication_cache_null(void); +/** + * Constructs the default value for :c:type:`ze_publication_cache_options_t`. + */ +ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void); +/** + * Returns ``true`` if `sub` is valid. + */ +ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub); +/** + * Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type + */ +ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(void); +/** + * Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. + */ +ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void); +/** + * Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *sub); diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 2a2c23e51..f5a81781c 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -16,29 +16,31 @@ )(&x) #define z_drop(x) \ - _Generic((x), z_owned_session_t * : z_close, \ - z_owned_publisher_t * : z_undeclare_publisher, \ - z_owned_keyexpr_t * : z_keyexpr_drop, \ - z_owned_config_t * : z_config_drop, \ - z_owned_scouting_config_t * : z_scouting_config_drop, \ - z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ - z_owned_subscriber_t * : z_undeclare_subscriber, \ - z_owned_queryable_t * : z_undeclare_queryable, \ - z_owned_encoding_t * : z_encoding_drop, \ - z_owned_reply_t * : z_reply_drop, \ - z_owned_hello_t * : z_hello_drop, \ - z_owned_str_t * : z_str_drop, \ - z_owned_closure_sample_t * : z_closure_sample_drop, \ - z_owned_closure_query_t * : z_closure_query_drop, \ - z_owned_closure_reply_t * : z_closure_reply_drop, \ - z_owned_closure_hello_t * : z_closure_hello_drop, \ - z_owned_closure_zid_t * : z_closure_zid_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ - zc_owned_payload_t * : zc_payload_drop, \ - zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop, \ - zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token \ + _Generic((x), z_owned_session_t * : z_close, \ + z_owned_publisher_t * : z_undeclare_publisher, \ + z_owned_keyexpr_t * : z_keyexpr_drop, \ + z_owned_config_t * : z_config_drop, \ + z_owned_scouting_config_t * : z_scouting_config_drop, \ + z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ + z_owned_subscriber_t * : z_undeclare_subscriber, \ + z_owned_queryable_t * : z_undeclare_queryable, \ + z_owned_encoding_t * : z_encoding_drop, \ + z_owned_reply_t * : z_reply_drop, \ + z_owned_hello_t * : z_hello_drop, \ + z_owned_str_t * : z_str_drop, \ + z_owned_closure_sample_t * : z_closure_sample_drop, \ + z_owned_closure_query_t * : z_closure_query_drop, \ + z_owned_closure_reply_t * : z_closure_reply_drop, \ + z_owned_closure_hello_t * : z_closure_hello_drop, \ + z_owned_closure_zid_t * : z_closure_zid_drop, \ + z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ + z_owned_reply_channel_t * : z_reply_channel_drop, \ + zc_owned_payload_t * : zc_payload_drop, \ + zc_owned_shmbuf_t * : zc_shmbuf_drop, \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ + ze_owned_publication_cache_t* : ze_close_publication_cache, \ + ze_owned_querying_subscriber_t* : ze_undeclare_querying_subscriber \ )(x) #define z_null(x) (*x = \ @@ -68,24 +70,25 @@ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check, \ - zc_owned_liveliness_token_t : zc_liveliness_token_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check, \ + ze_owned_querying_subscriber_t: ze_querying_subscriber_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/commons.rs b/src/commons.rs index 908dfac76..3cf70ff19 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -20,6 +20,8 @@ use libc::{c_char, c_ulong}; use zenoh::buffers::ZBuf; use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; +use zenoh::query::ReplyKeyExpr; +use zenoh::sample::Locality; use zenoh::sample::Sample; use zenoh_protocol::core::Timestamp; @@ -554,3 +556,66 @@ pub extern "C" fn z_str_null() -> z_owned_str_t { pub extern "C" fn z_str_loan(s: &z_owned_str_t) -> *const libc::c_char { s._cstr } + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zc_locality_t { + ANY = 0, + SESSION_LOCAL = 1, + REMOTE = 2, +} + +impl From for zc_locality_t { + fn from(k: Locality) -> Self { + match k { + Locality::Any => zc_locality_t::ANY, + Locality::SessionLocal => zc_locality_t::SESSION_LOCAL, + Locality::Remote => zc_locality_t::REMOTE, + } + } +} + +impl From for Locality { + fn from(k: zc_locality_t) -> Self { + match k { + zc_locality_t::ANY => Locality::Any, + zc_locality_t::SESSION_LOCAL => Locality::SessionLocal, + zc_locality_t::REMOTE => Locality::Remote, + } + } +} + +#[no_mangle] +pub extern "C" fn zc_locality_default() -> zc_locality_t { + Locality::default().into() +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zc_reply_keyexpr_t { + ANY = 0, + MATCHING_QUERY = 1, +} + +impl From for zc_reply_keyexpr_t { + fn from(k: ReplyKeyExpr) -> Self { + match k { + ReplyKeyExpr::Any => zc_reply_keyexpr_t::ANY, + ReplyKeyExpr::MatchingQuery => zc_reply_keyexpr_t::MATCHING_QUERY, + } + } +} + +impl From for ReplyKeyExpr { + fn from(k: zc_reply_keyexpr_t) -> Self { + match k { + zc_reply_keyexpr_t::ANY => ReplyKeyExpr::Any, + zc_reply_keyexpr_t::MATCHING_QUERY => ReplyKeyExpr::MatchingQuery, + } + } +} + +#[no_mangle] +pub extern "C" fn zc_reply_keyexpr_default() -> zc_reply_keyexpr_t { + ReplyKeyExpr::default().into() +} diff --git a/src/config.rs b/src/config.rs index 378c966d3..6e259f3b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,7 +55,7 @@ pub static Z_CONFIG_SCOUTING_DELAY_KEY: &c_char = unsafe { &*(b"scouting/delay\0".as_ptr() as *const c_char) }; #[no_mangle] pub static Z_CONFIG_ADD_TIMESTAMP_KEY: &c_char = - unsafe { &*(b"add_timestamp\0".as_ptr() as *const c_char) }; + unsafe { &*(b"timestamping/enabled\0".as_ptr() as *const c_char) }; /// A loaned zenoh configuration. #[repr(C)] diff --git a/src/lib.rs b/src/lib.rs index cc2a7bb3d..2a4f661d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,10 @@ mod closures; pub use closures::*; mod liveliness; pub use liveliness::*; +mod publication_cache; +pub use publication_cache::*; +mod querying_subscriber; +pub use querying_subscriber::*; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/publication_cache.rs b/src/publication_cache.rs new file mode 100644 index 000000000..39f0eb9a9 --- /dev/null +++ b/src/publication_cache.rs @@ -0,0 +1,191 @@ +// +// Copyright (c) 2023 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use std::ops::Deref; + +use zenoh_ext::SessionExt; +use zenoh_util::core::zresult::ErrNo; +use zenoh_util::core::SyncResolve; + +use crate::{ + impl_guarded_transmute, z_keyexpr_t, z_session_t, zc_locality_default, zc_locality_t, + GuardedTransmute, UninitializedKeyExprError, +}; + +/// Options passed to the :c:func:`ze_declare_publication_cache` function. +/// +/// Members: +/// queryable_prefix: the prefix used for queryable +/// queryable_origin: the restriction for the matching queries that will be receive by this +/// publication cache +/// history: the the history size +/// resources_limit: the limit number of cached resources +#[repr(C)] +pub struct ze_publication_cache_options_t { + pub queryable_prefix: z_keyexpr_t, + pub queryable_origin: zc_locality_t, + pub history: usize, + pub resources_limit: usize, +} + +/// Constructs the default value for :c:type:`ze_publication_cache_options_t`. +#[no_mangle] +pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache_options_t { + ze_publication_cache_options_t { + queryable_prefix: z_keyexpr_t::null(), + queryable_origin: zc_locality_default(), + history: 1, + resources_limit: 0, + } +} + +type PublicationCache = Option>>; + +/// An owned zenoh publication_cache. +/// +/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_publication_cache_t([usize; 1]); + +impl_guarded_transmute!(PublicationCache, ze_owned_publication_cache_t); + +impl From for ze_owned_publication_cache_t { + fn from(val: PublicationCache) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_publication_cache_t { + fn as_ref(&self) -> &PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl AsMut for ze_owned_publication_cache_t { + fn as_mut(&mut self) -> &mut PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_publication_cache_t { + pub fn new(pub_cache: zenoh_ext::PublicationCache<'static>) -> Self { + Some(Box::new(pub_cache)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Declares a publication cache. +/// +/// Parameters: +/// session: the zenoh session. +/// keyexpr: the key expression to publish. +/// options: additional options for the publication_cache. +/// +/// Returns: +/// A :c:type:`ze_owned_publication_cache_t`. +/// +/// +/// Example: +/// Declaring a publication cache `NULL` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); +/// +/// is equivalent to initializing and passing the default publication cache options: +/// +/// .. code-block:: C +/// +/// ze_publication_cache_options_t opts = ze_publication_cache_options_default(); +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_declare_publication_cache( + session: z_session_t, + keyexpr: z_keyexpr_t, + options: Option<&ze_publication_cache_options_t>, +) -> ze_owned_publication_cache_t { + match session.upgrade() { + Some(s) => { + let keyexpr = keyexpr.deref().as_ref().map(|s| s.clone().into_owned()); + if let Some(key_expr) = keyexpr { + let mut p = s.declare_publication_cache(key_expr); + if let Some(options) = options { + p = p.history(options.history.into()); + p = p.queryable_allowed_origin(options.queryable_origin.into()); + if options.resources_limit != 0 { + p = p.resources_limit(options.resources_limit) + } + if options.queryable_prefix.deref().is_some() { + let queryable_prefix = options + .queryable_prefix + .deref() + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(queryable_prefix) = queryable_prefix { + p = p.queryable_prefix(queryable_prefix) + } + } + } + match p.res_sync() { + Ok(publication_cache) => ze_owned_publication_cache_t::new(publication_cache), + Err(e) => { + log::error!("{}", e); + ze_owned_publication_cache_t::null() + } + } + } else { + log::error!("{}", UninitializedKeyExprError); + ze_owned_publication_cache_t::null() + } + } + None => ze_owned_publication_cache_t::null(), + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_null() -> ze_owned_publication_cache_t { + ze_owned_publication_cache_t::null() +} + +/// Returns ``true`` if `pub_cache` is valid. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_check(pub_cache: &ze_owned_publication_cache_t) -> bool { + pub_cache.as_ref().is_some() +} + +/// Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_close_publication_cache(pub_cache: &mut ze_owned_publication_cache_t) -> i8 { + if let Some(p) = pub_cache.as_mut().take() { + if let Err(e) = p.close().res_sync() { + log::error!("{}", e); + return e.errno().get(); + } + } + 0 +} diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs new file mode 100644 index 000000000..4d731bb04 --- /dev/null +++ b/src/querying_subscriber.rs @@ -0,0 +1,245 @@ +// +// Copyright (c) 2017, 2022 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use crate::commons::*; +use crate::z_closure_sample_call; +use crate::z_owned_closure_sample_t; +use crate::z_reliability_t; +use crate::LOG_INVALID_SESSION; +use zenoh::prelude::sync::SyncResolve; +use zenoh::prelude::SessionDeclarations; +use zenoh::prelude::SplitBuffer; +use zenoh_ext::*; +use zenoh_protocol::core::SubInfo; +use zenoh_util::core::zresult::ErrNo; + +use crate::{ + impl_guarded_transmute, z_keyexpr_t, z_query_consolidation_default, z_query_consolidation_t, + z_query_target_default, z_query_target_t, z_session_t, zc_locality_default, zc_locality_t, + zc_reply_keyexpr_default, zc_reply_keyexpr_t, GuardedTransmute, UninitializedKeyExprError, +}; + +/**************************************/ +/* DECLARATION */ +/**************************************/ +type FetchingSubscriber = Option>>; + +/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. +/// +/// Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_querying_subscriber_t([usize; 1]); + +impl_guarded_transmute!(FetchingSubscriber, ze_owned_querying_subscriber_t); + +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_t<'a>(&'a ze_owned_querying_subscriber_t); + +impl From for ze_owned_querying_subscriber_t { + fn from(val: FetchingSubscriber) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_querying_subscriber_t { + fn as_ref(&self) -> &FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl<'a> AsRef for ze_querying_subscriber_t<'a> { + fn as_ref(&self) -> &FetchingSubscriber { + self.0.as_ref() + } +} + +impl AsMut for ze_owned_querying_subscriber_t { + fn as_mut(&mut self) -> &mut FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_querying_subscriber_t { + pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>) -> Self { + Some(Box::new(sub)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_querying_subscriber_null() -> ze_owned_querying_subscriber_t { + ze_owned_querying_subscriber_t::null() +} + +/// Represents the set of options that can be applied to a querying subscriber, +/// upon its declaration via :c:func:`ze_declare_querying_subscriber`. +/// +/// Members: +/// z_reliability_t reliability: The subscription reliability. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_options_t { + reliability: z_reliability_t, + allowed_origin: zc_locality_t, + query_selector: z_keyexpr_t, + query_target: z_query_target_t, + query_consolidation: z_query_consolidation_t, + query_accept_replies: zc_reply_keyexpr_t, + query_timeout_ms: u64, +} + +/// Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscriber_options_t { + ze_querying_subscriber_options_t { + reliability: SubInfo::default().reliability.into(), + allowed_origin: zc_locality_default(), + query_selector: z_keyexpr_t::null(), + query_target: z_query_target_default(), + query_consolidation: z_query_consolidation_default(), + query_accept_replies: zc_reply_keyexpr_default(), + query_timeout_ms: 0, + } +} + +/// Declares a querying subscriber for a given key expression. +/// +/// Parameters: +/// session: The zenoh session. +/// keyexpr: The key expression to subscribe. +/// callback: The callback function that will be called each time a data matching the subscribed expression is received. +/// opts: additional options for the querying subscriber. +/// +/// Returns: +/// A :c:type:`ze_owned_subscriber_t`. +/// +/// To check if the subscription succeeded and if the querying subscriber is still valid, +/// you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// Example: +/// Declaring a subscriber passing ``NULL`` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); +/// +/// is equivalent to initializing and passing the default subscriber options: +/// +/// .. code-block:: C +/// +/// z_subscriber_options_t opts = z_subscriber_options_default(); +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ze_declare_querying_subscriber( + session: z_session_t, + keyexpr: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + options: Option<&ze_querying_subscriber_options_t>, +) -> ze_owned_querying_subscriber_t { + let mut closure = z_owned_closure_sample_t::empty(); + std::mem::swap(callback, &mut closure); + + match session.upgrade() { + Some(s) => { + let keyexpr = keyexpr.deref().as_ref().map(|s| s.clone().into_owned()); + if let Some(key_expr) = keyexpr { + let mut sub = s.declare_subscriber(keyexpr).querying(); + if let Some(options) = options { + sub = sub + .reliability(options.reliability.into()) + .allowed_origin(options.allowed_origin.into()) + .query_target(options.query_target.into()) + .query_consolidation(options.query_consolidation) + .query_accept_replies(options.query_accept_replies.into()); + if options.query_selector.is_some() { + let query_selector = options + .query_selector + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(query_selector) = query_selector { + sub = sub.query_selector(query_selector) + } + } + if options.query_timeout_ms != 0 { + sub = sub.query_timeout(std::time::Duration::from_millis( + options.query_timeout_ms, + )); + } + } + match sub + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&closure, &sample) + }) + .res() + { + Ok(sub) => ze_owned_querying_subscriber_t::new(sub), + Err(e) => { + log::debug!("{}", e); + ze_owned_querying_subscriber_t::null() + } + } + } else { + log::error!("{}", UninitializedKeyExprError); + ze_owned_publication_cache_t::null() + } + } + None => { + log::debug!("{}", LOG_INVALID_SESSION); + ze_owned_querying_subscriber_t::null() + } + } +} + +/// Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 { + if let Some(s) = sub.as_mut().take() { + if let Err(e) = s.close().res_sync() { + log::warn!("{}", e); + return e.errno().get(); + } + } + 0 +} + +/// Returns ``true`` if `sub` is valid. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_check(sub: &ze_owned_querying_subscriber_t) -> bool { + sub.as_ref().is_some() +}