diff --git a/Cargo.lock b/Cargo.lock index 0eba19b5e..00a33a63a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,6 +1869,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -2721,7 +2731,7 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "zenoh" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-global-executor", "async-std", @@ -2769,7 +2779,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "zenoh-collections", ] @@ -2791,6 +2801,7 @@ dependencies = [ "serde_yaml", "spin 0.9.8", "zenoh", + "zenoh-ext", "zenoh-protocol", "zenoh-util", ] @@ -2798,7 +2809,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "log", "serde", @@ -2811,16 +2822,17 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" [[package]] name = "zenoh-config" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "flume", "json5", "num_cpus", + "secrecy", "serde", "serde_json", "serde_yaml", @@ -2834,7 +2846,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "lazy_static", @@ -2844,7 +2856,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "aes", "hmac", @@ -2854,10 +2866,30 @@ dependencies = [ "zenoh-result", ] +[[package]] +name = "zenoh-ext" +version = "0.11.0-dev" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" +dependencies = [ + "async-std", + "bincode", + "env_logger", + "flume", + "futures", + "log", + "serde", + "zenoh", + "zenoh-core", + "zenoh-macros", + "zenoh-result", + "zenoh-sync", + "zenoh-util", +] + [[package]] name = "zenoh-keyexpr" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "hashbrown 0.14.0", "keyed-set", @@ -2871,7 +2903,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2890,7 +2922,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2906,11 +2938,12 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-rustls", "async-std", "async-trait", + "base64", "futures", "log", "quinn", @@ -2918,6 +2951,7 @@ dependencies = [ "rustls-native-certs", "rustls-pemfile", "rustls-webpki", + "secrecy", "zenoh-config", "zenoh-core", "zenoh-link-commons", @@ -2930,7 +2964,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2946,16 +2980,18 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-rustls", "async-std", "async-trait", + "base64", "futures", "log", "rustls", "rustls-pemfile", "rustls-webpki", + "secrecy", "webpki-roots", "zenoh-config", "zenoh-core", @@ -2969,7 +3005,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -2988,7 +3024,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -3006,7 +3042,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", @@ -3026,7 +3062,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "proc-macro2", "quote", @@ -3039,7 +3075,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "libloading", "log", @@ -3052,7 +3088,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "const_format", "hex", @@ -3068,7 +3104,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "anyhow", ] @@ -3076,7 +3112,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "bincode", "log", @@ -3089,7 +3125,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "event-listener", @@ -3104,7 +3140,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-executor", "async-global-executor", @@ -3136,7 +3172,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "0.11.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#b3ba1c421197443db33ff66d0f74973be35f4d48" +source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=master#d2399cad7afda578df4d1f70b9026cd0e5354274" dependencies = [ "async-std", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 47af03b38..5b31508cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/Cargo.toml.in b/Cargo.toml.in index fd810dbab..0eab1c852 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ spin = "0.9.5" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] } zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] } zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" } +zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]} [build-dependencies] cbindgen = "0.24.3" diff --git a/docs/api.rst b/docs/api.rst index d97007358..50df3a8a6 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,5 +1,5 @@ .. -.. Copyright (c) 2022 ZettaScale Technology +.. Copyright (c) 2023 ZettaScale Technology .. .. This program and the accompanying materials are made available under the .. terms of the Eclipse Public License 2.0 which is available at @@ -262,4 +262,38 @@ Functions .. autocfunction:: zenoh_commons.h::z_closure_query_call .. autocfunction:: zenoh_commons.h::z_closure_query_drop +Publication Cache +================= +Types +----- + +.. autocstruct:: zenoh_commons.h::ze_publication_cache_options_t +.. autocstruct:: zenoh_commons.h::ze_owned_publication_cache_t + +Functions +--------- + +.. autocfunction:: zenoh_commons.h::ze_declare_publication_cache +.. autocfunction:: zenoh_commons.h::ze_close_publication_cache +.. autocfunction:: zenoh_commons.h::ze_publication_cache_check +.. autocfunction:: zenoh_commons.h::ze_publication_cache_null +.. autocfunction:: zenoh_commons.h::ze_publication_cache_options_default + +Querying Subscriber +=================== + +Types +----- + +.. autocstruct:: zenoh_commons.h::ze_owned_querying_subscriber_t +.. autocstruct:: zenoh_commons.h::ze_querying_subscriber_options_t + +Functions +--------- + +.. autocfunction:: zenoh_commons.h::ze_declare_querying_subscriber +.. autocfunction:: zenoh_commons.h::ze_undeclare_querying_subscriber +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_check +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_null +.. autocfunction:: zenoh_commons.h::ze_querying_subscriber_options_default diff --git a/examples/z_pub_cache.c b/examples/z_pub_cache.c new file mode 100644 index 000000000..b293197a7 --- /dev/null +++ b/examples/z_pub_cache.c @@ -0,0 +1,78 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include + +#include "zenoh.h" +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif + +int main(int argc, char **argv) { + char *keyexpr = "demo/example/zenoh-c-pub"; + char *value = "Pub from C!"; + + if (argc > 1) keyexpr = argv[1]; + if (argc > 2) value = argv[2]; + + z_owned_config_t config = z_config_default(); + if (argc > 3) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) { + printf("Unable to configure timestamps!\n"); + exit(-1); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); + pub_cache_opts.history = 42; + + printf("Declaring publication cache on '%s'...\n", keyexpr); + ze_owned_publication_cache_t pub_cache = + ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts); + if (!z_check(pub_cache)) { + printf("Unable to declare publication cache for key expression!\n"); + exit(-1); + } + + char buf[256]; + for (int idx = 0; 1; ++idx) { + sleep(1); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL); + } + + z_drop(z_move(pub_cache)); + z_close(z_move(s)); + + return 0; +} diff --git a/examples/z_query_sub.c b/examples/z_query_sub.c new file mode 100644 index 000000000..680cfd973 --- /dev/null +++ b/examples/z_query_sub.c @@ -0,0 +1,90 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +const char *kind_to_str(z_sample_kind_t kind); + +void data_handler(const z_sample_t *sample, void *arg) { + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr), + (int)sample->payload.len, sample->payload.start); + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + char *expr = "demo/example/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + ze_querying_subscriber_options_t sub_opts = ze_querying_subscriber_options_default(); + z_owned_closure_sample_t callback = z_closure(data_handler); + printf("Declaring querying subscriber on '%s'...\n", expr); + ze_owned_querying_subscriber_t sub = + ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), &sub_opts); + if (!z_check(sub)) { + printf("Unable to declare querying subscriber.\n"); + exit(-1); + } + + printf("Enter 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } + } + + z_drop(z_move(sub)); + z_close(z_move(s)); + + return 0; +} + +const char *kind_to_str(z_sample_kind_t kind) { + switch (kind) { + case Z_SAMPLE_KIND_PUT: + return "PUT"; + case Z_SAMPLE_KIND_DELETE: + return "DELETE"; + default: + return "UNKNOWN"; + } +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 5741e3a0c..89fed2be9 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -139,6 +139,15 @@ typedef enum z_sample_kind_t { Z_SAMPLE_KIND_PUT = 0, Z_SAMPLE_KIND_DELETE = 1, } z_sample_kind_t; +typedef enum zcu_locality_t { + ZCU_LOCALITY_ANY = 0, + ZCU_LOCALITY_SESSION_LOCAL = 1, + ZCU_LOCALITY_REMOTE = 2, +} zcu_locality_t; +typedef enum zcu_reply_keyexpr_t { + ZCU_REPLY_KEYEXPR_ANY = 0, + ZCU_REPLY_KEYEXPR_MATCHING_QUERY = 1, +} zcu_reply_keyexpr_t; /** * An array of bytes. */ @@ -717,6 +726,75 @@ typedef struct zc_owned_shmbuf_t { typedef struct zc_owned_shm_manager_t { uintptr_t _0; } zc_owned_shm_manager_t; +/** + * An owned zenoh publication_cache. + * + * Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_publication_cache_t { + uintptr_t _0[1]; +} ze_owned_publication_cache_t; +/** + * Options passed to the :c:func:`ze_declare_publication_cache` function. + * + * Members: + * z_keyexpr_t queryable_prefix: The prefix used for queryable + * zcu_locality_t queryable_origin: The restriction for the matching queries that will be receive by this + * publication cache + * size_t history: The the history size + * size_t resources_limit: The limit number of cached resources + */ +typedef struct ze_publication_cache_options_t { + struct z_keyexpr_t queryable_prefix; + enum zcu_locality_t queryable_origin; + uintptr_t history; + uintptr_t resources_limit; +} ze_publication_cache_options_t; +/** + * An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. + * + * Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. + * The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + */ +typedef struct ze_owned_querying_subscriber_t { + uintptr_t _0[1]; +} ze_owned_querying_subscriber_t; +/** + * Represents the set of options that can be applied to a querying subscriber, + * upon its declaration via :c:func:`ze_declare_querying_subscriber`. + * + * Members: + * z_reliability_t reliability: The subscription reliability. + * zcu_locality_t allowed_origin: The restriction for the matching publications that will be + * receive by this subscriber. + * z_keyexpr_t query_selector: The selector to be used for queries. + * z_query_target_t query_target: The target to be used for queries. + * z_query_consolidation_t query_consolidation: The consolidation mode to be used for queries. + * zcu_reply_keyexpr_t query_accept_replies: The accepted replies for queries. + * uint64_t query_timeout_ms: The timeout to be used for queries. + */ +typedef struct ze_querying_subscriber_options_t { + enum z_reliability_t reliability; + enum zcu_locality_t allowed_origin; + struct z_keyexpr_t query_selector; + enum z_query_target_t query_target; + struct z_query_consolidation_t query_consolidation; + enum zcu_reply_keyexpr_t query_accept_replies; + uint64_t query_timeout_ms; +} ze_querying_subscriber_options_t; ZENOHC_API extern const unsigned int Z_ROUTER; ZENOHC_API extern const unsigned int Z_PEER; ZENOHC_API extern const unsigned int Z_CLIENT; @@ -1916,3 +1994,107 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf); ZENOHC_API void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf, uintptr_t len); +ZENOHC_API enum zcu_locality_t zcu_locality_default(void); +ZENOHC_API enum zcu_reply_keyexpr_t zcu_reply_keyexpr_default(void); +/** + * Declares a Publication Cache. + * + * Parameters: + * z_session_t session: The zenoh session. + * z_keyexpr_t keyexpr: The key expression to publish. + * ze_publication_cache_options_t options: Additional options for the publication_cache. + * + * Returns: + * :c:type:`ze_owned_publication_cache_t`. + * + * + * Example: + * Declaring a publication cache `NULL` for the options: + * + * .. code-block:: C + * + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); + * + * is equivalent to initializing and passing the default publication cache options: + * + * .. code-block:: C + * + * ze_publication_cache_options_t opts = ze_publication_cache_options_default(); + * ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); + */ +ZENOHC_API +struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_session_t session, + struct z_keyexpr_t keyexpr, + const struct ze_publication_cache_options_t *options); +/** + * Declares a Querying Subscriber for a given key expression. + * + * Parameters: + * z_session_t session: The zenoh session. + * z_keyexpr_t keyexpr: The key expression to subscribe. + * z_owned_closure_sample_t callback: The callback function that will be called each time a data matching the subscribed expression is received. + * ze_querying_subscriber_options_t options: Additional options for the querying subscriber. + * + * Returns: + * :c:type:`ze_owned_subscriber_t`. + * + * To check if the subscription succeeded and if the querying subscriber is still valid, + * you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. + * + * Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. + * To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. + * After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. + * + * Example: + * Declaring a subscriber passing ``NULL`` for the options: + * + * .. code-block:: C + * + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); + * + * is equivalent to initializing and passing the default subscriber options: + * + * .. code-block:: C + * + * z_subscriber_options_t opts = z_subscriber_options_default(); + * ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); + */ +ZENOHC_API +struct ze_owned_querying_subscriber_t ze_declare_querying_subscriber(struct z_session_t session, + struct z_keyexpr_t keyexpr, + struct z_owned_closure_sample_t *callback, + const struct ze_querying_subscriber_options_t *options); +/** + * Returns ``true`` if `pub_cache` is valid. + */ +ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *pub_cache); +/** + * Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type + */ +ZENOHC_API struct ze_owned_publication_cache_t ze_publication_cache_null(void); +/** + * Constructs the default value for :c:type:`ze_publication_cache_options_t`. + */ +ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void); +/** + * Returns ``true`` if `sub` is valid. + */ +ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub); +/** + * Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type + */ +ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(void); +/** + * Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. + */ +ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void); +/** + * Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_undeclare_publication_cache(struct ze_owned_publication_cache_t *pub_cache); +/** + * Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. + */ +ZENOHC_API +int8_t ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *sub); diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 2a2c23e51..9fa37aec2 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -16,29 +16,31 @@ )(&x) #define z_drop(x) \ - _Generic((x), z_owned_session_t * : z_close, \ - z_owned_publisher_t * : z_undeclare_publisher, \ - z_owned_keyexpr_t * : z_keyexpr_drop, \ - z_owned_config_t * : z_config_drop, \ - z_owned_scouting_config_t * : z_scouting_config_drop, \ - z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ - z_owned_subscriber_t * : z_undeclare_subscriber, \ - z_owned_queryable_t * : z_undeclare_queryable, \ - z_owned_encoding_t * : z_encoding_drop, \ - z_owned_reply_t * : z_reply_drop, \ - z_owned_hello_t * : z_hello_drop, \ - z_owned_str_t * : z_str_drop, \ - z_owned_closure_sample_t * : z_closure_sample_drop, \ - z_owned_closure_query_t * : z_closure_query_drop, \ - z_owned_closure_reply_t * : z_closure_reply_drop, \ - z_owned_closure_hello_t * : z_closure_hello_drop, \ - z_owned_closure_zid_t * : z_closure_zid_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ - zc_owned_payload_t * : zc_payload_drop, \ - zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop, \ - zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token \ + _Generic((x), z_owned_session_t * : z_close, \ + z_owned_publisher_t * : z_undeclare_publisher, \ + z_owned_keyexpr_t * : z_keyexpr_drop, \ + z_owned_config_t * : z_config_drop, \ + z_owned_scouting_config_t * : z_scouting_config_drop, \ + z_owned_pull_subscriber_t * : z_undeclare_pull_subscriber, \ + z_owned_subscriber_t * : z_undeclare_subscriber, \ + z_owned_queryable_t * : z_undeclare_queryable, \ + z_owned_encoding_t * : z_encoding_drop, \ + z_owned_reply_t * : z_reply_drop, \ + z_owned_hello_t * : z_hello_drop, \ + z_owned_str_t * : z_str_drop, \ + z_owned_closure_sample_t * : z_closure_sample_drop, \ + z_owned_closure_query_t * : z_closure_query_drop, \ + z_owned_closure_reply_t * : z_closure_reply_drop, \ + z_owned_closure_hello_t * : z_closure_hello_drop, \ + z_owned_closure_zid_t * : z_closure_zid_drop, \ + z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ + z_owned_reply_channel_t * : z_reply_channel_drop, \ + zc_owned_payload_t * : zc_payload_drop, \ + zc_owned_shmbuf_t * : zc_shmbuf_drop, \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ + ze_owned_publication_cache_t * : ze_undeclare_publication_cache, \ + ze_owned_querying_subscriber_t * : ze_undeclare_querying_subscriber \ )(x) #define z_null(x) (*x = \ @@ -64,28 +66,31 @@ zc_owned_payload_t * : zc_payload_null, \ zc_owned_shmbuf_t * : zc_shmbuf_null, \ zc_owned_shm_manager_t * : zc_shm_manager_null, \ + ze_owned_publication_cache_t * : ze_publication_cache_null, \ zc_owned_liveliness_token_t * : zc_liveliness_token_null \ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check, \ - zc_owned_liveliness_token_t : zc_liveliness_token_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check, \ + ze_owned_publication_cache_t : ze_publication_cache_check, \ + ze_owned_querying_subscriber_t : ze_querying_subscriber_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/commons.rs b/src/commons.rs index 908dfac76..c5fbf3950 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -20,6 +20,8 @@ use libc::{c_char, c_ulong}; use zenoh::buffers::ZBuf; use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; +use zenoh::query::ReplyKeyExpr; +use zenoh::sample::Locality; use zenoh::sample::Sample; use zenoh_protocol::core::Timestamp; @@ -554,3 +556,66 @@ pub extern "C" fn z_str_null() -> z_owned_str_t { pub extern "C" fn z_str_loan(s: &z_owned_str_t) -> *const libc::c_char { s._cstr } + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zcu_locality_t { + ANY = 0, + SESSION_LOCAL = 1, + REMOTE = 2, +} + +impl From for zcu_locality_t { + fn from(k: Locality) -> Self { + match k { + Locality::Any => zcu_locality_t::ANY, + Locality::SessionLocal => zcu_locality_t::SESSION_LOCAL, + Locality::Remote => zcu_locality_t::REMOTE, + } + } +} + +impl From for Locality { + fn from(k: zcu_locality_t) -> Self { + match k { + zcu_locality_t::ANY => Locality::Any, + zcu_locality_t::SESSION_LOCAL => Locality::SessionLocal, + zcu_locality_t::REMOTE => Locality::Remote, + } + } +} + +#[no_mangle] +pub extern "C" fn zcu_locality_default() -> zcu_locality_t { + Locality::default().into() +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub enum zcu_reply_keyexpr_t { + ANY = 0, + MATCHING_QUERY = 1, +} + +impl From for zcu_reply_keyexpr_t { + fn from(k: ReplyKeyExpr) -> Self { + match k { + ReplyKeyExpr::Any => zcu_reply_keyexpr_t::ANY, + ReplyKeyExpr::MatchingQuery => zcu_reply_keyexpr_t::MATCHING_QUERY, + } + } +} + +impl From for ReplyKeyExpr { + fn from(k: zcu_reply_keyexpr_t) -> Self { + match k { + zcu_reply_keyexpr_t::ANY => ReplyKeyExpr::Any, + zcu_reply_keyexpr_t::MATCHING_QUERY => ReplyKeyExpr::MatchingQuery, + } + } +} + +#[no_mangle] +pub extern "C" fn zcu_reply_keyexpr_default() -> zcu_reply_keyexpr_t { + ReplyKeyExpr::default().into() +} diff --git a/src/config.rs b/src/config.rs index 378c966d3..6e259f3b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,7 +55,7 @@ pub static Z_CONFIG_SCOUTING_DELAY_KEY: &c_char = unsafe { &*(b"scouting/delay\0".as_ptr() as *const c_char) }; #[no_mangle] pub static Z_CONFIG_ADD_TIMESTAMP_KEY: &c_char = - unsafe { &*(b"add_timestamp\0".as_ptr() as *const c_char) }; + unsafe { &*(b"timestamping/enabled\0".as_ptr() as *const c_char) }; /// A loaned zenoh configuration. #[repr(C)] diff --git a/src/lib.rs b/src/lib.rs index cc2a7bb3d..2a4f661d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,10 @@ mod closures; pub use closures::*; mod liveliness; pub use liveliness::*; +mod publication_cache; +pub use publication_cache::*; +mod querying_subscriber; +pub use querying_subscriber::*; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/publication_cache.rs b/src/publication_cache.rs new file mode 100644 index 000000000..d7fbf347c --- /dev/null +++ b/src/publication_cache.rs @@ -0,0 +1,193 @@ +// +// Copyright (c) 2023 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use std::ops::Deref; + +use zenoh_ext::SessionExt; +use zenoh_util::core::zresult::ErrNo; +use zenoh_util::core::SyncResolve; + +use crate::{ + impl_guarded_transmute, z_keyexpr_t, z_session_t, zcu_locality_default, zcu_locality_t, + GuardedTransmute, UninitializedKeyExprError, +}; + +/// Options passed to the :c:func:`ze_declare_publication_cache` function. +/// +/// Members: +/// z_keyexpr_t queryable_prefix: The prefix used for queryable +/// zcu_locality_t queryable_origin: The restriction for the matching queries that will be receive by this +/// publication cache +/// size_t history: The the history size +/// size_t resources_limit: The limit number of cached resources +#[repr(C)] +pub struct ze_publication_cache_options_t { + pub queryable_prefix: z_keyexpr_t, + pub queryable_origin: zcu_locality_t, + pub history: usize, + pub resources_limit: usize, +} + +/// Constructs the default value for :c:type:`ze_publication_cache_options_t`. +#[no_mangle] +pub extern "C" fn ze_publication_cache_options_default() -> ze_publication_cache_options_t { + ze_publication_cache_options_t { + queryable_prefix: z_keyexpr_t::null(), + queryable_origin: zcu_locality_default(), + history: 1, + resources_limit: 0, + } +} + +type PublicationCache = Option>>; + +/// An owned zenoh publication_cache. +/// +/// Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_publication_cache_t([usize; 1]); + +impl_guarded_transmute!(PublicationCache, ze_owned_publication_cache_t); + +impl From for ze_owned_publication_cache_t { + fn from(val: PublicationCache) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_publication_cache_t { + fn as_ref(&self) -> &PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl AsMut for ze_owned_publication_cache_t { + fn as_mut(&mut self) -> &mut PublicationCache { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_publication_cache_t { + pub fn new(pub_cache: zenoh_ext::PublicationCache<'static>) -> Self { + Some(Box::new(pub_cache)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Declares a Publication Cache. +/// +/// Parameters: +/// z_session_t session: The zenoh session. +/// z_keyexpr_t keyexpr: The key expression to publish. +/// ze_publication_cache_options_t options: Additional options for the publication_cache. +/// +/// Returns: +/// :c:type:`ze_owned_publication_cache_t`. +/// +/// +/// Example: +/// Declaring a publication cache `NULL` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL); +/// +/// is equivalent to initializing and passing the default publication cache options: +/// +/// .. code-block:: C +/// +/// ze_publication_cache_options_t opts = ze_publication_cache_options_default(); +/// ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_declare_publication_cache( + session: z_session_t, + keyexpr: z_keyexpr_t, + options: Option<&ze_publication_cache_options_t>, +) -> ze_owned_publication_cache_t { + match session.upgrade() { + Some(s) => { + let keyexpr = keyexpr.deref().as_ref().map(|s| s.clone().into_owned()); + if let Some(key_expr) = keyexpr { + let mut p = s.declare_publication_cache(key_expr); + if let Some(options) = options { + p = p.history(options.history.into()); + p = p.queryable_allowed_origin(options.queryable_origin.into()); + if options.resources_limit != 0 { + p = p.resources_limit(options.resources_limit) + } + if options.queryable_prefix.deref().is_some() { + let queryable_prefix = options + .queryable_prefix + .deref() + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(queryable_prefix) = queryable_prefix { + p = p.queryable_prefix(queryable_prefix) + } + } + } + match p.res_sync() { + Ok(publication_cache) => ze_owned_publication_cache_t::new(publication_cache), + Err(e) => { + log::error!("{}", e); + ze_owned_publication_cache_t::null() + } + } + } else { + log::error!("{}", UninitializedKeyExprError); + ze_owned_publication_cache_t::null() + } + } + None => ze_owned_publication_cache_t::null(), + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_null() -> ze_owned_publication_cache_t { + ze_owned_publication_cache_t::null() +} + +/// Returns ``true`` if `pub_cache` is valid. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_publication_cache_check(pub_cache: &ze_owned_publication_cache_t) -> bool { + pub_cache.as_ref().is_some() +} + +/// Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_undeclare_publication_cache( + pub_cache: &mut ze_owned_publication_cache_t, +) -> i8 { + if let Some(p) = pub_cache.as_mut().take() { + if let Err(e) = p.close().res_sync() { + log::error!("{}", e); + return e.errno().get(); + } + } + 0 +} diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs new file mode 100644 index 000000000..20522c031 --- /dev/null +++ b/src/querying_subscriber.rs @@ -0,0 +1,238 @@ +// +// Copyright (c) 2023 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use zenoh::prelude::sync::SyncResolve; +use zenoh::prelude::SessionDeclarations; +use zenoh::prelude::SplitBuffer; +use zenoh_ext::*; +use zenoh_protocol::core::SubInfo; +use zenoh_util::core::zresult::ErrNo; + +use crate::{ + impl_guarded_transmute, z_closure_sample_call, z_keyexpr_t, z_owned_closure_sample_t, + z_query_consolidation_none, z_query_consolidation_t, z_query_target_default, z_query_target_t, + z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, zcu_locality_t, + zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION, +}; + +type FetchingSubscriber = Option>>; + +/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. +/// +/// Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`. +/// The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +#[repr(C)] +pub struct ze_owned_querying_subscriber_t([usize; 1]); + +impl_guarded_transmute!(FetchingSubscriber, ze_owned_querying_subscriber_t); + +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_t<'a>(&'a ze_owned_querying_subscriber_t); + +impl From for ze_owned_querying_subscriber_t { + fn from(val: FetchingSubscriber) -> Self { + val.transmute() + } +} + +impl AsRef for ze_owned_querying_subscriber_t { + fn as_ref(&self) -> &FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl<'a> AsRef for ze_querying_subscriber_t<'a> { + fn as_ref(&self) -> &FetchingSubscriber { + self.0.as_ref() + } +} + +impl AsMut for ze_owned_querying_subscriber_t { + fn as_mut(&mut self) -> &mut FetchingSubscriber { + unsafe { std::mem::transmute(self) } + } +} + +impl ze_owned_querying_subscriber_t { + pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>) -> Self { + Some(Box::new(sub)).into() + } + pub fn null() -> Self { + None.into() + } +} + +/// Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub extern "C" fn ze_querying_subscriber_null() -> ze_owned_querying_subscriber_t { + ze_owned_querying_subscriber_t::null() +} + +/// Represents the set of options that can be applied to a querying subscriber, +/// upon its declaration via :c:func:`ze_declare_querying_subscriber`. +/// +/// Members: +/// z_reliability_t reliability: The subscription reliability. +/// zcu_locality_t allowed_origin: The restriction for the matching publications that will be +/// receive by this subscriber. +/// z_keyexpr_t query_selector: The selector to be used for queries. +/// z_query_target_t query_target: The target to be used for queries. +/// z_query_consolidation_t query_consolidation: The consolidation mode to be used for queries. +/// zcu_reply_keyexpr_t query_accept_replies: The accepted replies for queries. +/// uint64_t query_timeout_ms: The timeout to be used for queries. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct ze_querying_subscriber_options_t { + reliability: z_reliability_t, + allowed_origin: zcu_locality_t, + query_selector: z_keyexpr_t, + query_target: z_query_target_t, + query_consolidation: z_query_consolidation_t, + query_accept_replies: zcu_reply_keyexpr_t, + query_timeout_ms: u64, +} + +/// Constructs the default value for :c:type:`ze_querying_subscriber_options_t`. +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_options_default() -> ze_querying_subscriber_options_t { + ze_querying_subscriber_options_t { + reliability: SubInfo::default().reliability.into(), + allowed_origin: zcu_locality_default(), + query_selector: z_keyexpr_t::null(), + query_target: z_query_target_default(), + query_consolidation: z_query_consolidation_none(), + query_accept_replies: zcu_reply_keyexpr_default(), + query_timeout_ms: 0, + } +} + +/// Declares a Querying Subscriber for a given key expression. +/// +/// Parameters: +/// z_session_t session: The zenoh session. +/// z_keyexpr_t keyexpr: The key expression to subscribe. +/// z_owned_closure_sample_t callback: The callback function that will be called each time a data matching the subscribed expression is received. +/// ze_querying_subscriber_options_t options: Additional options for the querying subscriber. +/// +/// Returns: +/// :c:type:`ze_owned_subscriber_t`. +/// +/// To check if the subscription succeeded and if the querying subscriber is still valid, +/// you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid. +/// +/// Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved. +/// To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument. +/// After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid. +/// +/// Example: +/// Declaring a subscriber passing ``NULL`` for the options: +/// +/// .. code-block:: C +/// +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL); +/// +/// is equivalent to initializing and passing the default subscriber options: +/// +/// .. code-block:: C +/// +/// z_subscriber_options_t opts = z_subscriber_options_default(); +/// ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts); +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ze_declare_querying_subscriber( + session: z_session_t, + keyexpr: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + options: Option<&ze_querying_subscriber_options_t>, +) -> ze_owned_querying_subscriber_t { + let mut closure = z_owned_closure_sample_t::empty(); + std::mem::swap(callback, &mut closure); + + match session.upgrade() { + Some(s) => { + let mut sub = s.declare_subscriber(keyexpr).querying(); + if let Some(options) = options { + sub = sub + .reliability(options.reliability.into()) + .allowed_origin(options.allowed_origin.into()) + .query_target(options.query_target.into()) + .query_consolidation(options.query_consolidation) + .query_accept_replies(options.query_accept_replies.into()); + if options.query_selector.is_some() { + let query_selector = options + .query_selector + .as_ref() + .map(|s| s.clone().into_owned()); + if let Some(query_selector) = query_selector { + sub = sub.query_selector(query_selector) + } + } + if options.query_timeout_ms != 0 { + sub = sub + .query_timeout(std::time::Duration::from_millis(options.query_timeout_ms)); + } + } + match sub + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&closure, &sample) + }) + .res() + { + Ok(sub) => ze_owned_querying_subscriber_t::new(sub), + Err(e) => { + log::debug!("{}", e); + ze_owned_querying_subscriber_t::null() + } + } + } + None => { + log::debug!("{}", LOG_INVALID_SESSION); + ze_owned_querying_subscriber_t::null() + } + } +} + +/// Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 { + if let Some(s) = sub.as_mut().take() { + if let Err(e) = s.close().res_sync() { + log::warn!("{}", e); + return e.errno().get(); + } + } + 0 +} + +/// Returns ``true`` if `sub` is valid. +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub extern "C" fn ze_querying_subscriber_check(sub: &ze_owned_querying_subscriber_t) -> bool { + sub.as_ref().is_some() +} diff --git a/tests/z_api_constants.c b/tests/z_api_constants.c index bddb4c488..2c9a5e6fc 100644 --- a/tests/z_api_constants.c +++ b/tests/z_api_constants.c @@ -49,5 +49,5 @@ int main(int argc, char **argv) { printf("Z_CONFIG_SCOUTING_DELAY_KEY: %s\n", Z_CONFIG_SCOUTING_DELAY_KEY); assert(strcmp(Z_CONFIG_SCOUTING_DELAY_KEY, "scouting/delay") == 0); printf("Z_CONFIG_ADD_TIMESTAMP_KEY: %s\n", Z_CONFIG_ADD_TIMESTAMP_KEY); - assert(strcmp(Z_CONFIG_ADD_TIMESTAMP_KEY, "add_timestamp") == 0); -} \ No newline at end of file + assert(strcmp(Z_CONFIG_ADD_TIMESTAMP_KEY, "timestamping/enabled") == 0); +}