From d7bd3045a5092af1e3441eb2e7a2241b1c17b992 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 4 Jul 2024 11:17:53 +0200 Subject: [PATCH 01/20] fix: z_clone macro --- include/zenoh-pico/api/macros.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index 1fd710715..79f394a6f 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -238,14 +238,14 @@ * Defines a generic function for cloning any of the ``z_owned_X_t`` types. * * Parameters: - * x: The instance to clone. + * x: The clone storage. + * y: The instance to clone. * * Returns: * Returns the cloned instance of `x`. */ -#define z_clone(x) _Generic((x), \ +#define z_clone(x, y) _Generic((x), \ z_owned_keyexpr_t : z_keyexpr_clone, \ - z_owned_config_t : z_config_clone, \ z_owned_session_t : z_session_clone, \ z_owned_subscriber_t : z_subscriber_clone, \ z_owned_publisher_t : z_publisher_clone, \ @@ -259,7 +259,7 @@ z_owned_hello_t : z_hello_clone, \ z_owned_string_t : z_string_clone, \ z_owned_string_array_t : z_string_array_clone \ - )(&x) + )(&x, y) /** * Defines a generic function for making null object of any of the ``z_owned_X_t`` types. From 2d65e928436eee275ba8fd60fd81f1fbf3938453 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 4 Jul 2024 12:03:31 +0200 Subject: [PATCH 02/20] fix: rename refcount _cnt as strong count --- include/zenoh-pico/collections/refcount.h | 24 +++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index c73a0b2d1..39e30f04f 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -45,10 +45,10 @@ // c11 atomic variant #define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) -#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_DECR_AND_CMP \ - _z_atomic_fetch_sub_explicit(&p->in->_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 + _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #else // ZENOH_C_STANDARD == 99 @@ -56,11 +56,11 @@ // c99 gcc sync builtin variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_CNT \ - __sync_fetch_and_and(&p.in->_cnt, (unsigned int)0); \ - __sync_fetch_and_add(&p.in->_cnt, (unsigned int)1); -#define _ZP_RC_OP_INCR_CNT __sync_fetch_and_add(&p->in->_cnt, (unsigned int)1); -#define _ZP_RC_OP_DECR_AND_CMP __sync_fetch_and_sub(&p->in->_cnt, (unsigned int)1) > (unsigned int)1 +#define _ZP_RC_OP_INIT_CNT \ + __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); \ + __sync_fetch_and_add(&p.in->_strong_cnt, (unsigned int)1); +#define _ZP_RC_OP_INCR_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); +#define _ZP_RC_OP_DECR_AND_CMP __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 #define _ZP_RC_OP_SYNC __sync_synchronize(); #else // !ZENOH_COMPILER_GCC @@ -79,9 +79,9 @@ // Single thread variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_CNT p.in->_cnt = (unsigned int)1; -#define _ZP_RC_OP_INCR_CNT p->in->_cnt += (unsigned int)1; -#define _ZP_RC_OP_DECR_AND_CMP p->in->_cnt-- > (unsigned int)1 +#define _ZP_RC_OP_INIT_CNT p.in->_strong_cnt = (unsigned int)1; +#define _ZP_RC_OP_INCR_CNT p->in->_strong_cnt += (unsigned int)1; +#define _ZP_RC_OP_DECR_AND_CMP p->in->_strong_cnt-- > (unsigned int)1 #define _ZP_RC_OP_SYNC #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -90,7 +90,7 @@ #define _Z_REFCOUNT_DEFINE(name, type) \ typedef struct name##_inner_rc_t { \ type##_t val; \ - _ZP_RC_CNT_TYPE _cnt; \ + _ZP_RC_CNT_TYPE _strong_cnt; \ } name##_inner_rc_t; \ typedef struct name##_rc_t { \ name##_inner_rc_t *in; \ From 634a0bada6de1cdc107eb21ceaa35c2e7fe89a47 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 4 Jul 2024 15:51:01 +0200 Subject: [PATCH 03/20] fix: missing feature token on z_ping --- examples/unix/c99/z_ping.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/unix/c99/z_ping.c b/examples/unix/c99/z_ping.c index 46f956181..57189c826 100644 --- a/examples/unix/c99/z_ping.c +++ b/examples/unix/c99/z_ping.c @@ -22,7 +22,7 @@ #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/system/platform.h" -#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 && Z_FEATURE_MULTI_THREAD == 1 #define DEFAULT_PKT_SIZE 8 #define DEFAULT_PING_NB 100 @@ -190,8 +190,8 @@ struct args_t parse_args(int argc, char** argv) { #else int main(void) { printf( - "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION but this example " - "requires them.\n"); + "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION or " + "Z_FEATURE_MULTI_THREAD but this example requires them.\n"); return -2; } #endif From 6192e2dd18809704443c77aace4f91d8fabcc546 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 4 Jul 2024 15:56:29 +0200 Subject: [PATCH 04/20] feat: add weak references --- include/zenoh-pico/collections/refcount.h | 259 +++++++++++++++------- 1 file changed, 182 insertions(+), 77 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 39e30f04f..c4cdbefda 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -45,10 +45,21 @@ // c11 atomic variant #define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) -#define _ZP_RC_OP_INIT_CNT _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_INCR_CNT _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INIT_STRONG_CNT \ + _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); \ + _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)0, _z_memory_order_relaxed); +#define _ZP_RC_OP_INIT_WEAK_CNT \ + _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)0, _z_memory_order_relaxed); \ + _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INCR_STRONG_CNT \ + _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); +#define _ZP_RC_OP_INCR_WEAK_CNT \ + _z_atomic_fetch_add_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_DECR_AND_CMP \ _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 +#define _ZP_RC_OP_DECR_WEAK_CNT \ + _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release); +#define _ZP_RC_OP_COMP_CNT(x, y) atomic_compare_exchange_strong(&x, &y, y) #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #else // ZENOH_C_STANDARD == 99 @@ -56,11 +67,19 @@ // c99 gcc sync builtin variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_CNT \ +#define _ZP_RC_OP_INIT_STRONG_CNT \ __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); \ - __sync_fetch_and_add(&p.in->_strong_cnt, (unsigned int)1); -#define _ZP_RC_OP_INCR_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); + __sync_fetch_and_add(&p.in->_strong_cnt, (unsigned int)1); \ + __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); +#define _ZP_RC_OP_INIT_WEAK_CNT \ + __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ + __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); \ + __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); +#define _ZP_RC_OP_INCR_STRONG_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); +#define _ZP_RC_OP_INCR_WEAK_CNT __sync_fetch_and_add(&p->in->_weak_cnt, (unsigned int)1); #define _ZP_RC_OP_DECR_AND_CMP __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 +#define _ZP_RC_OP_DECR_WEAK_CNT __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1); +#define _ZP_RC_OP_COMP_CNT(x, y) __sync_bool_compare_and_swap(&x, y, y) #define _ZP_RC_OP_SYNC __sync_synchronize(); #else // !ZENOH_COMPILER_GCC @@ -68,9 +87,13 @@ // None variant #error "Multi-thread refcount in C99 only exists for GCC, use GCC or C11 or deactivate multi-thread" #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_CNT -#define _ZP_RC_OP_INCR_CNT -#define _ZP_RC_OP_DECR_AND_CMP +#define _ZP_RC_OP_INIT_STRONG_CNT +#define _ZP_RC_OP_INIT_WEAK_CNT +#define _ZP_RC_OP_INCR_STRONG_CNT +#define _ZP_RC_OP_INCR_WEAK_CNT +#define _ZP_RC_OP_DECR_AND_CMP true +#define _ZP_RC_OP_DECR_WEAK_CNT +#define _ZP_RC_OP_COMP_CNT(x, y) (x == y) #define _ZP_RC_OP_SYNC #endif // ZENOH_COMPILER_GCC @@ -79,81 +102,163 @@ // Single thread variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_CNT p.in->_strong_cnt = (unsigned int)1; -#define _ZP_RC_OP_INCR_CNT p->in->_strong_cnt += (unsigned int)1; +#define _ZP_RC_OP_INIT_STRONG_CNT \ + p.in->_strong_cnt = (unsigned int)1; \ + p.in->_weak_cnt = (unsigned int)0; +#define _ZP_RC_OP_INIT_WEAK_CNT \ + p.in->_strong_cnt = (unsigned int)0; \ + p.in->_weak_cnt = (unsigned int)1; +#define _ZP_RC_OP_INCR_STRONG_CNT p->in->_strong_cnt += (unsigned int)1; +#define _ZP_RC_OP_INCR_WEAK_CNT p->in->_weak_cnt += (unsigned int)1; #define _ZP_RC_OP_DECR_AND_CMP p->in->_strong_cnt-- > (unsigned int)1 +#define _ZP_RC_OP_DECR_WEAK_CNT p->in->_weak_cnt--; +#define _ZP_RC_OP_COMP_CNT(x, y) (x == y) #define _ZP_RC_OP_SYNC #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Internal Array Macros ------------------*/ -#define _Z_REFCOUNT_DEFINE(name, type) \ - typedef struct name##_inner_rc_t { \ - type##_t val; \ - _ZP_RC_CNT_TYPE _strong_cnt; \ - } name##_inner_rc_t; \ - typedef struct name##_rc_t { \ - name##_inner_rc_t *in; \ - } name##_rc_t; \ - static inline name##_rc_t name##_rc_null(void) { \ - name##_rc_t p; \ - p.in = NULL; \ - return p; \ - } \ - static inline name##_rc_t name##_rc_new(void) { \ - name##_rc_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - memset(&p.in->val, 0, sizeof(type##_t)); \ - _ZP_RC_OP_INIT_CNT \ - } \ - return p; \ - } \ - static inline name##_rc_t name##_rc_new_from_val(type##_t val) { \ - name##_rc_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - p.in->val = val; \ - _ZP_RC_OP_INIT_CNT \ - } \ - return p; \ - } \ - static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ - name##_rc_t c; \ - c.in = p->in; \ - _ZP_RC_OP_INCR_CNT \ - return c; \ - } \ - static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ - name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ - if (c != NULL) { \ - c->in = p->in; \ - _ZP_RC_OP_INCR_CNT \ - } \ - return c; \ - } \ - static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ - dst->in = p->in; \ - _ZP_RC_OP_INCR_CNT \ - } \ - static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ - return (left->in == right->in); \ - } \ - static inline _Bool name##_rc_drop(name##_rc_t *p) { \ - if ((p == NULL) || (p->in == NULL)) { \ - return false; \ - } \ - if (_ZP_RC_OP_DECR_AND_CMP) { \ - return false; \ - } \ - _ZP_RC_OP_SYNC \ - type##_clear(&p->in->val); \ - z_free(p->in); \ - return true; \ - } \ - static inline size_t name##_rc_size(name##_rc_t *p) { \ - _ZP_UNUSED(p); \ - return sizeof(name##_rc_t); \ +#define _Z_REFCOUNT_DEFINE(name, type) \ + typedef struct name##_inner_rc_t { \ + type##_t val; \ + _ZP_RC_CNT_TYPE _strong_cnt; \ + _ZP_RC_CNT_TYPE _weak_cnt; \ + } name##_inner_rc_t; \ + typedef struct name##_rc_t { \ + name##_inner_rc_t *in; \ + } name##_rc_t; \ + typedef struct name##_weak_t { \ + name##_inner_rc_t *in; \ + } name##_weak_t; \ + static inline name##_rc_t name##_rc_null(void) { \ + name##_rc_t p; \ + p.in = NULL; \ + return p; \ + } \ + static inline name##_rc_t name##_rc_new(void) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + memset(&p.in->val, 0, sizeof(type##_t)); \ + _ZP_RC_OP_INIT_STRONG_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_new_from_val(type##_t val) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + p.in->val = val; \ + _ZP_RC_OP_INIT_STRONG_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ + name##_rc_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + return c; \ + } \ + static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ + name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ + if (c != NULL) { \ + c->in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + } \ + return c; \ + } \ + static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \ + name##_weak_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + return c; \ + } \ + static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \ + name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \ + if (c != NULL) { \ + c->in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + return c; \ + } \ + static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ + dst->in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + } \ + static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ + return (left->in == right->in); \ + } \ + static inline _Bool name##_rc_drop(name##_rc_t *p) { \ + if ((p == NULL) || (p->in == NULL)) { \ + return false; \ + } \ + if (_ZP_RC_OP_DECR_AND_CMP) { \ + return false; \ + } \ + _ZP_RC_OP_SYNC \ + type##_clear(&p->in->val); \ + unsigned int cmp_val = 0; \ + if (_ZP_RC_OP_COMP_CNT(p->in->_strong_cnt, cmp_val) && _ZP_RC_OP_COMP_CNT(p->in->_weak_cnt, cmp_val)) { \ + z_free(p->in); \ + } \ + return true; \ + } \ + static inline name##_weak_t name##_weak_null(void) { \ + name##_weak_t p; \ + p.in = NULL; \ + return p; \ + } \ + static inline name##_weak_t name##_weak_new(void) { \ + name##_weak_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + memset(&p.in->val, 0, sizeof(type##_t)); \ + _ZP_RC_OP_INIT_WEAK_CNT \ + } \ + return p; \ + } \ + static inline name##_weak_t name##_weak_new_from_val(type##_t val) { \ + name##_weak_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + p.in->val = val; \ + _ZP_RC_OP_INIT_WEAK_CNT \ + } \ + return p; \ + } \ + static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ + name##_weak_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + return c; \ + } \ + static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { \ + dst->in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + static inline _Bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \ + return (left->in == right->in); \ + } \ + static inline _Bool name##_weak_check(const name##_weak_t *p) { \ + unsigned int cmp_val = 0; \ + return (!_ZP_RC_OP_COMP_CNT(p->in->_strong_cnt, cmp_val)); \ + } \ + static inline _Bool name##_weak_drop(name##_weak_t *p) { \ + if ((p == NULL) || (p->in == NULL)) { \ + return false; \ + } \ + _ZP_RC_OP_DECR_WEAK_CNT \ + unsigned int cmp_val = 0; \ + if (!name##_weak_check(p) && _ZP_RC_OP_COMP_CNT(p->in->_weak_cnt, cmp_val)) { \ + _ZP_RC_OP_SYNC \ + z_free(p->in); \ + return true; \ + } \ + return false; \ + } \ + static inline size_t name##_rc_size(name##_rc_t *p) { \ + _ZP_UNUSED(p); \ + return sizeof(name##_rc_t); \ } #endif /* ZENOH_PICO_COLLECTIONS_REFCOUNT_H */ From 20dc3c2ad5fae3f2720889de32ef600c2de757e8 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 5 Jul 2024 10:59:25 +0200 Subject: [PATCH 05/20] feat: use rc ref in transport and weak in queries --- include/zenoh-pico/net/query.h | 4 ++-- include/zenoh-pico/net/session.h | 4 ++-- include/zenoh-pico/session/queryable.h | 2 +- include/zenoh-pico/session/utils.h | 4 ++-- include/zenoh-pico/transport/transport.h | 6 +++--- src/api/api.c | 14 ++++++++++---- src/net/primitives.c | 9 +++++---- src/net/query.c | 20 ++++++++++++-------- src/net/session.c | 6 +++--- src/session/queryable.c | 5 +++-- src/session/rx.c | 5 +++-- src/session/utils.c | 9 +++++---- src/transport/multicast/lease.c | 2 +- 13 files changed, 52 insertions(+), 38 deletions(-) diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index dffb5f15a..3ed21f24d 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -28,7 +28,7 @@ typedef struct _z_query_t { _z_value_t _value; _z_keyexpr_t _key; uint32_t _request_id; - _z_session_t *_zn; // FIXME: Potential UB source, Issue #476 + _z_session_weak_t _zn; // Can't be an rc because of cross referencing _z_bytes_t attachment; char *_parameters; _Bool _anyke; @@ -50,7 +50,7 @@ typedef struct { } _z_queryable_t; #if Z_FEATURE_QUERYABLE == 1 -_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn, +_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_rc_t *zn, uint32_t request_id, const _z_bytes_t attachment); void _z_queryable_clear(_z_queryable_t *qbl); void _z_queryable_free(_z_queryable_t **qbl); diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 7be2fb6a6..541e762e5 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -69,7 +69,7 @@ typedef struct _z_session_t { #endif } _z_session_t; -extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include +extern void _z_session_clear(_z_session_t *zn); // Forward declaration to avoid cyclical include _Z_REFCOUNT_DEFINE(_z_session, _z_session) @@ -84,7 +84,7 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session) * ``0`` in case of success, or a ``negative value`` in case of failure. * */ -int8_t _z_open(_z_session_t *zn, _z_config_t *config); +int8_t _z_open(_z_session_rc_t *zn, _z_config_t *config); /** * Close a zenoh-net session. diff --git a/include/zenoh-pico/session/queryable.h b/include/zenoh-pico/session/queryable.h index 67ab58bd3..400af55f8 100644 --- a/include/zenoh-pico/session/queryable.h +++ b/include/zenoh-pico/session/queryable.h @@ -28,7 +28,7 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons _z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key); _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q); -int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid, +int8_t _z_trigger_queryables(_z_session_rc_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid, const _z_bytes_t attachment); void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q); void _z_flush_session_queryable(_z_session_t *zn); diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 4875bd82f..01650d65b 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -26,11 +26,11 @@ _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, const char *locator, const uint32_t timeout, const _Bool exit_on_first); -int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid); +int8_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid); void _z_session_clear(_z_session_t *zn); int8_t _z_session_close(_z_session_t *zn, uint8_t reason); -int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); +int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index d82b68bf6..0e5163b6d 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -56,7 +56,7 @@ _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport _z_transport_peer_entry_t *entry); // Forward type declaration to avoid cyclical include -typedef struct _z_session_t _z_session_t; +typedef struct _z_session_rc_t _z_session_rc_ref_t; // Forward declaration to be used in _zp_f_send_tmsg* typedef struct _z_transport_multicast_t _z_transport_multicast_t; @@ -65,7 +65,7 @@ typedef int8_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_trans typedef struct { // Session associated to the transport - _z_session_t *_session; + _z_session_rc_ref_t *_session; #if Z_FEATURE_MULTI_THREAD == 1 // TX and RX mutexes @@ -108,7 +108,7 @@ typedef struct { typedef struct _z_transport_multicast_t { // Session associated to the transport - _z_session_t *_session; + _z_session_rc_ref_t *_session; #if Z_FEATURE_MULTI_THREAD == 1 // TX and RX mutexes diff --git a/src/api/api.c b/src/api/api.c index e7213f50d..963ac7e36 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -875,15 +875,16 @@ int8_t z_open(z_owned_session_t *zs, z_owned_config_t *config) { z_config_drop(config); return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } + zs->_rc = zsrc; // Open session - int8_t ret = _z_open(&zsrc.in->val, &config->_val); + int8_t ret = _z_open(&zs->_rc, &config->_val); if (ret != _Z_RES_OK) { - _z_session_rc_drop(&zsrc); + _z_session_rc_drop(&zs->_rc); + z_session_null(zs); z_config_drop(config); return ret; } - // Store rc in session - zs->_rc = zsrc; + // Clean up z_config_drop(config); return _Z_RES_OK; } @@ -1297,6 +1298,11 @@ void z_query_reply_options_default(z_query_reply_options_t *options) { int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, const z_query_reply_options_t *options) { + // Check session as queries can't use session rc + if (!_z_session_weak_check(&query->in->val._zn)) { + return _Z_ERR_CONNECTION_CLOSED; + } + // Set options _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); z_query_reply_options_t opts; if (options == NULL) { diff --git a/src/net/primitives.c b/src/net/primitives.c index d183cd685..9413ae689 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -314,12 +314,13 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t att) { int8_t ret = _Z_RES_OK; + _z_session_t *zn = &query->_zn.in->val; _z_keyexpr_t q_ke; _z_keyexpr_t r_ke; if (query->_anyke == false) { - q_ke = _z_get_expanded_key_from_key(query->_zn, &query->_key); - r_ke = _z_get_expanded_key_from_key(query->_zn, &keyexpr); + q_ke = _z_get_expanded_key_from_key(zn, &query->_key); + r_ke = _z_get_expanded_key_from_key(zn, &keyexpr); if (_z_keyexpr_intersects(q_ke._suffix, strlen(q_ke._suffix), r_ke._suffix, strlen(r_ke._suffix)) == false) { ret = _Z_ERR_KEYEXPR_NOT_MATCH; } @@ -329,7 +330,7 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val if (ret == _Z_RES_OK) { // Build the reply context decorator. This is NOT the final reply. - _z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid; + _z_id_t zid = zn->_local_zid; _z_keyexpr_t ke = _z_keyexpr_alias(keyexpr); _z_zenoh_message_t z_msg; switch (kind) { @@ -396,7 +397,7 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val default: return _Z_ERR_GENERIC; } - if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/net/query.c b/src/net/query.c index df5b1c5ed..ae695fa81 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -23,22 +23,26 @@ _z_query_t _z_query_null(void) { ._parameters = NULL, ._request_id = 0, ._value = _z_value_null(), - ._zn = NULL, + ._zn = {.in = NULL}, }; } void _z_query_clear(_z_query_t *q) { - // Send REPLY_FINAL message - _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id); - if (_z_send_n_msg(q->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _Z_ERROR("Query send REPLY_FINAL transport failure !"); + // Check session as queries can't use session rc + if (_z_session_weak_check(&q->_zn)) { + // Send REPLY_FINAL message + _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id); + if (_z_send_n_msg(&q->_zn.in->val, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _Z_ERROR("Query send REPLY_FINAL transport failure !"); + } + _z_msg_clear(&z_msg); } // Clean up memory - _z_msg_clear(&z_msg); z_free(q->_parameters); _z_keyexpr_clear(&q->_key); _z_value_clear(&q->_value); _z_bytes_drop(&q->attachment); + _z_session_weak_drop(&q->_zn); } void _z_query_copy(_z_query_t *dst, const _z_query_t *src) { @@ -60,11 +64,11 @@ void _z_query_free(_z_query_t **query) { } #if Z_FEATURE_QUERYABLE == 1 -_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn, +_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_rc_t *zsrc, uint32_t request_id, const _z_bytes_t attachment) { _z_query_t q = _z_query_null(); q._request_id = request_id; - q._zn = zn; + q._zn = _z_session_rc_clone_as_weak(zsrc); q._parameters = (char *)z_malloc(parameters->len + 1); memcpy(q._parameters, parameters->start, parameters->len); q._parameters[parameters->len] = 0; diff --git a/src/net/session.c b/src/net/session.c index aa78b0e0d..fe7cd7953 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -37,7 +37,7 @@ #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/uuid.h" -int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) { +int8_t __z_open_inner(_z_session_rc_t *zn, char *locator, z_whatami_t mode) { int8_t ret = _Z_RES_OK; _z_id_t local_zid = _z_id_empty(); @@ -46,7 +46,7 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) { local_zid = _z_id_empty(); return ret; } - ret = _z_new_transport(&zn->_tp, &local_zid, locator, mode); + ret = _z_new_transport(&zn->in->val._tp, &local_zid, locator, mode); if (ret != _Z_RES_OK) { local_zid = _z_id_empty(); return ret; @@ -55,7 +55,7 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) { return ret; } -int8_t _z_open(_z_session_t *zn, _z_config_t *config) { +int8_t _z_open(_z_session_rc_t *zn, _z_config_t *config) { int8_t ret = _Z_RES_OK; _z_id_t zid = _z_id_empty(); diff --git a/src/session/queryable.c b/src/session/queryable.c index a936dbff0..165b3bacc 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -132,9 +132,10 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se return ret; } -int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid, +int8_t _z_trigger_queryables(_z_session_rc_t *zsrc, _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid, const _z_bytes_t attachment) { int8_t ret = _Z_RES_OK; + _z_session_t *zn = &zsrc->in->val; _zp_session_lock_mutex(zn); @@ -146,7 +147,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *msgq, const _z_ke // Build the z_query _z_query_rc_t query = _z_query_rc_new(); - query.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, attachment); + query.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zsrc, qid, attachment); // Parse session_queryable list _z_session_queryable_rc_list_t *xs = qles; while (xs != NULL) { diff --git a/src/session/rx.c b/src/session/rx.c index a3f789388..6ff784618 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -35,8 +35,9 @@ #include "zenoh-pico/utils/logging.h" /*------------------ Handle message ------------------*/ -int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) { +int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *msg, uint16_t local_peer_id) { int8_t ret = _Z_RES_OK; + _z_session_t *zn = &zsrc->in->val; switch (msg->_tag) { case _Z_N_DECLARE: { @@ -91,7 +92,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_REQUEST_QUERY: { #if Z_FEATURE_QUERYABLE == 1 _z_msg_query_t *query = &req->_body._query; - ret = _z_trigger_queryables(zn, query, req->_key, (uint32_t)req->_rid, + ret = _z_trigger_queryables(zsrc, query, req->_key, (uint32_t)req->_rid, req->_body._query._ext_attachment); #else _Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported"); diff --git a/src/session/utils.c b/src/session/utils.c index 0db332c32..deb615ed7 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -51,8 +51,9 @@ int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size) { } /*------------------ Init/Free/Close session ------------------*/ -int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { +int8_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { int8_t ret = _Z_RES_OK; + _z_session_t *zn = &zsrc->in->val; // Initialize the counters to 1 zn->_entity_id = 1; @@ -85,13 +86,13 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { // Note session in transport switch (zn->_tp._type) { case _Z_TRANSPORT_UNICAST_TYPE: - zn->_tp._transport._unicast._session = zn; + zn->_tp._transport._unicast._session = zsrc; break; case _Z_TRANSPORT_MULTICAST_TYPE: - zn->_tp._transport._multicast._session = zn; + zn->_tp._transport._multicast._session = zsrc; break; case _Z_TRANSPORT_RAWETH_TYPE: - zn->_tp._transport._raweth._session = zn; + zn->_tp._transport._raweth._session = zsrc; break; default: break; diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index b1f331986..0f1675dec 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -29,7 +29,7 @@ int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; next_sn._val._plain._reliable = ztm->_sn_tx_reliable; - _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; + _z_id_t zid = ztm->_session->in->val._local_zid; _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); return ztm->_send_f(ztm, &jsm); From f4c1b0ea4c6c9ef65e754df7c5be1d411194204a Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 5 Jul 2024 11:30:07 +0200 Subject: [PATCH 06/20] fix: memory leak on query consolidation edge case --- src/session/query.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/session/query.c b/src/session/query.c index 578d9ed6a..05996914f 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -115,10 +115,10 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons _z_reply_t reply = _z_reply_create(expanded_ke, Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload, &msg->_commons._timestamp, &msg->_encoding, kind, msg->_attachment); + _Bool drop = false; // Verify if this is a newer reply, free the old one in case it is if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) || (pen_qry->_consolidation == Z_CONSOLIDATION_MODE_MONOTONIC))) { - _Bool drop = false; _z_pending_reply_list_t *pen_rps = pen_qry->_pending_replies; _z_pending_reply_t *pen_rep = NULL; while (pen_rps != NULL) { @@ -167,9 +167,10 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons cb_reply = _z_reply_move(&reply); pen_qry->_callback(&cb_reply, pen_qry->_arg); _z_reply_clear(&cb_reply); + return ret; } - - if (ret != _Z_RES_OK) { + // Other cases + if (drop || (ret != _Z_RES_OK)) { _z_reply_clear(&reply); } From 16ac864cd4657d3eb9211537c4ea82c587d9f834 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 5 Jul 2024 15:30:57 +0200 Subject: [PATCH 07/20] fix: make refcount thread safe --- include/zenoh-pico/collections/refcount.h | 326 +++++++++++----------- 1 file changed, 169 insertions(+), 157 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index c4cdbefda..30152e5ca 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -47,7 +47,7 @@ #define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) #define _ZP_RC_OP_INIT_STRONG_CNT \ _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); \ - _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)0, _z_memory_order_relaxed); + _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_INIT_WEAK_CNT \ _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)0, _z_memory_order_relaxed); \ _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); @@ -55,11 +55,11 @@ _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_INCR_WEAK_CNT \ _z_atomic_fetch_add_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_DECR_AND_CMP \ +#define _ZP_RC_OP_DECR_AND_CMP_STRONG \ _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 -#define _ZP_RC_OP_DECR_WEAK_CNT \ - _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release); -#define _ZP_RC_OP_COMP_CNT(x, y) atomic_compare_exchange_strong(&x, &y, y) +#define _ZP_RC_OP_DECR_AND_CMP_WEAK \ + _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 +#define _ZP_RC_OP_CHECK_STRONG_CNT atomic_compare_exchange_strong(&p->in->_strong_cnt, &cmp_val, cmp_val) #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #else // ZENOH_C_STANDARD == 99 @@ -70,16 +70,17 @@ #define _ZP_RC_OP_INIT_STRONG_CNT \ __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); \ __sync_fetch_and_add(&p.in->_strong_cnt, (unsigned int)1); \ - __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); + __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ + __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); #define _ZP_RC_OP_INIT_WEAK_CNT \ __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); \ __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); #define _ZP_RC_OP_INCR_STRONG_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); #define _ZP_RC_OP_INCR_WEAK_CNT __sync_fetch_and_add(&p->in->_weak_cnt, (unsigned int)1); -#define _ZP_RC_OP_DECR_AND_CMP __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 -#define _ZP_RC_OP_DECR_WEAK_CNT __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1); -#define _ZP_RC_OP_COMP_CNT(x, y) __sync_bool_compare_and_swap(&x, y, y) +#define _ZP_RC_OP_DECR_AND_CMP_STRONG __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 +#define _ZP_RC_OP_DECR_AND_CMP_WEAK __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1) > (unsigned int)1 +#define _ZP_RC_OP_CHECK_STRONG_CNT __sync_bool_compare_and_swap(&p->in->_strong_cnt, cmp_val, cmp_val) #define _ZP_RC_OP_SYNC __sync_synchronize(); #else // !ZENOH_COMPILER_GCC @@ -91,9 +92,9 @@ #define _ZP_RC_OP_INIT_WEAK_CNT #define _ZP_RC_OP_INCR_STRONG_CNT #define _ZP_RC_OP_INCR_WEAK_CNT -#define _ZP_RC_OP_DECR_AND_CMP true -#define _ZP_RC_OP_DECR_WEAK_CNT -#define _ZP_RC_OP_COMP_CNT(x, y) (x == y) +#define _ZP_RC_OP_DECR_AND_CMP_STRONG true +#define _ZP_RC_OP_DECR_AND_CMP_WEAK true +#define _ZP_RC_OP_CHECK_STRONG_CNT true #define _ZP_RC_OP_SYNC #endif // ZENOH_COMPILER_GCC @@ -104,161 +105,172 @@ #define _ZP_RC_CNT_TYPE unsigned int #define _ZP_RC_OP_INIT_STRONG_CNT \ p.in->_strong_cnt = (unsigned int)1; \ - p.in->_weak_cnt = (unsigned int)0; + p.in->_weak_cnt = (unsigned int)1; #define _ZP_RC_OP_INIT_WEAK_CNT \ p.in->_strong_cnt = (unsigned int)0; \ p.in->_weak_cnt = (unsigned int)1; #define _ZP_RC_OP_INCR_STRONG_CNT p->in->_strong_cnt += (unsigned int)1; #define _ZP_RC_OP_INCR_WEAK_CNT p->in->_weak_cnt += (unsigned int)1; -#define _ZP_RC_OP_DECR_AND_CMP p->in->_strong_cnt-- > (unsigned int)1 -#define _ZP_RC_OP_DECR_WEAK_CNT p->in->_weak_cnt--; -#define _ZP_RC_OP_COMP_CNT(x, y) (x == y) +#define _ZP_RC_OP_DECR_AND_CMP_STRONG p->in->_strong_cnt-- > (unsigned int)1 +#define _ZP_RC_OP_DECR_AND_CMP_WEAK p->in->_weak_cnt-- > (unsigned int)1 +#define _ZP_RC_OP_CHECK_STRONG_CNT (p->in->_strong_cnt == cmp_val) #define _ZP_RC_OP_SYNC #endif // Z_FEATURE_MULTI_THREAD == 1 /*------------------ Internal Array Macros ------------------*/ -#define _Z_REFCOUNT_DEFINE(name, type) \ - typedef struct name##_inner_rc_t { \ - type##_t val; \ - _ZP_RC_CNT_TYPE _strong_cnt; \ - _ZP_RC_CNT_TYPE _weak_cnt; \ - } name##_inner_rc_t; \ - typedef struct name##_rc_t { \ - name##_inner_rc_t *in; \ - } name##_rc_t; \ - typedef struct name##_weak_t { \ - name##_inner_rc_t *in; \ - } name##_weak_t; \ - static inline name##_rc_t name##_rc_null(void) { \ - name##_rc_t p; \ - p.in = NULL; \ - return p; \ - } \ - static inline name##_rc_t name##_rc_new(void) { \ - name##_rc_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - memset(&p.in->val, 0, sizeof(type##_t)); \ - _ZP_RC_OP_INIT_STRONG_CNT \ - } \ - return p; \ - } \ - static inline name##_rc_t name##_rc_new_from_val(type##_t val) { \ - name##_rc_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - p.in->val = val; \ - _ZP_RC_OP_INIT_STRONG_CNT \ - } \ - return p; \ - } \ - static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ - name##_rc_t c; \ - c.in = p->in; \ - _ZP_RC_OP_INCR_STRONG_CNT \ - return c; \ - } \ - static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ - name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ - if (c != NULL) { \ - c->in = p->in; \ - _ZP_RC_OP_INCR_STRONG_CNT \ - } \ - return c; \ - } \ - static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \ - name##_weak_t c; \ - c.in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ - return c; \ - } \ - static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \ - name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \ - if (c != NULL) { \ - c->in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ - } \ - return c; \ - } \ - static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ - dst->in = p->in; \ - _ZP_RC_OP_INCR_STRONG_CNT \ - } \ - static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ - return (left->in == right->in); \ - } \ - static inline _Bool name##_rc_drop(name##_rc_t *p) { \ - if ((p == NULL) || (p->in == NULL)) { \ - return false; \ - } \ - if (_ZP_RC_OP_DECR_AND_CMP) { \ - return false; \ - } \ - _ZP_RC_OP_SYNC \ - type##_clear(&p->in->val); \ - unsigned int cmp_val = 0; \ - if (_ZP_RC_OP_COMP_CNT(p->in->_strong_cnt, cmp_val) && _ZP_RC_OP_COMP_CNT(p->in->_weak_cnt, cmp_val)) { \ - z_free(p->in); \ - } \ - return true; \ - } \ - static inline name##_weak_t name##_weak_null(void) { \ - name##_weak_t p; \ - p.in = NULL; \ - return p; \ - } \ - static inline name##_weak_t name##_weak_new(void) { \ - name##_weak_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - memset(&p.in->val, 0, sizeof(type##_t)); \ - _ZP_RC_OP_INIT_WEAK_CNT \ - } \ - return p; \ - } \ - static inline name##_weak_t name##_weak_new_from_val(type##_t val) { \ - name##_weak_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - p.in->val = val; \ - _ZP_RC_OP_INIT_WEAK_CNT \ - } \ - return p; \ - } \ - static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ - name##_weak_t c; \ - c.in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ - return c; \ - } \ - static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { \ - dst->in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ - } \ - static inline _Bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \ - return (left->in == right->in); \ - } \ - static inline _Bool name##_weak_check(const name##_weak_t *p) { \ - unsigned int cmp_val = 0; \ - return (!_ZP_RC_OP_COMP_CNT(p->in->_strong_cnt, cmp_val)); \ - } \ - static inline _Bool name##_weak_drop(name##_weak_t *p) { \ - if ((p == NULL) || (p->in == NULL)) { \ - return false; \ - } \ - _ZP_RC_OP_DECR_WEAK_CNT \ - unsigned int cmp_val = 0; \ - if (!name##_weak_check(p) && _ZP_RC_OP_COMP_CNT(p->in->_weak_cnt, cmp_val)) { \ - _ZP_RC_OP_SYNC \ - z_free(p->in); \ - return true; \ - } \ - return false; \ - } \ - static inline size_t name##_rc_size(name##_rc_t *p) { \ - _ZP_UNUSED(p); \ - return sizeof(name##_rc_t); \ +#define _Z_REFCOUNT_DEFINE(name, type) \ + typedef struct name##_inner_rc_t { \ + type##_t val; \ + _ZP_RC_CNT_TYPE _strong_cnt; \ + _ZP_RC_CNT_TYPE _weak_cnt; \ + } name##_inner_rc_t; \ + \ + typedef struct name##_rc_t { \ + name##_inner_rc_t *in; \ + } name##_rc_t; \ + \ + typedef struct name##_weak_t { \ + name##_inner_rc_t *in; \ + } name##_weak_t; \ + \ + static inline name##_rc_t name##_rc_null(void) { \ + name##_rc_t p; \ + p.in = NULL; \ + return p; \ + } \ + static inline name##_rc_t name##_rc_new(void) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + memset(&p.in->val, 0, sizeof(type##_t)); \ + _ZP_RC_OP_INIT_STRONG_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_new_from_val(type##_t val) { \ + name##_rc_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + p.in->val = val; \ + _ZP_RC_OP_INIT_STRONG_CNT \ + } \ + return p; \ + } \ + static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ + name##_rc_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + _ZP_RC_OP_INCR_WEAK_CNT \ + return c; \ + } \ + static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ + name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ + if (c != NULL) { \ + c->in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + return c; \ + } \ + static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \ + name##_weak_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + return c; \ + } \ + static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \ + name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \ + if (c != NULL) { \ + c->in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + return c; \ + } \ + static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ + dst->in = p->in; \ + _ZP_RC_OP_INCR_STRONG_CNT \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ + return (left->in == right->in); \ + } \ + static inline _Bool name##_rc_drop(name##_rc_t *p) { \ + if ((p == NULL) || (p->in == NULL)) { \ + return false; \ + } \ + if (_ZP_RC_OP_DECR_AND_CMP_STRONG) { \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + return false; \ + } \ + _ZP_RC_OP_SYNC \ + z_free(p->in); \ + return true; \ + } \ + _ZP_RC_OP_SYNC \ + type##_clear(&p->in->val); \ + \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + return false; \ + } \ + _ZP_RC_OP_SYNC \ + z_free(p->in); \ + return true; \ + } \ + static inline name##_weak_t name##_weak_null(void) { \ + name##_weak_t p; \ + p.in = NULL; \ + return p; \ + } \ + static inline name##_weak_t name##_weak_new(void) { \ + name##_weak_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + memset(&p.in->val, 0, sizeof(type##_t)); \ + _ZP_RC_OP_INIT_WEAK_CNT \ + } \ + return p; \ + } \ + static inline name##_weak_t name##_weak_new_from_val(type##_t val) { \ + name##_weak_t p; \ + p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ + if (p.in != NULL) { \ + p.in->val = val; \ + _ZP_RC_OP_INIT_WEAK_CNT \ + } \ + return p; \ + } \ + static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ + name##_weak_t c; \ + c.in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + return c; \ + } \ + static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { \ + dst->in = p->in; \ + _ZP_RC_OP_INCR_WEAK_CNT \ + } \ + static inline _Bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \ + return (left->in == right->in); \ + } \ + static inline _Bool name##_weak_check(const name##_weak_t *p) { \ + unsigned int cmp_val = 0; \ + return !_ZP_RC_OP_CHECK_STRONG_CNT; \ + } \ + static inline _Bool name##_weak_drop(name##_weak_t *p) { \ + if ((p == NULL) || (p->in == NULL)) { \ + return false; \ + } \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + return false; \ + } \ + _ZP_RC_OP_SYNC \ + z_free(p->in); \ + return true; \ + } \ + static inline size_t name##_rc_size(name##_rc_t *p) { \ + _ZP_UNUSED(p); \ + return sizeof(name##_rc_t); \ } #endif /* ZENOH_PICO_COLLECTIONS_REFCOUNT_H */ From f5976b98681f9cdd139a6444a3fcb7bff1eec9bd Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 5 Jul 2024 16:53:11 +0200 Subject: [PATCH 08/20] fix: cong_ctrl not initialized in reply options default --- include/zenoh-pico/protocol/definitions/network.h | 2 +- src/api/api.c | 1 + src/protocol/codec/network.c | 5 ++--- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 1940a4660..dfaef9ce9 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -78,7 +78,7 @@ typedef _z_qos_t _z_n_qos_t; static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; - _Bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; + _Bool nodrop = (congestion_control == Z_CONGESTION_CONTROL_DROP ? false : true); ret._val = (uint8_t)((express << 4) | (nodrop << 3) | priority); return ret; } diff --git a/src/api/api.c b/src/api/api.c index 963ac7e36..25d4398d1 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1294,6 +1294,7 @@ void z_query_reply_options_default(z_query_reply_options_t *options) { options->timestamp = NULL; options->is_express = false; options->attachment = NULL; + options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT; } int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index efee7bcfc..d57574e52 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -291,7 +291,6 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_ext_responder._zid.id, 0, zidlen)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_ext_responder._eid)); } - switch (msg->_tag) { case _Z_RESPONSE_BODY_REPLY: { _Z_RETURN_IF_ERR(_z_reply_encode(wbf, &msg->_body._reply)); @@ -302,9 +301,9 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { break; } } - return ret; } + int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) { int8_t ret = _Z_RES_OK; _z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx; @@ -550,4 +549,4 @@ int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) { default: return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; } -} +} \ No newline at end of file From dbc5f822c3ff54d1f6e1d9821b52e92c25529726 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 5 Jul 2024 16:55:37 +0200 Subject: [PATCH 09/20] fix: missing file newline eof --- src/protocol/codec/network.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index d57574e52..9de920ae3 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -549,4 +549,4 @@ int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) { default: return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; } -} \ No newline at end of file +} From 8503148226b2b1526e6d21bebbd43f7225e6ad2c Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 8 Jul 2024 14:41:34 +0200 Subject: [PATCH 10/20] feat: add refcount overflow protection --- include/zenoh-pico/collections/refcount.h | 35 +++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 30152e5ca..782b8f15c 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -60,6 +60,7 @@ #define _ZP_RC_OP_DECR_AND_CMP_WEAK \ _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 #define _ZP_RC_OP_CHECK_STRONG_CNT atomic_compare_exchange_strong(&p->in->_strong_cnt, &cmp_val, cmp_val) +#define _ZP_RC_OP_CHECK_WEAK_CNT atomic_compare_exchange_strong(&p->in->_weak_cnt, &cmp_val, cmp_val) #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #else // ZENOH_C_STANDARD == 99 @@ -81,6 +82,7 @@ #define _ZP_RC_OP_DECR_AND_CMP_STRONG __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 #define _ZP_RC_OP_DECR_AND_CMP_WEAK __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1) > (unsigned int)1 #define _ZP_RC_OP_CHECK_STRONG_CNT __sync_bool_compare_and_swap(&p->in->_strong_cnt, cmp_val, cmp_val) +#define _ZP_RC_OP_CHECK_WEAK_CNT __sync_bool_compare_and_swap(&p->in->_weak_cnt, cmp_val, cmp_val) #define _ZP_RC_OP_SYNC __sync_synchronize(); #else // !ZENOH_COMPILER_GCC @@ -95,6 +97,7 @@ #define _ZP_RC_OP_DECR_AND_CMP_STRONG true #define _ZP_RC_OP_DECR_AND_CMP_WEAK true #define _ZP_RC_OP_CHECK_STRONG_CNT true +#define _ZP_RC_OP_CHECK_WEAK_CNT true #define _ZP_RC_OP_SYNC #endif // ZENOH_COMPILER_GCC @@ -114,6 +117,7 @@ #define _ZP_RC_OP_DECR_AND_CMP_STRONG p->in->_strong_cnt-- > (unsigned int)1 #define _ZP_RC_OP_DECR_AND_CMP_WEAK p->in->_weak_cnt-- > (unsigned int)1 #define _ZP_RC_OP_CHECK_STRONG_CNT (p->in->_strong_cnt == cmp_val) +#define _ZP_RC_OP_CHECK_WEAK_CNT (p->in->_weak_cnt == cmp_val) #define _ZP_RC_OP_SYNC #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -159,12 +163,21 @@ } \ static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ name##_rc_t c; \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + c.in = NULL; \ + return c; \ + } \ c.in = p->in; \ _ZP_RC_OP_INCR_STRONG_CNT \ _ZP_RC_OP_INCR_WEAK_CNT \ return c; \ } \ static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + return NULL; \ + } \ name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ if (c != NULL) { \ c->in = p->in; \ @@ -175,11 +188,20 @@ } \ static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \ name##_weak_t c; \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + c.in = NULL; \ + return c; \ + } \ c.in = p->in; \ _ZP_RC_OP_INCR_WEAK_CNT \ return c; \ } \ static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + return NULL; \ + } \ name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \ if (c != NULL) { \ c->in = p->in; \ @@ -188,6 +210,10 @@ return c; \ } \ static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + return; \ + } \ dst->in = p->in; \ _ZP_RC_OP_INCR_STRONG_CNT \ _ZP_RC_OP_INCR_WEAK_CNT \ @@ -242,11 +268,20 @@ } \ static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ name##_weak_t c; \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + c.in = NULL; \ + return c; \ + } \ c.in = p->in; \ _ZP_RC_OP_INCR_WEAK_CNT \ return c; \ } \ static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + return; \ + } \ dst->in = p->in; \ _ZP_RC_OP_INCR_WEAK_CNT \ } \ From b9a51b19c377653e4a057b99c2b121008bbb4de6 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 8 Jul 2024 16:57:10 +0200 Subject: [PATCH 11/20] feat: remove weak_new functions --- include/zenoh-pico/collections/refcount.h | 41 ++++------------------- 1 file changed, 6 insertions(+), 35 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 782b8f15c..b99a37f99 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -45,12 +45,9 @@ // c11 atomic variant #define _ZP_RC_CNT_TYPE _z_atomic(unsigned int) -#define _ZP_RC_OP_INIT_STRONG_CNT \ +#define _ZP_RC_OP_INIT_CNT \ _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); \ _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_INIT_WEAK_CNT \ - _z_atomic_store_explicit(&p.in->_strong_cnt, (unsigned int)0, _z_memory_order_relaxed); \ - _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_INCR_STRONG_CNT \ _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_INCR_WEAK_CNT \ @@ -68,15 +65,11 @@ // c99 gcc sync builtin variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_STRONG_CNT \ +#define _ZP_RC_OP_INIT_CNT \ __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); \ __sync_fetch_and_add(&p.in->_strong_cnt, (unsigned int)1); \ __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); -#define _ZP_RC_OP_INIT_WEAK_CNT \ - __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ - __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); \ - __sync_fetch_and_and(&p.in->_strong_cnt, (unsigned int)0); #define _ZP_RC_OP_INCR_STRONG_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); #define _ZP_RC_OP_INCR_WEAK_CNT __sync_fetch_and_add(&p->in->_weak_cnt, (unsigned int)1); #define _ZP_RC_OP_DECR_AND_CMP_STRONG __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 @@ -90,8 +83,7 @@ // None variant #error "Multi-thread refcount in C99 only exists for GCC, use GCC or C11 or deactivate multi-thread" #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_STRONG_CNT -#define _ZP_RC_OP_INIT_WEAK_CNT +#define _ZP_RC_OP_INIT_CNT #define _ZP_RC_OP_INCR_STRONG_CNT #define _ZP_RC_OP_INCR_WEAK_CNT #define _ZP_RC_OP_DECR_AND_CMP_STRONG true @@ -106,12 +98,9 @@ // Single thread variant #define _ZP_RC_CNT_TYPE unsigned int -#define _ZP_RC_OP_INIT_STRONG_CNT \ +#define _ZP_RC_OP_INIT_CNT \ p.in->_strong_cnt = (unsigned int)1; \ p.in->_weak_cnt = (unsigned int)1; -#define _ZP_RC_OP_INIT_WEAK_CNT \ - p.in->_strong_cnt = (unsigned int)0; \ - p.in->_weak_cnt = (unsigned int)1; #define _ZP_RC_OP_INCR_STRONG_CNT p->in->_strong_cnt += (unsigned int)1; #define _ZP_RC_OP_INCR_WEAK_CNT p->in->_weak_cnt += (unsigned int)1; #define _ZP_RC_OP_DECR_AND_CMP_STRONG p->in->_strong_cnt-- > (unsigned int)1 @@ -148,7 +137,7 @@ p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ if (p.in != NULL) { \ memset(&p.in->val, 0, sizeof(type##_t)); \ - _ZP_RC_OP_INIT_STRONG_CNT \ + _ZP_RC_OP_INIT_CNT \ } \ return p; \ } \ @@ -157,7 +146,7 @@ p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ if (p.in != NULL) { \ p.in->val = val; \ - _ZP_RC_OP_INIT_STRONG_CNT \ + _ZP_RC_OP_INIT_CNT \ } \ return p; \ } \ @@ -248,24 +237,6 @@ p.in = NULL; \ return p; \ } \ - static inline name##_weak_t name##_weak_new(void) { \ - name##_weak_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - memset(&p.in->val, 0, sizeof(type##_t)); \ - _ZP_RC_OP_INIT_WEAK_CNT \ - } \ - return p; \ - } \ - static inline name##_weak_t name##_weak_new_from_val(type##_t val) { \ - name##_weak_t p; \ - p.in = (name##_inner_rc_t *)z_malloc(sizeof(name##_inner_rc_t)); \ - if (p.in != NULL) { \ - p.in->val = val; \ - _ZP_RC_OP_INIT_WEAK_CNT \ - } \ - return p; \ - } \ static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ name##_weak_t c; \ unsigned int cmp_val = UINT32_MAX; \ From 48a4f5b9ced3a03d17a297882d5685825ebc48aa Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 11:17:13 +0200 Subject: [PATCH 12/20] feat: add weak_upgrade function --- include/zenoh-pico/collections/refcount.h | 54 ++++++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index b99a37f99..0a15b3a29 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -29,6 +29,9 @@ #define _z_atomic_store_explicit atomic_store_explicit #define _z_atomic_fetch_add_explicit atomic_fetch_add_explicit #define _z_atomic_fetch_sub_explicit atomic_fetch_sub_explicit +#define _z_atomic_load_explicit atomic_load_explicit +#define _z_atomic_compare_exchange_strong atomic_compare_exchange_strong +#define _z_atomic_compare_exchange_weak_explicit atomic_compare_exchange_weak_explicit #define _z_memory_order_acquire memory_order_acquire #define _z_memory_order_release memory_order_release #define _z_memory_order_relaxed memory_order_relaxed @@ -38,6 +41,9 @@ #define _z_atomic_store_explicit std::atomic_store_explicit #define _z_atomic_fetch_add_explicit std::atomic_fetch_add_explicit #define _z_atomic_fetch_sub_explicit std::atomic_fetch_sub_explicit +#define _z_atomic_load_explicit std::atomic_load_explicit +#define _z_atomic_compare_exchange_strong std::atomic_compare_exchange_strong +#define _z_atomic_compare_exchange_weak_explicit std::atomic_compare_exchange_weak_explicit #define _z_memory_order_acquire std::memory_order_acquire #define _z_memory_order_release std::memory_order_release #define _z_memory_order_relaxed std::memory_order_relaxed @@ -56,9 +62,22 @@ _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 #define _ZP_RC_OP_DECR_AND_CMP_WEAK \ _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 -#define _ZP_RC_OP_CHECK_STRONG_CNT atomic_compare_exchange_strong(&p->in->_strong_cnt, &cmp_val, cmp_val) -#define _ZP_RC_OP_CHECK_WEAK_CNT atomic_compare_exchange_strong(&p->in->_weak_cnt, &cmp_val, cmp_val) +#define _ZP_RC_OP_CHECK_STRONG_CNT _z_atomic_compare_exchange_strong(&p->in->_strong_cnt, &cmp_val, cmp_val) +#define _ZP_RC_OP_CHECK_WEAK_CNT _z_atomic_compare_exchange_strong(&p->in->_weak_cnt, &cmp_val, cmp_val) #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); +#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ + unsigned int prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ + while ((prev != 0) && (prev < UINT32_MAX)) { \ + unsigned int next = prev + 1; \ + if (_z_atomic_compare_exchange_weak_explicit(&p->in->_strong_cnt, &prev, next, _z_memory_order_acquire, \ + _z_memory_order_relaxed)) { \ + _ZP_RC_OP_INCR_WEAK_CNT \ + c.in = p->in; \ + return c; \ + } else { \ + prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ + } \ + } #else // ZENOH_C_STANDARD == 99 #ifdef ZENOH_COMPILER_GCC @@ -77,6 +96,18 @@ #define _ZP_RC_OP_CHECK_STRONG_CNT __sync_bool_compare_and_swap(&p->in->_strong_cnt, cmp_val, cmp_val) #define _ZP_RC_OP_CHECK_WEAK_CNT __sync_bool_compare_and_swap(&p->in->_weak_cnt, cmp_val, cmp_val) #define _ZP_RC_OP_SYNC __sync_synchronize(); +#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ + unsigned int prev = __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)0); \ + while ((prev != 0) && (prev < UINT32_MAX)) { \ + unsigned int next = prev + 1; \ + if (__sync_bool_compare_and_swap(&p->in->_strong_cnt, prev, next)) { \ + _ZP_RC_OP_INCR_WEAK_CNT \ + c.in = p->in; \ + return c; \ + } else { \ + prev = __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)0); \ + } \ + } #else // !ZENOH_COMPILER_GCC @@ -91,6 +122,7 @@ #define _ZP_RC_OP_CHECK_STRONG_CNT true #define _ZP_RC_OP_CHECK_WEAK_CNT true #define _ZP_RC_OP_SYNC +#define _ZP_RC_OP_UPGRADE_CAS_LOOP #endif // ZENOH_COMPILER_GCC #endif // ZENOH_C_STANDARD != 99 @@ -108,6 +140,13 @@ #define _ZP_RC_OP_CHECK_STRONG_CNT (p->in->_strong_cnt == cmp_val) #define _ZP_RC_OP_CHECK_WEAK_CNT (p->in->_weak_cnt == cmp_val) #define _ZP_RC_OP_SYNC +#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ + if ((p->in->_strong_cnt != 0) && (p->in->_strong_cnt < UINT32_MAX)) { \ + _ZP_RC_OP_INCR_STRONG_CNT \ + _ZP_RC_OP_INCR_WEAK_CNT \ + c.in = p->in; \ + return c; \ + } #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -256,6 +295,17 @@ dst->in = p->in; \ _ZP_RC_OP_INCR_WEAK_CNT \ } \ + static inline name##_rc_t name##_weak_upgrade(name##_weak_t *p) { \ + name##_rc_t c; \ + unsigned int cmp_val = UINT32_MAX; \ + if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + c.in = NULL; \ + return c; \ + } \ + _ZP_RC_OP_UPGRADE_CAS_LOOP \ + c.in = NULL; \ + return c; \ + } \ static inline _Bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \ return (left->in == right->in); \ } \ From 985dcf5c19329fb9638c43ae13af4602b77133a7 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 12:53:34 +0200 Subject: [PATCH 13/20] feat: add refcount test --- CMakeLists.txt | 3 + tests/z_refcount_test.c | 234 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 tests/z_refcount_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a3e01366..edeabe8e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -379,6 +379,7 @@ if(UNIX OR MSVC) add_executable(z_api_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_api_bytes_test.c) add_executable(z_api_encoding_test ${PROJECT_SOURCE_DIR}/tests/z_api_encoding_test.c) add_executable(z_api_config_test ${PROJECT_SOURCE_DIR}/tests/z_api_config_test.c) + add_executable(z_refcount_test ${PROJECT_SOURCE_DIR}/tests/z_refcount_test.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_channels_test ${Libname}) @@ -397,6 +398,7 @@ if(UNIX OR MSVC) target_link_libraries(z_api_bytes_test ${Libname}) target_link_libraries(z_api_encoding_test ${Libname}) target_link_libraries(z_api_config_test ${Libname}) + target_link_libraries(z_refcount_test ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) @@ -418,6 +420,7 @@ if(UNIX OR MSVC) add_test(z_api_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_bytes_test) add_test(z_api_encoding_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_encoding_test) add_test(z_api_config_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_config_test) + add_test(z_refcount_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_refcount_test) endif() if(BUILD_MULTICAST) diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c new file mode 100644 index 000000000..34f75b69e --- /dev/null +++ b/tests/z_refcount_test.c @@ -0,0 +1,234 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include + +#include "zenoh-pico/collections/refcount.h" +#include "zenoh-pico/system/platform-common.h" + +#undef NDEBUG +#include + +#define FOO_CLEARED_VALUE -1 + +typedef struct _dummy_t { + int foo; +} _dummy_t; + +void _dummy_clear(_dummy_t *val) { + val->foo = FOO_CLEARED_VALUE; + return; +} + +_Z_REFCOUNT_DEFINE(_dummy, _dummy) + +void test_rc_null(void) { + _dummy_rc_t drc = _dummy_rc_null(); + assert(drc.in == NULL); +} + +void test_rc_size(void) { assert(_dummy_rc_size(NULL) == sizeof(_dummy_rc_t)); } + +void test_rc_drop(void) { + _dummy_rc_t drc = _dummy_rc_null(); + assert(!_dummy_rc_drop(NULL)); + assert(!_dummy_rc_drop(&drc)); +} + +void test_rc_new(void) { + _dummy_rc_t drc = _dummy_rc_new(); + assert(drc.in != NULL); + assert(drc.in->_strong_cnt == 1); + assert(drc.in->_weak_cnt == 1); + assert(drc.in->val.foo == 0); + assert(_dummy_rc_drop(&drc)); +} + +void test_rc_new_from_val(void) { + _dummy_t val = {.foo = 42}; + _dummy_rc_t drc = _dummy_rc_new_from_val(val); + assert(drc.in != NULL); + assert(drc.in->_strong_cnt == 1); + assert(drc.in->_weak_cnt == 1); + assert(drc.in->val.foo == 42); + assert(_dummy_rc_drop(&drc)); +} + +void test_rc_clone(void) { + _dummy_t val = {.foo = 42}; + _dummy_rc_t drc1 = _dummy_rc_new_from_val(val); + assert(drc1.in->_strong_cnt == 1); + assert(drc1.in->_weak_cnt == 1); + + _dummy_rc_t drc2 = _dummy_rc_clone(&drc1); + assert(drc2.in != NULL); + assert(drc2.in->_strong_cnt == 2); + assert(drc2.in->_weak_cnt == 2); + assert(drc2.in->_strong_cnt == drc1.in->_strong_cnt); + assert(drc2.in->_weak_cnt == drc1.in->_weak_cnt); + assert(drc2.in->val.foo == drc1.in->val.foo); + + assert(!_dummy_rc_drop(&drc1)); + assert(drc2.in->_strong_cnt == 1); + assert(drc2.in->_weak_cnt == 1); + assert(drc2.in->val.foo == 42); + assert(_dummy_rc_drop(&drc2)); +} + +void test_rc_eq(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_rc_t drc2 = _dummy_rc_clone(&drc1); + assert(_dummy_rc_eq(&drc1, &drc2)); + assert(!_dummy_rc_drop(&drc1)); + assert(_dummy_rc_drop(&drc2)); +} + +void test_rc_clone_as_ptr(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_rc_t *drc2 = _dummy_rc_clone_as_ptr(&drc1); + assert(drc2 != NULL); + assert(drc2->in != NULL); + assert(drc2->in->_strong_cnt == 2); + assert(drc2->in->_weak_cnt == 2); + assert(_dummy_rc_eq(&drc1, drc2)); + assert(!_dummy_rc_drop(&drc1)); + assert(_dummy_rc_drop(drc2)); + z_free(drc2); +} + +void test_rc_copy(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_rc_t drc2 = _dummy_rc_null(); + assert(!_dummy_rc_eq(&drc1, &drc2)); + _dummy_rc_copy(&drc2, &drc1); + assert(drc2.in->_strong_cnt == 2); + assert(drc2.in->_weak_cnt == 2); + assert(_dummy_rc_eq(&drc1, &drc2)); + assert(!_dummy_rc_drop(&drc2)); + assert(_dummy_rc_drop(&drc1)); +} + +void test_rc_clone_as_weak(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); + assert(dwk1.in != NULL); + assert(dwk1.in->_strong_cnt == 1); + assert(dwk1.in->_weak_cnt == 2); + assert(_dummy_weak_check(&dwk1)); + + assert(dwk1.in->val.foo == 0); + assert(!_dummy_rc_drop(&drc1)); + assert(dwk1.in->_strong_cnt == 0); + assert(dwk1.in->_weak_cnt == 1); + assert(!_dummy_weak_check(&dwk1)); + assert(dwk1.in->val.foo == FOO_CLEARED_VALUE); + assert(_dummy_weak_drop(&dwk1)); +} + +void test_rc_clone_as_weak_ptr(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_weak_t *dwk1 = _dummy_rc_clone_as_weak_ptr(&drc1); + assert(dwk1 != NULL); + assert(dwk1->in != NULL); + assert(dwk1->in->_strong_cnt == 1); + assert(dwk1->in->_weak_cnt == 2); + assert(_dummy_weak_check(dwk1)); + + assert(!_dummy_rc_drop(&drc1)); + assert(dwk1->in->_strong_cnt == 0); + assert(dwk1->in->_weak_cnt == 1); + assert(!_dummy_weak_check(dwk1)); + assert(_dummy_weak_drop(dwk1)); + z_free(dwk1); +} + +void test_weak_null(void) { + _dummy_weak_t dwk = _dummy_weak_null(); + assert(dwk.in == NULL); +} + +void test_weak_clone(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); + assert(dwk1.in->_strong_cnt == 1); + assert(dwk1.in->_weak_cnt == 2); + + _dummy_weak_t dwk2 = _dummy_weak_clone(&dwk1); + assert(dwk2.in->_strong_cnt == 1); + assert(dwk2.in->_weak_cnt == 3); + + assert(!_dummy_rc_drop(&drc1)); + assert(dwk2.in->_strong_cnt == 0); + assert(dwk2.in->_weak_cnt == 2); + + assert(_dummy_weak_eq(&dwk1, &dwk2)); + assert(!_dummy_weak_drop(&dwk2)); + assert(_dummy_weak_drop(&dwk1)); +} + +void test_weak_copy(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); + _dummy_weak_t dwk2 = _dummy_weak_null(); + assert(!_dummy_weak_eq(&dwk1, &dwk2)); + + _dummy_weak_copy(&dwk2, &dwk1); + assert(_dummy_weak_eq(&dwk1, &dwk2)); + assert(dwk2.in->_strong_cnt == 1); + assert(dwk2.in->_weak_cnt == 3); + + assert(!_dummy_weak_drop(&dwk1)); + assert(!_dummy_weak_drop(&dwk2)); + assert(_dummy_rc_drop(&drc1)); +} + +void test_weak_upgrade(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); + + // Valid upgrade + _dummy_rc_t drc2 = _dummy_weak_upgrade(&dwk1); + assert(drc2.in != NULL); + assert(drc2.in->_strong_cnt == 2); + assert(drc2.in->_weak_cnt == 3); + assert(!_dummy_rc_drop(&drc1)); + assert(!_dummy_rc_drop(&drc2)); + + // Failed upgrade + _dummy_rc_t drc3 = _dummy_weak_upgrade(&dwk1); + assert(drc3.in == NULL); + assert(_dummy_weak_drop(&dwk1)); +} + +int main(void) { + test_rc_null(); + test_rc_size(); + test_rc_drop(); + test_rc_new(); + test_rc_new_from_val(); + test_rc_clone(); + test_rc_eq(); + test_rc_clone_as_ptr(); + test_rc_copy(); + test_rc_clone_as_weak(); + test_rc_clone_as_weak_ptr(); + test_weak_null(); + test_weak_clone(); + test_weak_copy(); + test_weak_upgrade(); + return 0; +} From e47da6c499830754199f504f0ed7734c210c9864 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 13:09:04 +0200 Subject: [PATCH 14/20] fix: rebase shenanigans --- src/api/api.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api/api.c b/src/api/api.c index 25d4398d1..963ac7e36 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1294,7 +1294,6 @@ void z_query_reply_options_default(z_query_reply_options_t *options) { options->timestamp = NULL; options->is_express = false; options->attachment = NULL; - options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT; } int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, From 8928c8d15669f1fbb0db1aab53155a21d5c98c62 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 16:21:31 +0200 Subject: [PATCH 15/20] feat: make overflow check thread safe --- include/zenoh-pico/collections/refcount.h | 169 +++++++++++----------- 1 file changed, 84 insertions(+), 85 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 0a15b3a29..dd659e206 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -18,8 +18,11 @@ #include #include +#include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" +#define _Z_RC_MAX_COUNT INT32_MAX // Based on Rust lazy overflow check + #if Z_FEATURE_MULTI_THREAD == 1 #if ZENOH_C_STANDARD != 99 @@ -56,27 +59,27 @@ _z_atomic_store_explicit(&p.in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); #define _ZP_RC_OP_INCR_STRONG_CNT \ _z_atomic_fetch_add_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_INCR_WEAK_CNT \ - _z_atomic_fetch_add_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed); -#define _ZP_RC_OP_DECR_AND_CMP_STRONG \ - _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 -#define _ZP_RC_OP_DECR_AND_CMP_WEAK \ - _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)1 -#define _ZP_RC_OP_CHECK_STRONG_CNT _z_atomic_compare_exchange_strong(&p->in->_strong_cnt, &cmp_val, cmp_val) -#define _ZP_RC_OP_CHECK_WEAK_CNT _z_atomic_compare_exchange_strong(&p->in->_weak_cnt, &cmp_val, cmp_val) +#define _ZP_RC_OP_INCR_AND_CMP_WEAK(x) \ + _z_atomic_fetch_add_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_relaxed) >= x +#define _ZP_RC_OP_DECR_AND_CMP_STRONG(x) \ + _z_atomic_fetch_sub_explicit(&p->in->_strong_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)x +#define _ZP_RC_OP_DECR_AND_CMP_WEAK(x) \ + _z_atomic_fetch_sub_explicit(&p->in->_weak_cnt, (unsigned int)1, _z_memory_order_release) > (unsigned int)x +#define _ZP_RC_OP_CHECK_STRONG_CNT(x) _z_atomic_compare_exchange_strong(&p->in->_strong_cnt, &x, x) #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); -#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ - unsigned int prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ - while ((prev != 0) && (prev < UINT32_MAX)) { \ - unsigned int next = prev + 1; \ - if (_z_atomic_compare_exchange_weak_explicit(&p->in->_strong_cnt, &prev, next, _z_memory_order_acquire, \ - _z_memory_order_relaxed)) { \ - _ZP_RC_OP_INCR_WEAK_CNT \ - c.in = p->in; \ - return c; \ - } else { \ - prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ - } \ +#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ + unsigned int prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ + while ((prev != 0) && (prev < UINT32_MAX)) { \ + if (_z_atomic_compare_exchange_weak_explicit(&p->in->_strong_cnt, &prev, prev + 1, _z_memory_order_acquire, \ + _z_memory_order_relaxed)) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + c.in = NULL; \ + return c; \ + } \ + c.in = p->in; \ + return c; \ + } \ } #else // ZENOH_C_STANDARD == 99 @@ -90,18 +93,20 @@ __sync_fetch_and_and(&p.in->_weak_cnt, (unsigned int)0); \ __sync_fetch_and_add(&p.in->_weak_cnt, (unsigned int)1); #define _ZP_RC_OP_INCR_STRONG_CNT __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)1); -#define _ZP_RC_OP_INCR_WEAK_CNT __sync_fetch_and_add(&p->in->_weak_cnt, (unsigned int)1); -#define _ZP_RC_OP_DECR_AND_CMP_STRONG __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)1 -#define _ZP_RC_OP_DECR_AND_CMP_WEAK __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1) > (unsigned int)1 -#define _ZP_RC_OP_CHECK_STRONG_CNT __sync_bool_compare_and_swap(&p->in->_strong_cnt, cmp_val, cmp_val) -#define _ZP_RC_OP_CHECK_WEAK_CNT __sync_bool_compare_and_swap(&p->in->_weak_cnt, cmp_val, cmp_val) +#define _ZP_RC_OP_INCR_AND_CMP_WEAK(x) __sync_fetch_and_add(&p->in->_weak_cnt, (unsigned int)1) >= x +#define _ZP_RC_OP_DECR_AND_CMP_STRONG(x) __sync_fetch_and_sub(&p->in->_strong_cnt, (unsigned int)1) > (unsigned int)x +#define _ZP_RC_OP_DECR_AND_CMP_WEAK(x) __sync_fetch_and_sub(&p->in->_weak_cnt, (unsigned int)1) > (unsigned int)x +#define _ZP_RC_OP_CHECK_STRONG_CNT(x) __sync_bool_compare_and_swap(&p->in->_strong_cnt, x, x) #define _ZP_RC_OP_SYNC __sync_synchronize(); #define _ZP_RC_OP_UPGRADE_CAS_LOOP \ unsigned int prev = __sync_fetch_and_add(&p->in->_strong_cnt, (unsigned int)0); \ - while ((prev != 0) && (prev < UINT32_MAX)) { \ - unsigned int next = prev + 1; \ - if (__sync_bool_compare_and_swap(&p->in->_strong_cnt, prev, next)) { \ - _ZP_RC_OP_INCR_WEAK_CNT \ + while ((prev != 0) && (prev < _Z_RC_MAX_COUNT)) { \ + if (__sync_bool_compare_and_swap(&p->in->_strong_cnt, prev, prev + 1)) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + c.in = NULL; \ + return c; \ + } \ c.in = p->in; \ return c; \ } else { \ @@ -116,13 +121,12 @@ #define _ZP_RC_CNT_TYPE unsigned int #define _ZP_RC_OP_INIT_CNT #define _ZP_RC_OP_INCR_STRONG_CNT -#define _ZP_RC_OP_INCR_WEAK_CNT -#define _ZP_RC_OP_DECR_AND_CMP_STRONG true -#define _ZP_RC_OP_DECR_AND_CMP_WEAK true -#define _ZP_RC_OP_CHECK_STRONG_CNT true -#define _ZP_RC_OP_CHECK_WEAK_CNT true +#define _ZP_RC_OP_INCR_AND_CMP_WEAK(x) (x == 0) +#define _ZP_RC_OP_DECR_AND_CMP_STRONG(x) (x == 0) +#define _ZP_RC_OP_DECR_AND_CMP_WEAK(x) (x == 0) +#define _ZP_RC_OP_CHECK_STRONG_CNT(x) (x == 0) && (p != NULL) #define _ZP_RC_OP_SYNC -#define _ZP_RC_OP_UPGRADE_CAS_LOOP +#define _ZP_RC_OP_UPGRADE_CAS_LOOP (void)p; #endif // ZENOH_COMPILER_GCC #endif // ZENOH_C_STANDARD != 99 @@ -133,19 +137,22 @@ #define _ZP_RC_OP_INIT_CNT \ p.in->_strong_cnt = (unsigned int)1; \ p.in->_weak_cnt = (unsigned int)1; -#define _ZP_RC_OP_INCR_STRONG_CNT p->in->_strong_cnt += (unsigned int)1; -#define _ZP_RC_OP_INCR_WEAK_CNT p->in->_weak_cnt += (unsigned int)1; -#define _ZP_RC_OP_DECR_AND_CMP_STRONG p->in->_strong_cnt-- > (unsigned int)1 -#define _ZP_RC_OP_DECR_AND_CMP_WEAK p->in->_weak_cnt-- > (unsigned int)1 -#define _ZP_RC_OP_CHECK_STRONG_CNT (p->in->_strong_cnt == cmp_val) -#define _ZP_RC_OP_CHECK_WEAK_CNT (p->in->_weak_cnt == cmp_val) +#define _ZP_RC_OP_INCR_STRONG_CNT p->in->_strong_cnt++; +#define _ZP_RC_OP_INCR_AND_CMP_WEAK(x) p->in->_weak_cnt++ >= x +#define _ZP_RC_OP_DECR_AND_CMP_STRONG(x) p->in->_strong_cnt-- > (unsigned int)x +#define _ZP_RC_OP_DECR_AND_CMP_WEAK(x) p->in->_weak_cnt-- > (unsigned int)x +#define _ZP_RC_OP_CHECK_STRONG_CNT(x) (p->in->_strong_cnt == x) #define _ZP_RC_OP_SYNC -#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ - if ((p->in->_strong_cnt != 0) && (p->in->_strong_cnt < UINT32_MAX)) { \ - _ZP_RC_OP_INCR_STRONG_CNT \ - _ZP_RC_OP_INCR_WEAK_CNT \ - c.in = p->in; \ - return c; \ +#define _ZP_RC_OP_UPGRADE_CAS_LOOP \ + if ((p->in->_strong_cnt != 0) && (p->in->_strong_cnt < _Z_RC_MAX_COUNT)) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + c.in = NULL; \ + return c; \ + } \ + _ZP_RC_OP_INCR_STRONG_CNT \ + c.in = p->in; \ + return c; \ } #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -191,60 +198,58 @@ } \ static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \ name##_rc_t c; \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ c.in = NULL; \ return c; \ } \ - c.in = p->in; \ _ZP_RC_OP_INCR_STRONG_CNT \ - _ZP_RC_OP_INCR_WEAK_CNT \ + c.in = p->in; \ return c; \ } \ static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ - return NULL; \ - } \ name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \ if (c != NULL) { \ - c->in = p->in; \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + z_free(c); \ + return NULL; \ + } \ _ZP_RC_OP_INCR_STRONG_CNT \ - _ZP_RC_OP_INCR_WEAK_CNT \ + c->in = p->in; \ } \ return c; \ } \ static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \ name##_weak_t c; \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ c.in = NULL; \ return c; \ } \ c.in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ return c; \ } \ static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ - return NULL; \ - } \ name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \ if (c != NULL) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + z_free(c); \ + return NULL; \ + } \ c->in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ } \ return c; \ } \ static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + dst->in = NULL; \ return; \ } \ - dst->in = p->in; \ _ZP_RC_OP_INCR_STRONG_CNT \ - _ZP_RC_OP_INCR_WEAK_CNT \ + dst->in = p->in; \ } \ static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ return (left->in == right->in); \ @@ -253,8 +258,8 @@ if ((p == NULL) || (p->in == NULL)) { \ return false; \ } \ - if (_ZP_RC_OP_DECR_AND_CMP_STRONG) { \ - if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + if (_ZP_RC_OP_DECR_AND_CMP_STRONG(1)) { \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK(1)) { \ return false; \ } \ _ZP_RC_OP_SYNC \ @@ -264,7 +269,7 @@ _ZP_RC_OP_SYNC \ type##_clear(&p->in->val); \ \ - if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK(1)) { \ return false; \ } \ _ZP_RC_OP_SYNC \ @@ -278,30 +283,24 @@ } \ static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \ name##_weak_t c; \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ c.in = NULL; \ return c; \ } \ c.in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ return c; \ } \ static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ + if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ + _Z_ERROR("Rc weak count overflow"); \ + dst->in = NULL; \ return; \ } \ dst->in = p->in; \ - _ZP_RC_OP_INCR_WEAK_CNT \ } \ static inline name##_rc_t name##_weak_upgrade(name##_weak_t *p) { \ name##_rc_t c; \ - unsigned int cmp_val = UINT32_MAX; \ - if (_ZP_RC_OP_CHECK_WEAK_CNT) { \ - c.in = NULL; \ - return c; \ - } \ _ZP_RC_OP_UPGRADE_CAS_LOOP \ c.in = NULL; \ return c; \ @@ -311,13 +310,13 @@ } \ static inline _Bool name##_weak_check(const name##_weak_t *p) { \ unsigned int cmp_val = 0; \ - return !_ZP_RC_OP_CHECK_STRONG_CNT; \ + return !_ZP_RC_OP_CHECK_STRONG_CNT(cmp_val); \ } \ static inline _Bool name##_weak_drop(name##_weak_t *p) { \ if ((p == NULL) || (p->in == NULL)) { \ return false; \ } \ - if (_ZP_RC_OP_DECR_AND_CMP_WEAK) { \ + if (_ZP_RC_OP_DECR_AND_CMP_WEAK(1)) { \ return false; \ } \ _ZP_RC_OP_SYNC \ From 62d7e115346ef7fe4874bb58e5d87b4f358d0964 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 16:21:43 +0200 Subject: [PATCH 16/20] test: add rc overflow test --- tests/z_refcount_test.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index 34f75b69e..e08508ba4 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -214,6 +214,18 @@ void test_weak_upgrade(void) { assert(_dummy_weak_drop(&dwk1)); } +void test_overflow(void) { + _dummy_rc_t drc1 = _dummy_rc_new(); + // Artificially set weak count to max value + drc1.in->_weak_cnt = INT32_MAX; + + _dummy_rc_t drc2 = _dummy_rc_clone(&drc1); + assert(drc2.in == NULL); + + _dummy_weak_t dwk1 = _dummy_rc_clone_as_weak(&drc1); + assert(dwk1.in == NULL); +} + int main(void) { test_rc_null(); test_rc_size(); @@ -230,5 +242,6 @@ int main(void) { test_weak_clone(); test_weak_copy(); test_weak_upgrade(); + test_overflow(); return 0; } From d0e8a6a18118c35eec83f6919870300d74636137 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 17:01:37 +0200 Subject: [PATCH 17/20] fix: remove qos ternary --- include/zenoh-pico/protocol/definitions/network.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index dfaef9ce9..aa8f3664c 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -78,7 +78,7 @@ typedef _z_qos_t _z_n_qos_t; static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; - _Bool nodrop = (congestion_control == Z_CONGESTION_CONTROL_DROP ? false : true); + _Bool nodrop = (congestion_control != Z_CONGESTION_CONTROL_DROP); ret._val = (uint8_t)((express << 4) | (nodrop << 3) | priority); return ret; } From a9e4e3411ea8f40dda2362abcb31f629e13d3490 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 9 Jul 2024 17:13:28 +0200 Subject: [PATCH 18/20] feat: use upgrade instead of weak_check --- include/zenoh-pico/collections/refcount.h | 4 ---- src/api/api.c | 6 ++++-- src/net/query.c | 6 ++++-- tests/z_refcount_test.c | 4 ---- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index dd659e206..3c4f421e3 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -308,10 +308,6 @@ static inline _Bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \ return (left->in == right->in); \ } \ - static inline _Bool name##_weak_check(const name##_weak_t *p) { \ - unsigned int cmp_val = 0; \ - return !_ZP_RC_OP_CHECK_STRONG_CNT(cmp_val); \ - } \ static inline _Bool name##_weak_drop(name##_weak_t *p) { \ if ((p == NULL) || (p->in == NULL)) { \ return false; \ diff --git a/src/api/api.c b/src/api/api.c index 963ac7e36..af776109c 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1298,8 +1298,9 @@ void z_query_reply_options_default(z_query_reply_options_t *options) { int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, const z_query_reply_options_t *options) { - // Check session as queries can't use session rc - if (!_z_session_weak_check(&query->in->val._zn)) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn); + if (sess_rc.in == NULL) { return _Z_ERR_CONNECTION_CLOSED; } // Set options @@ -1321,6 +1322,7 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke z_bytes_drop(payload); } // Clean-up + _z_session_rc_drop(&sess_rc); z_encoding_drop(opts.encoding); z_bytes_drop(opts.attachment); return ret; diff --git a/src/net/query.c b/src/net/query.c index ae695fa81..08208e741 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -28,14 +28,16 @@ _z_query_t _z_query_null(void) { } void _z_query_clear(_z_query_t *q) { - // Check session as queries can't use session rc - if (_z_session_weak_check(&q->_zn)) { + // Try to upgrade session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&q->_zn); + if (sess_rc.in != NULL) { // Send REPLY_FINAL message _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id); if (_z_send_n_msg(&q->_zn.in->val, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _Z_ERROR("Query send REPLY_FINAL transport failure !"); } _z_msg_clear(&z_msg); + _z_session_rc_drop(&sess_rc); } // Clean up memory z_free(q->_parameters); diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index e08508ba4..e5ad98374 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -128,13 +128,11 @@ void test_rc_clone_as_weak(void) { assert(dwk1.in != NULL); assert(dwk1.in->_strong_cnt == 1); assert(dwk1.in->_weak_cnt == 2); - assert(_dummy_weak_check(&dwk1)); assert(dwk1.in->val.foo == 0); assert(!_dummy_rc_drop(&drc1)); assert(dwk1.in->_strong_cnt == 0); assert(dwk1.in->_weak_cnt == 1); - assert(!_dummy_weak_check(&dwk1)); assert(dwk1.in->val.foo == FOO_CLEARED_VALUE); assert(_dummy_weak_drop(&dwk1)); } @@ -146,12 +144,10 @@ void test_rc_clone_as_weak_ptr(void) { assert(dwk1->in != NULL); assert(dwk1->in->_strong_cnt == 1); assert(dwk1->in->_weak_cnt == 2); - assert(_dummy_weak_check(dwk1)); assert(!_dummy_rc_drop(&drc1)); assert(dwk1->in->_strong_cnt == 0); assert(dwk1->in->_weak_cnt == 1); - assert(!_dummy_weak_check(dwk1)); assert(_dummy_weak_drop(dwk1)); z_free(dwk1); } From edd90477d15afd85bbed8737b96a38d55e1083ec Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 10 Jul 2024 09:55:01 +0200 Subject: [PATCH 19/20] fix: add weak upgrade in query_reply_del --- include/zenoh-pico/collections/refcount.h | 2 +- src/api/api.c | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 3c4f421e3..9b31d248b 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -69,7 +69,7 @@ #define _ZP_RC_OP_SYNC atomic_thread_fence(_z_memory_order_acquire); #define _ZP_RC_OP_UPGRADE_CAS_LOOP \ unsigned int prev = _z_atomic_load_explicit(&p->in->_strong_cnt, _z_memory_order_relaxed); \ - while ((prev != 0) && (prev < UINT32_MAX)) { \ + while ((prev != 0) && (prev < _Z_RC_MAX_COUNT)) { \ if (_z_atomic_compare_exchange_weak_explicit(&p->in->_strong_cnt, &prev, prev + 1, _z_memory_order_acquire, \ _z_memory_order_relaxed)) { \ if (_ZP_RC_OP_INCR_AND_CMP_WEAK(_Z_RC_MAX_COUNT)) { \ diff --git a/src/api/api.c b/src/api/api.c index af776109c..f7d616c4b 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1338,6 +1338,11 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options) { int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, const z_query_reply_del_options_t *options) { + // Try upgrading session weak to rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn); + if (sess_rc.in == NULL) { + return _Z_ERR_CONNECTION_CLOSED; + } _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); z_query_reply_del_options_t opts; if (options == NULL) { @@ -1351,7 +1356,8 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t int8_t ret = _z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control, opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment)); - + // Clean-up + _z_session_rc_drop(&sess_rc); z_bytes_drop(opts.attachment); return ret; } From 329af6160a49c28d9a517a599bbb97ea243bba90 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 10 Jul 2024 11:18:27 +0200 Subject: [PATCH 20/20] fix: explicitly pass session rc --- include/zenoh-pico/net/primitives.h | 7 ++++--- src/api/api.c | 4 ++-- src/net/primitives.c | 8 ++++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 138725f6a..e950445e7 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -200,9 +200,10 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle); * kind: The type of operation. * attachment: An optional attachment to the reply. */ -int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload, - const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, - _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment); +int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_keyexpr_t keyexpr, + const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, + z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp, + const _z_bytes_t attachment); #endif #if Z_FEATURE_QUERY == 1 diff --git a/src/api/api.c b/src/api/api.c index f7d616c4b..5ec9ba3f7 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1316,7 +1316,7 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke .encoding = _z_encoding_from_owned(opts.encoding)}; int8_t ret = - _z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, opts.congestion_control, + _z_send_reply(&query->in->val, &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, opts.congestion_control, opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment)); if (payload != NULL) { z_bytes_drop(payload); @@ -1354,7 +1354,7 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t _z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()}; int8_t ret = - _z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control, + _z_send_reply(&query->in->val, &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control, opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment)); // Clean-up _z_session_rc_drop(&sess_rc); diff --git a/src/net/primitives.c b/src/net/primitives.c index 9413ae689..b8019b936 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -310,11 +310,11 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { return _Z_RES_OK; } -int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload, - const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority, - _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t att) { +int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_keyexpr_t keyexpr, + const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, + z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t att) { int8_t ret = _Z_RES_OK; - _z_session_t *zn = &query->_zn.in->val; + _z_session_t *zn = &zsrc->in->val; _z_keyexpr_t q_ke; _z_keyexpr_t r_ke;