From 18b38937dfb1e53944e5482a7aa42686e1fd2b5b Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Sat, 26 Oct 2024 00:35:20 +0200 Subject: [PATCH] Liveliness subscriber implementation --- examples/unix/c11/z_sub_liveliness.c | 139 ++++++++++++++++++++++ include/zenoh-pico/api/liveliness.h | 28 ++--- include/zenoh-pico/net/primitives.h | 31 +++++ include/zenoh-pico/session/subscription.h | 23 +++- src/api/api.c | 16 +-- src/api/liveliness.c | 41 ++++--- src/net/primitives.c | 46 +++++++ src/session/push.c | 11 +- src/session/rx.c | 18 +-- src/session/subscription.c | 49 ++++++-- 10 files changed, 335 insertions(+), 67 deletions(-) create mode 100644 examples/unix/c11/z_sub_liveliness.c diff --git a/examples/unix/c11/z_sub_liveliness.c b/examples/unix/c11/z_sub_liveliness.c new file mode 100644 index 000000000..befcdce0d --- /dev/null +++ b/examples/unix/c11/z_sub_liveliness.c @@ -0,0 +1,139 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_LIVELINESS == 1 + +static int msg_nb = 0; + +void data_handler(z_loaned_sample_t *sample, void *ctx) { + (void)(ctx); + z_view_string_t key_string; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string); + switch (z_sample_kind(sample)) { + case Z_SAMPLE_KIND_PUT: + printf(">> [LivelinessSubscriber] New alive token ('%.*s')\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string))); + break; + case Z_SAMPLE_KIND_DELETE: + printf(">> [LivelinessSubscriber] Dropped token ('%.*s')\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string))); + break; + } +} + +int main(int argc, char **argv) { + const char *keyexpr = "group1/**"; + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + int n = 0; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_session_drop(z_session_move(&s)); + return -1; + } + + printf("Declaring liveliness subscriber on '%s'...\n", keyexpr); + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler, NULL, NULL); + z_owned_subscriber_t sub; + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, keyexpr); + if (z_liveliness_declare_subscriber(z_loan(s), &sub, z_loan(ke), z_move(callback), NULL) < 0) { + printf("Unable to declare liveliness subscriber.\n"); + exit(-1); + } + + printf("Press CTRL-C to quit...\n"); + while (1) { + z_sleep_s(1); + } + printf("Press CTRL-C to quit...\n"); + while (1) { + if ((n != 0) && (msg_nb >= n)) { + break; + } + sleep(1); + } + // Clean up + z_drop(z_move(sub)); + z_drop(z_move(s)); + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION and Z_FEATURE_LIVELINESS but this example " + "requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico/api/liveliness.h b/include/zenoh-pico/api/liveliness.h index 13ac11e88..173dbec87 100644 --- a/include/zenoh-pico/api/liveliness.h +++ b/include/zenoh-pico/api/liveliness.h @@ -55,19 +55,19 @@ typedef struct z_liveliness_get_options_t { z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options); /** - * Declares a subscriber on liveliness tokens that intersect `key_expr`. + * Declares a subscriber on liveliness tokens that intersect `keyexpr`. * * @param token: An uninitialized memory location where subscriber will be constructed. - * @param session: The Zenoh session. - * @param key_expr: The key expression to subscribe to. + * @param zs: The Zenoh session. + * @param keyexpr: The key expression to subscribe to. * @param callback: The callback function that will be called each time a liveliness token status is changed. * @param _options: The options to be passed to the liveliness subscriber declaration. * * @return 0 in case of success, negative error values otherwise. */ -z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *session, z_owned_subscriber_t *token, - const z_loaned_keyexpr_t *key_expr, z_moved_closure_sample_t *callback, - z_liveliness_subscriber_options_t *_options); +z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub, + const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, + z_liveliness_subscriber_options_t *options); /** * Constructs default value for `z_liveliness_declaration_options_t`. */ @@ -80,12 +80,12 @@ z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_opt * is achieved, and a DELETE sample if it's lost. * * @param token: An uninitialized memory location where liveliness token will be constructed. - * @param session: A Zenos session to declare the liveliness token. - * @param key_expr: A keyexpr to declare a liveliess token for. + * @param zs: A Zenos session to declare the liveliness token. + * @param keyexpr: A keyexpr to declare a liveliess token for. * @param _options: Liveliness token declaration properties. */ -z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned_liveliness_token_t *token, - const z_loaned_keyexpr_t *key_expr, +z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token, + const z_loaned_keyexpr_t *keyexpr, const z_liveliness_declaration_options_t *_options); /** @@ -94,14 +94,14 @@ z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options); /** - * Queries liveliness tokens currently on the network with a key expression intersecting with `key_expr`. + * Queries liveliness tokens currently on the network with a key expression intersecting with `keyexpr`. * - * @param session: The Zenoh session. - * @param key_expr: The key expression to query liveliness tokens for. + * @param zs: The Zenoh session. + * @param keyexpr: The key expression to query liveliness tokens for. * @param callback: The callback function that will be called for each received reply. * @param options: Additional options for the liveliness get operation. */ -z_result_t z_liveliness_get(const z_loaned_session_t *session, const z_loaned_keyexpr_t *key_expr, +z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options); /** diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 95b47078b..a2c481d76 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -255,6 +255,37 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete bool is_express); #endif +#if Z_FEATURE_LIVELINESS == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 +/** + * Declare a :c:type:`_z_subscriber_t` for the given liveliness key. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value. + * callback: The callback function that will be called each time a matching liveliness token changed. + * arg: A pointer that will be passed to the **callback** on each call. + * + * Returns: + * The created :c:type:`_z_subscriber_t` (in null state if the declaration failed). + */ +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg); + +/** + * Undeclare a liveliness :c:type:`_z_subscriber_t`. + * + * Parameters: + * sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the + * subscriber upon successful return. + * Returns: + * 0 if success, or a negative value identifying the error. + */ +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub); +#endif // Z_FEATURE_SUBSCRIPTION == 1 +#endif // Z_FEATURE_LIVELINESS == 1 + #if Z_FEATURE_INTEREST == 1 uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, void *arg); diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index 595b65999..59c942554 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -23,9 +23,18 @@ extern "C" { #endif /*------------------ Subscription ------------------*/ -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment, z_reliability_t reliability); +z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, + _z_encoding_t *encoding, const _z_timestamp_t *timestamp, const _z_n_qos_t qos, + const _z_bytes_t attachment, z_reliability_t reliability); + +z_result_t _z_trigger_subscriptions_del(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability); + +z_result_t _z_trigger_liveliness_subscriptions_declare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp); + +z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp); #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id); @@ -33,9 +42,11 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, _z_subs const _z_keyexpr_t *keyexpr); _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_t *sub); -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability); +z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t subscriber_kind, + const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, + const _z_zint_t sample_kind, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability); void _z_unregister_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); #endif diff --git a/src/api/api.c b/src/api/api.c index 1589b7463..4d20a71df 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -828,12 +828,12 @@ z_result_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); - // Trigger local subscriptions - _z_trigger_local_subscriptions( + // Trigger subscriptions + _z_trigger_subscriptions_put( _Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this), - opt.encoding == NULL ? NULL : &opt.encoding->_this._val, + opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.timestamp, _z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); // Clean-up z_encoding_drop(opt.encoding); z_bytes_drop(opt.attachment); @@ -961,11 +961,11 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); } - // Trigger local subscriptions - _z_trigger_local_subscriptions( - _Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, + // Trigger subscriptions + _z_trigger_subscriptions_put( + _Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, opt.timestamp, _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); _z_session_rc_drop(&sess_rc); } else { diff --git a/src/api/liveliness.c b/src/api/liveliness.c index a3236afff..cdc1f08f7 100644 --- a/src/api/liveliness.c +++ b/src/api/liveliness.c @@ -14,6 +14,8 @@ #include "zenoh-pico/api/liveliness.h" +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/net/primitives.h" #include "zenoh-pico/utils/result.h" _Bool _z_liveliness_token_check(const _z_liveliness_token_t *token) { @@ -54,16 +56,27 @@ z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_optio return _Z_RES_OK; } -z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *session, z_owned_subscriber_t *token, - const z_loaned_keyexpr_t *key_expr, z_moved_closure_sample_t *callback, - z_liveliness_subscriber_options_t *_options) { - // TODO(sashacmc): Implement - (void)token; - (void)session; - (void)key_expr; - (void)callback; - (void)_options; - return _Z_RES_OK; +z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub, + const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, + z_liveliness_subscriber_options_t *options) { + _ZP_UNUSED(options); + void *ctx = callback->_this._val.context; + callback->_this._val.context = NULL; + + _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); + _z_keyexpr_t key = _z_keyexpr_alias(keyexpr_aliased); + + _z_subscriber_t int_sub = + _z_declare_liveliness_subscriber(zs, key, callback->_this._val.call, callback->_this._val.drop, ctx); + + z_internal_closure_sample_null(&callback->_this); + sub->_val = int_sub; + + if (!_z_subscriber_check(&sub->_val)) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } else { + return _Z_RES_OK; + } } z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_options_t *options) { @@ -71,12 +84,12 @@ z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_opt return _Z_RES_OK; } -z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned_liveliness_token_t *token, +z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token, const z_loaned_keyexpr_t *key_expr, const z_liveliness_declaration_options_t *_options) { // TODO(sashacmc): Implement (void)token; - (void)session; + (void)zs; (void)key_expr; (void)_options; return _Z_RES_OK; @@ -87,10 +100,10 @@ z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options) return _Z_RES_OK; } -z_result_t z_liveliness_get(const z_loaned_session_t *session, const z_loaned_keyexpr_t *key_expr, +z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *key_expr, z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options) { // TODO(sashacmc): Implement - (void)session; + (void)zs; (void)key_expr; (void)callback; (void)options; diff --git a/src/net/primitives.c b/src/net/primitives.c index 917d2bc35..fc804e205 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -491,6 +491,52 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete } #endif +#if Z_FEATURE_LIVELINESS == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 +/*------------------ Subscriber Declaration ------------------*/ +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg) { + _z_subscription_t s; + s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + s._key_id = keyexpr._id; + s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); + s._callback = callback; + s._dropper = dropper; + s._arg = arg; + + _z_subscriber_t ret = _z_subscriber_null(); + // Register subscription, stored at session-level, do not drop it by the end of this function. + _z_subscription_rc_t *sp_s = + _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, &s); + if (sp_s == NULL) { + _z_subscriber_clear(&ret); + return ret; + } + + // Fill subscriber + ret._entity_id = s._id; + ret._zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) { + if (sub == NULL || _Z_RC_IS_NULL(&sub->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_subscription_rc_t *s = + _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sub->_entity_id); + if (s == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, s); + return _Z_RES_OK; +} +#endif // Z_FEATURE_SUBSCRIPTION == 1 +#endif // Z_FEATURE_LIVELINESS == 1 + #if Z_FEATURE_INTEREST == 1 /*------------------ Interest Declaration ------------------*/ uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, diff --git a/src/session/push.c b/src/session/push.c index 6b45de25b..6f0a6c371 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -27,17 +27,14 @@ z_result_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliabilit // TODO check body to know where to dispatch - size_t kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; if (push->_body._is_put) { _z_msg_put_t *put = &push->_body._body._put; - ret = _z_trigger_subscriptions(zn, push->_key, put->_payload, &put->_encoding, kind, &put->_commons._timestamp, - push->_qos, put->_attachment, reliability); + ret = _z_trigger_subscriptions_put(zn, push->_key, put->_payload, &put->_encoding, &put->_commons._timestamp, + push->_qos, put->_attachment, reliability); } else { - _z_encoding_t encoding = _z_encoding_null(); - _z_bytes_t payload = _z_bytes_null(); _z_msg_del_t *del = &push->_body._body._del; - ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, &del->_commons._timestamp, push->_qos, - del->_attachment, reliability); + ret = _z_trigger_subscriptions_del(zn, push->_key, &del->_commons._timestamp, push->_qos, del->_attachment, + reliability); } return ret; } diff --git a/src/session/rx.c b/src/session/rx.c index 8a73f7010..6f3940a7f 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -61,6 +61,8 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * } break; case _Z_DECL_TOKEN: { _z_interest_process_declares(zn, &decl->_decl); + _z_trigger_liveliness_subscriptions_declare(zn, decl->_decl._body._decl_token._keyexpr, + &decl->_ext_timestamp); } break; case _Z_UNDECL_SUBSCRIBER: { _z_interest_process_undeclares(zn, &decl->_decl); @@ -70,6 +72,10 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * } break; case _Z_UNDECL_TOKEN: { _z_interest_process_undeclares(zn, &decl->_decl); + // TODO(sashacmc): _ext_keyexpr for some reason comes empty + // Decode problem? Interest request problem? Protocol problem? Check zenoh + _z_trigger_liveliness_subscriptions_undeclare(zn, decl->_decl._body._undecl_token._ext_keyexpr, + &decl->_ext_timestamp); } break; case _Z_DECL_FINAL: { // Check that interest id is valid @@ -101,9 +107,9 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * case _Z_REQUEST_PUT: { #if Z_FEATURE_SUBSCRIPTION == 1 _z_msg_put_t put = req->_body._put; - ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT, - &put._commons._timestamp, req->_ext_qos, put._attachment, - msg->_reliability); + ret = _z_trigger_subscriptions_put(zn, req->_key, put._payload, &put._encoding, + &put._commons._timestamp, req->_ext_qos, put._attachment, + msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); @@ -113,10 +119,8 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * case _Z_REQUEST_DEL: { #if Z_FEATURE_SUBSCRIPTION == 1 _z_msg_del_t del = req->_body._del; - _z_encoding_t encoding = _z_encoding_null(); - ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE, - &del._commons._timestamp, req->_ext_qos, del._attachment, - msg->_reliability); + ret = _z_trigger_subscriptions_del(zn, req->_key, &del._commons._timestamp, req->_ext_qos, + del._attachment, msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); diff --git a/src/session/subscription.c b/src/session/subscription.c index 7acd1ee83..07b97d88c 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -17,6 +17,7 @@ #include #include +#include "zenoh-pico/api/constants.h" #include "zenoh-pico/api/types.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/sample.h" @@ -140,17 +141,42 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, _z_subscriber_k return ret; } -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment, z_reliability_t reliability) { - z_result_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, - attachment, reliability); - (void)ret; +z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, + _z_encoding_t *encoding, const _z_timestamp_t *timestamp, const _z_n_qos_t qos, + const _z_bytes_t attachment, z_reliability_t reliability) { + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_SUBSCRIBER, keyexpr, payload, encoding, + Z_SAMPLE_KIND_PUT, timestamp, qos, attachment, reliability); } -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability) { +z_result_t _z_trigger_subscriptions_del(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_SUBSCRIBER, keyexpr, _z_bytes_null(), &encoding, + Z_SAMPLE_KIND_DELETE, timestamp, qos, attachment, reliability); +} + +z_result_t _z_trigger_liveliness_subscriptions_declare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, keyexpr, _z_bytes_null(), + &encoding, Z_SAMPLE_KIND_PUT, timestamp, _Z_N_QOS_DEFAULT, _z_bytes_null(), + Z_RELIABILITY_RELIABLE); +} + +z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, keyexpr, _z_bytes_null(), + &encoding, Z_SAMPLE_KIND_DELETE, timestamp, _Z_N_QOS_DEFAULT, _z_bytes_null(), + Z_RELIABILITY_RELIABLE); +} + +z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t subscriber_kind, + const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, + const _z_zint_t sample_kind, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability) { z_result_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -160,12 +186,13 @@ z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); _Z_DEBUG("Triggering subs for %d - %.*s", key._id, (int)_z_string_len(&key._suffix), _z_string_data(&key._suffix)); if (_z_keyexpr_has_suffix(&key)) { - _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_SUBSCRIBER_KIND_SUBSCRIBER, &key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, subscriber_kind, &key); _zp_session_unlock_mutex(zn); // Build the sample - _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment, reliability); + _z_sample_t sample = + _z_sample_create(&key, payload, timestamp, encoding, sample_kind, qos, attachment, reliability); // Parse subscription list _z_subscription_rc_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs));