From f4f945f50492eb5ebfa943ea7cf85618760794a6 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 9 Sep 2024 16:48:57 +0200 Subject: [PATCH 1/5] fix: weak_upgrade take const pointer --- include/zenoh-pico/collections/refcount.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 167ca734a..28f48d25d 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -143,7 +143,7 @@ size_t _z_rc_strong_count(void *cnt); return c; \ } \ static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { *dst = name##_weak_clone(p); } \ - static inline name##_rc_t name##_weak_upgrade(name##_weak_t *p) { \ + static inline name##_rc_t name##_weak_upgrade(const name##_weak_t *p) { \ name##_rc_t c = name##_rc_null(); \ if (_z_rc_weak_upgrade(p->_cnt) == _Z_RES_OK) { \ c._val = p->_val; \ From d8b5c778728b32bd8082a8a76b40d6c4713b6d15 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 9 Sep 2024 16:49:41 +0200 Subject: [PATCH 2/5] feat: switch publisher to weak session rc --- include/zenoh-pico/net/publish.h | 2 +- src/api/api.c | 45 ++++++++++++++++++++++---------- src/net/primitives.c | 4 +-- src/net/publish.c | 2 +- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/include/zenoh-pico/net/publish.h b/include/zenoh-pico/net/publish.h index 4be537dd3..58c632d38 100644 --- a/include/zenoh-pico/net/publish.h +++ b/include/zenoh-pico/net/publish.h @@ -25,7 +25,7 @@ typedef struct _z_publisher_t { _z_keyexpr_t _key; _z_zint_t _id; - _z_session_rc_t _zn; + _z_session_weak_t _zn; _z_encoding_t _encoding; z_congestion_control_t _congestion_control; z_priority_t _priority; diff --git a/src/api/api.c b/src/api/api.c index f49ca2f7d..415e5f505 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1232,18 +1232,27 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload // Remove potentially redundant ke suffix _z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true); - // Check if write filter is active before writing - if (!_z_write_filter_active(pub)) { - // Write value - ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, - Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, - _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + // Try to upgrade session rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&pub->_zn); + if (!_Z_RC_IS_NULL(&sess_rc)) { + // Check if write filter is active before writing + if (!_z_write_filter_active(pub)) { + // Write value + ret = _z_write(_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, + Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + } + // Trigger local subscriptions + _z_trigger_local_subscriptions( + _Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, + _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), + opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + + _z_session_rc_drop(&sess_rc); + } else { + ret = _Z_ERR_CONNECTION_CLOSED; } - // Trigger local subscriptions - _z_trigger_local_subscriptions( - _Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, - _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + // Clean-up _z_encoding_clear(&encoding); z_bytes_drop(opt.attachment); @@ -1265,9 +1274,17 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del // Remove potentially redundant ke suffix _z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true); - return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, - pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), - reliability); + // Try to upgrade session rc + _z_session_rc_t sess_rc = _z_session_weak_upgrade(&pub->_zn); + if (_Z_RC_IS_NULL(&sess_rc)) { + return _Z_ERR_CONNECTION_CLOSED; + } + int8_t ret = _z_write(_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, + pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), + reliability); + // Clean up + _z_session_rc_drop(&sess_rc); + return ret; } const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) { diff --git a/src/net/primitives.c b/src/net/primitives.c index 2fd2ea072..3489fc1b7 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -114,7 +114,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye ret._priority = priority; ret._is_express = is_express; ret.reliability = reliability; - ret._zn = _z_session_rc_clone(zn); + ret._zn = _z_session_rc_clone_as_weak(zn); ret._encoding = encoding == NULL ? _z_encoding_null() : _z_encoding_steal(encoding); return ret; } @@ -126,7 +126,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) { // Clear publisher _z_write_filter_destroy(pub); _z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id); - _z_session_rc_drop(&pub->_zn); + _z_session_weak_drop(&pub->_zn); return _Z_RES_OK; } diff --git a/src/net/publish.c b/src/net/publish.c index bde9b7298..9d4500a21 100644 --- a/src/net/publish.c +++ b/src/net/publish.c @@ -37,7 +37,7 @@ _Bool _z_publisher_check(const _z_publisher_t *publisher) { return !_Z_RC_IS_NUL _z_publisher_t _z_publisher_null(void) { return (_z_publisher_t) { ._congestion_control = Z_CONGESTION_CONTROL_DEFAULT, ._id = 0, ._key = _z_keyexpr_null(), - ._priority = Z_PRIORITY_DEFAULT, ._zn = _z_session_rc_null(), + ._priority = Z_PRIORITY_DEFAULT, ._zn = _z_session_weak_null(), #if Z_FEATURE_INTEREST == 1 ._filter = (_z_write_filter_t) { ._interest_id = 0, .ctx = NULL From ff925961a9606176069221e85c1016b6d017a8da Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 9 Sep 2024 17:00:53 +0200 Subject: [PATCH 3/5] chore: clang-format --- include/zenoh-pico/collections/refcount.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 28f48d25d..df6b2eadb 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -143,7 +143,7 @@ size_t _z_rc_strong_count(void *cnt); return c; \ } \ static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { *dst = name##_weak_clone(p); } \ - static inline name##_rc_t name##_weak_upgrade(const name##_weak_t *p) { \ + static inline name##_rc_t name##_weak_upgrade(const name##_weak_t *p) { \ name##_rc_t c = name##_rc_null(); \ if (_z_rc_weak_upgrade(p->_cnt) == _Z_RES_OK) { \ c._val = p->_val; \ From e9f0752ed1531f5bbae012d4e46615f009c06e31 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 10 Sep 2024 10:42:18 +0200 Subject: [PATCH 4/5] feat: use session weak for subscribers and queryables --- include/zenoh-pico/net/query.h | 2 +- include/zenoh-pico/net/subscribe.h | 2 +- src/net/primitives.c | 8 ++++---- src/net/query.c | 2 +- src/net/subscribe.c | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index d55af1c26..6800d8b0c 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -46,7 +46,7 @@ _Z_REFCOUNT_DEFINE(_z_query, _z_query) */ typedef struct { uint32_t _entity_id; - _z_session_rc_t _zn; + _z_session_weak_t _zn; } _z_queryable_t; #if Z_FEATURE_QUERYABLE == 1 diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index 581eb1894..2edf7370f 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -25,7 +25,7 @@ */ typedef struct { uint32_t _entity_id; - _z_session_rc_t _zn; + _z_session_weak_t _zn; } _z_subscriber_t; #if Z_FEATURE_SUBSCRIPTION == 1 diff --git a/src/net/primitives.c b/src/net/primitives.c index 3489fc1b7..e042680eb 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -221,7 +221,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke _z_n_msg_clear(&n_msg); // Fill subscriber ret._entity_id = s._id; - ret._zn = _z_session_rc_clone(zn); + ret._zn = _z_session_rc_clone_as_weak(zn); return ret; } @@ -250,7 +250,7 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { // Only if message is successfully send, local subscription state can be removed _z_undeclare_resource(_Z_RC_IN_VAL(&sub->_zn), _Z_RC_IN_VAL(s)->_key_id); _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_RESOURCE_IS_LOCAL, s); - _z_session_rc_drop(&sub->_zn); + _z_session_weak_drop(&sub->_zn); return _Z_RES_OK; } #endif @@ -285,7 +285,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye _z_n_msg_clear(&n_msg); // Fill queryable ret._entity_id = q._id; - ret._zn = _z_session_rc_clone(zn); + ret._zn = _z_session_rc_clone_as_weak(zn); return ret; } @@ -313,7 +313,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { _z_n_msg_clear(&n_msg); // Only if message is successfully send, local queryable state can be removed _z_unregister_session_queryable(_Z_RC_IN_VAL(&qle->_zn), q); - _z_session_rc_drop(&qle->_zn); + _z_session_weak_drop(&qle->_zn); return _Z_RES_OK; } diff --git a/src/net/query.c b/src/net/query.c index d26d24265..f9054b39e 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -99,7 +99,7 @@ _z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_ return q; } -_z_queryable_t _z_queryable_null(void) { return (_z_queryable_t){._entity_id = 0, ._zn = _z_session_rc_null()}; } +_z_queryable_t _z_queryable_null(void) { return (_z_queryable_t){._entity_id = 0, ._zn = _z_session_weak_null()}; } _Bool _z_queryable_check(const _z_queryable_t *queryable) { return !_Z_RC_IS_NULL(&queryable->_zn); } diff --git a/src/net/subscribe.c b/src/net/subscribe.c index e213d01a9..277f49b0f 100644 --- a/src/net/subscribe.c +++ b/src/net/subscribe.c @@ -33,6 +33,6 @@ void _z_subscriber_free(_z_subscriber_t **sub) { } _Bool _z_subscriber_check(const _z_subscriber_t *subscriber) { return !_Z_RC_IS_NULL(&subscriber->_zn); } -_z_subscriber_t _z_subscriber_null(void) { return (_z_subscriber_t){._entity_id = 0, ._zn = _z_session_rc_null()}; } +_z_subscriber_t _z_subscriber_null(void) { return (_z_subscriber_t){._entity_id = 0, ._zn = _z_session_weak_null()}; } #endif From a64a0b80706aa6b20df845c6606da9be4e1ee058 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 10 Sep 2024 15:47:39 +0200 Subject: [PATCH 5/5] feat: add dedicated session closed error code --- include/zenoh-pico/utils/result.h | 1 + src/api/api.c | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/include/zenoh-pico/utils/result.h b/include/zenoh-pico/utils/result.h index 87ff8308f..ad313830a 100644 --- a/include/zenoh-pico/utils/result.h +++ b/include/zenoh-pico/utils/result.h @@ -78,6 +78,7 @@ typedef enum { _Z_ERR_INVALID = -75, Z_EINVAL = -75, _Z_ERR_OVERFLOW = -74, + _Z_ERR_SESSION_CLOSED = -73, _Z_ERR_GENERIC = -128 } _z_res_t; diff --git a/src/api/api.c b/src/api/api.c index 415e5f505..8ab883013 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1250,7 +1250,7 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload _z_session_rc_drop(&sess_rc); } else { - ret = _Z_ERR_CONNECTION_CLOSED; + ret = _Z_ERR_SESSION_CLOSED; } // Clean-up @@ -1277,7 +1277,7 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del // Try to upgrade session rc _z_session_rc_t sess_rc = _z_session_weak_upgrade(&pub->_zn); if (_Z_RC_IS_NULL(&sess_rc)) { - return _Z_ERR_CONNECTION_CLOSED; + return _Z_ERR_SESSION_CLOSED; } int8_t ret = _z_write(_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(), @@ -1446,7 +1446,7 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke // Try upgrading session weak to rc _z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn); if (_Z_RC_IS_NULL(&sess_rc)) { - return _Z_ERR_CONNECTION_CLOSED; + return _Z_ERR_SESSION_CLOSED; } // Set options _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); @@ -1484,7 +1484,7 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t // Try upgrading session weak to rc _z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn); if (_Z_RC_IS_NULL(&sess_rc)) { - return _Z_ERR_CONNECTION_CLOSED; + return _Z_ERR_SESSION_CLOSED; } _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); z_query_reply_del_options_t opts; @@ -1512,7 +1512,7 @@ int8_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *payload // Try upgrading session weak to rc _z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn); if (_Z_RC_IS_NULL(&sess_rc)) { - return _Z_ERR_CONNECTION_CLOSED; + return _Z_ERR_SESSION_CLOSED; } z_query_reply_err_options_t opts; if (options == NULL) {