Skip to content

Commit

Permalink
Merge branch 'protocol_changes' into protocol_pull
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets authored Apr 5, 2024
2 parents 3559b3e + 084b087 commit 2c009d0
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 40 deletions.
1 change: 1 addition & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef struct _z_session_t {
// Session interests
#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_list_t *_local_interests;
_z_declare_data_list_t *_remote_declares;
#endif
} _z_session_t;

Expand Down
8 changes: 5 additions & 3 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
struct {
uint8_t _complete;
uint32_t _distance;
_Bool _complete;
uint16_t _distance;
} _ext_queryable_info;
} _z_decl_queryable_t;
_z_decl_queryable_t _z_decl_queryable_null(void);
Expand Down Expand Up @@ -95,6 +95,8 @@ typedef struct {
} _z_undecl_interest_t;
_z_undecl_interest_t _z_undecl_interest_null(void);

#define _Z_FLAG_INTEREST_ID (1 << 5)

typedef struct {
enum {
_Z_DECL_KEXPR,
Expand Down Expand Up @@ -132,7 +134,7 @@ _z_declaration_t _z_make_undecl_keyexpr(uint16_t id);
_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable);
_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete);
_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete);
_z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr);
void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
#endif // Z_FEATURE_INTEREST == 1

void _z_flush_interest(_z_session_t *zn);
int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);
Expand Down
28 changes: 22 additions & 6 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size);

typedef enum {
_Z_INTEREST_MSG_TYPE_FINAL = 0,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER = 1,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE = 2,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN = 3,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER = 4,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE = 5,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN = 6,
} _z_interest_msg_type_t;

typedef struct _z_interest_msg_t {
Expand Down Expand Up @@ -225,4 +225,20 @@ _Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _
_z_noop_copy)
_Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t)

typedef enum {
_Z_DECLARE_TYPE_SUBSCRIBER = 0,
_Z_DECLARE_TYPE_QUERYABLE = 1,
_Z_DECLARE_TYPE_TOKEN = 2,
} _z_declare_type_t;

typedef struct {
_z_keyexpr_t _key;
uint32_t _id;
uint8_t _type;
} _z_declare_data_t;

void _z_declare_data_clear(_z_declare_data_t *data);
_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_declare_data, _z_declare_data_t)

#endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */
20 changes: 14 additions & 6 deletions src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,16 @@ int8_t _z_undecl_subscriber_encode(_z_wbuf_t *wbf, const _z_undecl_subscriber_t
}
int8_t _z_decl_queryable_encode(_z_wbuf_t *wbf, const _z_decl_queryable_t *decl) {
uint8_t header = _Z_DECL_QUERYABLE_MID;
_Bool has_info_ext = (decl->_ext_queryable_info._complete != 0) || (decl->_ext_queryable_info._distance != 0);
_Bool has_info_ext = decl->_ext_queryable_info._complete || (decl->_ext_queryable_info._distance != 0);
_Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, has_info_ext, decl->_id, decl->_keyexpr));
if (has_info_ext) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZINT | 0x01));
_Z_RETURN_IF_ERR(_z_zint64_encode(
wbf, ((uint64_t)decl->_ext_queryable_info._distance << 8) | decl->_ext_queryable_info._complete));
uint8_t flags = 0;
if (decl->_ext_queryable_info._complete) {
flags |= 0x01;
}
uint64_t value = (uint64_t)flags | (uint64_t)decl->_ext_queryable_info._distance << 8;
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, value));
}
return _Z_RES_OK;
}
Expand Down Expand Up @@ -328,8 +332,8 @@ int8_t _z_decl_queryable_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
switch (extension->_header) {
case _Z_MSG_EXT_ENC_ZINT | 0x01: {
uint64_t val = extension->_body._zint._val;
decl->_ext_queryable_info._complete = val & 0xff;
decl->_ext_queryable_info._distance = (uint32_t)(val >> 8);
decl->_ext_queryable_info._complete = _Z_HAS_FLAG(val, 0x01);
decl->_ext_queryable_info._distance = (uint16_t)(val >> 8);
} break;
default:
if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) {
Expand Down Expand Up @@ -423,9 +427,13 @@ int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uin
return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header);
}
int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf) {
int8_t ret;
uint8_t header;
_Z_RETURN_IF_ERR(_z_uint8_decode(&header, zbf));
if (_Z_HAS_FLAG(header, _Z_FLAG_INTEREST_ID)) {
return _Z_ERR_MESSAGE_FLAG_UNEXPECTED;
}

int8_t ret;
switch (_Z_MID(header)) {
case _Z_DECL_KEXPR_MID: {
decl->_tag = _Z_DECL_KEXPR;
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/definitions/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ _z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_key
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete) {
_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete) {
return (_z_declaration_t){
._tag = _Z_DECL_QUERYABLE,
._body = {._decl_queryable = {._id = id,
Expand Down
102 changes: 95 additions & 7 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_INTEREST == 1
void _z_declare_data_clear(_z_declare_data_t *data) { _z_keyexpr_clear(&data->_key); }

_Bool _z_declare_data_eq(const _z_declare_data_t *left, const _z_declare_data_t *right) {
return ((left->_id == right->_id) && (left->_type == right->_type));
}

_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two) {
return one->_id == two->_id;
}
Expand Down Expand Up @@ -58,7 +64,7 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if ((intr->in->val._flags & flags) != 0) {
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
strlen(key._suffix))) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
}
}
Expand Down Expand Up @@ -120,8 +126,7 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) {
// Build the declare message to send on the wire
_z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key);
_z_declaration_t declaration =
_z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE,
sub->in->val._info.mode == Z_SUBMODE_PULL);
_z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -199,46 +204,126 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte
return ret;
}

