From c78d43729743c0a30ba925047700ba5b902a03af Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 8 Nov 2024 19:12:12 +0100 Subject: [PATCH] feat: n msg svec is now a transport resource pool --- include/zenoh-pico/config.h | 5 --- include/zenoh-pico/config.h.in | 5 --- include/zenoh-pico/protocol/codec/transport.h | 6 ++- include/zenoh-pico/transport/transport.h | 1 + src/protocol/codec/transport.c | 38 +++++++++---------- src/transport/common/rx.c | 4 +- src/transport/multicast/read.c | 3 +- src/transport/multicast/rx.c | 2 +- src/transport/multicast/transport.c | 7 +++- src/transport/raweth/rx.c | 2 +- src/transport/unicast/read.c | 3 +- src/transport/unicast/rx.c | 2 +- src/transport/unicast/transport.c | 7 +++- tests/z_msgcodec_test.c | 6 ++- 14 files changed, 46 insertions(+), 45 deletions(-) diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 2449535fe..5b946505f 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -177,11 +177,6 @@ */ #define Z_GET_TIMEOUT_DEFAULT 10000 -/** - * Average size of a frame message (bytes). Used to evaluate initial decoding frame size. - */ -#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32 - /** * Default "nop" instruction */ diff --git a/include/zenoh-pico/config.h.in b/include/zenoh-pico/config.h.in index d6f43e3d3..4e75ac0cb 100644 --- a/include/zenoh-pico/config.h.in +++ b/include/zenoh-pico/config.h.in @@ -177,11 +177,6 @@ */ #define Z_GET_TIMEOUT_DEFAULT 10000 -/** - * Average size of a frame message (bytes). Used to evaluate initial decoding frame size. - */ -#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32 - /** * Default "nop" instruction */ diff --git a/include/zenoh-pico/protocol/codec/transport.h b/include/zenoh-pico/protocol/codec/transport.h index 490227f72..4a83857b1 100644 --- a/include/zenoh-pico/protocol/codec/transport.h +++ b/include/zenoh-pico/protocol/codec/transport.h @@ -22,7 +22,8 @@ z_result_t _z_scouting_message_encode(_z_wbuf_t *buf, const _z_scouting_message_ z_result_t _z_scouting_message_decode(_z_scouting_message_t *msg, _z_zbuf_t *buf); z_result_t _z_transport_message_encode(_z_wbuf_t *buf, const _z_transport_message_t *msg); -z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf, _z_arc_slice_svec_t *arc_pool); +z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf, _z_arc_slice_svec_t *arc_pool, + _z_network_message_svec_t *msg_pool); z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg); z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header); @@ -40,7 +41,8 @@ z_result_t _z_keep_alive_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_k z_result_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t header); z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_t *msg); -z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool); +z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool, + _z_network_message_svec_t *msg_pool); z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fragment_t *msg); z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 91291a06d..e78de9366 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -82,6 +82,7 @@ typedef struct { _z_zint_t _sn_tx_reliable; _z_zint_t _sn_tx_best_effort; _z_arc_slice_svec_t _arc_pool; + _z_network_message_svec_t _msg_pool; volatile _z_zint_t _lease; volatile bool _transmitted; #if Z_FEATURE_MULTI_THREAD == 1 diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index bf6c0aac3..a3d8c2c29 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -28,10 +28,6 @@ #include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/result.h" -#define _Z_FRAME_VEC_BASE_SIZE 8 // Abritrary small value -#define _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(len) \ - (_Z_FRAME_VEC_BASE_SIZE + (len) / Z_CONFIG_FRAME_AVG_MSG_SIZE) // Approximate number of messages in frame - uint8_t _z_whatami_to_uint8(z_whatami_t whatami) { return (whatami >> 1) & 0x03; // get set bit index; only first 3 bits can be set } @@ -352,7 +348,8 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_ return ret; } -z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool) { +z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool, + _z_network_message_svec_t *msg_pool) { z_result_t ret = _Z_RES_OK; *msg = (_z_t_msg_frame_t){0}; @@ -360,19 +357,15 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x04)); } - // Create message vector - size_t var_size = _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(_z_zbuf_len(zbf)); - msg->_messages = _z_network_message_svec_make(var_size); - if (msg->_messages._capacity == 0) { - return _Z_ERR_SYSTEM_OUT_OF_MEMORY; - } - _z_network_message_svec_init(&msg->_messages); + // Init message vector + msg_pool->_len = 0; + _z_network_message_svec_init(msg_pool); size_t msg_idx = 0; while (_z_zbuf_len(zbf) > 0) { // Expand message vector if needed - if (msg_idx >= msg->_messages._capacity) { - _Z_RETURN_IF_ERR(_z_network_message_svec_expand(&msg->_messages)); - _z_network_message_svec_init(&msg->_messages); + if (msg_idx >= msg_pool->_capacity) { + _Z_RETURN_IF_ERR(_z_network_message_svec_expand(msg_pool)); + _z_network_message_svec_init(msg_pool); } // Expand arc pool if needed if (msg_idx >= arc_pool->_capacity) { @@ -380,13 +373,13 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header } // Mark the reading position of the iobfer size_t r_pos = _z_zbuf_get_rpos(zbf); - // Retrieve storage in svecs - _z_network_message_t *nm = _z_network_message_svec_get_mut(&msg->_messages, msg_idx); + // Retrieve storage in resource pool + _z_network_message_t *nm = _z_network_message_svec_get_mut(msg_pool, msg_idx); _z_arc_slice_t *arcs = _z_arc_slice_svec_get_mut(arc_pool, msg_idx); // Decode message ret = _z_network_message_decode(nm, zbf, arcs); if (ret != _Z_RES_OK) { - _z_network_message_svec_clear(&msg->_messages); + _z_network_message_svec_clear(msg_pool); _z_zbuf_set_rpos(zbf, r_pos); // Restore the reading position of the iobfer // FIXME: Check for the return error, since not all of them means a decoding error @@ -398,9 +391,11 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header } return ret; } - msg->_messages._len++; + msg_pool->_len++; msg_idx++; } + // Alias network message svec in frame struct + msg->_messages = _z_network_message_svec_alias(msg_pool); return _Z_RES_OK; } @@ -500,7 +495,8 @@ z_result_t _z_transport_message_encode(_z_wbuf_t *wbf, const _z_transport_messag return ret; } -z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_svec_t *arc_pool) { +z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_svec_t *arc_pool, + _z_network_message_svec_t *msg_pool) { z_result_t ret = _Z_RES_OK; ret |= _z_uint8_decode(&msg->_header, zbf); // Decode the header @@ -508,7 +504,7 @@ z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *z uint8_t mid = _Z_MID(msg->_header); switch (mid) { case _Z_MID_T_FRAME: { - ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header, arc_pool); + ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header, arc_pool, msg_pool); } break; case _Z_MID_T_FRAGMENT: { ret |= _z_fragment_decode(&msg->_body._fragment, zbf, msg->_header); diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index d6545f5c8..49a39b6cb 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -73,11 +73,13 @@ z_result_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl if (ret == _Z_RES_OK) { _z_transport_message_t l_t_msg; _z_arc_slice_svec_t arc_pool = _z_arc_slice_svec_make(1); - ret = _z_transport_message_decode(&l_t_msg, &zbf, &arc_pool); + _z_network_message_svec_t msg_pool = _z_network_message_svec_make(1); + ret = _z_transport_message_decode(&l_t_msg, &zbf, &arc_pool, &msg_pool); if (ret == _Z_RES_OK) { _z_t_msg_copy(t_msg, &l_t_msg); } _z_arc_slice_svec_clear(&arc_pool); + _z_network_message_svec_clear(&msg_pool); } _z_zbuf_clear(&zbf); diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index a5a8bca02..bea0b82f3 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -107,7 +107,8 @@ void *_zp_multicast_read_task(void *ztm_arg) { while (_z_zbuf_len(&zbuf) > 0) { // Decode one session message _z_transport_message_t t_msg; - z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf, &ztm->_common._arc_pool); + z_result_t ret = + _z_transport_message_decode(&t_msg, &zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool); if (ret == _Z_RES_OK) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 12c439906..1ab13f161 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -77,7 +77,7 @@ static z_result_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_t if (ret == _Z_RES_OK) { _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf)); - ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool); + ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool); } _z_transport_rx_mutex_unlock(&ztm->_common); return ret; diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index 5ba3bca4c..3f5f1c1bf 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -81,11 +81,13 @@ z_result_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, ztm->_common._wbuf = _z_wbuf_make(mtu, false); ztm->_common._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); - // Initialize rx pool + // Initialize resource pool ztm->_common._arc_pool = _z_arc_slice_svec_make(_Z_RES_POOL_INIT_SIZE); + ztm->_common._msg_pool = _z_network_message_svec_make(_Z_RES_POOL_INIT_SIZE); // Clean up the buffers if one of them failed to be allocated - if ((ztm->_common._arc_pool._capacity == 0) || (_z_wbuf_capacity(&ztm->_common._wbuf) != mtu) || + if ((ztm->_common._msg_pool._capacity == 0) || (ztm->_common._arc_pool._capacity == 0) || + (_z_wbuf_capacity(&ztm->_common._wbuf) != mtu) || (_z_zbuf_capacity(&ztm->_common._zbuf) != Z_BATCH_MULTICAST_SIZE)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; _Z_ERROR("Not enough memory to allocate transport tx rx buffers!"); @@ -213,6 +215,7 @@ void _z_multicast_transport_clear(_z_transport_t *zt) { _z_wbuf_clear(&ztm->_common._wbuf); _z_zbuf_clear(&ztm->_common._zbuf); _z_arc_slice_svec_release(&ztm->_common._arc_pool); + _z_network_message_svec_release(&ztm->_common._msg_pool); // Clean up peer list _z_transport_peer_entry_list_free(&ztm->_peers); diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 613405ed1..fdec282f8 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -100,7 +100,7 @@ z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_m // Decode message if (ret == _Z_RES_OK) { _Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf)); - ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool); + ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool); } _z_transport_rx_mutex_unlock(&ztm->_common); return ret; diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index acc1f70e6..5851f3146 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -102,7 +102,8 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (_z_zbuf_len(&zbuf) > 0) { // Decode one session message _z_transport_message_t t_msg; - z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf, &ztu->_common._arc_pool); + z_result_t ret = + _z_transport_message_decode(&t_msg, &zbuf, &ztu->_common._arc_pool, &ztu->_common._msg_pool); if (ret == _Z_RES_OK) { ret = _z_unicast_handle_transport_message(ztu, &t_msg); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index a000b1624..628f2815d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -75,7 +75,7 @@ z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_me if (ret == _Z_RES_OK) { _Z_DEBUG(">> \t transport_message_decode"); - ret = _z_transport_message_decode(t_msg, &ztu->_common._zbuf, &ztu->_common._arc_pool); + ret = _z_transport_message_decode(t_msg, &ztu->_common._zbuf, &ztu->_common._arc_pool, &ztu->_common._msg_pool); // Mark the session that we have received data if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 46d376cff..6f90b4397 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -64,11 +64,13 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, ztu->_common._wbuf = _z_wbuf_make(wbuf_size, false); ztu->_common._zbuf = _z_zbuf_make(zbuf_size); - // Initialize rx pool + // Initialize resources pool ztu->_common._arc_pool = _z_arc_slice_svec_make(_Z_RES_POOL_INIT_SIZE); + ztu->_common._msg_pool = _z_network_message_svec_make(_Z_RES_POOL_INIT_SIZE); // Clean up the buffers if one of them failed to be allocated - if ((ztu->_common._arc_pool._capacity == 0) || (_z_wbuf_capacity(&ztu->_common._wbuf) != wbuf_size) || + if ((ztu->_common._msg_pool._capacity == 0) || (ztu->_common._arc_pool._capacity == 0) || + (_z_wbuf_capacity(&ztu->_common._wbuf) != wbuf_size) || (_z_zbuf_capacity(&ztu->_common._zbuf) != zbuf_size)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; _Z_ERROR("Not enough memory to allocate transport tx rx buffers!"); @@ -343,6 +345,7 @@ void _z_unicast_transport_clear(_z_transport_t *zt) { _z_wbuf_clear(&ztu->_common._wbuf); _z_zbuf_clear(&ztu->_common._zbuf); _z_arc_slice_svec_release(&ztu->_common._arc_pool); + _z_network_message_svec_release(&ztu->_common._msg_pool); #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_clear(&ztu->_dbuf_reliable); _z_wbuf_clear(&ztu->_dbuf_best_effort); diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 2dc50c5a6..471087659 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1724,8 +1724,9 @@ void frame_message(void) { assert(_z_frame_encode(&wbf, expected._header, &expected._body._frame) == _Z_RES_OK); _z_t_msg_frame_t decoded = {0}; _z_arc_slice_svec_t arcs = _z_arc_slice_svec_make(1); + _z_network_message_svec_t msg = _z_network_message_svec_make(1); _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); - z_result_t ret = _z_frame_decode(&decoded, &zbf, expected._header, &arcs); + z_result_t ret = _z_frame_decode(&decoded, &zbf, expected._header, &arcs, &msg); assert(_Z_RES_OK == ret); assert_eq_frame(&expected._body._frame, &decoded); _z_t_msg_frame_clear(&decoded); @@ -1818,8 +1819,9 @@ void transport_message(void) { assert(_z_transport_message_encode(&wbf, &expected) == _Z_RES_OK); _z_transport_message_t decoded = {0}; _z_arc_slice_svec_t arcs = _z_arc_slice_svec_make(1); + _z_network_message_svec_t msg = _z_network_message_svec_make(1); _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); - z_result_t ret = _z_transport_message_decode(&decoded, &zbf, &arcs); + z_result_t ret = _z_transport_message_decode(&decoded, &zbf, &arcs, &msg); assert(_Z_RES_OK == ret); assert_eq_transport(&expected, &decoded); _z_t_msg_clear(&decoded);