diff --git a/include/zenoh-pico/api/liveliness.h b/include/zenoh-pico/api/liveliness.h index 89848eb96..a4d6c8c9e 100644 --- a/include/zenoh-pico/api/liveliness.h +++ b/include/zenoh-pico/api/liveliness.h @@ -21,6 +21,10 @@ #include "zenoh-pico/api/types.h" #include "zenoh-pico/protocol/core.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { uint32_t _id; _z_keyexpr_t _key; @@ -143,4 +147,8 @@ z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr #endif // Z_FEATURE_LIVELINESS == 1 +#ifdef __cplusplus +} +#endif + #endif // INCLUDE_ZENOH_PICO_API_LIVELINESS_H diff --git a/include/zenoh-pico/net/liveliness.h b/include/zenoh-pico/net/liveliness.h new file mode 100644 index 000000000..4cf9d03e4 --- /dev/null +++ b/include/zenoh-pico/net/liveliness.h @@ -0,0 +1,83 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#ifndef INCLUDE_ZENOH_PICO_NET_LIVELINESS_H +#define INCLUDE_ZENOH_PICO_NET_LIVELINESS_H + +#include "zenoh-pico/api/liveliness.h" +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/net/subscribe.h" +#include "zenoh-pico/protocol/core.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#if Z_FEATURE_LIVELINESS == 1 + +z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, + _z_keyexpr_t keyexpr); +z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token); + +#if Z_FEATURE_SUBSCRIPTION == 1 +/** + * Declare a :c:type:`_z_subscriber_t` for the given liveliness key. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value. + * callback: The callback function that will be called each time a matching liveliness token changed. + * arg: A pointer that will be passed to the **callback** on each call. + * + * Returns: + * The created :c:type:`_z_subscriber_t` (in null state if the declaration failed). + */ +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg); + +/** + * Undeclare a liveliness :c:type:`_z_subscriber_t`. + * + * Parameters: + * sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the + * subscriber upon successful return. + * Returns: + * 0 if success, or a negative value identifying the error. + */ +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub); +#endif // Z_FEATURE_SUBSCRIPTION == 1 + +#if Z_FEATURE_QUERY == 1 +/** + * Query liveliness token state. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to liveliness token. + * callback: The callback function that will be called on reception of replies for this query. + * dropper: The callback function that will be called on upon completion of the callback. + * arg: A pointer that will be passed to the **callback** on each call. + * timeout_ms: The timeout value of this query. + */ +z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, + _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms); +#endif // Z_FEATURE_QUERY == 1 + +#endif // Z_FEATURE_LIVELINESS == 1 + +#ifdef __cplusplus +} +#endif + +#endif /* INCLUDE_ZENOH_PICO_NET_LIVELINESS_H */ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 42393beaa..a2a2512d7 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -17,7 +17,6 @@ #include #include "zenoh-pico/api/constants.h" -#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/net/encoding.h" #include "zenoh-pico/net/publish.h" @@ -25,7 +24,6 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/net/subscribe.h" #include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/utils/config.h" #ifdef __cplusplus extern "C" { @@ -256,59 +254,6 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete bool is_express); #endif -#if Z_FEATURE_LIVELINESS == 1 - -#if Z_FEATURE_SUBSCRIPTION == 1 -/** - * Declare a :c:type:`_z_subscriber_t` for the given liveliness key. - * - * Parameters: - * zn: The zenoh-net session. The caller keeps its ownership. - * keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value. - * callback: The callback function that will be called each time a matching liveliness token changed. - * arg: A pointer that will be passed to the **callback** on each call. - * - * Returns: - * The created :c:type:`_z_subscriber_t` (in null state if the declaration failed). - */ -_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, - _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, - void *arg); - -/** - * Undeclare a liveliness :c:type:`_z_subscriber_t`. - * - * Parameters: - * sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the - * subscriber upon successful return. - * Returns: - * 0 if success, or a negative value identifying the error. - */ -z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub); -#endif // Z_FEATURE_SUBSCRIPTION == 1 - -#if Z_FEATURE_QUERY == 1 -/** - * Query liveliness token state. - * - * Parameters: - * zn: The zenoh-net session. The caller keeps its ownership. - * keyexpr: The resource key to liveliness token. - * callback: The callback function that will be called on reception of replies for this query. - * dropper: The callback function that will be called on upon completion of the callback. - * arg: A pointer that will be passed to the **callback** on each call. - * timeout_ms: The timeout value of this query. - */ -z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, - _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms); -#endif // Z_FEATURE_QUERY == 1 - -z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, - _z_keyexpr_t keyexpr); -z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token); - -#endif // Z_FEATURE_LIVELINESS == 1 - #if Z_FEATURE_INTEREST == 1 uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, void *arg); diff --git a/include/zenoh-pico/session/liveliness.h b/include/zenoh-pico/session/liveliness.h index da0f78667..55bbeb2b6 100644 --- a/include/zenoh-pico/session/liveliness.h +++ b/include/zenoh-pico/session/liveliness.h @@ -17,6 +17,10 @@ #include "zenoh-pico/session/session.h" +#ifdef __cplusplus +extern "C" { +#endif + #if Z_FEATURE_LIVELINESS == 1 typedef struct _z_session_t _z_session_t; @@ -59,4 +63,8 @@ void _z_liveliness_init(_z_session_t *zn); void _z_liveliness_clear(_z_session_t *zn); #endif +#ifdef __cplusplus +} +#endif + #endif /* ZENOH_PICO_SESSION_LIVELINESS_H */ diff --git a/src/api/liveliness.c b/src/api/liveliness.c index 6f29bc543..cef81a3ef 100644 --- a/src/api/liveliness.c +++ b/src/api/liveliness.c @@ -15,6 +15,7 @@ #include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/net/liveliness.h" #include "zenoh-pico/net/primitives.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/keyexpr.h" diff --git a/src/net/liveliness.c b/src/net/liveliness.c new file mode 100644 index 000000000..2567c44ae --- /dev/null +++ b/src/net/liveliness.c @@ -0,0 +1,164 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/api/liveliness.h" + +#include "zenoh-pico/protocol/definitions/network.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/session.h" +#include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/result.h" + +#if Z_FEATURE_LIVELINESS == 1 + +/**************** Liveliness Token ****************/ + +z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, + _z_keyexpr_t keyexpr) { + z_result_t ret; + + uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + _z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); + + _z_declaration_t declaration = _z_make_decl_token(&key, id); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); + ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + _z_n_msg_clear(&n_msg); + + _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key); + + ret_token->_id = id; + _z_keyexpr_move(&ret_token->_key, &key); + ret_token->_zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) { + if (token == NULL || _Z_RC_IS_NULL(&token->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + z_result_t ret; + + _z_liveliness_unregister_token(_Z_RC_IN_VAL(&token->_zn), token->_id); + + _z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); + ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + _z_n_msg_clear(&n_msg); + + return ret; +} + +/**************** Liveliness Subscriber ****************/ + +#if Z_FEATURE_SUBSCRIPTION == 1 +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg) { + _z_subscription_t s; + s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + s._key_id = keyexpr._id; + s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); + s._callback = callback; + s._dropper = dropper; + s._arg = arg; + + _z_subscriber_t ret = _z_subscriber_null(); + // Register subscription, stored at session-level, do not drop it by the end of this function. + _z_subscription_rc_t *sp_s = + _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, &s); + if (sp_s == NULL) { + _z_subscriber_clear(&ret); + return ret; + } + // Build the declare message to send on the wire + _z_interest_t interest = + _z_make_interest(&keyexpr, s._id, + _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED | + _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE); + + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); + if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s); + _z_subscriber_clear(&ret); + return ret; + } + _z_n_msg_clear(&n_msg); + + ret._entity_id = s._id; + ret._zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) { + if (sub == NULL || _Z_RC_IS_NULL(&sub->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_subscription_rc_t *s = + _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sub->_entity_id); + if (s == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, s); + return _Z_RES_OK; +} +#endif // Z_FEATURE_SUBSCRIPTION == 1 + +/**************** Liveliness Query ****************/ + +#if Z_FEATURE_QUERY == 1 +z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, + _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms) { + z_result_t ret = _Z_RES_OK; + + // Create the pending liveliness query object + _z_liveliness_pending_query_t *pq = + (_z_liveliness_pending_query_t *)z_malloc(sizeof(_z_liveliness_pending_query_t)); + if (pq != NULL) { + uint32_t id = _z_liveliness_get_query_id(zn); + pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); + pq->_callback = callback; + pq->_dropper = dropper; + pq->_arg = arg; + + ret = _z_liveliness_register_pending_query(zn, id, pq); + if (ret == _Z_RES_OK) { + _ZP_UNUSED(timeout_ms); // Current interest in pico don't support timeout + + _z_interest_t interest = _z_make_interest(&keyexpr, id, + _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | + _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT); + + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_liveliness_unregister_pending_query(zn, id); + ret = _Z_ERR_TRANSPORT_TX_FAILED; + } + + _z_n_msg_clear(&n_msg); + + } else { + _z_liveliness_pending_query_clear(pq); + } + } + + return ret; +} +#endif // Z_FEATURE_QUERY == 1 + +#endif // Z_FEATURE_LIVELINESS == 1 diff --git a/src/net/primitives.c b/src/net/primitives.c index db7b179fc..696b4c106 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -18,7 +18,6 @@ #include #include "zenoh-pico/api/constants.h" -#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" @@ -492,141 +491,6 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete } #endif -#if Z_FEATURE_LIVELINESS == 1 -#if Z_FEATURE_SUBSCRIPTION == 1 -/*------------------ Liveliness Subscriber Declaration ------------------*/ -_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, - _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, - void *arg) { - _z_subscription_t s; - s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); - s._key_id = keyexpr._id; - s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); - s._callback = callback; - s._dropper = dropper; - s._arg = arg; - - _z_subscriber_t ret = _z_subscriber_null(); - // Register subscription, stored at session-level, do not drop it by the end of this function. - _z_subscription_rc_t *sp_s = - _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, &s); - if (sp_s == NULL) { - _z_subscriber_clear(&ret); - return ret; - } - // Build the declare message to send on the wire - _z_interest_t interest = - _z_make_interest(&keyexpr, s._id, - _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED | - _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE); - - _z_network_message_t n_msg = _z_n_msg_make_interest(interest); - if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s); - _z_subscriber_clear(&ret); - return ret; - } - _z_n_msg_clear(&n_msg); - // Fill subscriber - ret._entity_id = s._id; - ret._zn = _z_session_rc_clone_as_weak(zn); - return ret; -} - -z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) { - if (sub == NULL || _Z_RC_IS_NULL(&sub->_zn)) { - return _Z_ERR_ENTITY_UNKNOWN; - } - - _z_subscription_rc_t *s = - _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sub->_entity_id); - if (s == NULL) { - return _Z_ERR_ENTITY_UNKNOWN; - } - - _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, s); - return _Z_RES_OK; -} -#endif // Z_FEATURE_SUBSCRIPTION == 1 -#if Z_FEATURE_QUERY == 1 -/*------------------ Query ------------------*/ -z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, - _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms) { - z_result_t ret = _Z_RES_OK; - - // Create the pending liveliness query object - _z_liveliness_pending_query_t *pq = - (_z_liveliness_pending_query_t *)z_malloc(sizeof(_z_liveliness_pending_query_t)); - if (pq != NULL) { - uint32_t id = _z_liveliness_get_query_id(zn); - pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); - pq->_callback = callback; - pq->_dropper = dropper; - pq->_arg = arg; - - ret = _z_liveliness_register_pending_query(zn, id, pq); - if (ret == _Z_RES_OK) { - _ZP_UNUSED(timeout_ms); // Current interest in pico don't support timeout - - _z_interest_t interest = _z_make_interest(&keyexpr, id, - _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | - _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT); - - _z_network_message_t n_msg = _z_n_msg_make_interest(interest); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _z_liveliness_unregister_pending_query(zn, id); - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - - _z_n_msg_clear(&n_msg); - - } else { - _z_liveliness_pending_query_clear(pq); - } - } - - return ret; -} -#endif // Z_FEATURE_QUERY == 1 -z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, - _z_keyexpr_t keyexpr) { - z_result_t ret; - - uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); - _z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); - - _z_declaration_t declaration = _z_make_decl_token(&key, id); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); - _z_n_msg_clear(&n_msg); - - _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key); - - ret_token->_id = id; - // TODO(sashacmc): clenaup? Maybe store in to list only? - _z_keyexpr_move(&ret_token->_key, &key); - ret_token->_zn = _z_session_rc_clone_as_weak(zn); - return ret; -} - -z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) { - if (token == NULL || _Z_RC_IS_NULL(&token->_zn)) { - return _Z_ERR_ENTITY_UNKNOWN; - } - - z_result_t ret; - - _z_liveliness_unregister_token(_Z_RC_IN_VAL(&token->_zn), token->_id); - - _z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); - ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); - _z_n_msg_clear(&n_msg); - - return ret; -} -#endif // Z_FEATURE_LIVELINESS == 1 - #if Z_FEATURE_INTEREST == 1 /*------------------ Interest Declaration ------------------*/ uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,