Skip to content

Commit

Permalink
feat: new declare interest packet format
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Mar 6, 2024
1 parent ebdb01e commit 997cf12
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 12 deletions.
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ _z_undecl_token_t _z_undecl_token_null(void);
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
Expand Down
4 changes: 2 additions & 2 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) {
}

int8_t _z_write_filter_create(_z_publisher_t *pub) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT |
_Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t));

if (ctx == NULL) {
Expand Down
76 changes: 71 additions & 5 deletions src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,38 @@ int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) {
}

int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) {
// Set header
uint8_t header = _Z_DECL_INTEREST_MID;
_Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags));
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) {
header |= _Z_INTEREST_FLAG_CURRENT;
}
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) {
header |= _Z_INTEREST_FLAG_FUTURE;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
// Set id
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, decl->_id));
// Copy flags but clear double use ones.
uint8_t interest_flags = decl->interest_flags;
interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_N;
interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_M;
// Process restricted flag
if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
// Set Named & Mapping flags
_Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr);
if (has_kesuffix) {
interest_flags |= _Z_DECL_SUBSCRIBER_FLAG_N;
}
if (_z_keyexpr_is_local(&decl->_keyexpr)) {
interest_flags |= _Z_DECL_SUBSCRIBER_FLAG_M;
}
// Set decl flags & keyexpr
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags));
_Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr));
} else {
// Set decl flags
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags));
}
return _Z_RES_OK;
}

Expand Down Expand Up @@ -337,11 +366,48 @@ int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t h
return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header);
}
int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) {
_Bool has_ext;
*decl = _z_decl_interest_null();
_Z_RETURN_IF_ERR(_z_decl_commons_decode(zbf, header, &has_ext, &decl->_id, &decl->_keyexpr));
// Decode id
_Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf));
// Decode interest flags
_Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf));
if (has_ext) {
// Process restricted flag
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M)
? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE
: _Z_KEYEXPR_MAPPING_LOCAL;
// Decode ke id
_Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf));
// Decode ke suffix
if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) {
_z_zint_t len;
_Z_RETURN_IF_ERR(_z_zint_decode(&len, zbf));
if (_z_zbuf_len(zbf) < len) {
return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
}
decl->_keyexpr._suffix = zp_malloc(len + 1);
if (decl->_keyexpr._suffix == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true);
_z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len);
decl->_keyexpr._suffix[len] = 0;
} else {
decl->_keyexpr._suffix = NULL;
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false);
}
}
// Replace named & mapping by current & future flags
decl->interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_M;
decl->interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_N;
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) {
decl->interest_flags |= _Z_INTEREST_FLAG_CURRENT;
}
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) {
decl->interest_flags |= _Z_INTEREST_FLAG_FUTURE;
}
// Decode extention
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13));
}
return _Z_RES_OK;
Expand Down
11 changes: 6 additions & 5 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,28 +291,29 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) {
}

int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) {
// TODO process restricted flag & key
_ZP_UNUSED(key);
_ZP_UNUSED(id);
// Check transport type
if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
return _Z_RES_OK; // Nothing to do on unicast
}
// Current flags process
if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_CURRENT)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
// Send all declare
if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
_Z_DEBUG("Sending declare resources");
_Z_RETURN_IF_ERR(_z_send_resource_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
_Z_DEBUG("Sending declare subscribers");
_Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
_Z_DEBUG("Sending declare queryables");
_Z_RETURN_IF_ERR(_z_send_queryable_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
// Zenoh pico doesn't support liveliness token for now
}
// Send final declare
Expand Down

0 comments on commit 997cf12

Please sign in to comment.