diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 822bbf094..8de397141 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -65,6 +65,7 @@ typedef struct _z_session_t { // Session interests #if Z_FEATURE_INTEREST == 1 _z_session_interest_rc_list_t *_local_interests; + _z_declare_data_list_t *_remote_declares; #endif } _z_session_t; diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 4e0bae713..b40cb3fc4 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -48,8 +48,8 @@ typedef struct { _z_keyexpr_t _keyexpr; uint32_t _id; struct { - uint8_t _complete; - uint32_t _distance; + _Bool _complete; + uint16_t _distance; } _ext_queryable_info; } _z_decl_queryable_t; _z_decl_queryable_t _z_decl_queryable_null(void); @@ -95,6 +95,8 @@ typedef struct { } _z_undecl_interest_t; _z_undecl_interest_t _z_undecl_interest_null(void); +#define _Z_FLAG_INTEREST_ID (1 << 5) + typedef struct { enum { _Z_DECL_KEXPR, @@ -132,7 +134,7 @@ _z_declaration_t _z_make_undecl_keyexpr(uint16_t id); _z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable); _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); -_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete); +_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete); _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id); diff --git a/include/zenoh-pico/session/interest.h b/include/zenoh-pico/session/interest.h index 116ac9385..a4eeb43ba 100644 --- a/include/zenoh-pico/session/interest.h +++ b/include/zenoh-pico/session/interest.h @@ -23,10 +23,11 @@ _z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id); _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr); void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr); -void _z_flush_interest(_z_session_t *zn); #endif // Z_FEATURE_INTEREST == 1 +void _z_flush_interest(_z_session_t *zn); int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl); +int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl); int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id); int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id); int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags); diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 754f97ce0..5f2980d86 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -190,12 +190,12 @@ int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size); typedef enum { _Z_INTEREST_MSG_TYPE_FINAL = 0, - _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER, - _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE, - _Z_INTEREST_MSG_TYPE_DECL_TOKEN, - _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER, - _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE, - _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN, + _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER = 1, + _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE = 2, + _Z_INTEREST_MSG_TYPE_DECL_TOKEN = 3, + _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER = 4, + _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE = 5, + _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN = 6, } _z_interest_msg_type_t; typedef struct _z_interest_msg_t { @@ -225,4 +225,20 @@ _Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _ _z_noop_copy) _Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t) +typedef enum { + _Z_DECLARE_TYPE_SUBSCRIBER = 0, + _Z_DECLARE_TYPE_QUERYABLE = 1, + _Z_DECLARE_TYPE_TOKEN = 2, +} _z_declare_type_t; + +typedef struct { + _z_keyexpr_t _key; + uint32_t _id; + uint8_t _type; +} _z_declare_data_t; + +void _z_declare_data_clear(_z_declare_data_t *data); +_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy) +_Z_LIST_DEFINE(_z_declare_data, _z_declare_data_t) + #endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */ diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index d9d86d684..bb0806a6a 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -107,12 +107,16 @@ int8_t _z_undecl_subscriber_encode(_z_wbuf_t *wbf, const _z_undecl_subscriber_t } int8_t _z_decl_queryable_encode(_z_wbuf_t *wbf, const _z_decl_queryable_t *decl) { uint8_t header = _Z_DECL_QUERYABLE_MID; - _Bool has_info_ext = (decl->_ext_queryable_info._complete != 0) || (decl->_ext_queryable_info._distance != 0); + _Bool has_info_ext = decl->_ext_queryable_info._complete || (decl->_ext_queryable_info._distance != 0); _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, has_info_ext, decl->_id, decl->_keyexpr)); if (has_info_ext) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZINT | 0x01)); - _Z_RETURN_IF_ERR(_z_zint64_encode( - wbf, ((uint64_t)decl->_ext_queryable_info._distance << 8) | decl->_ext_queryable_info._complete)); + uint8_t flags = 0; + if (decl->_ext_queryable_info._complete) { + flags |= 0x01; + } + uint64_t value = (uint64_t)flags | (uint64_t)decl->_ext_queryable_info._distance << 8; + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, value)); } return _Z_RES_OK; } @@ -328,8 +332,8 @@ int8_t _z_decl_queryable_decode_extensions(_z_msg_ext_t *extension, void *ctx) { switch (extension->_header) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { uint64_t val = extension->_body._zint._val; - decl->_ext_queryable_info._complete = val & 0xff; - decl->_ext_queryable_info._distance = (uint32_t)(val >> 8); + decl->_ext_queryable_info._complete = _Z_HAS_FLAG(val, 0x01); + decl->_ext_queryable_info._distance = (uint16_t)(val >> 8); } break; default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { @@ -423,9 +427,13 @@ int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uin return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header); } int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf) { - int8_t ret; uint8_t header; _Z_RETURN_IF_ERR(_z_uint8_decode(&header, zbf)); + if (_Z_HAS_FLAG(header, _Z_FLAG_INTEREST_ID)) { + return _Z_ERR_MESSAGE_FLAG_UNEXPECTED; + } + + int8_t ret; switch (_Z_MID(header)) { case _Z_DECL_KEXPR_MID: { decl->_tag = _Z_DECL_KEXPR; diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index 709899619..69dc52560 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -83,7 +83,7 @@ _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_key ._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } -_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete) { +_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete) { return (_z_declaration_t){ ._tag = _Z_DECL_QUERYABLE, ._body = {._decl_queryable = {._id = id, diff --git a/src/session/interest.c b/src/session/interest.c index 4003f7bc6..499a917d6 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -29,6 +29,12 @@ #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_INTEREST == 1 +void _z_declare_data_clear(_z_declare_data_t *data) { _z_keyexpr_clear(&data->_key); } + +_Bool _z_declare_data_eq(const _z_declare_data_t *left, const _z_declare_data_t *right) { + return ((left->_id == right->_id) && (left->_type == right->_type)); +} + _Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two) { return one->_id == two->_id; } @@ -58,7 +64,7 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); if ((intr->in->val._flags & flags) != 0) { if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix, - strlen(key._suffix)) == true) { + strlen(key._suffix))) { ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr)); } } @@ -120,8 +126,7 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) { // Build the declare message to send on the wire _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); _z_declaration_t declaration = - _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE, - sub->in->val._info.mode == Z_SUBMODE_PULL); + _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; @@ -199,46 +204,126 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte return ret; } +static int8_t _unsafe_z_register_declare(_z_session_t *zn, const _z_keyexpr_t *key, uint32_t id, uint8_t type) { + _z_declare_data_t *decl = NULL; + decl = (_z_declare_data_t *)zp_malloc(sizeof(_z_declare_data_t)); + if (decl == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + _z_keyexpr_copy(&decl->_key, key); + decl->_id = id; + decl->_type = type; + zn->_remote_declares = _z_declare_data_list_push(zn->_remote_declares, decl); + return _Z_RES_OK; +} + +static _z_keyexpr_t _unsafe_z_get_key_from_declare(_z_session_t *zn, uint32_t id, uint8_t type) { + _z_declare_data_list_t *xs = zn->_remote_declares; + _z_declare_data_t comp = { + ._key = _z_keyexpr_null(), + ._id = id, + ._type = type, + }; + while (xs != NULL) { + _z_declare_data_t *decl = _z_declare_data_list_head(xs); + if (_z_declare_data_eq(&comp, decl)) { + return _z_keyexpr_duplicate(decl->_key); + } + xs = _z_declare_data_list_tail(xs); + } + return _z_keyexpr_null(); +} + +static int8_t _unsafe_z_unregister_declare(_z_session_t *zn, uint32_t id, uint8_t type) { + _z_declare_data_t decl = { + ._key = _z_keyexpr_null(), + ._id = id, + ._type = type, + }; + zn->_remote_declares = _z_declare_data_list_drop_filter(zn->_remote_declares, _z_declare_data_eq, &decl); + return _Z_RES_OK; +} + int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { const _z_keyexpr_t *decl_key = NULL; _z_interest_msg_t msg; uint8_t flags = 0; + uint8_t decl_type = 0; switch (decl->_tag) { case _Z_DECL_SUBSCRIBER: msg.type = _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER; msg.id = decl->_body._decl_subscriber._id; decl_key = &decl->_body._decl_subscriber._keyexpr; + decl_type = _Z_DECLARE_TYPE_SUBSCRIBER; flags = _Z_INTEREST_FLAG_SUBSCRIBERS; break; case _Z_DECL_QUERYABLE: msg.type = _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE; msg.id = decl->_body._decl_queryable._id; decl_key = &decl->_body._decl_queryable._keyexpr; + decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; + default: + return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; + } + // Retrieve key + _zp_session_lock_mutex(zn); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + if (key._suffix == NULL) { + _zp_session_unlock_mutex(zn); + return _Z_ERR_KEYEXPR_UNKNOWN; + } + // Register declare + _unsafe_z_register_declare(zn, &key, msg.id, decl_type); + // Retrieve interests + _z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key); + _zp_session_unlock_mutex(zn); + // Parse session_interest list + _z_session_interest_rc_list_t *xs = intrs; + while (xs != NULL) { + _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); + if (intr->in->val._callback != NULL) { + intr->in->val._callback(&msg, intr->in->val._arg); + } + xs = _z_session_interest_rc_list_tail(xs); + } + // Clean up + _z_keyexpr_clear(&key); + _z_session_interest_rc_list_free(&intrs); + return _Z_RES_OK; +} + +int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl) { + _z_interest_msg_t msg; + uint8_t flags = 0; + uint8_t decl_type = 0; + switch (decl->_tag) { case _Z_UNDECL_SUBSCRIBER: msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER; msg.id = decl->_body._undecl_subscriber._id; - decl_key = &decl->_body._undecl_subscriber._ext_keyexpr; + decl_type = _Z_DECLARE_TYPE_SUBSCRIBER; flags = _Z_INTEREST_FLAG_SUBSCRIBERS; break; case _Z_UNDECL_QUERYABLE: msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE; msg.id = decl->_body._undecl_queryable._id; - decl_key = &decl->_body._undecl_queryable._ext_keyexpr; + decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; default: return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; } _zp_session_lock_mutex(zn); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + // Retrieve declare data + _z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type); if (key._suffix == NULL) { - _z_keyexpr_clear(&key); _zp_session_unlock_mutex(zn); return _Z_ERR_KEYEXPR_UNKNOWN; } _z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key); + // Remove declare + _unsafe_z_unregister_declare(zn, msg.id, decl_type); _zp_session_unlock_mutex(zn); // Parse session_interest list @@ -266,6 +351,7 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) { void _z_flush_interest(_z_session_t *zn) { _zp_session_lock_mutex(zn); _z_session_interest_rc_list_free(&zn->_local_interests); + _z_declare_data_list_free(&zn->_remote_declares); _zp_session_unlock_mutex(zn); } @@ -322,6 +408,8 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, } #else +void _z_flush_interest(_z_session_t *zn) { _ZP_UNUSED(zn); } + int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { _ZP_UNUSED(zn); _ZP_UNUSED(decl); diff --git a/src/session/queryable.c b/src/session/queryable.c index b669b8b3f..d85533466 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -141,9 +141,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const if (key._suffix != NULL) { _z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); // Build the z_query z_query_t query = {._val = {._rc = _z_query_rc_new()}}; @@ -160,10 +158,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_clear(&key); _z_session_queryable_rc_list_free(&qles); } else { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 - + _zp_session_unlock_mutex(zn); ret = _Z_ERR_KEYEXPR_UNKNOWN; } diff --git a/src/session/rx.c b/src/session/rx.c index 451ed1551..4e0a5ba16 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -70,10 +70,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_interest_process_declares(zn, &decl._decl); } break; case _Z_UNDECL_SUBSCRIBER: { - _z_interest_process_declares(zn, &decl._decl); + _z_interest_process_undeclares(zn, &decl._decl); } break; case _Z_UNDECL_QUERYABLE: { - _z_interest_process_declares(zn, &decl._decl); + _z_interest_process_undeclares(zn, &decl._decl); } break; case _Z_DECL_TOKEN: { // TODO: add support or explicitly discard diff --git a/src/session/subscription.c b/src/session/subscription.c index bbe38d100..e3a43c2fd 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -170,9 +170,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co if (key._suffix != NULL) { _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); // Build the sample _z_sample_t s; @@ -195,9 +193,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _z_keyexpr_clear(&key); _z_subscription_rc_list_free(&subs); } else { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); ret = _Z_ERR_KEYEXPR_UNKNOWN; } diff --git a/src/session/utils.c b/src/session/utils.c index d5d7fe1d7..eac976016 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -18,6 +18,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" @@ -115,6 +116,7 @@ void _z_session_clear(_z_session_t *zn) { #if Z_FEATURE_QUERY == 1 _z_flush_pending_queries(zn); #endif + _z_flush_interest(zn); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_free(&zn->_mutex_inner); diff --git a/zenohpico.pc b/zenohpico.pc index 1643bd7bb..17aab2980 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20240323dev +Version: 0.11.20240326dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico