diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 6c3144e7b..1df9c60f7 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -75,9 +75,11 @@ _z_undecl_token_t _z_undecl_token_null(void); #define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1) #define _Z_INTEREST_FLAG_QUERYABLES (1 << 2) #define _Z_INTEREST_FLAG_TOKENS (1 << 3) +#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4) #define _Z_INTEREST_FLAG_CURRENT (1 << 5) #define _Z_INTEREST_FLAG_FUTURE (1 << 6) #define _Z_INTEREST_FLAG_AGGREGATE (1 << 7) + typedef struct { _z_keyexpr_t _keyexpr; uint32_t _id; diff --git a/src/net/filtering.c b/src/net/filtering.c index ae0a8d898..9d4cb4d83 100644 --- a/src/net/filtering.c +++ b/src/net/filtering.c @@ -73,8 +73,8 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { } int8_t _z_write_filter_create(_z_publisher_t *pub) { - uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT | - _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; + uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | + _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; _z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t)); if (ctx == NULL) { diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 554d84a59..092480ec2 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -130,9 +130,38 @@ int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) { } int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) { + // Set header uint8_t header = _Z_DECL_INTEREST_MID; - _Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr)); - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags)); + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) { + header |= _Z_INTEREST_FLAG_CURRENT; + } + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) { + header |= _Z_INTEREST_FLAG_FUTURE; + } + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + // Set id + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, decl->_id)); + // Copy flags but clear double use ones. + uint8_t interest_flags = decl->interest_flags; + interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_N; + interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_M; + // Process restricted flag + if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { + // Set Named & Mapping flags + _Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr); + if (has_kesuffix) { + interest_flags |= _Z_DECL_SUBSCRIBER_FLAG_N; + } + if (_z_keyexpr_is_local(&decl->_keyexpr)) { + interest_flags |= _Z_DECL_SUBSCRIBER_FLAG_M; + } + // Set decl flags & keyexpr + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags)); + _Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr)); + } else { + // Set decl flags + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags)); + } return _Z_RES_OK; } @@ -337,11 +366,48 @@ int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t h return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header); } int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) { - _Bool has_ext; *decl = _z_decl_interest_null(); - _Z_RETURN_IF_ERR(_z_decl_commons_decode(zbf, header, &has_ext, &decl->_id, &decl->_keyexpr)); + // Decode id + _Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf)); + // Decode interest flags _Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf)); - if (has_ext) { + // Process restricted flag + if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) { + uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M) + ? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE + : _Z_KEYEXPR_MAPPING_LOCAL; + // Decode ke id + _Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf)); + // Decode ke suffix + if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) { + _z_zint_t len; + _Z_RETURN_IF_ERR(_z_zint_decode(&len, zbf)); + if (_z_zbuf_len(zbf) < len) { + return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; + } + decl->_keyexpr._suffix = zp_malloc(len + 1); + if (decl->_keyexpr._suffix == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true); + _z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len); + decl->_keyexpr._suffix[len] = 0; + } else { + decl->_keyexpr._suffix = NULL; + decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false); + } + } + // Replace named & mapping by current & future flags + decl->interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_M; + decl->interest_flags &= ~_Z_DECL_SUBSCRIBER_FLAG_N; + if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) { + decl->interest_flags |= _Z_INTEREST_FLAG_CURRENT; + } + if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) { + decl->interest_flags |= _Z_INTEREST_FLAG_FUTURE; + } + // Decode extention + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13)); } return _Z_RES_OK; diff --git a/src/session/interest.c b/src/session/interest.c index 219077b0f..222c7eb94 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -291,6 +291,7 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { } int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { + // TODO process restricted flag & key _ZP_UNUSED(key); _ZP_UNUSED(id); // Check transport type @@ -298,21 +299,21 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, return _Z_RES_OK; // Nothing to do on unicast } // Current flags process - if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_CURRENT)) { // Send all declare - if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) { _Z_DEBUG("Sending declare resources"); _Z_RETURN_IF_ERR(_z_send_resource_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) { _Z_DEBUG("Sending declare subscribers"); _Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) { _Z_DEBUG("Sending declare queryables"); _Z_RETURN_IF_ERR(_z_send_queryable_interest(zn)); } - if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) { + if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) { // Zenoh pico doesn't support liveliness token for now } // Send final declare