Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace session pointer with refcount #322

Merged
merged 36 commits into from
Jan 18, 2024
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0f36124
Fix c99 compilation (#317)
jean-roland Jan 12, 2024
b399310
Merge branch 'eclipse-zenoh:main' into main
jean-roland Jan 17, 2024
c1e7388
Merge branch 'eclipse-zenoh:main' into main
jean-roland Jan 18, 2024
59a40a0
fix: set correct session pointer type
jean-roland Jan 12, 2024
ecbc147
refactor: rename sptr to rc
jean-roland Jan 12, 2024
e1de7d4
feat: add compiler based definition
jean-roland Jan 12, 2024
790d666
fix: allocate int instead of int pointer
jean-roland Jan 12, 2024
42f8136
fix: align implementation counter size
jean-roland Jan 12, 2024
3281c90
fix: require single thread config to use single thread implem
jean-roland Jan 12, 2024
0d84724
feat: add c99 gcc rc implementation
jean-roland Jan 12, 2024
fa0bcfb
refactor: rename derived sptr types to rc
jean-roland Jan 12, 2024
5320e0a
feat: name anon struct for forward declaration
jean-roland Jan 12, 2024
06a0846
feat: add _z_session refcounter type
jean-roland Jan 12, 2024
4dc768b
chore: clang-format
jean-roland Jan 12, 2024
76e17c4
chore: update gitignore
jean-roland Jan 15, 2024
5f5c228
build: add compiler id log
jean-roland Jan 15, 2024
104fff5
build: add compiler symbol on platformio builds
jean-roland Jan 15, 2024
dc67d97
feat: remove superfluous function
jean-roland Jan 15, 2024
f04c5a5
feat: go through indirect session pointer
jean-roland Jan 15, 2024
9161fce
fix: use correct type type for session pointers
jean-roland Jan 16, 2024
b119691
feat: use session ref counter in z_session
jean-roland Jan 16, 2024
9e293bd
fix: prevent segfault when clearing empty session
jean-roland Jan 16, 2024
253073e
fix: drop owned config all the time
jean-roland Jan 16, 2024
974c87e
feat: add new_from_val rc function
jean-roland Jan 17, 2024
0a834ff
feat: switch publisher sesssion ptr to rc
jean-roland Jan 17, 2024
50c5e79
feat: switch subscriber session ptr to rc
jean-roland Jan 17, 2024
1a37eec
fix: remove unnecessary dependency
jean-roland Jan 17, 2024
fe05e71
fix: add necessary dependency
jean-roland Jan 17, 2024
de19579
feat: switch queryable session ptr to rc
jean-roland Jan 17, 2024
f72df5b
refactor: flatten queryable primitives
jean-roland Jan 17, 2024
8232fc2
refactor: flatten subscriber primitives
jean-roland Jan 17, 2024
34378c1
refactor: flatten publisher primitives
jean-roland Jan 17, 2024
6b6f20e
feat: change rc to use a single inner pointer
jean-roland Jan 18, 2024
a48cdd3
feat: mutualize rc code between implems
jean-roland Jan 18, 2024
d4fb72b
chore: clang-format
jean-roland Jan 18, 2024
7e9441a
fix: compilation issues
jean-roland Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: switch subscriber session ptr to rc
  • Loading branch information
jean-roland committed Jan 18, 2024
commit 50c5e796cc9f34f456568d46b3e6dc3a05d3d37a
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay
* Returns:
* The created :c:type:`_z_subscriber_t` or null if the declaration failed.
*/
_z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg);

/**
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
*/
typedef struct {
uint32_t _entity_id;
_z_session_t *_zn;
_z_session_rc_t _zn;
} _z_subscriber_t;

