Skip to content

Commit

Permalink
Merge pull request #636 from jean-roland/ft_session_weak
Browse files Browse the repository at this point in the history
Use session weak in user owned entities
  • Loading branch information
milyin authored Sep 10, 2024
2 parents 9e926b5 + a64a0b8 commit 2ab9562
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 30 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ size_t _z_rc_strong_count(void *cnt);
return c; \
} \
static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { *dst = name##_weak_clone(p); } \
static inline name##_rc_t name##_weak_upgrade(name##_weak_t *p) { \
static inline name##_rc_t name##_weak_upgrade(const name##_weak_t *p) { \
name##_rc_t c = name##_rc_null(); \
if (_z_rc_weak_upgrade(p->_cnt) == _Z_RES_OK) { \
c._val = p->_val; \
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
typedef struct _z_publisher_t {
_z_keyexpr_t _key;
_z_zint_t _id;
_z_session_rc_t _zn;
_z_session_weak_t _zn;
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ _Z_REFCOUNT_DEFINE(_z_query, _z_query)
*/
typedef struct {
uint32_t _entity_id;
_z_session_rc_t _zn;
_z_session_weak_t _zn;
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
typedef struct {
uint32_t _entity_id;
_z_session_rc_t _zn;
_z_session_weak_t _zn;
} _z_subscriber_t;

#if Z_FEATURE_SUBSCRIPTION == 1
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/utils/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum {
_Z_ERR_INVALID = -75,
Z_EINVAL = -75,
_Z_ERR_OVERFLOW = -74,
_Z_ERR_SESSION_CLOSED = -73,

_Z_ERR_GENERIC = -128
} _z_res_t;
Expand Down
51 changes: 34 additions & 17 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1232,18 +1232,27 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&pub->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);

_z_session_rc_drop(&sess_rc);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);

// Clean-up
_z_encoding_clear(&encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1265,9 +1274,17 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(),
reliability);
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&pub->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_SESSION_CLOSED;
}
int8_t ret = _z_write(_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(),
reliability);
// Clean up
_z_session_rc_drop(&sess_rc);
return ret;
}

const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
Expand Down Expand Up @@ -1429,7 +1446,7 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_CONNECTION_CLOSED;
return _Z_ERR_SESSION_CLOSED;
}
// Set options
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
Expand Down Expand Up @@ -1467,7 +1484,7 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_CONNECTION_CLOSED;
return _Z_ERR_SESSION_CLOSED;
}
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
z_query_reply_del_options_t opts;
Expand Down Expand Up @@ -1495,7 +1512,7 @@ int8_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *payload
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
return _Z_ERR_CONNECTION_CLOSED;
return _Z_ERR_SESSION_CLOSED;
}
z_query_reply_err_options_t opts;
if (options == NULL) {
Expand Down
12 changes: 6 additions & 6 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye
ret._priority = priority;
ret._is_express = is_express;
ret.reliability = reliability;
ret._zn = _z_session_rc_clone(zn);
ret._zn = _z_session_rc_clone_as_weak(zn);
ret._encoding = encoding == NULL ? _z_encoding_null() : _z_encoding_steal(encoding);
return ret;
}
Expand All @@ -126,7 +126,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
// Clear publisher
_z_write_filter_destroy(pub);
_z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id);
_z_session_rc_drop(&pub->_zn);
_z_session_weak_drop(&pub->_zn);
return _Z_RES_OK;
}

Expand Down Expand Up @@ -221,7 +221,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke
_z_n_msg_clear(&n_msg);
// Fill subscriber
ret._entity_id = s._id;
ret._zn = _z_session_rc_clone(zn);
ret._zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

Expand Down Expand Up @@ -250,7 +250,7 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
// Only if message is successfully send, local subscription state can be removed
_z_undeclare_resource(_Z_RC_IN_VAL(&sub->_zn), _Z_RC_IN_VAL(s)->_key_id);
_z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_RESOURCE_IS_LOCAL, s);
_z_session_rc_drop(&sub->_zn);
_z_session_weak_drop(&sub->_zn);
return _Z_RES_OK;
}
#endif
Expand Down Expand Up @@ -285,7 +285,7 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye
_z_n_msg_clear(&n_msg);
// Fill queryable
ret._entity_id = q._id;
ret._zn = _z_session_rc_clone(zn);
ret._zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

Expand Down Expand Up @@ -313,7 +313,7 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
_z_n_msg_clear(&n_msg);
// Only if message is successfully send, local queryable state can be removed
_z_unregister_session_queryable(_Z_RC_IN_VAL(&qle->_zn), q);
_z_session_rc_drop(&qle->_zn);
_z_session_weak_drop(&qle->_zn);
return _Z_RES_OK;
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ _Bool _z_publisher_check(const _z_publisher_t *publisher) { return !_Z_RC_IS_NUL
_z_publisher_t _z_publisher_null(void) {
return (_z_publisher_t) {
._congestion_control = Z_CONGESTION_CONTROL_DEFAULT, ._id = 0, ._key = _z_keyexpr_null(),
._priority = Z_PRIORITY_DEFAULT, ._zn = _z_session_rc_null(),
._priority = Z_PRIORITY_DEFAULT, ._zn = _z_session_weak_null(),
#if Z_FEATURE_INTEREST == 1
._filter = (_z_write_filter_t) {
._interest_id = 0, .ctx = NULL
Expand Down
2 changes: 1 addition & 1 deletion src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ _z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_
return q;
}

_z_queryable_t _z_queryable_null(void) { return (_z_queryable_t){._entity_id = 0, ._zn = _z_session_rc_null()}; }
_z_queryable_t _z_queryable_null(void) { return (_z_queryable_t){._entity_id = 0, ._zn = _z_session_weak_null()}; }

_Bool _z_queryable_check(const _z_queryable_t *queryable) { return !_Z_RC_IS_NULL(&queryable->_zn); }

Expand Down
2 changes: 1 addition & 1 deletion src/net/subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ void _z_subscriber_free(_z_subscriber_t **sub) {
}

_Bool _z_subscriber_check(const _z_subscriber_t *subscriber) { return !_Z_RC_IS_NULL(&subscriber->_zn); }
_z_subscriber_t _z_subscriber_null(void) { return (_z_subscriber_t){._entity_id = 0, ._zn = _z_session_rc_null()}; }
_z_subscriber_t _z_subscriber_null(void) { return (_z_subscriber_t){._entity_id = 0, ._zn = _z_session_weak_null()}; }

#endif

0 comments on commit 2ab9562

Please sign in to comment.