diff --git a/include/zenoh-pico/protocol/definitions/core.h b/include/zenoh-pico/protocol/definitions/core.h index 58d16a430..3fd9110b1 100644 --- a/include/zenoh-pico/protocol/definitions/core.h +++ b/include/zenoh-pico/protocol/definitions/core.h @@ -44,5 +44,6 @@ #define _Z_FLAGS(h) (_Z_FLAGS_MASK & (h)) #define _Z_HAS_FLAG(h, f) (((h) & (f)) != 0) #define _Z_SET_FLAG(h, f) (h |= f) +#define _Z_CLEAR_FLAG(h, f) (h &= ~(f)) #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_CORE_H */ diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 6c3144e7b..1df9c60f7 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -75,9 +75,11 @@ _z_undecl_token_t _z_undecl_token_null(void); #define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1) #define _Z_INTEREST_FLAG_QUERYABLES (1 << 2) #define _Z_INTEREST_FLAG_TOKENS (1 << 3) +#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4) #define _Z_INTEREST_FLAG_CURRENT (1 << 5) #define _Z_INTEREST_FLAG_FUTURE (1 << 6) #define _Z_INTEREST_FLAG_AGGREGATE (1 << 7) + typedef struct { _z_keyexpr_t _keyexpr; uint32_t _id; diff --git a/src/net/filtering.c b/src/net/filtering.c index ae0a8d898..9d4cb4d83 100644 --- a/src/net/filtering.c +++ b/src/net/filtering.c @@ -73,8 +73,8 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { } int8_t _z_write_filter_create(_z_publisher_t *pub) { - uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT | - _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; + uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | + _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; _z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t)); if (ctx == NULL) { diff --git a/src/net/primitives.c b/src/net/primitives.c index 87d138d47..4958b04be 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -237,7 +237,12 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { return _Z_ERR_ENTITY_UNKNOWN; } // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key); + _z_declaration_t declaration; + if (sub->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + declaration = _z_make_undecl_subscriber(sub->_entity_id, NULL); + } else { + declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key); + } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(&sub->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; @@ -318,7 +323,12 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { return _Z_ERR_ENTITY_UNKNOWN; } // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key); + _z_declaration_t declaration; + if (qle->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + declaration = _z_make_undecl_queryable(qle->_entity_id, NULL); + } else { + declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key); + } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(&qle->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; @@ -461,7 +471,12 @@ int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id) { return _Z_ERR_ENTITY_UNKNOWN; } // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key); + _z_declaration_t declaration; + if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + declaration = _z_make_undecl_interest(sintr->in->val._id, NULL); + } else { + declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key); + } _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 554d84a59..0ba99c755 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -130,9 +130,38 @@ int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) { } int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) { + // Set header uint8_t header = _Z_DECL_INTEREST_MID; - _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr)); - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags)); + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) { + _Z_SET_FLAG(header, _Z_INTEREST_FLAG_CURRENT); + } + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) { + _Z_SET_FLAG(header, _Z_INTEREST_FLAG_FUTURE); + } + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + // Set id + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, decl->_id)); + // Copy flags but clear double use ones. + uint8_t interest_flags = decl->interest_flags; + _Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_CURRENT); + _Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_FUTURE); + // Process restricted flag + if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { + // Set Named & Mapping flags + _Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr); + if (has_kesuffix) { + _Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N); + } + if (_z_keyexpr_is_local(&decl->_keyexpr)) { + _Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M); + } + // Set decl flags & keyexpr + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags)); + _Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr)); + } else { + // Set decl flags + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags)); + } return _Z_RES_OK; } @@ -337,11 +366,48 @@ int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t h return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header); } int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) { - _Bool has_ext; *decl = _z_decl_interest_null(); - _Z_RETURN_IF_ERR(_z_decl_commons_decode(zbf, header, &has_ext, &decl->_id, &decl->_keyexpr)); + // Decode id + _Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf)); + // Decode interest flags _Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf)); - if (has_ext) { + // Process restricted flag + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { + uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M) + ? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE + : _Z_KEYEXPR_MAPPING_LOCAL; + // Decode ke id + _Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf)); + // Decode ke suffix + if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) { + _z_zint_t len; + _Z_RETURN_IF_ERR(_z_zint_decode(&len, zbf)); + if (_z_zbuf_len(zbf) < len) { + return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; + } + decl->_keyexpr._suffix = zp_malloc(len + 1); + if (decl->_keyexpr._suffix == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true); + _z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len); + decl->_keyexpr._suffix[len] = 0; + } else { + decl->_keyexpr._suffix = NULL; + decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false); + } + } + // Replace named & mapping by current & future flags + _Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M); + _Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N); + if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) { + _Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT); + } + if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) { + _Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE); + } + // Decode extention + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13)); } return _Z_RES_OK; diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index 9f1973077..56f004a18 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -76,10 +76,14 @@ _z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, ._keyexpr = _z_keyexpr_steal(key), ._ext_subinfo = {._pull_mode = pull_mode, ._reliable = reliable}}}}; } + _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){._tag = _Z_UNDECL_SUBSCRIBER, - ._body = {._undecl_subscriber = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; + return (_z_declaration_t){ + ._tag = _Z_UNDECL_SUBSCRIBER, + ._body = {._undecl_subscriber = { + ._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } + _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete) { return (_z_declaration_t){ ._tag = _Z_DECL_QUERYABLE, @@ -88,8 +92,10 @@ _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, ._ext_queryable_info = {._complete = complete, ._distance = distance}}}}; } _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){._tag = _Z_UNDECL_QUERYABLE, - ._body = {._undecl_queryable = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; + return (_z_declaration_t){ + ._tag = _Z_UNDECL_QUERYABLE, + ._body = {._undecl_queryable = { + ._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) { return (_z_declaration_t){._tag = _Z_DECL_TOKEN, @@ -99,8 +105,10 @@ _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) { }}}; } _z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){._tag = _Z_UNDECL_TOKEN, - ._body = {._undecl_token = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; + return (_z_declaration_t){ + ._tag = _Z_UNDECL_TOKEN, + ._body = {._undecl_token = {._id = id, + ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } _z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags) { return (_z_declaration_t){._tag = _Z_DECL_INTEREST, @@ -111,8 +119,10 @@ _z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, u }}}; } _z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){._tag = _Z_UNDECL_INTEREST, - ._body = {._undecl_interest = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; + return (_z_declaration_t){ + ._tag = _Z_UNDECL_INTEREST, + ._body = {._undecl_interest = { + ._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}}; } _z_declaration_t _z_make_final_interest(uint32_t id) { return (_z_declaration_t){._tag = _Z_FINAL_INTEREST, ._body = {._final_interest = {._id = id}}}; diff --git a/src/session/interest.c b/src/session/interest.c index 219077b0f..4003f7bc6 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -56,12 +56,11 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi _z_session_interest_rc_list_t *xs = intrs; while (xs != NULL) { _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); - if ((intr->in->val._flags & flags) == 0) { - continue; - } - if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix, - strlen(key._suffix)) == true) { - ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr)); + if ((intr->in->val._flags & flags) != 0) { + if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix, + strlen(key._suffix)) == true) { + ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr)); + } } xs = _z_session_interest_rc_list_tail(xs); } @@ -291,6 +290,7 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { } int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { + // TODO process restricted flag & key _ZP_UNUSED(key); _ZP_UNUSED(id); // Check transport type @@ -298,21 +298,21 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, return _Z_RES_OK; // Nothing to do on unicast } // Current flags process - if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_CURRENT)) { // Send all declare - if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) { _Z_DEBUG("Sending declare resources"); _Z_RETURN_IF_ERR(_z_send_resource_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) { _Z_DEBUG("Sending declare subscribers"); _Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) { _Z_DEBUG("Sending declare queryables"); _Z_RETURN_IF_ERR(_z_send_queryable_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) { // Zenoh pico doesn't support liveliness token for now } // Send final declare diff --git a/src/session/query.c b/src/session/query.c index 491be698d..90d300621 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -20,6 +20,7 @@ #include "zenoh-pico/net/memory.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_QUERY == 1 @@ -103,15 +104,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const } _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return pql; } @@ -121,9 +118,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) _Z_DEBUG(">>> Allocating query for (%ju:%s,%s)", (uintmax_t)pen_qry->_key._id, pen_qry->_key._suffix, pen_qry->_parameters); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id); if (pql == NULL) { // Register query only if a pending one with the same ID does not exist @@ -132,9 +127,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) ret = _Z_ERR_ENTITY_DECLARATION_FAILED; } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return ret; } @@ -144,9 +137,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons const _z_timestamp_t timestamp) { int8_t ret = _Z_RES_OK; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id); if ((ret == _Z_RES_OK) && (pen_qry == NULL)) { @@ -216,9 +207,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons } } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); // Trigger the user callback if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) { @@ -235,9 +224,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) { int8_t ret = _Z_RES_OK; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); // Final reply received for unknown query id _z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id); @@ -261,34 +248,24 @@ int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) { zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry); } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return ret; } void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } void _z_flush_pending_queries(_z_session_t *zn) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_pending_query_list_free(&zn->_pending_queries); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } #endif diff --git a/src/session/queryable.c b/src/session/queryable.c index ce4ba8b19..b669b8b3f 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -95,30 +95,22 @@ _z_session_queryable_rc_list_t *__unsafe_z_get_session_queryable_by_key(_z_sessi } _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_session_queryable_rc_t *qle = __unsafe_z_get_session_queryable_by_id(zn, id); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return qle; } _z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); _z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return qles; } @@ -127,9 +119,7 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se _Z_DEBUG(">>> Allocating queryable for (%ju:%s)", (uintmax_t)q->_key._id, q->_key._suffix); _z_session_queryable_rc_t *ret = NULL; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); ret = (_z_session_queryable_rc_t *)zp_malloc(sizeof(_z_session_queryable_rc_t)); if (ret != NULL) { @@ -137,9 +127,7 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se zn->_local_queryable = _z_session_queryable_rc_list_push(zn->_local_queryable, ret); } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return ret; } @@ -147,9 +135,7 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid) { int8_t ret = _Z_RES_OK; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key); if (key._suffix != NULL) { @@ -185,28 +171,20 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const } void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *qle) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); zn->_local_queryable = _z_session_queryable_rc_list_drop_filter(zn->_local_queryable, _z_session_queryable_rc_eq, qle); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } void _z_flush_session_queryable(_z_session_t *zn) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_session_queryable_rc_list_free(&zn->_local_queryable); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } #endif diff --git a/src/session/resource.c b/src/session/resource.c index f2e1d796e..477ff2789 100644 --- a/src/session/resource.c +++ b/src/session/resource.c @@ -22,6 +22,7 @@ #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/session.h" +#include "zenoh-pico/session/utils.h" #include "zenoh-pico/system/platform.h" #include "zenoh-pico/utils/logging.h" @@ -180,15 +181,11 @@ _z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_key } _z_resource_t *_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t rid) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_resource_t *res = __unsafe_z_get_resource_by_id(zn, mapping, rid); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return res; } @@ -197,28 +194,20 @@ _z_resource_t *_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr_t *keye if (keyexpr->_suffix == NULL) { return _z_get_resource_by_id(zn, _z_keyexpr_mapping_id(keyexpr), keyexpr->_id); } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_resource_t *res = __unsafe_z_get_resource_by_key(zn, keyexpr); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return res; } _z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_keyexpr_t res = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return res; } @@ -230,9 +219,7 @@ int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, ui uint16_t mapping = register_to_mapping; uint16_t parent_mapping = _z_keyexpr_mapping_id(&key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); if (key._id != Z_RESOURCE_ID_NONE) { if (parent_mapping == mapping) { @@ -261,9 +248,7 @@ int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, ui } } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return ret; } @@ -271,9 +256,7 @@ int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, ui void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping) { _Bool is_local = mapping == _Z_KEYEXPR_MAPPING_LOCAL; _Z_DEBUG("unregistering: id %d, mapping: %d", id, mapping); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_resource_list_t **parent_mut = is_local ? &zn->_local_resources : &zn->_remote_resources; while (id != 0) { _z_resource_list_t *parent = *parent_mut; @@ -295,9 +278,7 @@ void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping) { parent = *parent_mut; } } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } _Bool _z_unregister_resource_for_peer_filter(const _z_resource_t *candidate, const _z_resource_t *ctx) { @@ -305,27 +286,19 @@ _Bool _z_unregister_resource_for_peer_filter(const _z_resource_t *candidate, con return _z_keyexpr_mapping_id(&candidate->_key) == mapping; } void _z_unregister_resources_for_peer(_z_session_t *zn, uint16_t mapping) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_resource_t ctx = {._id = mapping, ._refcount = 0, ._key = {0}}; zn->_remote_resources = _z_resource_list_drop_filter(zn->_remote_resources, _z_unregister_resource_for_peer_filter, &ctx); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } void _z_flush_resources(_z_session_t *zn) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_resource_list_free(&zn->_local_resources); _z_resource_list_free(&zn->_remote_resources); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } diff --git a/src/session/subscription.c b/src/session/subscription.c index e85ae2420..68e0cb300 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -22,6 +22,7 @@ #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/session.h" +#include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_SUBSCRIPTION == 1 @@ -97,29 +98,21 @@ _z_subscription_rc_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, } _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_subscription_rc_t *sub = __unsafe_z_get_subscription_by_id(zn, is_local, id); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return sub; } _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, *key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return subs; } @@ -128,9 +121,7 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca _Z_DEBUG(">>> Allocating sub decl for (%ju:%s)", (uintmax_t)s->_key._id, s->_key._suffix); _z_subscription_rc_t *ret = NULL; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); ret = (_z_subscription_rc_t *)zp_malloc(sizeof(_z_subscription_rc_t)); if (ret != NULL) { @@ -142,9 +133,7 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca } } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); return ret; } @@ -176,9 +165,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co ) { int8_t ret = _Z_RES_OK; -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _Z_DEBUG("Resolving %d - %s on mapping 0x%x", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr)); _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); @@ -221,9 +208,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co } void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); if (is_local == _Z_RESOURCE_IS_LOCAL) { zn->_local_subscriptions = @@ -233,22 +218,16 @@ void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscript _z_subscription_rc_list_drop_filter(zn->_remote_subscriptions, _z_subscription_rc_eq, sub); } -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } void _z_flush_subscriptions(_z_session_t *zn) { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_lock_mutex(zn); _z_subscription_rc_list_free(&zn->_local_subscriptions); _z_subscription_rc_list_free(&zn->_remote_subscriptions); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); } #else // Z_FEATURE_SUBSCRIPTION == 0 diff --git a/src/session/utils.c b/src/session/utils.c index cfeee4b1e..0791f4e47 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -22,6 +22,7 @@ #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/utils/logging.h" /*------------------ clone helpers ------------------*/ _z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp) {