From 94e9f9f48424e5a7f9f068528711798e513464f8 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 8 Nov 2024 17:51:49 +0100 Subject: [PATCH] Improve sub frame decode performance (#777) * feat: alias string decode * fix: alias keyexpr only on rx buffer * feat: add svec release function * feat: remove duplicate message clear * feat: remove unused subscription function * fix: missing const qualifier * feat: remove encoding null value * feat: improve sample create perf * feat: init string in encoding move * feat: replace rc sub list with sub info vec * feat: check before encoding clear * feat: avoid rechecking msg reliability * feat: add sub memoization * fix: alias cache if suffix * feat: move z_bytes instead of copy * feat: add svec move function * feat: add memoization config token * feat: add vec alias function * feat: switch sub cache to session level * feat: add svec expand function * fix: missing function args * fix: don't use _z_noop_move in svec * feat: svec ops return error instead of bool * feat: svec functions * feat: add use elem f in svec * feat: switch frame nmsg to svec * feat: add config value for initial frame size evaluation * feat: remove superfluous init * fix: rename cache token * fix: remove superfluous initialization --- CMakeLists.txt | 1 + include/zenoh-pico/collections/bytes.h | 7 +- include/zenoh-pico/collections/element.h | 5 +- include/zenoh-pico/collections/string.h | 8 +- include/zenoh-pico/collections/vec.h | 43 +++-- include/zenoh-pico/config.h | 6 + include/zenoh-pico/config.h.in | 6 + include/zenoh-pico/link/endpoint.h | 2 +- include/zenoh-pico/net/reply.h | 4 +- include/zenoh-pico/net/sample.h | 6 +- include/zenoh-pico/net/session.h | 4 + include/zenoh-pico/protocol/codec/transport.h | 2 - include/zenoh-pico/protocol/core.h | 2 +- .../zenoh-pico/protocol/definitions/network.h | 4 +- .../protocol/definitions/transport.h | 4 +- include/zenoh-pico/protocol/ext.h | 2 +- include/zenoh-pico/protocol/iobuf.h | 2 +- include/zenoh-pico/session/resource.h | 2 +- include/zenoh-pico/session/session.h | 20 +- include/zenoh-pico/session/subscription.h | 36 +++- include/zenoh-pico/system/link/raweth.h | 5 +- include/zenoh-pico/transport/transport.h | 4 +- src/api/api.c | 10 +- src/collections/bytes.c | 14 +- src/collections/vec.c | 106 +++++----- src/net/encoding.c | 8 +- src/net/sample.c | 36 ++-- src/protocol/codec.c | 12 +- src/protocol/codec/message.c | 6 - src/protocol/codec/network.c | 6 - src/protocol/codec/transport.c | 73 ++++--- src/protocol/core.c | 2 +- src/protocol/definitions/message.c | 1 + src/protocol/definitions/transport.c | 8 +- src/session/interest.c | 2 +- src/session/query.c | 2 +- src/session/queryable.c | 4 +- src/session/resource.c | 18 +- src/session/rx.c | 1 - src/session/subscription.c | 181 +++++++++++------- src/session/utils.c | 6 + src/transport/multicast/rx.c | 17 +- src/transport/unicast/rx.c | 16 +- tests/z_msgcodec_test.c | 46 ++--- 44 files changed, 435 insertions(+), 315 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bc5f6f6da..8f2df3631 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -236,6 +236,7 @@ set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY") set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions") set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check") set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching") +set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE") add_compile_definitions("Z_BUILD_DEBUG=$") message(STATUS "Building with feature confing:\n\ diff --git a/include/zenoh-pico/collections/bytes.h b/include/zenoh-pico/collections/bytes.h index 8afe8857f..051c5b0ca 100644 --- a/include/zenoh-pico/collections/bytes.h +++ b/include/zenoh-pico/collections/bytes.h @@ -31,11 +31,8 @@ inline size_t _z_arc_slice_size(const _z_arc_slice_t *s) { (void)s; return sizeof(_z_arc_slice_t); } -static inline void _z_arc_slice_elem_move(void *dst, void *src) { - _z_arc_slice_move((_z_arc_slice_t *)dst, (_z_arc_slice_t *)src); -} -_Z_ELEM_DEFINE(_z_arc_slice, _z_arc_slice_t, _z_arc_slice_size, _z_arc_slice_drop, _z_arc_slice_copy) -_Z_SVEC_DEFINE(_z_arc_slice, _z_arc_slice_t) +_Z_ELEM_DEFINE(_z_arc_slice, _z_arc_slice_t, _z_arc_slice_size, _z_arc_slice_drop, _z_arc_slice_copy, _z_arc_slice_move) +_Z_SVEC_DEFINE(_z_arc_slice, _z_arc_slice_t, true) /*-------- Bytes --------*/ /** diff --git a/include/zenoh-pico/collections/element.h b/include/zenoh-pico/collections/element.h index bd5701187..5657a63b9 100644 --- a/include/zenoh-pico/collections/element.h +++ b/include/zenoh-pico/collections/element.h @@ -34,7 +34,7 @@ typedef void (*z_element_move_f)(void *dst, void *src); typedef void *(*z_element_clone_f)(const void *e); typedef bool (*z_element_eq_f)(const void *left, const void *right); -#define _Z_ELEM_DEFINE(name, type, elem_size_f, elem_clear_f, elem_copy_f) \ +#define _Z_ELEM_DEFINE(name, type, elem_size_f, elem_clear_f, elem_copy_f, elem_move_f) \ typedef bool (*name##_eq_f)(const type *left, const type *right); \ static inline void name##_elem_clear(void *e) { elem_clear_f((type *)e); } \ static inline void name##_elem_free(void **e) { \ @@ -45,6 +45,7 @@ typedef bool (*z_element_eq_f)(const void *left, const void *right); *e = NULL; \ } \ } \ + static inline void name##_elem_move(void *dst, void *src) { elem_move_f((type *)dst, (type *)src); } \ static inline void name##_elem_copy(void *dst, const void *src) { elem_copy_f((type *)dst, (type *)src); } \ static inline void *name##_elem_clone(const void *src) { \ type *dst = (type *)z_malloc(elem_size_f((type *)src)); \ @@ -76,7 +77,7 @@ static inline void _z_noop_move(void *dst, void *src) { _ZP_UNUSED(src); } -_Z_ELEM_DEFINE(_z_noop, _z_noop_t, _z_noop_size, _z_noop_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_noop, _z_noop_t, _z_noop_size, _z_noop_clear, _z_noop_copy, _z_noop_move) #ifdef __cplusplus } diff --git a/include/zenoh-pico/collections/string.h b/include/zenoh-pico/collections/string.h index fdce11179..e3a7a288d 100644 --- a/include/zenoh-pico/collections/string.h +++ b/include/zenoh-pico/collections/string.h @@ -37,7 +37,7 @@ bool _z_str_eq(const char *left, const char *right); size_t _z_str_size(const char *src); void _z_str_copy(char *dst, const char *src); void _z_str_n_copy(char *dst, const char *src, size_t size); -_Z_ELEM_DEFINE(_z_str, char, _z_str_size, _z_noop_clear, _z_str_copy) +_Z_ELEM_DEFINE(_z_str, char, _z_str_size, _z_noop_clear, _z_str_copy, _z_noop_move) _Z_VEC_DEFINE(_z_str, char) _Z_LIST_DEFINE(_z_str, char) @@ -103,10 +103,8 @@ _z_string_t _z_string_preallocate(const size_t len); char *_z_str_from_string_clone(const _z_string_t *str); -_Z_ELEM_DEFINE(_z_string, _z_string_t, _z_string_len, _z_string_clear, _z_string_copy) - -static inline void _z_string_elem_move(void *dst, void *src) { _z_string_move((_z_string_t *)dst, (_z_string_t *)src); } -_Z_SVEC_DEFINE(_z_string, _z_string_t) +_Z_ELEM_DEFINE(_z_string, _z_string_t, _z_string_len, _z_string_clear, _z_string_copy, _z_string_move) +_Z_SVEC_DEFINE(_z_string, _z_string_t, true) _Z_LIST_DEFINE(_z_string, _z_string_t) _Z_INT_MAP_DEFINE(_z_string, _z_string_t) diff --git a/include/zenoh-pico/collections/vec.h b/include/zenoh-pico/collections/vec.h index 18ddc11ea..6bde66222 100644 --- a/include/zenoh-pico/collections/vec.h +++ b/include/zenoh-pico/collections/vec.h @@ -34,9 +34,10 @@ typedef struct { } _z_vec_t; static inline _z_vec_t _z_vec_null(void) { return (_z_vec_t){0}; } +static inline _z_vec_t _z_vec_alias(const _z_vec_t *src) { return *src; } _z_vec_t _z_vec_make(size_t capacity); void _z_vec_copy(_z_vec_t *dst, const _z_vec_t *src, z_element_clone_f f); -void _z_vec_steal(_z_vec_t *dst, _z_vec_t *src); +void _z_vec_move(_z_vec_t *dst, _z_vec_t *src); size_t _z_vec_len(const _z_vec_t *v); bool _z_vec_is_empty(const _z_vec_t *v); @@ -65,7 +66,8 @@ void _z_vec_release(_z_vec_t *v); static inline void name##_vec_copy(name##_vec_t *dst, const name##_vec_t *src) { \ _z_vec_copy(dst, src, name##_elem_clone); \ } \ - static inline void name##_vec_steal(name##_vec_t *dst, name##_vec_t *src) { _z_vec_steal(dst, src); } \ + static inline name##_vec_t name##_vec_alias(const name##_vec_t *v) { return _z_vec_alias(v); } \ + static inline void name##_vec_move(name##_vec_t *dst, name##_vec_t *src) { _z_vec_move(dst, src); } \ static inline void name##_vec_reset(name##_vec_t *v) { _z_vec_reset(v, name##_elem_free); } \ static inline void name##_vec_clear(name##_vec_t *v) { _z_vec_clear(v, name##_elem_free); } \ static inline void name##_vec_free(name##_vec_t **v) { _z_vec_free(v, name##_elem_free); } \ @@ -81,44 +83,63 @@ typedef struct { void *_val; } _z_svec_t; +static inline _z_svec_t _z_svec_null(void) { return (_z_svec_t){0}; } +static inline _z_svec_t _z_svec_alias(const _z_svec_t *src) { return *src; } +void _z_svec_init(_z_svec_t *dst, size_t element_size); _z_svec_t _z_svec_make(size_t capacity, size_t element_size); -bool _z_svec_copy(_z_svec_t *dst, const _z_svec_t *src, z_element_copy_f copy, size_t element_size); +z_result_t _z_svec_copy(_z_svec_t *dst, const _z_svec_t *src, z_element_copy_f copy, size_t element_size, + bool use_elem_f); +void _z_svec_move(_z_svec_t *dst, _z_svec_t *src); size_t _z_svec_len(const _z_svec_t *v); bool _z_svec_is_empty(const _z_svec_t *v); -bool _z_svec_append(_z_svec_t *v, const void *e, z_element_move_f m, size_t element_size); +z_result_t _z_svec_expand(_z_svec_t *v, z_element_move_f move, size_t element_size, bool use_elem_f); +z_result_t _z_svec_append(_z_svec_t *v, const void *e, z_element_move_f m, size_t element_size, bool use_elem_f); void *_z_svec_get(const _z_svec_t *v, size_t pos, size_t element_size); +void *_z_svec_get_mut(_z_svec_t *v, size_t i, size_t element_size); void _z_svec_set(_z_svec_t *sv, size_t pos, void *e, z_element_clear_f f, size_t element_size); -void _z_svec_remove(_z_svec_t *sv, size_t pos, z_element_clear_f f, z_element_move_f m, size_t element_size); +void _z_svec_remove(_z_svec_t *sv, size_t pos, z_element_clear_f f, z_element_move_f m, size_t element_size, + bool use_elem_f); void _z_svec_reset(_z_svec_t *v, z_element_clear_f f, size_t element_size); void _z_svec_clear(_z_svec_t *v, z_element_clear_f f, size_t element_size); void _z_svec_free(_z_svec_t **v, z_element_clear_f f, size_t element_size); void _z_svec_release(_z_svec_t *v); -#define _Z_SVEC_DEFINE(name, type) \ +#define _Z_SVEC_DEFINE(name, type, use_elem_f) \ typedef _z_svec_t name##_svec_t; \ + static inline name##_svec_t name##_svec_null(void) { return _z_svec_null(); } \ static inline name##_svec_t name##_svec_make(size_t capacity) { return _z_svec_make(capacity, sizeof(type)); } \ + static inline void name##_svec_init(name##_svec_t *v) { _z_svec_init(v, sizeof(type)); } \ static inline size_t name##_svec_len(const name##_svec_t *v) { return _z_svec_len(v); } \ static inline bool name##_svec_is_empty(const name##_svec_t *v) { return _z_svec_is_empty(v); } \ - static inline bool name##_svec_append(name##_svec_t *v, type *e) { \ - return _z_svec_append(v, e, name##_elem_move, sizeof(type)); \ + static inline z_result_t name##_svec_expand(name##_svec_t *v) { \ + return _z_svec_expand(v, name##_elem_move, sizeof(type), use_elem_f); \ + } \ + static inline z_result_t name##_svec_append(name##_svec_t *v, const type *e) { \ + return _z_svec_append(v, e, name##_elem_move, sizeof(type), use_elem_f); \ } \ static inline type *name##_svec_get(const name##_svec_t *v, size_t pos) { \ return (type *)_z_svec_get(v, pos, sizeof(type)); \ } \ + static inline type *name##_svec_get_mut(name##_svec_t *v, size_t pos) { \ + return (type *)_z_svec_get_mut(v, pos, sizeof(type)); \ + } \ static inline void name##_svec_set(name##_svec_t *v, size_t pos, type *e) { \ _z_svec_set(v, pos, e, name##_elem_clear, sizeof(type)); \ } \ static inline void name##_svec_remove(name##_svec_t *v, size_t pos) { \ - _z_svec_remove(v, pos, name##_elem_clear, name##_elem_move, sizeof(type)); \ + _z_svec_remove(v, pos, name##_elem_clear, name##_elem_move, sizeof(type), use_elem_f); \ } \ - static inline bool name##_svec_copy(name##_svec_t *dst, const name##_svec_t *src) { \ - return _z_svec_copy(dst, src, name##_elem_copy, sizeof(type)); \ + static inline z_result_t name##_svec_copy(name##_svec_t *dst, const name##_svec_t *src) { \ + return _z_svec_copy(dst, src, name##_elem_copy, sizeof(type), use_elem_f); \ } \ + static inline name##_svec_t name##_svec_alias(const name##_svec_t *v) { return _z_svec_alias(v); } \ + static inline void name##_svec_move(name##_svec_t *dst, name##_svec_t *src) { _z_svec_move(dst, src); } \ static inline void name##_svec_reset(name##_svec_t *v) { _z_svec_reset(v, name##_elem_clear, sizeof(type)); } \ static inline void name##_svec_clear(name##_svec_t *v) { _z_svec_clear(v, name##_elem_clear, sizeof(type)); } \ + static inline void name##_svec_release(name##_svec_t *v) { _z_svec_release(v); } \ static inline void name##_svec_free(name##_svec_t **v) { _z_svec_free(v, name##_elem_clear, sizeof(type)); } #ifdef __cplusplus diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index e3a76d963..2449535fe 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -45,6 +45,7 @@ #define Z_FEATURE_LOCAL_SUBSCRIBER 0 #define Z_FEATURE_PUBLISHER_SESSION_CHECK 1 #define Z_FEATURE_BATCHING 1 +#define Z_FEATURE_RX_CACHE 0 // End of CMake generation /*------------------ Runtime configuration properties ------------------*/ @@ -176,6 +177,11 @@ */ #define Z_GET_TIMEOUT_DEFAULT 10000 +/** + * Average size of a frame message (bytes). Used to evaluate initial decoding frame size. + */ +#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32 + /** * Default "nop" instruction */ diff --git a/include/zenoh-pico/config.h.in b/include/zenoh-pico/config.h.in index a8738d6a4..d6f43e3d3 100644 --- a/include/zenoh-pico/config.h.in +++ b/include/zenoh-pico/config.h.in @@ -45,6 +45,7 @@ #define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@ #define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@ #define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@ +#define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@ // End of CMake generation /*------------------ Runtime configuration properties ------------------*/ @@ -176,6 +177,11 @@ */ #define Z_GET_TIMEOUT_DEFAULT 10000 +/** + * Average size of a frame message (bytes). Used to evaluate initial decoding frame size. + */ +#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32 + /** * Default "nop" instruction */ diff --git a/include/zenoh-pico/link/endpoint.h b/include/zenoh-pico/link/endpoint.h index d6b83e30a..8aa03390e 100644 --- a/include/zenoh-pico/link/endpoint.h +++ b/include/zenoh-pico/link/endpoint.h @@ -58,7 +58,7 @@ z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s); size_t _z_locator_size(_z_locator_t *lc); void _z_locator_clear(_z_locator_t *lc); -_Z_ELEM_DEFINE(_z_locator, _z_locator_t, _z_locator_size, _z_locator_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_locator, _z_locator_t, _z_locator_size, _z_locator_clear, _z_noop_copy, _z_noop_move) /*------------------ Locator array ------------------*/ _Z_ARRAY_DEFINE(_z_locator, _z_locator_t) diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h index fc5b4bcb9..956620d67 100644 --- a/include/zenoh-pico/net/reply.h +++ b/include/zenoh-pico/net/reply.h @@ -71,7 +71,7 @@ static inline _z_reply_data_t _z_reply_data_init(void) { void _z_reply_data_clear(_z_reply_data_t *rd); z_result_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src); -_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) /** @@ -106,7 +106,7 @@ typedef struct _z_pending_reply_t { bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two); void _z_pending_reply_clear(_z_pending_reply_t *res); -_Z_ELEM_DEFINE(_z_pending_reply, _z_pending_reply_t, _z_noop_size, _z_pending_reply_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_pending_reply, _z_pending_reply_t, _z_noop_size, _z_pending_reply_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_pending_reply, _z_pending_reply_t) #ifdef __cplusplus diff --git a/include/zenoh-pico/net/sample.h b/include/zenoh-pico/net/sample.h index a88ebf77c..04560f603 100644 --- a/include/zenoh-pico/net/sample.h +++ b/include/zenoh-pico/net/sample.h @@ -63,9 +63,9 @@ void _z_sample_free(_z_sample_t **sample); z_result_t _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src); _z_sample_t _z_sample_duplicate(const _z_sample_t *src); -_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t *attachment, z_reliability_t reliability); +void _z_sample_create(_z_sample_t *s, _z_keyexpr_t *key, _z_bytes_t *payload, const _z_timestamp_t *timestamp, + _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, _z_bytes_t *attachment, + z_reliability_t reliability); #ifdef __cplusplus } #endif diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index d7ab4495a..0b8ff07fb 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -22,6 +22,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/session.h" +#include "zenoh-pico/session/subscription.h" #include "zenoh-pico/utils/config.h" #ifdef __cplusplus @@ -56,6 +57,9 @@ typedef struct _z_session_t { #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_list_t *_local_subscriptions; _z_subscription_rc_list_t *_remote_subscriptions; +#if Z_FEATURE_RX_CACHE == 1 + _z_subscription_cache_t _subscription_cache; +#endif #endif // Session queryables diff --git a/include/zenoh-pico/protocol/codec/transport.h b/include/zenoh-pico/protocol/codec/transport.h index d7b4320a5..dc9e9e6da 100644 --- a/include/zenoh-pico/protocol/codec/transport.h +++ b/include/zenoh-pico/protocol/codec/transport.h @@ -22,8 +22,6 @@ extern "C" { #endif -#define _ZENOH_PICO_FRAME_MESSAGES_VEC_SIZE 32 - z_result_t _z_scouting_message_encode(_z_wbuf_t *buf, const _z_scouting_message_t *msg); z_result_t _z_scouting_message_decode(_z_scouting_message_t *msg, _z_zbuf_t *buf); diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 534cb3890..2219ac509 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -203,7 +203,7 @@ z_result_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src); bool _z_hello_check(const _z_hello_t *hello); -_Z_ELEM_DEFINE(_z_hello, _z_hello_t, _z_noop_size, _z_hello_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_hello, _z_hello_t, _z_noop_size, _z_hello_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_hello, _z_hello_t) typedef struct { diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index ad52b74a3..8727d58fe 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -294,8 +294,8 @@ void _z_n_msg_clear(_z_network_message_t *m); void _z_n_msg_free(_z_network_message_t **m); inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg); } inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); } -_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy) -_Z_VEC_DEFINE(_z_network_message, _z_network_message_t) +_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move) +_Z_SVEC_DEFINE(_z_network_message, _z_network_message_t, false) void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping); _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid, diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 261c61fc6..046cd815b 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -448,7 +448,7 @@ void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg); // - if R==1 then the FRAME is sent on the reliable channel, best-effort otherwise. // typedef struct { - _z_network_message_vec_t _messages; + _z_network_message_svec_t _messages; _z_zint_t _sn; } _z_t_msg_frame_t; void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); @@ -511,7 +511,7 @@ _z_transport_message_t _z_t_msg_make_open_syn(_z_zint_t lease, _z_zint_t initial _z_transport_message_t _z_t_msg_make_open_ack(_z_zint_t lease, _z_zint_t initial_sn); _z_transport_message_t _z_t_msg_make_close(uint8_t reason, bool link_only); _z_transport_message_t _z_t_msg_make_keep_alive(void); -_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, +_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_svec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index eb70ede42..989d7491d 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -100,7 +100,7 @@ void _z_msg_ext_copy_unit(_z_msg_ext_unit_t *clone, const _z_msg_ext_unit_t *ext void _z_msg_ext_copy_zint(_z_msg_ext_zint_t *clone, const _z_msg_ext_zint_t *ext); void _z_msg_ext_copy_zbuf(_z_msg_ext_zbuf_t *clone, const _z_msg_ext_zbuf_t *ext); -_Z_ELEM_DEFINE(_z_msg_ext, _z_msg_ext_t, _z_noop_size, _z_msg_ext_clear, _z_msg_ext_copy) +_Z_ELEM_DEFINE(_z_msg_ext, _z_msg_ext_t, _z_noop_size, _z_msg_ext_clear, _z_msg_ext_copy, _z_noop_move) _Z_VEC_DEFINE(_z_msg_ext, _z_msg_ext_t) #ifdef __cplusplus diff --git a/include/zenoh-pico/protocol/iobuf.h b/include/zenoh-pico/protocol/iobuf.h index d3bca6872..bea659d34 100644 --- a/include/zenoh-pico/protocol/iobuf.h +++ b/include/zenoh-pico/protocol/iobuf.h @@ -64,7 +64,7 @@ void _z_iosli_free(_z_iosli_t **ios); void _z_iosli_copy(_z_iosli_t *dst, const _z_iosli_t *src); _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src); -_Z_ELEM_DEFINE(_z_iosli, _z_iosli_t, _z_iosli_size, _z_iosli_clear, _z_iosli_copy) +_Z_ELEM_DEFINE(_z_iosli, _z_iosli_t, _z_iosli_size, _z_iosli_clear, _z_iosli_copy, _z_noop_move) _Z_VEC_DEFINE(_z_iosli, _z_iosli_t) /*------------------ ZBuf ------------------*/ diff --git a/include/zenoh-pico/session/resource.h b/include/zenoh-pico/session/resource.h index bedffac06..61af9989b 100644 --- a/include/zenoh-pico/session/resource.h +++ b/include/zenoh-pico/session/resource.h @@ -36,7 +36,7 @@ void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping); void _z_unregister_resources_for_peer(_z_session_t *zn, uint16_t mapping); void _z_flush_resources(_z_session_t *zn); -_z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr); +_z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr, bool force_alias); _z_resource_t *__unsafe_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t id); _z_resource_t *__unsafe_z_get_resource_matching_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr); diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index bf1e801df..712d2e14a 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -49,7 +49,7 @@ void _z_resource_clear(_z_resource_t *res); void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src); void _z_resource_free(_z_resource_t **res); -_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy) +_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy, _z_noop_move) _Z_LIST_DEFINE(_z_resource, _z_resource_t) // Forward declaration to avoid cyclical include @@ -73,9 +73,9 @@ bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *t void _z_subscription_clear(_z_subscription_t *sub); _Z_REFCOUNT_DEFINE(_z_subscription, _z_subscription) -_Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy, _z_noop_move) _Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_subscription_rc_size, _z_subscription_rc_drop, - _z_subscription_rc_copy) + _z_subscription_rc_copy, _z_noop_move) _Z_LIST_DEFINE(_z_subscription_rc, _z_subscription_rc_t) typedef struct { @@ -104,9 +104,10 @@ bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session void _z_session_queryable_clear(_z_session_queryable_t *res); _Z_REFCOUNT_DEFINE(_z_session_queryable, _z_session_queryable) -_Z_ELEM_DEFINE(_z_session_queryable, _z_session_queryable_t, _z_noop_size, _z_session_queryable_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_session_queryable, _z_session_queryable_t, _z_noop_size, _z_session_queryable_clear, _z_noop_copy, + _z_noop_move) _Z_ELEM_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t, _z_noop_size, _z_session_queryable_rc_drop, - _z_noop_copy) + _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t) // Forward declaration to avoid cyclical includes @@ -137,7 +138,7 @@ typedef struct { bool _z_pending_query_eq(const _z_pending_query_t *one, const _z_pending_query_t *two); void _z_pending_query_clear(_z_pending_query_t *res); -_Z_ELEM_DEFINE(_z_pending_query, _z_pending_query_t, _z_noop_size, _z_pending_query_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_pending_query, _z_pending_query_t, _z_noop_size, _z_pending_query_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_pending_query, _z_pending_query_t) typedef struct { @@ -188,9 +189,10 @@ bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_i void _z_session_interest_clear(_z_session_interest_t *res); _Z_REFCOUNT_DEFINE(_z_session_interest, _z_session_interest) -_Z_ELEM_DEFINE(_z_session_interest, _z_session_interest_t, _z_noop_size, _z_session_interest_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_session_interest, _z_session_interest_t, _z_noop_size, _z_session_interest_clear, _z_noop_copy, + _z_noop_move) _Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _z_session_interest_rc_drop, - _z_noop_copy) + _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t) typedef enum { @@ -206,7 +208,7 @@ typedef struct { } _z_declare_data_t; void _z_declare_data_clear(_z_declare_data_t *data); -_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_declare_data, _z_declare_data_t) #ifdef __cplusplus diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index 62005b3b2..5932ec18b 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -16,25 +16,47 @@ #define INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H #include "zenoh-pico/net/encoding.h" -#include "zenoh-pico/net/session.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/session.h" + +// Forward declaration to avoid cyclical include +typedef struct _z_session_t _z_session_t; + +// Subscription infos +typedef struct { + _z_sample_handler_t callback; + void *arg; +} _z_subscription_infos_t; + +_Z_ELEM_DEFINE(_z_subscription_infos, _z_subscription_infos_t, _z_noop_size, _z_noop_clear, _z_noop_copy, _z_noop_move) +_Z_SVEC_DEFINE(_z_subscription_infos, _z_subscription_infos_t, false) + +typedef struct { + _z_keyexpr_t ke_in; + _z_keyexpr_t ke_out; + _z_subscription_infos_svec_t infos; +} _z_subscription_cache_t; #ifdef __cplusplus extern "C" { #endif /*------------------ Subscription ------------------*/ -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload, +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_bytes_t *payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t *attachment, z_reliability_t reliability); + _z_bytes_t *attachment, z_reliability_t reliability); #if Z_FEATURE_SUBSCRIPTION == 1 -_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); -_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *keyexpr); +#if Z_FEATURE_RX_CACHE == 1 +void _z_subscription_cache_clear(_z_subscription_cache_t *cache); +#endif + +_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload, +z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_bytes_t *payload, _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t *attachment, z_reliability_t reliability); + const _z_n_qos_t qos, _z_bytes_t *attachment, z_reliability_t reliability); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); #endif diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index f7b106571..b1fa878fe 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -47,14 +47,15 @@ typedef struct { void _z_raweth_clear_mapping_entry(_zp_raweth_mapping_entry_t *entry); _Z_ELEM_DEFINE(_zp_raweth_mapping, _zp_raweth_mapping_entry_t, _z_noop_size, _z_raweth_clear_mapping_entry, - _z_noop_copy) + _z_noop_copy, _z_noop_move) _Z_ARRAY_DEFINE(_zp_raweth_mapping, _zp_raweth_mapping_entry_t) typedef struct { uint8_t _mac[_ZP_MAC_ADDR_LENGTH]; } _zp_raweth_whitelist_entry_t; -_Z_ELEM_DEFINE(_zp_raweth_whitelist, _zp_raweth_whitelist_entry_t, _z_noop_size, _z_noop_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_zp_raweth_whitelist, _zp_raweth_whitelist_entry_t, _z_noop_size, _z_noop_clear, _z_noop_copy, + _z_noop_move) _Z_ARRAY_DEFINE(_zp_raweth_whitelist, _zp_raweth_whitelist_entry_t) // Ethernet header structure type diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 9cbfe7233..a66567791 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -65,7 +65,7 @@ void _z_transport_peer_entry_clear(_z_transport_peer_entry_t *src); void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_transport_peer_entry_t *src); bool _z_transport_peer_entry_eq(const _z_transport_peer_entry_t *left, const _z_transport_peer_entry_t *right); _Z_ELEM_DEFINE(_z_transport_peer_entry, _z_transport_peer_entry_t, _z_transport_peer_entry_size, - _z_transport_peer_entry_clear, _z_transport_peer_entry_copy) + _z_transport_peer_entry_clear, _z_transport_peer_entry_copy, _z_noop_move) _Z_LIST_DEFINE(_z_transport_peer_entry, _z_transport_peer_entry_t) _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport_peer_entry_list_t *root, _z_transport_peer_entry_t *entry); @@ -143,7 +143,7 @@ typedef struct { enum { _Z_TRANSPORT_UNICAST_TYPE, _Z_TRANSPORT_MULTICAST_TYPE, _Z_TRANSPORT_RAWETH_TYPE, _Z_TRANSPORT_NONE } _type; } _z_transport_t; -_Z_ELEM_DEFINE(_z_transport, _z_transport_t, _z_noop_size, _z_noop_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_transport, _z_transport_t, _z_noop_size, _z_noop_clear, _z_noop_copy, _z_noop_move) _Z_LIST_DEFINE(_z_transport, _z_transport_t) typedef struct { diff --git a/src/api/api.c b/src/api/api.c index 9fedbc62b..cb7d0fbc2 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -517,8 +517,7 @@ z_result_t z_whatami_to_view_string(z_whatami_t whatami, z_view_string_t *str_ou bool _z_string_array_check(const _z_string_svec_t *val) { return !_z_string_svec_is_empty(val); } z_result_t _z_string_array_copy(_z_string_svec_t *dst, const _z_string_svec_t *src) { - _z_string_svec_copy(dst, src); - return dst->_len == src->_len ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY; + return _z_string_svec_copy(dst, src); } _Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_string_svec_t, string_array, _z_string_array_check, _z_string_array_null, _z_string_array_copy, _z_string_svec_clear) @@ -832,9 +831,10 @@ z_result_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr // Trigger local subscriptions #if Z_FEATURE_LOCAL_SUBSCRIBER == 1 - _z_timestamp_t local_timestamp = ((opt.timestamp != NULL) ? *opt.timestamp : _z_timestamp_null()); + _z_timestamp_t local_timestamp = (opt.timestamp != NULL) ? *opt.timestamp : _z_timestamp_null(); + _z_encoding_t local_encoding = (opt.encoding != NULL) ? &opt.encoding->_this._val : _z_encoding_null(); _z_trigger_local_subscriptions( - _Z_RC_IN_VAL(zs), &keyexpr_aliased, &payload_bytes, opt.encoding == NULL ? NULL : &opt.encoding->_this._val, + _Z_RC_IN_VAL(zs), &keyexpr_aliased, &payload_bytes, &local_encoding, _z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), &local_timestamp, &attachment_bytes, reliability); #endif @@ -979,7 +979,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay } // Trigger local subscriptions #if Z_FEATURE_LOCAL_SUBSCRIBER == 1 - _z_timestamp_t local_timestamp = ((opt.timestamp != NULL) ? *opt.timestamp : _z_timestamp_null()); + _z_timestamp_t local_timestamp = (opt.timestamp != NULL) ? *opt.timestamp : _z_timestamp_null(); _z_trigger_local_subscriptions( session, &pub_keyexpr, &payload_bytes, &encoding, _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), diff --git a/src/collections/bytes.c b/src/collections/bytes.c index e4328cd40..deb3f8252 100644 --- a/src/collections/bytes.c +++ b/src/collections/bytes.c @@ -28,12 +28,7 @@ bool _z_bytes_check(const _z_bytes_t *bytes) { return !_z_bytes_is_empty(bytes); } z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src) { - _z_arc_slice_svec_copy(&dst->_slices, &src->_slices); - if (_z_arc_slice_svec_len(&dst->_slices) == _z_arc_slice_svec_len(&src->_slices)) { - return _Z_RES_OK; - } else { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } + return _z_arc_slice_svec_copy(&dst->_slices, &src->_slices); } _z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src) { @@ -98,7 +93,7 @@ z_result_t _z_bytes_from_slice(_z_bytes_t *b, _z_slice_t s) { *b = _z_bytes_null(); _z_arc_slice_t arc_s = _z_arc_slice_wrap(s, 0, s.len); if (_z_arc_slice_len(&arc_s) != s.len) return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - return _z_arc_slice_svec_append(&b->_slices, &arc_s) ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY; + return _z_arc_slice_svec_append(&b->_slices, &arc_s); } z_result_t _z_bytes_from_buf(_z_bytes_t *b, const uint8_t *src, size_t len) { @@ -130,8 +125,7 @@ z_result_t _z_bytes_to_slice(const _z_bytes_t *bytes, _z_slice_t *s) { } z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s) { - _Z_CLEAN_RETURN_IF_ERR(_z_arc_slice_svec_append(&dst->_slices, s) ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY, - _z_arc_slice_drop(s)); + _Z_CLEAN_RETURN_IF_ERR(_z_arc_slice_svec_append(&dst->_slices, s), _z_arc_slice_drop(s)); return _Z_RES_OK; } @@ -159,7 +153,7 @@ _z_slice_t _z_bytes_try_get_contiguous(const _z_bytes_t *bs) { } void _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src) { - dst->_slices = src->_slices; + *dst = *src; *src = _z_bytes_null(); } diff --git a/src/collections/vec.c b/src/collections/vec.c index d0f1a5c92..60ee8f4a1 100644 --- a/src/collections/vec.c +++ b/src/collections/vec.c @@ -41,7 +41,7 @@ void _z_vec_copy(_z_vec_t *dst, const _z_vec_t *src, z_element_clone_f d_f) { } } -void _z_vec_steal(_z_vec_t *dst, _z_vec_t *src) { +void _z_vec_move(_z_vec_t *dst, _z_vec_t *src) { *dst = *src; *src = _z_vec_null(); } @@ -145,40 +145,47 @@ _z_svec_t _z_svec_make(size_t capacity, size_t element_size) { return v; } -void __z_svec_copy_inner(void *dst, const void *src, z_element_copy_f copy, size_t num_elements, size_t element_size) { - if (copy == NULL) { - memcpy(dst, src, num_elements * element_size); - } else { - size_t offset = 0; - for (size_t i = 0; i < num_elements; i++) { - copy((uint8_t *)dst + offset, (uint8_t *)src + offset); - offset += element_size; - } - } -} +void _z_svec_init(_z_svec_t *dst, size_t element_size) { memset(dst->_val, 0, dst->_capacity * element_size); } -void __z_svec_move_inner(void *dst, void *src, z_element_move_f move, size_t num_elements, size_t element_size) { - if (move == NULL) { - memcpy(dst, src, num_elements * element_size); - } else { +static inline void __z_svec_move_inner(void *dst, void *src, z_element_move_f move, size_t num_elements, + size_t element_size, bool use_elem_f) { + if (use_elem_f) { size_t offset = 0; for (size_t i = 0; i < num_elements; i++) { move((uint8_t *)dst + offset, (uint8_t *)src + offset); offset += element_size; } + } else { + memcpy(dst, src, num_elements * element_size); } } -bool _z_svec_copy(_z_svec_t *dst, const _z_svec_t *src, z_element_copy_f copy, size_t element_size) { +void _z_svec_move(_z_svec_t *dst, _z_svec_t *src) { + *dst = *src; + *src = _z_svec_null(); +} + +z_result_t _z_svec_copy(_z_svec_t *dst, const _z_svec_t *src, z_element_copy_f copy, size_t element_size, + bool use_elem_f) { dst->_capacity = 0; dst->_len = 0; dst->_val = z_malloc(element_size * src->_capacity); - if (dst->_val != NULL) { - dst->_capacity = src->_capacity; - dst->_len = src->_len; - __z_svec_copy_inner(dst->_val, src->_val, copy, src->_len, element_size); + if (dst->_val == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + dst->_capacity = src->_capacity; + dst->_len = src->_len; + // Copy data to new vector + if (use_elem_f) { + size_t offset = 0; + for (size_t i = 0; i < src->_len; i++) { + copy((uint8_t *)dst->_val + offset, (uint8_t *)src->_val + offset); + offset += element_size; + } + } else { + memcpy(dst->_val, src->_val, src->_len * element_size); } - return dst->_len == src->_len; + return _Z_RES_OK; } void _z_svec_reset(_z_svec_t *v, z_element_clear_f clear, size_t element_size) { @@ -199,10 +206,7 @@ void _z_svec_clear(_z_svec_t *v, z_element_clear_f clear_f, size_t element_size) void _z_svec_release(_z_svec_t *v) { z_free(v->_val); - v->_val = NULL; - - v->_capacity = 0; - v->_len = 0; + *v = _z_svec_null(); } void _z_svec_free(_z_svec_t **v, z_element_clear_f clear, size_t element_size) { @@ -220,29 +224,30 @@ size_t _z_svec_len(const _z_svec_t *v) { return v->_len; } bool _z_svec_is_empty(const _z_svec_t *v) { return v->_len == 0; } -bool _z_svec_append(_z_svec_t *v, const void *e, z_element_move_f move, size_t element_size) { - if (v->_len == v->_capacity) { - // Allocate a new vector - size_t _capacity = v->_capacity == 0 ? 1 : (v->_capacity << 1); - void *_val = (void *)z_malloc(_capacity * element_size); - if (_val != NULL) { - __z_svec_move_inner(_val, v->_val, move, v->_len, element_size); - // Free the old data - z_free(v->_val); +z_result_t _z_svec_expand(_z_svec_t *v, z_element_move_f move, size_t element_size, bool use_elem_f) { + // Allocate a new vector + size_t _capacity = v->_capacity == 0 ? 1 : (v->_capacity << 1); + void *_val = (void *)z_malloc(_capacity * element_size); + if (_val == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + __z_svec_move_inner(_val, v->_val, move, v->_len, element_size, use_elem_f); + // Free the old data + z_free(v->_val); + // Update the current vector + v->_val = _val; + v->_capacity = _capacity; + return _Z_RES_OK; +} - // Update the current vector - v->_val = _val; - v->_capacity = _capacity; - memcpy((uint8_t *)v->_val + v->_len * element_size, e, element_size); - v->_len++; - } else { - return false; - } - } else { - memcpy((uint8_t *)v->_val + v->_len * element_size, e, element_size); - v->_len++; +z_result_t _z_svec_append(_z_svec_t *v, const void *e, z_element_move_f move, size_t element_size, bool use_elem_f) { + if (v->_len == v->_capacity) { + _Z_RETURN_IF_ERR(_z_svec_expand(v, move, element_size, use_elem_f)); } - return true; + // Append element + memcpy((uint8_t *)v->_val + v->_len * element_size, e, element_size); + v->_len++; + return _Z_RES_OK; } void *_z_svec_get(const _z_svec_t *v, size_t i, size_t element_size) { @@ -250,17 +255,20 @@ void *_z_svec_get(const _z_svec_t *v, size_t i, size_t element_size) { return (uint8_t *)v->_val + i * element_size; } +void *_z_svec_get_mut(_z_svec_t *v, size_t i, size_t element_size) { return (uint8_t *)v->_val + i * element_size; } + void _z_svec_set(_z_svec_t *v, size_t i, void *e, z_element_clear_f clear, size_t element_size) { assert(i < v->_len); clear((uint8_t *)v->_val + i * element_size); memcpy((uint8_t *)v->_val + i * element_size, e, element_size); } -void _z_svec_remove(_z_svec_t *v, size_t pos, z_element_clear_f clear, z_element_move_f move, size_t element_size) { +void _z_svec_remove(_z_svec_t *v, size_t pos, z_element_clear_f clear, z_element_move_f move, size_t element_size, + bool use_elem_f) { assert(pos < v->_len); clear((uint8_t *)v->_val + pos * element_size); __z_svec_move_inner((uint8_t *)v->_val + pos * element_size, (uint8_t *)v->_val + (pos + 1) * element_size, move, - (v->_len - pos - 1) * element_size, element_size); + (v->_len - pos - 1) * element_size, element_size, use_elem_f); v->_len--; } diff --git a/src/net/encoding.c b/src/net/encoding.c index 8bd1ec256..45b13e4d3 100644 --- a/src/net/encoding.c +++ b/src/net/encoding.c @@ -38,7 +38,11 @@ z_result_t _z_encoding_make(_z_encoding_t *encoding, uint16_t id, const char *sc return _Z_RES_OK; } -void _z_encoding_clear(_z_encoding_t *encoding) { _z_string_clear(&encoding->schema); } +void _z_encoding_clear(_z_encoding_t *encoding) { + if (_z_string_check(&encoding->schema)) { + _z_string_clear(&encoding->schema); + } +} z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src) { dst->id = src->id; @@ -55,6 +59,8 @@ void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src) { src->id = _Z_ENCODING_ID_DEFAULT; if (_z_string_check(&src->schema)) { _z_string_move(&dst->schema, &src->schema); + } else { + dst->schema = _z_string_null(); } } diff --git a/src/net/sample.c b/src/net/sample.c index 0921a3fc2..d158398c6 100644 --- a/src/net/sample.c +++ b/src/net/sample.c @@ -58,28 +58,22 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) { } #if Z_FEATURE_SUBSCRIPTION == 1 -_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t *attachment, z_reliability_t reliability) { - _z_sample_t s = _z_sample_null(); - s.keyexpr = _z_keyexpr_steal(key); - s.kind = kind; - if (_z_timestamp_check(timestamp)) { - s.timestamp = _z_timestamp_duplicate(timestamp); - } - s.qos = qos; - s.reliability = reliability; - _z_bytes_copy(&s.payload, payload); - _z_bytes_copy(&s.attachment, attachment); - if (encoding != NULL) { - _z_encoding_move(&s.encoding, encoding); - } - return s; +void _z_sample_create(_z_sample_t *s, _z_keyexpr_t *key, _z_bytes_t *payload, const _z_timestamp_t *timestamp, + _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, _z_bytes_t *attachment, + z_reliability_t reliability) { + s->kind = kind; + s->qos = qos; + s->reliability = reliability; + s->keyexpr = _z_keyexpr_steal(key); + s->timestamp = _z_timestamp_check(timestamp) ? _z_timestamp_duplicate(timestamp) : _z_timestamp_null(); + _z_encoding_move(&s->encoding, encoding); + _z_bytes_move(&s->attachment, attachment); + _z_bytes_move(&s->payload, payload); } #else -_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, - const _z_bytes_t *attachment, z_reliability_t reliability) { +void _z_sample_create(_z_sample_t *s, _z_keyexpr_t *key, _z_bytes_t *payload, const _z_timestamp_t *timestamp, + _z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos, _z_bytes_t *attachment, + z_reliability_t reliability) { _ZP_UNUSED(key); _ZP_UNUSED(payload); _ZP_UNUSED(timestamp); @@ -88,6 +82,6 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t *payload, const _ZP_UNUSED(qos); _ZP_UNUSED(attachment); _ZP_UNUSED(reliability); - return _z_sample_null(); + *s = _z_sample_null(); } #endif diff --git a/src/protocol/codec.c b/src/protocol/codec.c index 8574d2d89..64f22cade 100644 --- a/src/protocol/codec.c +++ b/src/protocol/codec.c @@ -282,7 +282,6 @@ z_result_t _z_slice_val_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice z_result_t _z_slice_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice_decode_na(bs, zbf); } z_result_t _z_bytes_decode(_z_bytes_t *bs, _z_zbuf_t *zbf) { - *bs = _z_bytes_null(); // Decode slice _z_slice_t s; _Z_RETURN_IF_ERR(_z_slice_decode(&s, zbf)); @@ -351,7 +350,6 @@ z_result_t _z_string_encode(_z_wbuf_t *wbf, const _z_string_t *s) { } z_result_t _z_string_decode(_z_string_t *str, _z_zbuf_t *zbf) { - *str = _z_string_null(); _z_zint_t len = 0; // Decode string length _Z_RETURN_IF_ERR(_z_zsize_decode(&len, zbf)); @@ -360,13 +358,9 @@ z_result_t _z_string_decode(_z_string_t *str, _z_zbuf_t *zbf) { _Z_INFO("Not enough bytes to read"); return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; } - // Allocate space for the string terminator - *str = _z_string_preallocate(len); - if (str->_slice.start == NULL) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - // Read bytes - _z_zbuf_read_bytes(zbf, (uint8_t *)_z_string_data(str), 0, len); + // Alias string + *str = _z_string_alias_substr((const char *)_z_zbuf_get_rptr(zbf), len); + _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + len); return _Z_RES_OK; } diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index b46dcaca0..35c8fa826 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -332,7 +332,6 @@ z_result_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t hea switch (_Z_MID(header)) { case _Z_MID_Z_PUT: { pshb->_is_put = true; - pshb->_body._put = (_z_msg_put_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P_T)) { _Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf)); } @@ -349,7 +348,6 @@ z_result_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t hea } case _Z_MID_Z_DEL: { pshb->_is_put = false; - pshb->_body._del = (_z_msg_del_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_D_T)) { _Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf)); } @@ -470,7 +468,6 @@ z_result_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) { z_result_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) { _Z_DEBUG("Decoding _Z_MID_Z_QUERY"); - *msg = (_z_msg_query_t){0}; z_result_t ret = _Z_RES_OK; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Q_C)) { @@ -513,7 +510,6 @@ z_result_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) { return ret; } z_result_t _z_reply_decode(_z_msg_reply_t *reply, _z_zbuf_t *zbf, uint8_t header) { - *reply = (_z_msg_reply_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_C)) { _Z_RETURN_IF_ERR(_z_uint8_decode((uint8_t *)&reply->_consolidation, zbf)); } else { @@ -574,8 +570,6 @@ z_result_t _z_err_decode_extension(_z_msg_ext_t *extension, void *ctx) { return ret; } z_result_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) { - *err = (_z_msg_err_t){0}; - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_E_E)) { _Z_RETURN_IF_ERR(_z_encoding_decode(&err->_encoding, zbf)); } diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 374db4564..de453c9ae 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -94,7 +94,6 @@ z_result_t _z_push_decode_ext_cb(_z_msg_ext_t *extension, void *ctx) { z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header) { z_result_t ret = _Z_RES_OK; - *msg = (_z_n_msg_push_t){0}; msg->_qos = _Z_N_QOS_DEFAULT; ret |= _z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_N)); _z_keyexpr_set_mapping(&msg->_key, _Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_M) ? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE @@ -208,7 +207,6 @@ z_result_t _z_request_decode_extensions(_z_msg_ext_t *extension, void *ctx) { return _Z_RES_OK; } z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint8_t header) { - *msg = (_z_n_msg_request_t){0}; msg->_ext_qos = _Z_N_QOS_DEFAULT; _Z_RETURN_IF_ERR(_z_zsize_decode(&msg->_rid, zbf)); _Z_RETURN_IF_ERR(_z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_REQUEST_N))); @@ -338,7 +336,6 @@ z_result_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) { z_result_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header) { _Z_DEBUG("Decoding _Z_MID_N_RESPONSE"); - *msg = (_z_n_msg_response_t){0}; msg->_ext_qos = _Z_N_QOS_DEFAULT; z_result_t ret = _Z_RES_OK; _z_keyexpr_set_mapping(&msg->_key, _Z_HAS_FLAG(header, _Z_FLAG_N_RESPONSE_M) ? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE @@ -384,8 +381,6 @@ z_result_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_fina z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header) { (void)(header); - - *msg = (_z_n_msg_response_final_t){0}; z_result_t ret = _Z_RES_OK; ret |= _z_zsize_decode(&msg->_request_id, zbf); if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { @@ -444,7 +439,6 @@ z_result_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { return _Z_RES_OK; } z_result_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header) { - *decl = (_z_n_msg_declare_t){0}; decl->_ext_qos = _Z_N_QOS_DEFAULT; // Retrieve interest id if (_Z_HAS_FLAG(header, _Z_FLAG_N_DECLARE_I)) { diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index e77e62f88..f09b5460f 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -28,6 +28,10 @@ #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" +#define _Z_FRAME_VEC_BASE_SIZE 8 // Abritrary small value +#define _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(len) \ + (_Z_FRAME_VEC_BASE_SIZE + (len) / Z_CONFIG_FRAME_AVG_MSG_SIZE) // Approximate number of messages in frame + uint8_t _z_whatami_to_uint8(z_whatami_t whatami) { return (whatami >> 1) & 0x03; // get set bit index; only first 3 bits can be set } @@ -339,9 +343,9 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_ ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } if (ret == _Z_RES_OK) { - size_t len = _z_network_message_vec_len(&msg->_messages); + size_t len = _z_network_message_svec_len(&msg->_messages); for (size_t i = 0; i < len; i++) { - _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_vec_get(&msg->_messages, i))) + _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i))) } } @@ -352,38 +356,45 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header z_result_t ret = _Z_RES_OK; *msg = (_z_t_msg_frame_t){0}; - ret |= _z_zsize_decode(&msg->_sn, zbf); - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x04); - } - if (ret == _Z_RES_OK) { - msg->_messages = _z_network_message_vec_make(_ZENOH_PICO_FRAME_MESSAGES_VEC_SIZE); - while (_z_zbuf_len(zbf) > 0) { - // Mark the reading position of the iobfer - size_t r_pos = _z_zbuf_get_rpos(zbf); - _z_network_message_t *nm = (_z_network_message_t *)z_malloc(sizeof(_z_network_message_t)); - memset(nm, 0, sizeof(_z_network_message_t)); - ret |= _z_network_message_decode(nm, zbf); - if (ret == _Z_RES_OK) { - _z_network_message_vec_append(&msg->_messages, nm); - } else { - _z_n_msg_free(&nm); - _z_network_message_vec_clear(&msg->_messages); - - _z_zbuf_set_rpos(zbf, r_pos); // Restore the reading position of the iobfer - - // FIXME: Check for the return error, since not all of them means a decoding error - // in this particular case. As of now, we roll-back the reading position - // and return to the Zenoh transport-level decoder. - // https://github.com/eclipse-zenoh/zenoh-pico/pull/132#discussion_r1045593602 - if ((ret & _Z_ERR_MESSAGE_ZENOH_UNKNOWN) == _Z_ERR_MESSAGE_ZENOH_UNKNOWN) { - ret = _Z_RES_OK; - } - break; + _Z_RETURN_IF_ERR(_z_zsize_decode(&msg->_sn, zbf)); + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x04)); + } + // Create message vector + size_t var_size = _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(_z_zbuf_len(zbf)); + msg->_messages = _z_network_message_svec_make(var_size); + if (msg->_messages._capacity == 0) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + _z_network_message_svec_init(&msg->_messages); + size_t msg_idx = 0; + while (_z_zbuf_len(zbf) > 0) { + // Expand message vector if needed + if (msg_idx >= msg->_messages._capacity) { + _Z_RETURN_IF_ERR(_z_network_message_svec_expand(&msg->_messages)); + _z_network_message_svec_init(&msg->_messages); + } + // Mark the reading position of the iobfer + size_t r_pos = _z_zbuf_get_rpos(zbf); + _z_network_message_t *nm = _z_network_message_svec_get_mut(&msg->_messages, msg_idx); + ret = _z_network_message_decode(nm, zbf); + if (ret != _Z_RES_OK) { + _z_network_message_svec_clear(&msg->_messages); + _z_zbuf_set_rpos(zbf, r_pos); // Restore the reading position of the iobfer + + // FIXME: Check for the return error, since not all of them means a decoding error + // in this particular case. As of now, we roll-back the reading position + // and return to the Zenoh transport-level decoder. + // https://github.com/eclipse-zenoh/zenoh-pico/pull/132#discussion_r1045593602 + if ((ret & _Z_ERR_MESSAGE_ZENOH_UNKNOWN) == _Z_ERR_MESSAGE_ZENOH_UNKNOWN) { + ret = _Z_RES_OK; } + return ret; } + msg->_messages._len++; + msg_idx++; } - return ret; + return _Z_RES_OK; } /*------------------ Fragment Message ------------------*/ diff --git a/src/protocol/core.c b/src/protocol/core.c index 4dda7b57c..16842f708 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -63,7 +63,7 @@ z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src) { z_result_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src) { *dst = _z_hello_null(); - _Z_RETURN_IF_ERR(_z_string_svec_copy(&dst->_locators, &src->_locators) ? _Z_RES_OK : _Z_ERR_SYSTEM_OUT_OF_MEMORY); + _Z_RETURN_IF_ERR(_z_string_svec_copy(&dst->_locators, &src->_locators)); dst->_version = src->_version; dst->_whatami = src->_whatami; memcpy(&dst->_zid.id, &src->_zid.id, _Z_ID_LEN); diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index 3001b3372..0c3058e0a 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -23,6 +23,7 @@ void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); } void _z_msg_put_clear(_z_msg_put_t *msg) { + // TODO: systematically move everything so there's nothing to clear _z_bytes_drop(&msg->_payload); _z_bytes_drop(&msg->_attachment); _z_encoding_clear(&msg->_encoding); diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2af77cb97..448951e46 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -40,7 +40,7 @@ void _z_t_msg_close_clear(_z_t_msg_close_t *msg) { (void)(msg); } void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg) { (void)(msg); } -void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg) { _z_network_message_vec_clear(&msg->_messages); } +void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg) { _z_network_message_svec_clear(&msg->_messages); } void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg) { _z_slice_clear(&msg->_payload); } @@ -216,7 +216,7 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void) { return msg; } -_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, +_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_svec_t messages, z_reliability_t reliability) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAME; @@ -241,7 +241,7 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t _Z_SET_FLAG(msg._header, _Z_FLAG_T_FRAME_R); } - msg._body._frame._messages = _z_network_message_vec_make(0); + msg._body._frame._messages = _z_network_message_svec_null(); return msg; } @@ -307,7 +307,7 @@ void _z_t_msg_copy_keep_alive(_z_t_msg_keep_alive_t *clone, _z_t_msg_keep_alive_ void _z_t_msg_copy_frame(_z_t_msg_frame_t *clone, _z_t_msg_frame_t *msg) { clone->_sn = msg->_sn; - _z_network_message_vec_copy(&clone->_messages, &msg->_messages); + _z_network_message_svec_copy(&clone->_messages, &msg->_messages); } /*------------------ Transport Message ------------------*/ diff --git a/src/session/interest.c b/src/session/interest.c index 6bcdbed4c..8faa6cc2f 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -271,7 +271,7 @@ z_result_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t } // Retrieve key _z_session_mutex_lock(zn); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key, true); if (!_z_keyexpr_has_suffix(&key)) { _z_session_mutex_unlock(zn); return _Z_ERR_KEYEXPR_UNKNOWN; diff --git a/src/session/query.c b/src/session/query.c index 150d9c54f..241839551 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -122,7 +122,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, ret = _Z_ERR_ENTITY_UNKNOWN; } - _z_keyexpr_t expanded_ke = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); + _z_keyexpr_t expanded_ke = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr, true); if ((ret == _Z_RES_OK) && ((pen_qry->_anykey == false) && (_z_keyexpr_suffix_intersects(&pen_qry->_key, &keyexpr) == false))) { ret = _Z_ERR_QUERY_NOT_MATCH; diff --git a/src/session/queryable.c b/src/session/queryable.c index a7a25eaee..ba04debd7 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -106,7 +106,7 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons _z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { _z_session_mutex_lock(zn); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, false); _z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key); _z_session_mutex_unlock(zn); @@ -139,7 +139,7 @@ z_result_t _z_trigger_queryables(_z_session_rc_t *zsrc, _z_msg_query_t *msgq, co _z_session_mutex_lock(zn); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key, true); if (_z_keyexpr_has_suffix(&key)) { _z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key); diff --git a/src/session/resource.c b/src/session/resource.c index 37c19411a..2413a1d21 100644 --- a/src/session/resource.c +++ b/src/session/resource.c @@ -87,7 +87,8 @@ _z_resource_t *__z_get_resource_by_key(_z_resource_list_t *rl, const _z_keyexpr_ return ret; } -_z_keyexpr_t __z_get_expanded_key_from_key(_z_resource_list_t *xs, const _z_keyexpr_t *keyexpr) { +static _z_keyexpr_t __z_get_expanded_key_from_key(_z_resource_list_t *xs, const _z_keyexpr_t *keyexpr, + bool force_alias) { _z_zint_t id = keyexpr->_id; // Check if ke is already expanded @@ -95,7 +96,12 @@ _z_keyexpr_t __z_get_expanded_key_from_key(_z_resource_list_t *xs, const _z_keye if (!_z_keyexpr_has_suffix(keyexpr)) { return _z_keyexpr_null(); } - return _z_keyexpr_duplicate(keyexpr); + // Keyexpr can be aliased from a rx buffer + if (force_alias) { + return _z_keyexpr_alias(*keyexpr); + } else { + return _z_keyexpr_duplicate(keyexpr); + } } // Need to build the complete resource name, by recursively look at RIDs @@ -169,9 +175,9 @@ _z_resource_t *__unsafe_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { +_z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr, bool force_alias) { _z_resource_list_t *decls = _z_keyexpr_is_local(keyexpr) ? zn->_local_resources : zn->_remote_resources; - return __z_get_expanded_key_from_key(decls, keyexpr); + return __z_get_expanded_key_from_key(decls, keyexpr, force_alias); } _z_resource_t *_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t rid) { @@ -199,7 +205,7 @@ _z_resource_t *_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr_t *keye _z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { _z_session_mutex_lock(zn); - _z_keyexpr_t res = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); + _z_keyexpr_t res = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, false); _z_session_mutex_unlock(zn); @@ -219,7 +225,7 @@ uint16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, u _z_resource_t *parent = __unsafe_z_get_resource_by_id(zn, parent_mapping, key._id); parent->_refcount++; } else { - key = __unsafe_z_get_expanded_key_from_key(zn, &key); + key = __unsafe_z_get_expanded_key_from_key(zn, &key, false); } } ret = key._id; diff --git a/src/session/rx.c b/src/session/rx.c index 54dc8983f..70213b898 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -159,6 +159,5 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * } } } - _z_msg_clear(msg); return ret; } diff --git a/src/session/subscription.c b/src/session/subscription.c index db397367f..3e6297176 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -29,6 +29,57 @@ #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_SUBSCRIPTION == 1 + +#define _Z_SUBINFOS_VEC_SIZE 4 // Arbitrary initial size + +#if Z_FEATURE_RX_CACHE == 1 +static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val, + _z_subscription_infos_svec_t *infos_val) { + if (!_z_keyexpr_equals(ke, &zn->_subscription_cache.ke_in)) { + return false; + } + *ke_val = _z_keyexpr_alias(zn->_subscription_cache.ke_out); + *infos_val = _z_subscription_infos_svec_alias(&zn->_subscription_cache.infos); + return true; +} + +static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out, + _z_subscription_infos_svec_t *infos) { + // Clear previous data + _z_subscription_cache_clear(&zn->_subscription_cache); + // Register new info + zn->_subscription_cache.ke_in = _z_keyexpr_duplicate(ke_in); + zn->_subscription_cache.ke_out = _z_keyexpr_duplicate(ke_out); + zn->_subscription_cache.infos = _z_subscription_infos_svec_alias(infos); +} + +void _z_subscription_cache_clear(_z_subscription_cache_t *cache) { + _z_subscription_infos_svec_clear(&cache->infos); + _z_keyexpr_clear(&cache->ke_in); + _z_keyexpr_clear(&cache->ke_out); +} + +#else +static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val, + _z_subscription_infos_svec_t *infos_val) { + _ZP_UNUSED(zn); + _ZP_UNUSED(ke); + _ZP_UNUSED(ke_val); + _ZP_UNUSED(infos_val); + return false; +} + +static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out, + _z_subscription_infos_svec_t *infos) { + _ZP_UNUSED(zn); + _ZP_UNUSED(ke_in); + _ZP_UNUSED(ke_out); + _ZP_UNUSED(infos); + return; +} +#endif // Z_FEATURE_RX_CACHE == 1 + +// Subscription bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this_) { return this_->_id == other->_id; } @@ -57,22 +108,6 @@ _z_subscription_rc_t *__z_get_subscription_by_id(_z_subscription_rc_list_t *subs return ret; } -_z_subscription_rc_list_t *__z_get_subscriptions_by_key(_z_subscription_rc_list_t *subs, const _z_keyexpr_t *key) { - _z_subscription_rc_list_t *ret = NULL; - - _z_subscription_rc_list_t *xs = subs; - while (xs != NULL) { - _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); - if (_z_keyexpr_suffix_intersects(&_Z_RC_IN_VAL(sub)->_key, key) == true) { - ret = _z_subscription_rc_list_push(ret, _z_subscription_rc_clone_as_ptr(sub)); - } - - xs = _z_subscription_rc_list_tail(xs); - } - - return ret; -} - /** * This function is unsafe because it operates in potentially concurrent data. * Make sure that the following mutexes are locked before calling this function: @@ -89,11 +124,24 @@ _z_subscription_rc_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint8_ * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_subscription_rc_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, - const _z_keyexpr_t *key) { +static z_result_t __unsafe_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key, + _z_subscription_infos_svec_t *sub_infos) { _z_subscription_rc_list_t *subs = (is_local == _Z_RESOURCE_IS_LOCAL) ? zn->_local_subscriptions : zn->_remote_subscriptions; - return __z_get_subscriptions_by_key(subs, key); + + *sub_infos = _z_subscription_infos_svec_make(_Z_SUBINFOS_VEC_SIZE); + _z_subscription_rc_list_t *xs = subs; + while (xs != NULL) { + // Parse subscription list + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + if (_z_keyexpr_suffix_intersects(&_Z_RC_IN_VAL(sub)->_key, key)) { + _z_subscription_infos_t new_sub_info = {.arg = _Z_RC_IN_VAL(sub)->_arg, + .callback = _Z_RC_IN_VAL(sub)->_callback}; + _Z_RETURN_IF_ERR(_z_subscription_infos_svec_append(sub_infos, &new_sub_info)); + } + xs = _z_subscription_rc_list_tail(xs); + } + return _Z_RES_OK; } _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { @@ -106,16 +154,6 @@ _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_loc return sub; } -_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key) { - _z_session_mutex_lock(zn); - - _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, key); - - _z_session_mutex_unlock(zn); - - return subs; -} - _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *s) { _Z_DEBUG(">>> Allocating sub decl for (%ju:%.*s)", (uintmax_t)s->_key._id, (int)_z_string_len(&s->_key._suffix), _z_string_data(&s->_key._suffix)); @@ -138,53 +176,60 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca return ret; } -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload, +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_bytes_t *payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t *attachment, z_reliability_t reliability) { + _z_bytes_t *attachment, z_reliability_t reliability) { z_result_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, attachment, reliability); (void)ret; } -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload, +z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_bytes_t *payload, _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t *attachment, z_reliability_t reliability) { - z_result_t ret = _Z_RES_OK; - - _z_session_mutex_lock(zn); - - _Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr->_id, (int)_z_string_len(&keyexpr->_suffix), - _z_string_data(&keyexpr->_suffix), _z_keyexpr_mapping_id(keyexpr)); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); - _Z_DEBUG("Triggering subs for %d - %.*s", key._id, (int)_z_string_len(&key._suffix), _z_string_data(&key._suffix)); - if (_z_keyexpr_has_suffix(&key)) { - _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, &key); - _z_session_mutex_unlock(zn); - - // Check if there is subs - size_t sub_nb = _z_subscription_rc_list_len(subs); - if (sub_nb == 0) { - return _Z_RES_OK; - } - // Build the sample - _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment, reliability); - // Parse subscription list - _z_subscription_rc_list_t *xs = subs; - _Z_DEBUG("Triggering %ju subs", (uintmax_t)sub_nb); - while (xs != NULL) { - _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); - _Z_RC_IN_VAL(sub)->_callback(&sample, _Z_RC_IN_VAL(sub)->_arg); - xs = _z_subscription_rc_list_tail(xs); + const _z_n_qos_t qos, _z_bytes_t *attachment, z_reliability_t reliability) { + _z_sample_t sample; + _z_keyexpr_t key; + _z_subscription_infos_svec_t subs; + // Check cache + if (!_z_subscription_get_from_cache(zn, keyexpr, &key, &subs)) { + _Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr->_id, (int)_z_string_len(&keyexpr->_suffix), + _z_string_data(&keyexpr->_suffix), _z_keyexpr_mapping_id(keyexpr)); + _z_session_mutex_lock(zn); + key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, true); + + if (!_z_keyexpr_has_suffix(&key)) { + _z_session_mutex_unlock(zn); + return _Z_ERR_KEYEXPR_UNKNOWN; } - // Clean up - _z_sample_clear(&sample); - _z_subscription_rc_list_free(&subs); - } else { + // Get subscription list + z_result_t ret = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, &key, &subs); _z_session_mutex_unlock(zn); - ret = _Z_ERR_KEYEXPR_UNKNOWN; + if (ret != _Z_RES_OK) { + return ret; + } + // Update cache + _z_subscription_update_cache(zn, keyexpr, &key, &subs); } - - return ret; + // Check if there is subs + size_t sub_nb = _z_subscription_infos_svec_len(&subs); + _Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_nb, key._id, (int)_z_string_len(&key._suffix), + _z_string_data(&key._suffix)); + if (sub_nb == 0) { + return _Z_RES_OK; + } + // Build the sample + _z_sample_create(&sample, &key, payload, timestamp, encoding, kind, qos, attachment, reliability); + // Parse subscription infos svec + for (size_t i = 0; i < sub_nb; i++) { + _z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&subs, i); + sub_info->callback(&sample, sub_info->arg); + } + // Clean up + _z_sample_clear(&sample); +#if Z_FEATURE_RX_CACHE != 1 + _z_subscription_infos_svec_release(&subs); // Otherwise it's released with cache +#endif + return _Z_RES_OK; } void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub) { @@ -211,9 +256,9 @@ void _z_flush_subscriptions(_z_session_t *zn) { } #else // Z_FEATURE_SUBSCRIPTION == 0 -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, const _z_bytes_t *payload, +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_bytes_t *payload, _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t *attachment, z_reliability_t reliability) { + _z_bytes_t *attachment, z_reliability_t reliability) { _ZP_UNUSED(zn); _ZP_UNUSED(keyexpr); _ZP_UNUSED(payload); diff --git a/src/session/utils.c b/src/session/utils.c index 0a2cff984..dde52129c 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -60,6 +60,9 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { #if Z_FEATURE_SUBSCRIPTION == 1 zn->_local_subscriptions = NULL; zn->_remote_subscriptions = NULL; +#if Z_FEATURE_RX_CACHE == 1 + memset(&zn->_subscription_cache, 0, sizeof(zn->_subscription_cache)); +#endif #endif #if Z_FEATURE_QUERYABLE == 1 zn->_local_queryable = NULL; @@ -111,6 +114,9 @@ void _z_session_clear(_z_session_t *zn) { _z_flush_resources(zn); #if Z_FEATURE_SUBSCRIPTION == 1 _z_flush_subscriptions(zn); +#if Z_FEATURE_RX_CACHE == 1 + _z_subscription_cache_clear(&zn->_subscription_cache); +#endif #endif #if Z_FEATURE_QUERYABLE == 1 _z_flush_session_queryable(zn); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index d10d9709f..c1f438c38 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -130,9 +130,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } // Note that we receive data from peer entry->_received = true; - + z_reliability_t tmsg_reliability; // Check if the SN is correct if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == @@ -147,6 +148,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } } else { + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._frame._sn) == true) { entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; @@ -162,10 +164,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, // Handle all the zenoh message, one by one uint16_t mapping = entry->_peer_id; - size_t len = _z_vec_len(&t_msg->_body._frame._messages); + size_t len = _z_svec_len(&t_msg->_body._frame._messages); for (size_t i = 0; i < len; i++) { - _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); - zm->_reliability = _z_t_msg_get_reliability(t_msg); + _z_network_message_t *zm = _z_network_message_svec_get(&t_msg->_body._frame._messages, i); + zm->_reliability = tmsg_reliability; _z_msg_fix_mapping(zm, mapping); _z_handle_network_message(ztm->_common._session, zm, mapping); @@ -186,11 +188,14 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_wbuf_t *dbuf; uint8_t *dbuf_state; + z_reliability_t tmsg_reliability; // Select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; dbuf = &entry->_dbuf_reliable; dbuf_state = &entry->_state_reliable; } else { + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; dbuf = &entry->_dbuf_best_effort; dbuf_state = &entry->_state_best_effort; } @@ -234,9 +239,9 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } // Decode message - _z_zenoh_message_t zm; + _z_zenoh_message_t zm = {0}; ret = _z_network_message_decode(&zm, &zbf); - zm._reliability = _z_t_msg_get_reliability(t_msg); + zm._reliability = tmsg_reliability; if (ret == _Z_RES_OK) { uint16_t mapping = entry->_peer_id; _z_msg_fix_mapping(&zm, mapping); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index ca9591bba..1d30e988f 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -96,8 +96,10 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t switch (_Z_MID(t_msg->_header)) { case _Z_MID_T_FRAME: { _Z_DEBUG("Received Z_FRAME message"); + z_reliability_t tmsg_reliability; // Check if the SN is correct if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { @@ -111,6 +113,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } } else { + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; if (_z_sn_precedes(ztu->_common._sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; } else { @@ -124,10 +127,10 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } // Handle all the zenoh message, one by one - size_t len = _z_vec_len(&t_msg->_body._frame._messages); + size_t len = _z_svec_len(&t_msg->_body._frame._messages); for (size_t i = 0; i < len; i++) { - _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); - zm->_reliability = _z_t_msg_get_reliability(t_msg); + _z_network_message_t *zm = _z_network_message_svec_get(&t_msg->_body._frame._messages, i); + zm->_reliability = tmsg_reliability; _z_handle_network_message(ztu->_common._session, zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } @@ -139,11 +142,14 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_t *dbuf; uint8_t *dbuf_state; + z_reliability_t tmsg_reliability; // Select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)) { + tmsg_reliability = Z_RELIABILITY_RELIABLE; dbuf = &ztu->_dbuf_reliable; dbuf_state = &ztu->_state_reliable; } else { + tmsg_reliability = Z_RELIABILITY_BEST_EFFORT; dbuf = &ztu->_dbuf_best_effort; dbuf_state = &ztu->_state_best_effort; } @@ -187,9 +193,9 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } // Decode message - _z_zenoh_message_t zm; + _z_zenoh_message_t zm = {0}; ret = _z_network_message_decode(&zm, &zbf); - zm._reliability = _z_t_msg_get_reliability(t_msg); + zm._reliability = tmsg_reliability; if (ret == _Z_RES_OK) { _z_handle_network_message(ztu->_common._session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } else { diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index c881f8f4e..f3a92a77a 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -554,7 +554,7 @@ void payload_field(void) { // Decode _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); - _z_bytes_t d_pld; + _z_bytes_t d_pld = _z_bytes_null(); res = _z_bytes_decode(&d_pld, &zbf); assert(res == _Z_RES_OK); printf(" "); @@ -1140,7 +1140,7 @@ void interest_message(void) { // Encode assert(_z_n_interest_encode(&wbf, &expected._body._interest) == _Z_RES_OK); // Decode - _z_n_msg_interest_t decoded; + _z_n_msg_interest_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); assert(_z_n_interest_decode(&decoded, &zbf, header) == _Z_RES_OK); @@ -1236,7 +1236,7 @@ void query_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_msg_query_t expected = gen_query(); assert(_z_query_encode(&wbf, &expected) == _Z_RES_OK); - _z_msg_query_t decoded; + _z_msg_query_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); z_result_t res = _z_query_decode(&decoded, &zbf, header); @@ -1268,7 +1268,7 @@ void err_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_msg_err_t expected = gen_err(); assert(_z_err_encode(&wbf, &expected) == _Z_RES_OK); - _z_msg_err_t decoded; + _z_msg_err_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); assert(_Z_RES_OK == _z_err_decode(&decoded, &zbf, header)); @@ -1296,7 +1296,7 @@ void reply_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_msg_reply_t expected = gen_reply(); assert(_z_reply_encode(&wbf, &expected) == _Z_RES_OK); - _z_msg_reply_t decoded; + _z_msg_reply_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); assert(_Z_RES_OK == _z_reply_decode(&decoded, &zbf, header)); @@ -1328,7 +1328,7 @@ void push_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_push_t expected = gen_push(); assert(_z_push_encode(&wbf, &expected) == _Z_RES_OK); - _z_n_msg_push_t decoded; + _z_n_msg_push_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); assert(_Z_RES_OK == _z_push_decode(&decoded, &zbf, header)); @@ -1398,7 +1398,7 @@ void request_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_request_t expected = gen_request(); assert(_z_request_encode(&wbf, &expected) == _Z_RES_OK); - _z_n_msg_request_t decoded; + _z_n_msg_request_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); z_result_t ret = _z_request_decode(&decoded, &zbf, header); @@ -1456,7 +1456,7 @@ void response_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_response_t expected = gen_response(); assert(_z_response_encode(&wbf, &expected) == _Z_RES_OK); - _z_n_msg_response_t decoded; + _z_n_msg_response_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); z_result_t ret = _z_response_decode(&decoded, &zbf, header); @@ -1477,7 +1477,7 @@ void response_final_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_response_final_t expected = gen_response_final(); assert(_z_response_final_encode(&wbf, &expected) == _Z_RES_OK); - _z_n_msg_response_final_t decoded; + _z_n_msg_response_final_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); uint8_t header = _z_zbuf_read(&zbf); z_result_t ret = _z_response_final_decode(&decoded, &zbf, header); @@ -1526,7 +1526,7 @@ void join_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_join(); assert(_z_join_encode(&wbf, expected._header, &expected._body._join) == _Z_RES_OK); - _z_t_msg_join_t decoded; + _z_t_msg_join_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_join_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1558,7 +1558,7 @@ void init_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_init(); assert(_z_init_encode(&wbf, expected._header, &expected._body._init) == _Z_RES_OK); - _z_t_msg_init_t decoded; + _z_t_msg_init_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_init_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1586,7 +1586,7 @@ void open_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_open(); assert(_z_open_encode(&wbf, expected._header, &expected._body._open) == _Z_RES_OK); - _z_t_msg_open_t decoded; + _z_t_msg_open_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_open_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1606,7 +1606,7 @@ void close_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_close(); assert(_z_close_encode(&wbf, expected._header, &expected._body._close) == _Z_RES_OK); - _z_t_msg_close_t decoded; + _z_t_msg_close_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_close_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1627,7 +1627,7 @@ void keep_alive_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_keep_alive(); assert(_z_keep_alive_encode(&wbf, expected._header, &expected._body._keep_alive) == _Z_RES_OK); - _z_t_msg_keep_alive_t decoded; + _z_t_msg_keep_alive_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_keep_alive_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1688,13 +1688,12 @@ void assert_eq_net_msg(const _z_network_message_t *left, const _z_network_messag break; } } -_z_network_message_vec_t gen_net_msgs(size_t n) { - _z_network_message_vec_t ret = _z_network_message_vec_make(n); +_z_network_message_svec_t gen_net_msgs(size_t n) { + _z_network_message_svec_t ret = _z_network_message_svec_make(n); for (size_t i = 0; i < n; i++) { - _z_network_message_t *msg = (_z_network_message_t *)z_malloc(sizeof(_z_network_message_t)); + _z_network_message_t *msg = _z_network_message_svec_get_mut(&ret, i); memset(msg, 0, sizeof(_z_network_message_t)); *msg = gen_net_msg(); - _z_network_message_vec_append(&ret, msg); } return ret; } @@ -1706,7 +1705,8 @@ void assert_eq_frame(const _z_t_msg_frame_t *left, const _z_t_msg_frame_t *right assert(left->_sn == right->_sn); assert(left->_messages._len == right->_messages._len); for (size_t i = 0; i < left->_messages._len; i++) { - assert_eq_net_msg(left->_messages._val[i], right->_messages._val[i]); + assert_eq_net_msg(_z_network_message_svec_get(&left->_messages, i), + _z_network_message_svec_get(&right->_messages, i)); } } void frame_message(void) { @@ -1714,7 +1714,7 @@ void frame_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_frame(); assert(_z_frame_encode(&wbf, expected._header, &expected._body._frame) == _Z_RES_OK); - _z_t_msg_frame_t decoded; + _z_t_msg_frame_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_frame_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1737,7 +1737,7 @@ void fragment_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_fragment(); assert(_z_fragment_encode(&wbf, expected._header, &expected._body._fragment) == _Z_RES_OK); - _z_t_msg_fragment_t decoded; + _z_t_msg_fragment_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_fragment_decode(&decoded, &zbf, expected._header); assert(_Z_RES_OK == ret); @@ -1807,7 +1807,7 @@ void transport_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_transport_message_t expected = gen_transport(); assert(_z_transport_message_encode(&wbf, &expected) == _Z_RES_OK); - _z_transport_message_t decoded; + _z_transport_message_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_transport_message_decode(&decoded, &zbf); assert(_Z_RES_OK == ret); @@ -1849,7 +1849,7 @@ void scouting_message(void) { _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_scouting_message_t expected = gen_scouting(); assert(_z_scouting_message_encode(&wbf, &expected) == _Z_RES_OK); - _z_scouting_message_t decoded; + _z_scouting_message_t decoded = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); z_result_t ret = _z_scouting_message_decode(&decoded, &zbf); assert(_Z_RES_OK == ret);