diff --git a/.github/workflows/arduino_esp32.yaml b/.github/workflows/arduino_esp32.yaml index bd347aec6..ce4125da5 100644 --- a/.github/workflows/arduino_esp32.yaml +++ b/.github/workflows/arduino_esp32.yaml @@ -51,7 +51,7 @@ jobs: mkdir -p $ARDUINO_BASE cd $ARDUINO_BASE - platformio init -b esp32thing_plus --project-option="build_flags=-DZ_FEATURE_LINK_BLUETOOTH=1 -DZENOH_DEBUG=3" + platformio init -b esp32thing_plus --project-option="build_flags=-DZ_FEATURE_LINK_BLUETOOTH=1 -DZENOH_DEBUG=3 -DZENOH_COMPILER_GCC" cd $ARDUINO_BASE/lib ln -s $ZENOH_PICO_BASE diff --git a/.github/workflows/mbed.yaml b/.github/workflows/mbed.yaml index 60c74ecb7..d3c799cd3 100644 --- a/.github/workflows/mbed.yaml +++ b/.github/workflows/mbed.yaml @@ -51,7 +51,7 @@ jobs: mkdir -p $MBED_BASE cd $MBED_BASE - pio init -b nucleo_f767zi --project-option="framework=mbed" --project-option="build_flags=-DZ_FEATURE_LINK_SERIAL=1 -DZENOH_DEBUG=3" + pio init -b nucleo_f767zi --project-option="framework=mbed" --project-option="build_flags=-DZ_FEATURE_LINK_SERIAL=1 -DZENOH_DEBUG=3 -DZENOH_COMPILER_GCC" cd $MBED_BASE/lib ln -s $ZENOH_PICO_BASE diff --git a/.gitignore b/.gitignore index 11d4de96f..ae356975d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # CMake build +crossbuilds # vscode .vscode @@ -52,6 +53,7 @@ cmake-build-release *.su *.idb *.pdb +*.sarif # Kernel Module Compile Results *.mod* diff --git a/CMakeLists.txt b/CMakeLists.txt index 04a5ecfee..69ac6dc24 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -75,6 +75,7 @@ string(TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE) set(CHECK_THREADS "ON") +# System definition if(CMAKE_SYSTEM_NAME MATCHES "Linux") add_definition(ZENOH_LINUX) elseif(POSIX_COMPATIBLE) @@ -101,6 +102,20 @@ else() return() endif() +# Compiler definition +message("Compilers in use: ${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_ID}") +if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR CMAKE_C_COMPILER_ID STREQUAL "Clang") + add_definition(ZENOH_COMPILER_CLANG) +elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" OR CMAKE_C_COMPILER_ID STREQUAL "GNU") + add_definition(ZENOH_COMPILER_GCC) +elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Intel" OR CMAKE_C_COMPILER_ID STREQUAL "Intel") + add_definition(ZENOH_COMPILER_INTEL) +elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC" OR CMAKE_C_COMPILER_ID STREQUAL "MSVC") + add_definition(ZENOH_COMPILER_MSVC) +else() + add_definition(ZENOH_COMPILER_OTHER) +endif() + add_definition(ZENOH_DEBUG=${ZENOH_DEBUG}) if(FRAG_MAX_SIZE) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index be3726d04..7d776251d 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -129,9 +129,12 @@ _OWNED_TYPE_PTR(_z_scouting_config_t, scouting_config) * Represents a Zenoh session. */ typedef struct { - _z_session_t *_val; + _z_session_rc_t _val; } z_session_t; -_OWNED_TYPE_PTR(_z_session_t, session) + +typedef struct { + _z_session_rc_t _value; +} z_owned_session_t; /** * Represents a Zenoh (push) Subscriber entity. diff --git a/include/zenoh-pico/collections/pointer.h b/include/zenoh-pico/collections/pointer.h deleted file mode 100644 index aa8e6c782..000000000 --- a/include/zenoh-pico/collections/pointer.h +++ /dev/null @@ -1,155 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_COLLECTIONS_POINTER_H -#define ZENOH_PICO_COLLECTIONS_POINTER_H - -#include -#include - -#if ZENOH_C_STANDARD != 99 - -#ifndef __cplusplus -#include -#define _z_atomic(X) _Atomic X -#define _z_atomic_store_explicit atomic_store_explicit -#define _z_atomic_fetch_add_explicit atomic_fetch_add_explicit -#define _z_atomic_fetch_sub_explicit atomic_fetch_sub_explicit -#define _z_memory_order_acquire memory_order_acquire -#define _z_memory_order_release memory_order_release -#define _z_memory_order_relaxed memory_order_relaxed -#else -#include -#define _z_atomic(X) std::atomic -#define _z_atomic_store_explicit std::atomic_store_explicit -#define _z_atomic_fetch_add_explicit std::atomic_fetch_add_explicit -#define _z_atomic_fetch_sub_explicit std::atomic_fetch_sub_explicit -#define _z_memory_order_acquire std::memory_order_acquire -#define _z_memory_order_release std::memory_order_release -#define _z_memory_order_relaxed std::memory_order_relaxed -#endif // __cplusplus - -/*------------------ Internal Array Macros ------------------*/ -#define _Z_POINTER_DEFINE(name, type) \ - typedef struct { \ - type##_t *ptr; \ - _z_atomic(unsigned int) * _cnt; \ - } name##_sptr_t; \ - static inline name##_sptr_t name##_sptr_new(type##_t val) { \ - name##_sptr_t p; \ - p.ptr = (type##_t *)zp_malloc(sizeof(type##_t)); \ - if (p.ptr != NULL) { \ - p._cnt = (_z_atomic(unsigned int) *)zp_malloc(sizeof(_z_atomic(unsigned int) *)); \ - if (p._cnt != NULL) { \ - *p.ptr = val; \ - _z_atomic_store_explicit(p._cnt, 1, _z_memory_order_relaxed); \ - } else { \ - zp_free(p.ptr); \ - } \ - } \ - return p; \ - } \ - static inline name##_sptr_t name##_sptr_clone(name##_sptr_t *p) { \ - name##_sptr_t c; \ - c._cnt = p->_cnt; \ - c.ptr = p->ptr; \ - _z_atomic_fetch_add_explicit(p->_cnt, 1, _z_memory_order_relaxed); \ - return c; \ - } \ - static inline name##_sptr_t *name##_sptr_clone_as_ptr(name##_sptr_t *p) { \ - name##_sptr_t *c = (name##_sptr_t *)zp_malloc(sizeof(name##_sptr_t)); \ - if (c != NULL) { \ - c->_cnt = p->_cnt; \ - c->ptr = p->ptr; \ - _z_atomic_fetch_add_explicit(p->_cnt, 1, _z_memory_order_relaxed); \ - } \ - return c; \ - } \ - static inline _Bool name##_sptr_eq(const name##_sptr_t *left, const name##_sptr_t *right) { \ - return (left->ptr == right->ptr); \ - } \ - static inline _Bool name##_sptr_drop(name##_sptr_t *p) { \ - _Bool dropped = false; \ - if (p->_cnt != NULL) { \ - unsigned int c = _z_atomic_fetch_sub_explicit(p->_cnt, 1, _z_memory_order_release); \ - dropped = c == 1; \ - if (dropped == true) { \ - atomic_thread_fence(_z_memory_order_acquire); \ - if (p->ptr != NULL) { \ - type##_clear(p->ptr); \ - zp_free(p->ptr); \ - zp_free((void *)p->_cnt); \ - } \ - } \ - } \ - return dropped; \ - } -#else -/*------------------ Internal Array Macros ------------------*/ -#define _Z_POINTER_DEFINE(name, type) \ - typedef struct { \ - type##_t *ptr; \ - volatile uint8_t *_cnt; \ - } name##_sptr_t; \ - static inline name##_sptr_t name##_sptr_new(type##_t val) { \ - name##_sptr_t p; \ - p.ptr = (type##_t *)zp_malloc(sizeof(type##_t)); \ - if (p.ptr != NULL) { \ - p._cnt = (uint8_t *)zp_malloc(sizeof(uint8_t)); \ - if (p._cnt != NULL) { \ - *p.ptr = val; \ - *p._cnt = 1; \ - } else { \ - zp_free(p.ptr); \ - } \ - } \ - return p; \ - } \ - static inline name##_sptr_t name##_sptr_clone(name##_sptr_t *p) { \ - name##_sptr_t c; \ - c._cnt = p->_cnt; \ - c.ptr = p->ptr; \ - *p->_cnt = *p->_cnt + (uint8_t)1; \ - return c; \ - } \ - static inline name##_sptr_t *name##_sptr_clone_as_ptr(name##_sptr_t *p) { \ - name##_sptr_t *c = (name##_sptr_t *)zp_malloc(sizeof(name##_sptr_t)); \ - if (c != NULL) { \ - c->_cnt = p->_cnt; \ - c->ptr = p->ptr; \ - *p->_cnt = *p->_cnt + (uint8_t)1; \ - } \ - return c; \ - } \ - static inline _Bool name##_sptr_eq(const name##_sptr_t *left, const name##_sptr_t *right) { \ - return (left->ptr == right->ptr); \ - } \ - static inline _Bool name##_sptr_drop(name##_sptr_t *p) { \ - _Bool dropped = true; \ - if (p->_cnt != NULL) { \ - *p->_cnt = *p->_cnt - 1; \ - dropped = *p->_cnt == 0; \ - if (dropped == true) { \ - if (p->ptr != NULL) { \ - type##_clear(p->ptr); \ - zp_free(p->ptr); \ - zp_free((void *)p->_cnt); \ - } \ - } \ - } \ - return dropped; \ - } -#endif // ZENOH_C_STANDARD != 99 - -#endif /* ZENOH_PICO_COLLECTIONS_POINTER_H */ diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h new file mode 100644 index 000000000..c348d16b7 --- /dev/null +++ b/include/zenoh-pico/collections/refcount.h @@ -0,0 +1,143 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_COLLECTIONS_REFCOUNT_H +#define ZENOH_PICO_COLLECTIONS_REFCOUNT_H + +#include +#include + +#if Z_FEATURE_MULTI_THREAD == 1 +#if ZENOH_C_STANDARD != 99 + +#ifndef __cplusplus +#include +#define _z_atomic(X) _Atomic X +#define _z_atomic_store_explicit atomic_store_explicit +#define _z_atomic_fetch_add_explicit atomic_fetch_add_explicit +#define _z_atomic_fetch_sub_explicit atomic_fetch_sub_explicit +#define _z_memory_order_acquire memory_order_acquire +#define _z_memory_order_release memory_order_release +#define _z_memory_order_relaxed memory_order_relaxed +#else +#include +#define _z_atomic(X) std::atomic +#define _z_atomic_store_explicit std::atomic_store_explicit +#define _z_atomic_fetch_add_explicit std::atomic_fetch_add_explicit +#define _z_atomic_fetch_sub_explicit std::atomic_fetch_sub_explicit +#define _z_memory_order_acquire std::memory_order_acquire +#define _z_memory_order_release std::memory_order_release +#define _z_memory_order_relaxed std::memory_order_relaxed +#endif // __cplusplus + +// c11 atomic variant +#define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) +#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, 1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, 1, _z_memory_order_relaxed); +#define _ZP_RC_OP_DECR_AND_CMP _z_atomic_fetch_sub_explicit(&p->in->_cnt, 1, _z_memory_order_release) > 1 +#define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); + +#else // ZENOH_C_STANDARD == 99 +#ifdef ZENOH_COMPILER_GCC + +// c99 gcc sync builtin variant +#define _ZP_RC_CNT_TYPE unsigned int +#define _ZP_RC_OP_INIT_CNT \ + __sync_fetch_and_and(&p.in->_cnt, 0); \ + __sync_fetch_and_add(&p.in->_cnt, 1); +#define _ZP_RC_OP_INCR_CNT __sync_fetch_and_add(&p->in->_cnt, 1); +#define _ZP_RC_OP_DECR_AND_CMP __sync_fetch_and_sub(&p->in->_cnt, 1) > 1 +#define _ZP_RC_OP_SYNC __sync_synchronize(); + +#else // !ZENOH_COMPILER_GCC + +// None variant +#error "Multi-thread refcount in C99 only exists for GCC, use GCC or C11 or deactivate multi-thread" +#define _ZP_RC_CNT_TYPE unsigned int +#define _ZP_RC_OP_INIT_CNT +#define _ZP_RC_OP_INCR_CNT +#define _ZP_RC_OP_DECR_AND_CMP +#define _ZP_RC_OP_SYNC + +#endif // ZENOH_COMPILER_GCC +#endif // ZENOH_C_STANDARD != 99 +#else // Z_FEATURE_MULTI_THREAD == 0 + +// Single thread variant +#define _ZP_RC_CNT_TYPE unsigned int +#define _ZP_RC_OP_INIT_CNT p.in->_cnt = 1; +#define _ZP_RC_OP_INCR_CNT p->in->_cnt += 1; +#define _ZP_RC_OP_DECR_AND_CMP p->in->_cnt-- > 1 +#define _ZP_RC_OP_SYNC + +#endif // Z_FEATURE_MULTI_THREAD == 1 + +/*------------------ Internal Array Macros ------------------*/ +#define _Z_REFCOUNT_DEFINE(name, type) \ + typedef struct name##_inner_rc_t { \ + type##_t val; \ + _ZP_RC_CNT_TYPE _cnt; \ + } name##_inner_rc_t; \ + typedef struct name##_rc_t { \ + name##_inner_rc_t *in; \ + } name##_rc_t; \ + static inline name##_rc_t name##_rc_new(void) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)zp_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + memset(&p.in->val, 0, sizeof(type##_t)); \ + _ZP_RC_OP_INIT_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_new_from_val(type##_t val) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)zp_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + p.in->val = val; \ + _ZP_RC_OP_INIT_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_clone(name##_rc_t *p) { \ + name##_rc_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_CNT \ + return c; \ + } \ + static inline name##_rc_t *name##_rc_clone_as_ptr(name##_rc_t *p) { \ + name##_rc_t *c = (name##_rc_t *)zp_malloc(sizeof(name##_rc_t)); \ + if (c != NULL) { \ + c->in = p->in; \ + _ZP_RC_OP_INCR_CNT \ + } \ + return c; \ + } \ + static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ + return (left->in == right->in); \ + } \ + static inline _Bool name##_rc_drop(name##_rc_t *p) { \ + if ((p == NULL) || (p->in == NULL)) { \ + return false; \ + } \ + if (_ZP_RC_OP_DECR_AND_CMP) { \ + return false; \ + } \ + _ZP_RC_OP_SYNC \ + type##_clear(&p->in->val); \ + zp_free(p->in); \ + return true; \ + } + +#endif /* ZENOH_PICO_COLLECTIONS_REFCOUNT_H */ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index c1cf2a0da..a4b76c5b1 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -86,8 +86,8 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid); * Returns: * The created :c:type:`_z_publisher_t` or null if the declaration failed. */ -_z_publisher_t *_z_declare_publisher(_z_session_t *zn, _z_keyexpr_t keyexpr, z_congestion_control_t congestion_control, - z_priority_t priority); +_z_publisher_t *_z_declare_publisher(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, + z_congestion_control_t congestion_control, z_priority_t priority); /** * Undeclare a :c:type:`_z_publisher_t`. @@ -143,7 +143,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay * Returns: * The created :c:type:`_z_subscriber_t` or null if the declaration failed. */ -_z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, +_z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, _z_data_handler_t callback, _z_drop_handler_t dropper, void *arg); /** @@ -184,7 +184,7 @@ int8_t _z_subscriber_pull(const _z_subscriber_t *sub); * Returns: * The created :c:type:`_z_queryable_t` or null if the declaration failed. */ -_z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete, +_z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _Bool complete, _z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg); /** diff --git a/include/zenoh-pico/net/publish.h b/include/zenoh-pico/net/publish.h index 15933e035..75ebad8d1 100644 --- a/include/zenoh-pico/net/publish.h +++ b/include/zenoh-pico/net/publish.h @@ -24,7 +24,7 @@ typedef struct { _z_keyexpr_t _key; _z_zint_t _id; - _z_session_t *_zn; + _z_session_rc_t _zn; z_congestion_control_t _congestion_control; z_priority_t _priority; } _z_publisher_t; diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 0854907b7..8c6a2d701 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -17,16 +17,17 @@ #include #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" /** * The query to be answered by a queryable. */ -typedef struct { +typedef struct z_query_t { _z_value_t _value; _z_keyexpr_t _key; uint32_t _request_id; - void *_zn; // FIXME: _z_session_t *zn; + _z_session_t *_zn; char *_parameters; _Bool _anyke; } z_query_t; @@ -36,7 +37,7 @@ typedef struct { */ typedef struct { uint32_t _entity_id; - void *_zn; // FIXME: _z_session_t *zn; + _z_session_rc_t _zn; } _z_queryable_t; #if Z_FEATURE_QUERYABLE == 1 diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 111920337..117c49c3f 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -27,7 +27,7 @@ /** * A zenoh-net session. */ -typedef struct { +typedef struct _z_session_t { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_t _mutex_inner; #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -51,19 +51,23 @@ typedef struct { // Session subscriptions #if Z_FEATURE_SUBSCRIPTION == 1 - _z_subscription_sptr_list_t *_local_subscriptions; - _z_subscription_sptr_list_t *_remote_subscriptions; + _z_subscription_rc_list_t *_local_subscriptions; + _z_subscription_rc_list_t *_remote_subscriptions; #endif // Session queryables #if Z_FEATURE_QUERYABLE == 1 - _z_questionable_sptr_list_t *_local_questionable; + _z_questionable_rc_list_t *_local_questionable; #endif #if Z_FEATURE_QUERY == 1 _z_pending_query_list_t *_pending_queries; #endif } _z_session_t; +extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include + +_Z_REFCOUNT_DEFINE(_z_session, _z_session) + /** * Open a zenoh-net session * diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index d6c3ba67a..e2aafe540 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -25,7 +25,7 @@ */ typedef struct { uint32_t _entity_id; - _z_session_t *_zn; + _z_session_rc_t _zn; } _z_subscriber_t; typedef _z_subscriber_t _z_pull_subscriber_t; diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index a60a12dc6..bb7285854 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -287,7 +287,7 @@ typedef struct { /** * Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created - * :c:type:`_z_subscription_sptr_t`. + * :c:type:`_z_subscription_rc_t`. * * Members: * _z_period_t *period: The subscription period. diff --git a/include/zenoh-pico/session/queryable.h b/include/zenoh-pico/session/queryable.h index 95e8cb08a..d756278b3 100644 --- a/include/zenoh-pico/session/queryable.h +++ b/include/zenoh-pico/session/queryable.h @@ -24,12 +24,12 @@ #define _Z_QUERYABLE_DISTANCE_DEFAULT 0 /*------------------ Queryable ------------------*/ -_z_questionable_sptr_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id); -_z_questionable_sptr_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key); +_z_questionable_rc_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id); +_z_questionable_rc_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key); -_z_questionable_sptr_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q); +_z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q); int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid); -void _z_unregister_questionable(_z_session_t *zn, _z_questionable_sptr_t *q); +void _z_unregister_questionable(_z_session_t *zn, _z_questionable_rc_t *q); void _z_flush_questionables(_z_session_t *zn); #endif diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 8c11826a6..7a2272253 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -20,10 +20,9 @@ #include "zenoh-pico/collections/element.h" #include "zenoh-pico/collections/list.h" -#include "zenoh-pico/collections/pointer.h" +#include "zenoh-pico/collections/refcount.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" -#include "zenoh-pico/net/query.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/transport/manager.h" @@ -101,16 +100,17 @@ typedef struct { _Bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two); void _z_subscription_clear(_z_subscription_t *sub); -_Z_POINTER_DEFINE(_z_subscription, _z_subscription) +_Z_REFCOUNT_DEFINE(_z_subscription, _z_subscription) _Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy) -_Z_ELEM_DEFINE(_z_subscription_sptr, _z_subscription_sptr_t, _z_noop_size, _z_subscription_sptr_drop, _z_noop_copy) -_Z_LIST_DEFINE(_z_subscription_sptr, _z_subscription_sptr_t) +_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_noop_copy) +_Z_LIST_DEFINE(_z_subscription_rc, _z_subscription_rc_t) typedef struct { _z_keyexpr_t _key; uint32_t _id; } _z_publication_t; +typedef struct z_query_t z_query_t; // Forward type declaration to avoid cyclical include /** * The callback signature of the functions handling query messages. */ @@ -128,10 +128,10 @@ typedef struct { _Bool _z_questionable_eq(const _z_questionable_t *one, const _z_questionable_t *two); void _z_questionable_clear(_z_questionable_t *res); -_Z_POINTER_DEFINE(_z_questionable, _z_questionable) +_Z_REFCOUNT_DEFINE(_z_questionable, _z_questionable) _Z_ELEM_DEFINE(_z_questionable, _z_questionable_t, _z_noop_size, _z_questionable_clear, _z_noop_copy) -_Z_ELEM_DEFINE(_z_questionable_sptr, _z_questionable_sptr_t, _z_noop_size, _z_questionable_sptr_drop, _z_noop_copy) -_Z_LIST_DEFINE(_z_questionable_sptr, _z_questionable_sptr_t) +_Z_ELEM_DEFINE(_z_questionable_rc, _z_questionable_rc_t, _z_noop_size, _z_questionable_rc_drop, _z_noop_copy) +_Z_LIST_DEFINE(_z_questionable_rc, _z_questionable_rc_t) typedef struct { _z_reply_t _reply; diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index b6fcb2caa..f13ffe05f 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -19,11 +19,10 @@ #if Z_FEATURE_SUBSCRIPTION == 1 /*------------------ Subscription ------------------*/ -_z_subscription_sptr_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); -_z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, - const _z_keyexpr_t *keyexpr); +_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); +_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *keyexpr); -_z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); +_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, _z_zint_t payload_len #if Z_FEATURE_ATTACHMENT == 1 @@ -38,7 +37,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co z_attachment_t att #endif ); -void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_sptr_t *sub); +void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); /*------------------ Pull ------------------*/ diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 39cae73d3..39491ba4d 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -27,9 +27,8 @@ _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, const char *loc const _Bool exit_on_first); int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid); -int8_t _z_session_close(_z_session_t *zn, uint8_t reason); void _z_session_clear(_z_session_t *zn); -void _z_session_free(_z_session_t **zn); +int8_t _z_session_close(_z_session_t *zn, uint8_t reason); int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability, diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 7b8d44836..e708b7e82 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -53,6 +53,9 @@ _Z_LIST_DEFINE(_z_transport_peer_entry, _z_transport_peer_entry_t) _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport_peer_entry_list_t *root, _z_transport_peer_entry_t *entry); +// Forward type declaration to avoid cyclical include +typedef struct _z_session_t _z_session_t; + // Forward declaration to be used in _zp_f_send_tmsg* typedef struct _z_transport_multicast_t _z_transport_multicast_t; // Send function prototype @@ -60,6 +63,7 @@ typedef int8_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_trans typedef struct { // Session associated to the transport + _z_session_t *_session; #if Z_FEATURE_MULTI_THREAD == 1 // TX and RX mutexes @@ -85,8 +89,6 @@ typedef struct { _z_zint_t _sn_rx_best_effort; volatile _z_zint_t _lease; - void *_session; - #if Z_FEATURE_MULTI_THREAD == 1 zp_task_t *_read_task; zp_task_t *_lease_task; @@ -100,7 +102,7 @@ typedef struct { typedef struct _z_transport_multicast_t { // Session associated to the transport - void *_session; + _z_session_t *_session; #if Z_FEATURE_MULTI_THREAD == 1 // TX and RX mutexes diff --git a/src/api/api.c b/src/api/api.c index 432fd0855..9c3187894 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -89,7 +89,7 @@ _Bool zp_keyexpr_was_declared(const z_keyexpr_t *keyexpr) { z_owned_str_t zp_keyexpr_resolve(z_session_t zs, z_keyexpr_t keyexpr) { z_owned_str_t ret = {._value = NULL}; - _z_keyexpr_t ekey = _z_get_expanded_key_from_key(zs._val, &keyexpr); + _z_keyexpr_t ekey = _z_get_expanded_key_from_key(&zs._val.in->val, &keyexpr); ret._value = (char *)ekey._suffix; // ekey will be out of scope so // - suffix can be safely casted as non-const // - suffix does not need to be copied @@ -417,14 +417,21 @@ OWNED_FUNCTIONS_PTR_COMMON(z_scouting_config_t, z_owned_scouting_config_t, scout OWNED_FUNCTIONS_PTR_CLONE(z_scouting_config_t, z_owned_scouting_config_t, scouting_config, _z_owner_noop_copy) OWNED_FUNCTIONS_PTR_DROP(z_scouting_config_t, z_owned_scouting_config_t, scouting_config, _z_scouting_config_free) -OWNED_FUNCTIONS_PTR_COMMON(z_session_t, z_owned_session_t, session) -OWNED_FUNCTIONS_PTR_CLONE(z_session_t, z_owned_session_t, session, _z_owner_noop_copy) -void z_session_drop(z_owned_session_t *val) { z_close(val); } - OWNED_FUNCTIONS_PTR_INTERNAL(z_keyexpr_t, z_owned_keyexpr_t, keyexpr, _z_keyexpr_free, _z_keyexpr_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_hello_t, z_owned_hello_t, hello, _z_hello_free, _z_owner_noop_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_str_array_t, z_owned_str_array_t, str_array, _z_str_array_free, _z_owner_noop_copy) +_Bool z_session_check(const z_owned_session_t *val) { return val->_value.in != NULL; } +z_session_t z_session_loan(const z_owned_session_t *val) { return (z_session_t){._val = val->_value}; } +z_owned_session_t z_session_null(void) { return (z_owned_session_t){._value = {.in = NULL}}; } +z_owned_session_t *z_session_move(z_owned_session_t *val) { return val; } +z_owned_session_t z_session_clone(z_owned_session_t *val) { + z_owned_session_t ret; + ret._value = _z_session_rc_clone(&val->_value); + return ret; +} +void z_session_drop(z_owned_session_t *val) { z_close(val); } + #define OWNED_FUNCTIONS_CLOSURE(ownedtype, name) \ _Bool z_##name##_check(const ownedtype *val) { return val->call != NULL; } \ ownedtype *z_##name##_move(ownedtype *val) { return val; } \ @@ -528,36 +535,42 @@ int8_t z_scout(z_owned_scouting_config_t *config, z_owned_closure_hello_t *callb } z_owned_session_t z_open(z_owned_config_t *config) { - z_owned_session_t zs = {._value = (_z_session_t *)zp_malloc(sizeof(_z_session_t))}; - memset(zs._value, 0, sizeof(_z_session_t)); - - if (zs._value != NULL) { - if (_z_open(zs._value, config->_value) != _Z_RES_OK) { - zp_free(zs._value); - zs._value = NULL; - } + z_owned_session_t zs = {._value = {.in = NULL}}; + // Create rc + _z_session_rc_t zsrc = _z_session_rc_new(); + if (zsrc.in == NULL) { z_config_drop(config); + return zs; } - + // Open session + if (_z_open(&zsrc.in->val, config->_value) != _Z_RES_OK) { + _z_session_rc_drop(&zsrc); + z_config_drop(config); + return zs; + } + // Store rc in session + zs._value = zsrc; + z_config_drop(config); return zs; } int8_t z_close(z_owned_session_t *zs) { - int8_t ret = _Z_RES_OK; - - _z_close(zs->_value); - _z_session_free(&zs->_value); - - return ret; + if ((zs == NULL) || (zs->_value.in == NULL)) { + return _Z_RES_OK; + } + _z_close(&zs->_value.in->val); + _z_session_rc_drop(&zs->_value); + zs->_value.in = NULL; + return _Z_RES_OK; } int8_t z_info_peers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { // Call transport function - switch (zs._val->_tp._type) { + switch (zs._val.in->val._tp._type) { case _Z_TRANSPORT_MULTICAST_TYPE: case _Z_TRANSPORT_RAWETH_TYPE: - _zp_multicast_fetch_zid(&zs._val->_tp, callback); + _zp_multicast_fetch_zid(&zs._val.in->val._tp, callback); break; default: break; @@ -574,9 +587,9 @@ int8_t z_info_peers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { int8_t z_info_routers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { // Call transport function - switch (zs._val->_tp._type) { + switch (zs._val.in->val._tp._type) { case _Z_TRANSPORT_UNICAST_TYPE: - _zp_unicast_fetch_zid(&zs._val->_tp, callback); + _zp_unicast_fetch_zid(&zs._val.in->val._tp, callback); break; default: break; @@ -591,7 +604,7 @@ int8_t z_info_routers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) return 0; } -z_id_t z_info_zid(const z_session_t zs) { return zs._val->_local_zid; } +z_id_t z_info_zid(const z_session_t zs) { return zs._val.in->val._local_zid; } #if Z_FEATURE_PUBLICATION == 1 OWNED_FUNCTIONS_PTR_COMMON(z_publisher_t, z_owned_publisher_t, publisher) @@ -625,7 +638,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint opt.attachment = options->attachment; #endif } - ret = _z_write(zs._val, keyexpr, (const uint8_t *)payload, payload_len, opt.encoding, Z_SAMPLE_KIND_PUT, + ret = _z_write(&zs._val.in->val, keyexpr, (const uint8_t *)payload, payload_len, opt.encoding, Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority #if Z_FEATURE_ATTACHMENT == 1 , @@ -634,7 +647,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint ); // Trigger local subscriptions - _z_trigger_local_subscriptions(zs._val, keyexpr, payload, payload_len + _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len #if Z_FEATURE_ATTACHMENT == 1 , opt.attachment @@ -652,8 +665,8 @@ int8_t z_delete(z_session_t zs, z_keyexpr_t keyexpr, const z_delete_options_t *o opt.congestion_control = options->congestion_control; opt.priority = options->priority; } - ret = _z_write(zs._val, keyexpr, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, opt.congestion_control, - opt.priority + ret = _z_write(&zs._val.in->val, keyexpr, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, + opt.congestion_control, opt.priority #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() @@ -673,10 +686,10 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); + if (zs._val.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(&zs._val.in->val, &keyexpr); if (r == NULL) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); + uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); key = _z_rid_with_suffix(id, NULL); } } @@ -687,7 +700,7 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con opt.priority = options->priority; } - return (z_owned_publisher_t){._value = _z_declare_publisher(zs._val, key, opt.congestion_control, opt.priority)}; + return (z_owned_publisher_t){._value = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority)}; } int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { @@ -724,7 +737,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l #endif } - ret = _z_write(pub._val->_zn, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, + ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, pub._val->_congestion_control, pub._val->_priority #if Z_FEATURE_ATTACHMENT == 1 , @@ -733,7 +746,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l ); // Trigger local subscriptions - _z_trigger_local_subscriptions(pub._val->_zn, pub._val->_key, payload, len + _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len #if Z_FEATURE_ATTACHMENT == 1 , opt.attachment @@ -745,7 +758,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options) { (void)(options); - return _z_write(pub._val->_zn, pub._val->_key, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, + return _z_write(&pub._val->_zn.in->val, pub._val->_key, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, pub._val->_congestion_control, pub._val->_priority #if Z_FEATURE_ATTACHMENT == 1 , @@ -820,9 +833,8 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne wrapped_ctx->ctx = ctx; } - ret = _z_query(zs._val, keyexpr, parameters, opt.target, opt.consolidation.mode, opt.value, __z_reply_handler, - wrapped_ctx, callback->drop, ctx - + ret = _z_query(&zs._val.in->val, keyexpr, parameters, opt.target, opt.consolidation.mode, opt.value, + __z_reply_handler, wrapped_ctx, callback->drop, ctx #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() @@ -866,10 +878,10 @@ z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_o // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); + if (zs._val.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(&zs._val.in->val, &keyexpr); if (r == NULL) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); + uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); key = _z_rid_with_suffix(id, NULL); } } @@ -880,7 +892,7 @@ z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_o } return (z_owned_queryable_t){ - ._value = _z_declare_queryable(zs._val, key, opt.complete, callback->call, callback->drop, ctx)}; + ._value = _z_declare_queryable(&zs._val, key, opt.complete, callback->call, callback->drop, ctx)}; } int8_t z_undeclare_queryable(z_owned_queryable_t *queryable) { @@ -926,7 +938,7 @@ z_owned_keyexpr_t z_declare_keyexpr(z_session_t zs, z_keyexpr_t keyexpr) { key._value = (z_keyexpr_t *)zp_malloc(sizeof(z_keyexpr_t)); if (key._value != NULL) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); + uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); *key._value = _z_rid_with_suffix(id, NULL); } @@ -936,7 +948,7 @@ z_owned_keyexpr_t z_declare_keyexpr(z_session_t zs, z_keyexpr_t keyexpr) { int8_t z_undeclare_keyexpr(z_session_t zs, z_owned_keyexpr_t *keyexpr) { int8_t ret = _Z_RES_OK; - ret = _z_undeclare_resource(zs._val, keyexpr->_value->_id); + ret = _z_undeclare_resource(&zs._val.in->val, keyexpr->_value->_id); z_keyexpr_drop(keyexpr); return ret; @@ -969,8 +981,8 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { - _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); + if (zs._val.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(&zs._val.in->val, &keyexpr); if (r == NULL) { char *wild = strpbrk(keyexpr._suffix, "*$"); _Bool do_keydecl = true; @@ -988,7 +1000,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z } } if (do_keydecl) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); + uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); key = _z_rid_with_suffix(id, wild); } } @@ -998,7 +1010,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z if (options != NULL) { subinfo.reliability = options->reliability; } - _z_subscriber_t *sub = _z_declare_subscriber(zs._val, key, subinfo, callback->call, callback->drop, ctx); + _z_subscriber_t *sub = _z_declare_subscriber(&zs._val, key, subinfo, callback->call, callback->drop, ctx); if (suffix != NULL) { zp_free(suffix); } @@ -1015,9 +1027,9 @@ z_owned_pull_subscriber_t z_declare_pull_subscriber(z_session_t zs, z_keyexpr_t callback->context = NULL; z_keyexpr_t key = keyexpr; - _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); + _z_resource_t *r = _z_get_resource_by_key(&zs._val.in->val, &keyexpr); if (r == NULL) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); + uint16_t id = _z_declare_resource(&zs._val.in->val, keyexpr); key = _z_rid_with_suffix(id, NULL); } @@ -1027,7 +1039,7 @@ z_owned_pull_subscriber_t z_declare_pull_subscriber(z_session_t zs, z_keyexpr_t } return (z_owned_pull_subscriber_t){ - ._value = _z_declare_subscriber(zs._val, key, subinfo, callback->call, callback->drop, ctx)}; + ._value = _z_declare_subscriber(&zs._val, key, subinfo, callback->call, callback->drop, ctx)}; } int8_t z_undeclare_subscriber(z_owned_subscriber_t *sub) { @@ -1054,11 +1066,11 @@ z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { z_owned_keyexpr_t ret = z_keyexpr_null(); uint32_t lookup = sub._val->_entity_id; if (sub._val != NULL) { - _z_subscription_sptr_list_t *tail = sub._val->_zn->_local_subscriptions; + _z_subscription_rc_list_t *tail = sub._val->_zn.in->val._local_subscriptions; while (tail != NULL && !z_keyexpr_check(&ret)) { - _z_subscription_sptr_t *head = _z_subscription_sptr_list_head(tail); - if (head->ptr->_id == lookup) { - _z_keyexpr_t key = _z_keyexpr_duplicate(head->ptr->_key); + _z_subscription_rc_t *head = _z_subscription_rc_list_head(tail); + if (head->in->val._id == lookup) { + _z_keyexpr_t key = _z_keyexpr_duplicate(head->in->val._key); ret = (z_owned_keyexpr_t){._value = zp_malloc(sizeof(_z_keyexpr_t))}; if (ret._value != NULL) { *ret._value = key; @@ -1066,7 +1078,7 @@ z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { _z_keyexpr_clear(&key); } } - tail = _z_subscription_sptr_list_tail(tail); + tail = _z_subscription_rc_list_tail(tail); } } return ret; @@ -1091,7 +1103,7 @@ int8_t zp_start_read_task(z_session_t zs, const zp_task_read_options_t *options) if (options != NULL) { opt.task_attributes = options->task_attributes; } - return _zp_start_read_task(zs._val, opt.task_attributes); + return _zp_start_read_task(&zs._val.in->val, opt.task_attributes); #else (void)(zs); return -1; @@ -1100,7 +1112,7 @@ int8_t zp_start_read_task(z_session_t zs, const zp_task_read_options_t *options) int8_t zp_stop_read_task(z_session_t zs) { #if Z_FEATURE_MULTI_THREAD == 1 - return _zp_stop_read_task(zs._val); + return _zp_stop_read_task(&zs._val.in->val); #else (void)(zs); return -1; @@ -1124,7 +1136,7 @@ int8_t zp_start_lease_task(z_session_t zs, const zp_task_lease_options_t *option if (options != NULL) { opt.task_attributes = options->task_attributes; } - return _zp_start_lease_task(zs._val, opt.task_attributes); + return _zp_start_lease_task(&zs._val.in->val, opt.task_attributes); #else (void)(zs); return -1; @@ -1133,7 +1145,7 @@ int8_t zp_start_lease_task(z_session_t zs, const zp_task_lease_options_t *option int8_t zp_stop_lease_task(z_session_t zs) { #if Z_FEATURE_MULTI_THREAD == 1 - return _zp_stop_lease_task(zs._val); + return _zp_stop_lease_task(&zs._val.in->val); #else (void)(zs); return -1; @@ -1144,7 +1156,7 @@ zp_read_options_t zp_read_options_default(void) { return (zp_read_options_t){.__ int8_t zp_read(z_session_t zs, const zp_read_options_t *options) { (void)(options); - return _zp_read(zs._val); + return _zp_read(&zs._val.in->val); } zp_send_keep_alive_options_t zp_send_keep_alive_options_default(void) { @@ -1153,14 +1165,14 @@ zp_send_keep_alive_options_t zp_send_keep_alive_options_default(void) { int8_t zp_send_keep_alive(z_session_t zs, const zp_send_keep_alive_options_t *options) { (void)(options); - return _zp_send_keep_alive(zs._val); + return _zp_send_keep_alive(&zs._val.in->val); } zp_send_join_options_t zp_send_join_options_default(void) { return (zp_send_join_options_t){.__dummy = 0}; } int8_t zp_send_join(z_session_t zs, const zp_send_join_options_t *options) { (void)(options); - return _zp_send_join(zs._val); + return _zp_send_join(&zs._val.in->val); } #if Z_FEATURE_ATTACHMENT == 1 void _z_bytes_pair_clear(struct _z_bytes_pair_t *this_) { diff --git a/src/link/link.c b/src/link/link.c index 3ccc27697..674da4bf1 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -115,8 +115,12 @@ int8_t _z_listen_link(_z_link_t *zl, const char *locator) { } void _z_link_clear(_z_link_t *l) { - l->_close_f(l); - l->_free_f(l); + if (l->_close_f != NULL) { + l->_close_f(l); + } + if (l->_free_f != NULL) { + l->_free_f(l); + } _z_endpoint_clear(&l->_endpoint); } diff --git a/src/net/primitives.c b/src/net/primitives.c index 13d51e0a5..c8ecc6560 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -98,31 +98,30 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { #if Z_FEATURE_PUBLICATION == 1 /*------------------ Publisher Declaration ------------------*/ -_z_publisher_t *_z_declare_publisher(_z_session_t *zn, _z_keyexpr_t keyexpr, z_congestion_control_t congestion_control, - z_priority_t priority) { +_z_publisher_t *_z_declare_publisher(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, + z_congestion_control_t congestion_control, z_priority_t priority) { + // Allocate publisher _z_publisher_t *ret = (_z_publisher_t *)zp_malloc(sizeof(_z_publisher_t)); - if (ret != NULL) { - ret->_zn = zn; - ret->_key = _z_keyexpr_duplicate(keyexpr); - ret->_id = _z_get_entity_id(zn); - ret->_congestion_control = congestion_control; - ret->_priority = priority; + if (ret == NULL) { + return NULL; } - + // Fill publisher + ret->_key = _z_keyexpr_duplicate(keyexpr); + ret->_id = _z_get_entity_id(&zn->in->val); + ret->_congestion_control = congestion_control; + ret->_priority = priority; + ret->_zn = _z_session_rc_clone(zn); return ret; } int8_t _z_undeclare_publisher(_z_publisher_t *pub) { - int8_t ret = _Z_RES_OK; - - if (pub != NULL) { - // Build the declare message to send on the wire - _z_undeclare_resource(pub->_zn, pub->_key._id); - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; + if (pub == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; } - - return ret; + // Clear publisher + _z_undeclare_resource(&pub->_zn.in->val, pub->_key._id); + _z_session_rc_drop(&pub->_zn); + return _Z_RES_OK; } /*------------------ Write ------------------*/ @@ -186,81 +185,77 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay #if Z_FEATURE_SUBSCRIPTION == 1 /*------------------ Subscriber Declaration ------------------*/ -_z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, +_z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info, _z_data_handler_t callback, _z_drop_handler_t dropper, void *arg) { _z_subscription_t s; - s._id = _z_get_entity_id(zn); + s._id = _z_get_entity_id(&zn->in->val); s._key_id = keyexpr._id; - s._key = _z_get_expanded_key_from_key(zn, &keyexpr); + s._key = _z_get_expanded_key_from_key(&zn->in->val, &keyexpr); s._info = sub_info; s._callback = callback; s._dropper = dropper; s._arg = arg; + // Allocate subscriber _z_subscriber_t *ret = (_z_subscriber_t *)zp_malloc(sizeof(_z_subscriber_t)); - if (ret != NULL) { - ret->_zn = zn; - ret->_entity_id = s._id; - - _z_subscription_sptr_t *sp_s = _z_register_subscription( - zn, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level. - // Do not drop it by the end of this function. - if (sp_s != NULL) { - // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_decl_subscriber( - &keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _z_unregister_subscription(zn, _Z_RESOURCE_IS_LOCAL, sp_s); - _z_subscriber_free(&ret); - } - _z_n_msg_clear(&n_msg); - } else { - _z_subscriber_free(&ret); - } - } else { + if (ret == NULL) { _z_subscription_clear(&s); + return NULL; } - + // Register subscription, stored at session-level, do not drop it by the end of this function. + _z_subscription_rc_t *sp_s = _z_register_subscription(&zn->in->val, _Z_RESOURCE_IS_LOCAL, &s); + if (sp_s == NULL) { + _z_subscriber_free(&ret); + return NULL; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_decl_subscriber( + &keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_subscription(&zn->in->val, _Z_RESOURCE_IS_LOCAL, sp_s); + _z_subscriber_free(&ret); + return NULL; + } + _z_n_msg_clear(&n_msg); + // Fill subscriber + ret->_entity_id = s._id; + ret->_zn = _z_session_rc_clone(zn); return ret; } int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { - int8_t ret = _Z_ERR_GENERIC; - - if (sub != NULL) { - _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); - if (s != NULL) { - // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(sub->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { - // Only if message is successfully send, local subscription state can be removed - _z_undeclare_resource(sub->_zn, s->ptr->_key_id); - _z_unregister_subscription(sub->_zn, _Z_RESOURCE_IS_LOCAL, s); - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; - } - _z_n_msg_clear(&n_msg); - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; - } - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; + if (sub == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; } - - return ret; + // Find subscription entry + _z_subscription_rc_t *s = _z_get_subscription_by_id(&sub->_zn.in->val, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); + if (s == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(&sub->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + // Only if message is successfully send, local subscription state can be removed + _z_undeclare_resource(&sub->_zn.in->val, s->in->val._key_id); + _z_unregister_subscription(&sub->_zn.in->val, _Z_RESOURCE_IS_LOCAL, s); + _z_session_rc_drop(&sub->_zn); + return _Z_RES_OK; } /*------------------ Pull ------------------*/ int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { int8_t ret = _Z_RES_OK; - _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); + _z_subscription_rc_t *s = _z_get_subscription_by_id(&sub->_zn.in->val, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); if (s != NULL) { - _z_zint_t pull_id = _z_get_pull_id(sub->_zn); - _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id); - if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_zint_t pull_id = _z_get_pull_id(&sub->_zn.in->val); + _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->in->val._key), pull_id); + if (_z_send_n_msg(&sub->_zn.in->val, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } } else { @@ -273,70 +268,63 @@ int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { #if Z_FEATURE_QUERYABLE == 1 /*------------------ Queryable Declaration ------------------*/ -_z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete, +_z_queryable_t *_z_declare_queryable(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _Bool complete, _z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg) { _z_questionable_t q; - q._id = _z_get_entity_id(zn); - q._key = _z_get_expanded_key_from_key(zn, &keyexpr); + q._id = _z_get_entity_id(&zn->in->val); + q._key = _z_get_expanded_key_from_key(&zn->in->val, &keyexpr); q._complete = complete; q._callback = callback; q._dropper = dropper; q._arg = arg; + // Allocate queryable _z_queryable_t *ret = (_z_queryable_t *)zp_malloc(sizeof(_z_queryable_t)); - if (ret != NULL) { - ret->_zn = zn; - ret->_entity_id = q._id; - - _z_questionable_sptr_t *sp_q = - _z_register_questionable(zn, &q); // This a pointer to the entry stored at session-level. - // Do not drop it by the end of this function. - if (sp_q != NULL) { - // Build the declare message to send on the wire - _z_declaration_t declaration = - _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - // ret = _Z_ERR_TRANSPORT_TX_FAILED; - _z_unregister_questionable(zn, sp_q); - _z_queryable_free(&ret); - } - _z_n_msg_clear(&n_msg); - } else { - _z_queryable_free(&ret); - } - } else { + if (ret == NULL) { _z_questionable_clear(&q); + return NULL; } - + // Create questionable entry, stored at session-level, do not drop it by the end of this function. + _z_questionable_rc_t *sp_q = _z_register_questionable(&zn->in->val, &q); + if (sp_q == NULL) { + _z_queryable_free(&ret); + return NULL; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_decl_queryable(&keyexpr, q._id, q._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(&zn->in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_questionable(&zn->in->val, sp_q); + _z_queryable_free(&ret); + return NULL; + } + _z_n_msg_clear(&n_msg); + // Fill queryable + ret->_entity_id = q._id; + ret->_zn = _z_session_rc_clone(zn); return ret; } int8_t _z_undeclare_queryable(_z_queryable_t *qle) { - int8_t ret = _Z_RES_OK; - - if (qle != NULL) { - _z_questionable_sptr_t *q = _z_get_questionable_by_id(qle->_zn, qle->_entity_id); - if (q != NULL) { - // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->ptr->_key); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(qle->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { - // Only if message is successfully send, local queryable state can be removed - _z_unregister_questionable(qle->_zn, q); - } else { - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_n_msg_clear(&n_msg); - - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; - } - } else { - ret = _Z_ERR_ENTITY_UNKNOWN; + if (qle == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; } - - return ret; + // Find questionable entry + _z_questionable_rc_t *q = _z_get_questionable_by_id(&qle->_zn.in->val, qle->_entity_id); + if (q == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(&qle->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + // Only if message is successfully send, local queryable state can be removed + _z_unregister_questionable(&qle->_zn.in->val, q); + _z_session_rc_drop(&qle->_zn); + return _Z_RES_OK; } int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) { diff --git a/src/net/query.c b/src/net/query.c index 04a728266..1f56e612d 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, -#include "zenoh-pico/session/query.h" +#include "zenoh-pico/net/query.h" #if Z_FEATURE_QUERYABLE == 1 void _z_queryable_clear(_z_queryable_t *qbl) { diff --git a/src/session/queryable.c b/src/session/queryable.c index e2550c593..a73615c61 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -17,6 +17,7 @@ #include #include "zenoh-pico/config.h" +#include "zenoh-pico/net/query.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" @@ -35,35 +36,35 @@ void _z_questionable_clear(_z_questionable_t *qle) { } /*------------------ Queryable ------------------*/ -_z_questionable_sptr_t *__z_get_questionable_by_id(_z_questionable_sptr_list_t *qles, const _z_zint_t id) { - _z_questionable_sptr_t *ret = NULL; +_z_questionable_rc_t *__z_get_questionable_by_id(_z_questionable_rc_list_t *qles, const _z_zint_t id) { + _z_questionable_rc_t *ret = NULL; - _z_questionable_sptr_list_t *xs = qles; + _z_questionable_rc_list_t *xs = qles; while (xs != NULL) { - _z_questionable_sptr_t *qle = _z_questionable_sptr_list_head(xs); - if (id == qle->ptr->_id) { + _z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs); + if (id == qle->in->val._id) { ret = qle; break; } - xs = _z_questionable_sptr_list_tail(xs); + xs = _z_questionable_rc_list_tail(xs); } return ret; } -_z_questionable_sptr_list_t *__z_get_questionable_by_key(_z_questionable_sptr_list_t *qles, const _z_keyexpr_t key) { - _z_questionable_sptr_list_t *ret = NULL; +_z_questionable_rc_list_t *__z_get_questionable_by_key(_z_questionable_rc_list_t *qles, const _z_keyexpr_t key) { + _z_questionable_rc_list_t *ret = NULL; - _z_questionable_sptr_list_t *xs = qles; + _z_questionable_rc_list_t *xs = qles; while (xs != NULL) { - _z_questionable_sptr_t *qle = _z_questionable_sptr_list_head(xs); - if (_z_keyexpr_intersects(qle->ptr->_key._suffix, strlen(qle->ptr->_key._suffix), key._suffix, + _z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs); + if (_z_keyexpr_intersects(qle->in->val._key._suffix, strlen(qle->in->val._key._suffix), key._suffix, strlen(key._suffix)) == true) { - ret = _z_questionable_sptr_list_push(ret, _z_questionable_sptr_clone_as_ptr(qle)); + ret = _z_questionable_rc_list_push(ret, _z_questionable_rc_clone_as_ptr(qle)); } - xs = _z_questionable_sptr_list_tail(xs); + xs = _z_questionable_rc_list_tail(xs); } return ret; @@ -74,8 +75,8 @@ _z_questionable_sptr_list_t *__z_get_questionable_by_key(_z_questionable_sptr_li * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_questionable_sptr_t *__unsafe_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) { - _z_questionable_sptr_list_t *qles = zn->_local_questionable; +_z_questionable_rc_t *__unsafe_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) { + _z_questionable_rc_list_t *qles = zn->_local_questionable; return __z_get_questionable_by_id(qles, id); } @@ -84,17 +85,17 @@ _z_questionable_sptr_t *__unsafe_z_get_questionable_by_id(_z_session_t *zn, cons * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_questionable_sptr_list_t *__unsafe_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key) { - _z_questionable_sptr_list_t *qles = zn->_local_questionable; +_z_questionable_rc_list_t *__unsafe_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t key) { + _z_questionable_rc_list_t *qles = zn->_local_questionable; return __z_get_questionable_by_key(qles, key); } -_z_questionable_sptr_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) { +_z_questionable_rc_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zint_t id) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_questionable_sptr_t *qle = __unsafe_z_get_questionable_by_id(zn, id); + _z_questionable_rc_t *qle = __unsafe_z_get_questionable_by_id(zn, id); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -103,13 +104,13 @@ _z_questionable_sptr_t *_z_get_questionable_by_id(_z_session_t *zn, const _z_zin return qle; } -_z_questionable_sptr_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { +_z_questionable_rc_list_t *_z_get_questionable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); - _z_questionable_sptr_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key); + _z_questionable_rc_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -118,18 +119,18 @@ _z_questionable_sptr_list_t *_z_get_questionable_by_key(_z_session_t *zn, const return qles; } -_z_questionable_sptr_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q) { +_z_questionable_rc_t *_z_register_questionable(_z_session_t *zn, _z_questionable_t *q) { _Z_DEBUG(">>> Allocating queryable for (%ju:%s)", (uintmax_t)q->_key._id, q->_key._suffix); - _z_questionable_sptr_t *ret = NULL; + _z_questionable_rc_t *ret = NULL; #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - ret = (_z_questionable_sptr_t *)zp_malloc(sizeof(_z_questionable_sptr_t)); + ret = (_z_questionable_rc_t *)zp_malloc(sizeof(_z_questionable_rc_t)); if (ret != NULL) { - *ret = _z_questionable_sptr_new(*q); - zn->_local_questionable = _z_questionable_sptr_list_push(zn->_local_questionable, ret); + *ret = _z_questionable_rc_new_from_val(*q); + zn->_local_questionable = _z_questionable_rc_list_push(zn->_local_questionable, ret); } #if Z_FEATURE_MULTI_THREAD == 1 @@ -148,7 +149,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key); if (key._suffix != NULL) { - _z_questionable_sptr_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key); + _z_questionable_rc_list_t *qles = __unsafe_z_get_questionable_by_key(zn, key); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -170,15 +171,15 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons q._value.encoding = query->_ext_value.encoding; q._value.payload = query->_ext_value.payload; q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; - _z_questionable_sptr_list_t *xs = qles; + _z_questionable_rc_list_t *xs = qles; while (xs != NULL) { - _z_questionable_sptr_t *qle = _z_questionable_sptr_list_head(xs); - qle->ptr->_callback(&q, qle->ptr->_arg); - xs = _z_questionable_sptr_list_tail(xs); + _z_questionable_rc_t *qle = _z_questionable_rc_list_head(xs); + qle->in->val._callback(&q, qle->in->val._arg); + xs = _z_questionable_rc_list_tail(xs); } _z_keyexpr_clear(&key); - _z_questionable_sptr_list_free(&qles); + _z_questionable_rc_list_free(&qles); #if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32))) zp_free(params); #endif @@ -201,13 +202,12 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons return ret; } -void _z_unregister_questionable(_z_session_t *zn, _z_questionable_sptr_t *qle) { +void _z_unregister_questionable(_z_session_t *zn, _z_questionable_rc_t *qle) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - zn->_local_questionable = - _z_questionable_sptr_list_drop_filter(zn->_local_questionable, _z_questionable_sptr_eq, qle); + zn->_local_questionable = _z_questionable_rc_list_drop_filter(zn->_local_questionable, _z_questionable_rc_eq, qle); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -219,7 +219,7 @@ void _z_flush_questionables(_z_session_t *zn) { zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_questionable_sptr_list_free(&zn->_local_questionable); + _z_questionable_rc_list_free(&zn->_local_questionable); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); diff --git a/src/session/subscription.c b/src/session/subscription.c index 435f2d682..0cd1050d9 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -39,35 +39,35 @@ void _z_subscription_clear(_z_subscription_t *sub) { /*------------------ Pull ------------------*/ _z_zint_t _z_get_pull_id(_z_session_t *zn) { return zn->_pull_id++; } -_z_subscription_sptr_t *__z_get_subscription_by_id(_z_subscription_sptr_list_t *subs, const _z_zint_t id) { - _z_subscription_sptr_t *ret = NULL; +_z_subscription_rc_t *__z_get_subscription_by_id(_z_subscription_rc_list_t *subs, const _z_zint_t id) { + _z_subscription_rc_t *ret = NULL; - _z_subscription_sptr_list_t *xs = subs; + _z_subscription_rc_list_t *xs = subs; while (xs != NULL) { - _z_subscription_sptr_t *sub = _z_subscription_sptr_list_head(xs); - if (id == sub->ptr->_id) { + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + if (id == sub->in->val._id) { ret = sub; break; } - xs = _z_subscription_sptr_list_tail(xs); + xs = _z_subscription_rc_list_tail(xs); } return ret; } -_z_subscription_sptr_list_t *__z_get_subscriptions_by_key(_z_subscription_sptr_list_t *subs, const _z_keyexpr_t key) { - _z_subscription_sptr_list_t *ret = NULL; +_z_subscription_rc_list_t *__z_get_subscriptions_by_key(_z_subscription_rc_list_t *subs, const _z_keyexpr_t key) { + _z_subscription_rc_list_t *ret = NULL; - _z_subscription_sptr_list_t *xs = subs; + _z_subscription_rc_list_t *xs = subs; while (xs != NULL) { - _z_subscription_sptr_t *sub = _z_subscription_sptr_list_head(xs); - if (_z_keyexpr_intersects(sub->ptr->_key._suffix, strlen(sub->ptr->_key._suffix), key._suffix, + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + if (_z_keyexpr_intersects(sub->in->val._key._suffix, strlen(sub->in->val._key._suffix), key._suffix, strlen(key._suffix)) == true) { - ret = _z_subscription_sptr_list_push(ret, _z_subscription_sptr_clone_as_ptr(sub)); + ret = _z_subscription_rc_list_push(ret, _z_subscription_rc_clone_as_ptr(sub)); } - xs = _z_subscription_sptr_list_tail(xs); + xs = _z_subscription_rc_list_tail(xs); } return ret; @@ -78,8 +78,8 @@ _z_subscription_sptr_list_t *__z_get_subscriptions_by_key(_z_subscription_sptr_l * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_subscription_sptr_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { - _z_subscription_sptr_list_t *subs = +_z_subscription_rc_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { + _z_subscription_rc_list_t *subs = (is_local == _Z_RESOURCE_IS_LOCAL) ? zn->_local_subscriptions : zn->_remote_subscriptions; return __z_get_subscription_by_id(subs, id); } @@ -89,19 +89,19 @@ _z_subscription_sptr_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_subscription_sptr_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, - const _z_keyexpr_t key) { - _z_subscription_sptr_list_t *subs = +_z_subscription_rc_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, + const _z_keyexpr_t key) { + _z_subscription_rc_list_t *subs = (is_local == _Z_RESOURCE_IS_LOCAL) ? zn->_local_subscriptions : zn->_remote_subscriptions; return __z_get_subscriptions_by_key(subs, key); } -_z_subscription_sptr_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { +_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_subscription_sptr_t *sub = __unsafe_z_get_subscription_by_id(zn, is_local, id); + _z_subscription_rc_t *sub = __unsafe_z_get_subscription_by_id(zn, is_local, id); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -110,12 +110,12 @@ _z_subscription_sptr_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_l return sub; } -_z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key) { +_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_subscription_sptr_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, *key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, *key); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -124,23 +124,23 @@ _z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8 return subs; } -_z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *s) { +_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *s) { _Z_DEBUG(">>> Allocating sub decl for (%ju:%s)", (uintmax_t)s->_key._id, s->_key._suffix); - _z_subscription_sptr_t *ret = NULL; + _z_subscription_rc_t *ret = NULL; #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_subscription_sptr_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, s->_key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, s->_key); if (subs == NULL) { // A subscription for this name does not yet exists - ret = (_z_subscription_sptr_t *)zp_malloc(sizeof(_z_subscription_sptr_t)); + ret = (_z_subscription_rc_t *)zp_malloc(sizeof(_z_subscription_rc_t)); if (ret != NULL) { - *ret = _z_subscription_sptr_new(*s); + *ret = _z_subscription_rc_new_from_val(*s); if (is_local == _Z_RESOURCE_IS_LOCAL) { - zn->_local_subscriptions = _z_subscription_sptr_list_push(zn->_local_subscriptions, ret); + zn->_local_subscriptions = _z_subscription_rc_list_push(zn->_local_subscriptions, ret); } else { - zn->_remote_subscriptions = _z_subscription_sptr_list_push(zn->_remote_subscriptions, ret); + zn->_remote_subscriptions = _z_subscription_rc_list_push(zn->_remote_subscriptions, ret); } } } @@ -187,7 +187,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); _Z_DEBUG("Triggering subs for %d - %s", key._id, key._suffix); if (key._suffix != NULL) { - _z_subscription_sptr_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -203,16 +203,16 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co #if Z_FEATURE_ATTACHMENT == 1 s.attachment = att; #endif - _z_subscription_sptr_list_t *xs = subs; - _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_sptr_list_len(xs)); + _z_subscription_rc_list_t *xs = subs; + _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs)); while (xs != NULL) { - _z_subscription_sptr_t *sub = _z_subscription_sptr_list_head(xs); - sub->ptr->_callback(&s, sub->ptr->_arg); - xs = _z_subscription_sptr_list_tail(xs); + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + sub->in->val._callback(&s, sub->in->val._arg); + xs = _z_subscription_rc_list_tail(xs); } _z_keyexpr_clear(&key); - _z_subscription_sptr_list_free(&subs); + _z_subscription_rc_list_free(&subs); } else { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); @@ -223,17 +223,17 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co return ret; } -void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_sptr_t *sub) { +void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 if (is_local == _Z_RESOURCE_IS_LOCAL) { zn->_local_subscriptions = - _z_subscription_sptr_list_drop_filter(zn->_local_subscriptions, _z_subscription_sptr_eq, sub); + _z_subscription_rc_list_drop_filter(zn->_local_subscriptions, _z_subscription_rc_eq, sub); } else { zn->_remote_subscriptions = - _z_subscription_sptr_list_drop_filter(zn->_remote_subscriptions, _z_subscription_sptr_eq, sub); + _z_subscription_rc_list_drop_filter(zn->_remote_subscriptions, _z_subscription_rc_eq, sub); } #if Z_FEATURE_MULTI_THREAD == 1 @@ -246,8 +246,8 @@ void _z_flush_subscriptions(_z_session_t *zn) { zp_mutex_lock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_subscription_sptr_list_free(&zn->_local_subscriptions); - _z_subscription_sptr_list_free(&zn->_remote_subscriptions); + _z_subscription_rc_list_free(&zn->_local_subscriptions); + _z_subscription_rc_list_free(&zn->_remote_subscriptions); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_unlock(&zn->_mutex_inner); diff --git a/src/session/utils.c b/src/session/utils.c index ea6fc34dc..f90ca20ff 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -121,17 +121,6 @@ void _z_session_clear(_z_session_t *zn) { #endif // Z_FEATURE_MULTI_THREAD == 1 } -void _z_session_free(_z_session_t **zn) { - _z_session_t *ptr = *zn; - - if (ptr != NULL) { - _z_session_clear(ptr); - - zp_free(ptr); - *zn = NULL; - } -} - int8_t _z_session_close(_z_session_t *zn, uint8_t reason) { int8_t ret = _Z_ERR_GENERIC; diff --git a/tests/z_client_test.c b/tests/z_client_test.c index b2975f960..bbb73bd30 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -122,7 +122,7 @@ int main(int argc, char **argv) { z_owned_session_t s1 = z_open(z_move(config)); assert(z_check(s1)); - z_string_t zid1 = format_id(&z_loan(s1)._val->_local_zid); + z_string_t zid1 = format_id(&z_loan(s1)._val.in->val._local_zid); printf("Session 1 with PID: %s\n", zid1.val); _z_string_clear(&zid1); @@ -137,7 +137,7 @@ int main(int argc, char **argv) { z_owned_session_t s2 = z_open(z_move(config)); assert(z_check(s2)); - z_string_t zid2 = format_id(&z_loan(s2)._val->_local_zid); + z_string_t zid2 = format_id(&z_loan(s2)._val.in->val._local_zid); printf("Session 2 with PID: %s\n", zid2.val); _z_string_clear(&zid2); diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index 17bdb27e2..f0448ad41 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -73,7 +73,8 @@ int main(int argc, char **argv) { z_owned_session_t s1 = z_open(z_move(config)); assert(z_check(s1)); - _z_bytes_t id_as_bytes = _z_bytes_wrap(z_loan(s1)._val->_local_zid.id, _z_id_len(z_loan(s1)._val->_local_zid)); + _z_bytes_t id_as_bytes = + _z_bytes_wrap(z_loan(s1)._val.in->val._local_zid.id, _z_id_len(z_loan(s1)._val.in->val._local_zid)); z_string_t zid1 = _z_string_from_bytes(&id_as_bytes); printf("Session 1 with PID: %s\n", zid1.val); _z_string_clear(&zid1); @@ -90,7 +91,7 @@ int main(int argc, char **argv) { z_owned_session_t s2 = z_open(z_move(config)); assert(z_check(s2)); - id_as_bytes = _z_bytes_wrap(z_loan(s2)._val->_local_zid.id, _z_id_len(z_loan(s2)._val->_local_zid)); + id_as_bytes = _z_bytes_wrap(z_loan(s2)._val.in->val._local_zid.id, _z_id_len(z_loan(s2)._val.in->val._local_zid)); z_string_t zid2 = _z_string_from_bytes(&id_as_bytes); printf("Session 2 with PID: %s\n", zid2.val); _z_string_clear(&zid2);