typedef _z_subscriber_t _z_pull_subscriber_t;
6 changes: 3 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
@@ -1011,7 +1011,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z
if (options != NULL) {
subinfo.reliability = options->reliability;
}
_z_subscriber_t *sub = _z_declare_subscriber(zs._val.ptr, key, subinfo, callback->call, callback->drop, ctx);
_z_subscriber_t *sub = _z_declare_subscriber(&zs._val, key, subinfo, callback->call, callback->drop, ctx);
if (suffix != NULL) {
zp_free(suffix);
}
@@ -1040,7 +1040,7 @@ z_owned_pull_subscriber_t z_declare_pull_subscriber(z_session_t zs, z_keyexpr_t
}

return (z_owned_pull_subscriber_t){
._value = _z_declare_subscriber(zs._val.ptr, key, subinfo, callback->call, callback->drop, ctx)};
._value = _z_declare_subscriber(&zs._val, key, subinfo, callback->call, callback->drop, ctx)};
}

int8_t z_undeclare_subscriber(z_owned_subscriber_t *sub) {
@@ -1067,7 +1067,7 @@ z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) {
z_owned_keyexpr_t ret = z_keyexpr_null();
uint32_t lookup = sub._val->_entity_id;
if (sub._val != NULL) {
_z_subscription_rc_list_t *tail = sub._val->_zn->_local_subscriptions;
_z_subscription_rc_list_t *tail = sub._val->_zn.ptr->_local_subscriptions;
while (tail != NULL && !z_keyexpr_check(&ret)) {
_z_subscription_rc_t *head = _z_subscription_rc_list_head(tail);
if (head->ptr->_id == lookup) {
31 changes: 16 additions & 15 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
@@ -185,32 +185,32 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay

#if Z_FEATURE_SUBSCRIPTION == 1
/*------------------ Subscriber Declaration ------------------*/
_z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_subscriber_t *_z_declare_subscriber(_z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg) {
_z_subscription_t s;
s._id = _z_get_entity_id(zn);
s._id = _z_get_entity_id(zn->ptr);
s._key_id = keyexpr._id;
s._key = _z_get_expanded_key_from_key(zn, &keyexpr);
s._key = _z_get_expanded_key_from_key(zn->ptr, &keyexpr);
s._info = sub_info;
s._callback = callback;
s._dropper = dropper;
s._arg = arg;

_z_subscriber_t *ret = (_z_subscriber_t *)zp_malloc(sizeof(_z_subscriber_t));
if (ret != NULL) {
ret->_zn = zn;
ret->_zn = _z_session_rc_clone(zn);
ret->_entity_id = s._id;

_z_subscription_rc_t *sp_s = _z_register_subscription(
zn, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level.
// Do not drop it by the end of this function.
zn->ptr, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level.
// Do not drop it by the end of this function.
if (sp_s != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_decl_subscriber(
&keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_subscription(zn, _Z_RESOURCE_IS_LOCAL, sp_s);
if (_z_send_n_msg(zn->ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_subscription(zn->ptr, _Z_RESOURCE_IS_LOCAL, sp_s);
_z_subscriber_free(&ret);
}
_z_n_msg_clear(&n_msg);
@@ -228,15 +228,16 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
int8_t ret = _Z_ERR_GENERIC;

if (sub != NULL) {
_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(sub->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
if (_z_send_n_msg(sub->_zn.ptr, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
// Only if message is successfully send, local subscription state can be removed
_z_undeclare_resource(sub->_zn, s->ptr->_key_id);
_z_unregister_subscription(sub->_zn, _Z_RESOURCE_IS_LOCAL, s);
_z_undeclare_resource(sub->_zn.ptr, s->ptr->_key_id);
_z_unregister_subscription(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, s);
_z_session_rc_drop(&sub->_zn);
} else {
ret = _Z_ERR_ENTITY_UNKNOWN;
}
@@ -255,11 +256,11 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
int8_t ret = _Z_RES_OK;

_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
_z_subscription_rc_t *s = _z_get_subscription_by_id(sub->_zn.ptr, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
_z_zint_t pull_id = _z_get_pull_id(sub->_zn);
_z_zint_t pull_id = _z_get_pull_id(sub->_zn.ptr);
_z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id);
if (_z_send_n_msg(sub->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
if (_z_send_n_msg(sub->_zn.ptr, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
} else {