static int8_t _unsafe_z_register_declare(_z_session_t *zn, const _z_keyexpr_t *key, uint32_t id, uint8_t type) {
_z_declare_data_t *decl = NULL;
decl = (_z_declare_data_t *)zp_malloc(sizeof(_z_declare_data_t));
if (decl == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
_z_keyexpr_copy(&decl->_key, key);
decl->_id = id;
decl->_type = type;
zn->_remote_declares = _z_declare_data_list_push(zn->_remote_declares, decl);
return _Z_RES_OK;
}

static _z_keyexpr_t _unsafe_z_get_key_from_declare(_z_session_t *zn, uint32_t id, uint8_t type) {
_z_declare_data_list_t *xs = zn->_remote_declares;
_z_declare_data_t comp = {
._key = _z_keyexpr_null(),
._id = id,
._type = type,
};
while (xs != NULL) {
_z_declare_data_t *decl = _z_declare_data_list_head(xs);
if (_z_declare_data_eq(&comp, decl)) {
return _z_keyexpr_duplicate(decl->_key);
}
xs = _z_declare_data_list_tail(xs);
}
return _z_keyexpr_null();
}

static int8_t _unsafe_z_unregister_declare(_z_session_t *zn, uint32_t id, uint8_t type) {
_z_declare_data_t decl = {
._key = _z_keyexpr_null(),
._id = id,
._type = type,
};
zn->_remote_declares = _z_declare_data_list_drop_filter(zn->_remote_declares, _z_declare_data_eq, &decl);
return _Z_RES_OK;
}

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) {
const _z_keyexpr_t *decl_key = NULL;
_z_interest_msg_t msg;
uint8_t flags = 0;
uint8_t decl_type = 0;
switch (decl->_tag) {
case _Z_DECL_SUBSCRIBER:
msg.type = _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER;
msg.id = decl->_body._decl_subscriber._id;
decl_key = &decl->_body._decl_subscriber._keyexpr;
decl_type = _Z_DECLARE_TYPE_SUBSCRIBER;
flags = _Z_INTEREST_FLAG_SUBSCRIBERS;
break;
case _Z_DECL_QUERYABLE:
msg.type = _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE;
msg.id = decl->_body._decl_queryable._id;
decl_key = &decl->_body._decl_queryable._keyexpr;
decl_type = _Z_DECLARE_TYPE_QUERYABLE;
flags = _Z_INTEREST_FLAG_QUERYABLES;
break;
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
// Retrieve key
_zp_session_lock_mutex(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
if (key._suffix == NULL) {
_zp_session_unlock_mutex(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Register declare
_unsafe_z_register_declare(zn, &key, msg.id, decl_type);
// Retrieve interests
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key);
_zp_session_unlock_mutex(zn);
// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if (intr->in->val._callback != NULL) {
intr->in->val._callback(&msg, intr->in->val._arg);
}
xs = _z_session_interest_rc_list_tail(xs);
}
// Clean up
_z_keyexpr_clear(&key);
_z_session_interest_rc_list_free(&intrs);
return _Z_RES_OK;
}

int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl) {
_z_interest_msg_t msg;
uint8_t flags = 0;
uint8_t decl_type = 0;
switch (decl->_tag) {
case _Z_UNDECL_SUBSCRIBER:
msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER;
msg.id = decl->_body._undecl_subscriber._id;
decl_key = &decl->_body._undecl_subscriber._ext_keyexpr;
decl_type = _Z_DECLARE_TYPE_SUBSCRIBER;
flags = _Z_INTEREST_FLAG_SUBSCRIBERS;
break;
case _Z_UNDECL_QUERYABLE:
msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE;
msg.id = decl->_body._undecl_queryable._id;
decl_key = &decl->_body._undecl_queryable._ext_keyexpr;
decl_type = _Z_DECLARE_TYPE_QUERYABLE;
flags = _Z_INTEREST_FLAG_QUERYABLES;
break;
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_zp_session_lock_mutex(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
// Retrieve declare data
_z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type);
if (key._suffix == NULL) {
_z_keyexpr_clear(&key);
_zp_session_unlock_mutex(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key);
// Remove declare
_unsafe_z_unregister_declare(zn, msg.id, decl_type);
_zp_session_unlock_mutex(zn);

// Parse session_interest list
Expand Down Expand Up @@ -266,6 +351,7 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) {
void _z_flush_interest(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_interest_rc_list_free(&zn->_local_interests);
_z_declare_data_list_free(&zn->_remote_declares);
_zp_session_unlock_mutex(zn);
}

Expand Down Expand Up @@ -322,6 +408,8 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key,
}

#else
void _z_flush_interest(_z_session_t *zn) { _ZP_UNUSED(zn); }

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) {
_ZP_UNUSED(zn);
_ZP_UNUSED(decl);
Expand Down
9 changes: 2 additions & 7 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
if (key._suffix != NULL) {
_z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Build the z_query
z_query_t query = {._val = {._rc = _z_query_rc_new()}};
Expand All @@ -160,10 +158,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
_z_keyexpr_clear(&key);
_z_session_queryable_rc_list_free(&qles);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

_zp_session_unlock_mutex(zn);
ret = _Z_ERR_KEYEXPR_UNKNOWN;
}

Expand Down
4 changes: 2 additions & 2 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_interest_process_declares(zn, &decl._decl);
} break;
case _Z_UNDECL_SUBSCRIBER: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl._decl);
} break;
case _Z_UNDECL_QUERYABLE: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl._decl);
} break;
case _Z_DECL_TOKEN: {
// TODO: add support or explicitly discard
Expand Down
8 changes: 2 additions & 6 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
if (key._suffix != NULL) {
_z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Build the sample
_z_sample_t s;
Expand All @@ -195,9 +193,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
_z_keyexpr_clear(&key);
_z_subscription_rc_list_free(&subs);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
ret = _Z_ERR_KEYEXPR_UNKNOWN;
}

Expand Down
2 changes: 2 additions & 0 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/interest.h"
#include "zenoh-pico/session/query.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/resource.h"
Expand Down Expand Up @@ -115,6 +116,7 @@ void _z_session_clear(_z_session_t *zn) {
#if Z_FEATURE_QUERY == 1
_z_flush_pending_queries(zn);
#endif
_z_flush_interest(zn);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&zn->_mutex_inner);
Expand Down
Loading

0 comments on commit 2c009d0

Please sign in to comment.