Skip to content

Commit

Permalink
feat: n msg svec is now a transport resource pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Nov 8, 2024
1 parent 87fbbb0 commit c78d437
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 45 deletions.
5 changes: 0 additions & 5 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,6 @@
*/
#define Z_GET_TIMEOUT_DEFAULT 10000

/**
* Average size of a frame message (bytes). Used to evaluate initial decoding frame size.
*/
#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32

/**
* Default "nop" instruction
*/
Expand Down
5 changes: 0 additions & 5 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,6 @@
*/
#define Z_GET_TIMEOUT_DEFAULT 10000

/**
* Average size of a frame message (bytes). Used to evaluate initial decoding frame size.
*/
#define Z_CONFIG_FRAME_AVG_MSG_SIZE 32

/**
* Default "nop" instruction
*/
Expand Down
6 changes: 4 additions & 2 deletions include/zenoh-pico/protocol/codec/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ z_result_t _z_scouting_message_encode(_z_wbuf_t *buf, const _z_scouting_message_
z_result_t _z_scouting_message_decode(_z_scouting_message_t *msg, _z_zbuf_t *buf);

z_result_t _z_transport_message_encode(_z_wbuf_t *buf, const _z_transport_message_t *msg);
z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf, _z_arc_slice_svec_t *arc_pool);
z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf, _z_arc_slice_svec_t *arc_pool,
_z_network_message_svec_t *msg_pool);

z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg);
z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header);
Expand All @@ -40,7 +41,8 @@ z_result_t _z_keep_alive_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_k
z_result_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t header);

z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_t *msg);
z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool);
z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool,
_z_network_message_svec_t *msg_pool);

z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fragment_t *msg);
z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef struct {
_z_zint_t _sn_tx_reliable;
_z_zint_t _sn_tx_best_effort;
_z_arc_slice_svec_t _arc_pool;
_z_network_message_svec_t _msg_pool;
volatile _z_zint_t _lease;
volatile bool _transmitted;
#if Z_FEATURE_MULTI_THREAD == 1
Expand Down
38 changes: 17 additions & 21 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"

#define _Z_FRAME_VEC_BASE_SIZE 8 // Abritrary small value
#define _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(len) \
(_Z_FRAME_VEC_BASE_SIZE + (len) / Z_CONFIG_FRAME_AVG_MSG_SIZE) // Approximate number of messages in frame

uint8_t _z_whatami_to_uint8(z_whatami_t whatami) {
return (whatami >> 1) & 0x03; // get set bit index; only first 3 bits can be set
}
Expand Down Expand Up @@ -352,41 +348,38 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_
return ret;
}

z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool) {
z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_svec_t *arc_pool,
_z_network_message_svec_t *msg_pool) {
z_result_t ret = _Z_RES_OK;
*msg = (_z_t_msg_frame_t){0};

_Z_RETURN_IF_ERR(_z_zsize_decode(&msg->_sn, zbf));
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x04));
}
// Create message vector
size_t var_size = _Z_FRAME_VEC_SIZE_FROM_ZBUF_LEN(_z_zbuf_len(zbf));
msg->_messages = _z_network_message_svec_make(var_size);
if (msg->_messages._capacity == 0) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
_z_network_message_svec_init(&msg->_messages);
// Init message vector
msg_pool->_len = 0;
_z_network_message_svec_init(msg_pool);
size_t msg_idx = 0;
while (_z_zbuf_len(zbf) > 0) {
// Expand message vector if needed
if (msg_idx >= msg->_messages._capacity) {
_Z_RETURN_IF_ERR(_z_network_message_svec_expand(&msg->_messages));
_z_network_message_svec_init(&msg->_messages);
if (msg_idx >= msg_pool->_capacity) {
_Z_RETURN_IF_ERR(_z_network_message_svec_expand(msg_pool));
_z_network_message_svec_init(msg_pool);
}
// Expand arc pool if needed
if (msg_idx >= arc_pool->_capacity) {
_Z_RETURN_IF_ERR(_z_arc_slice_svec_expand(arc_pool));
}
// Mark the reading position of the iobfer
size_t r_pos = _z_zbuf_get_rpos(zbf);
// Retrieve storage in svecs
_z_network_message_t *nm = _z_network_message_svec_get_mut(&msg->_messages, msg_idx);
// Retrieve storage in resource pool
_z_network_message_t *nm = _z_network_message_svec_get_mut(msg_pool, msg_idx);
_z_arc_slice_t *arcs = _z_arc_slice_svec_get_mut(arc_pool, msg_idx);
// Decode message
ret = _z_network_message_decode(nm, zbf, arcs);
if (ret != _Z_RES_OK) {
_z_network_message_svec_clear(&msg->_messages);
_z_network_message_svec_clear(msg_pool);
_z_zbuf_set_rpos(zbf, r_pos); // Restore the reading position of the iobfer

// FIXME: Check for the return error, since not all of them means a decoding error
Expand All @@ -398,9 +391,11 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header
}
return ret;
}
msg->_messages._len++;
msg_pool->_len++;
msg_idx++;
}
// Alias network message svec in frame struct
msg->_messages = _z_network_message_svec_alias(msg_pool);
return _Z_RES_OK;
}

Expand Down Expand Up @@ -500,15 +495,16 @@ z_result_t _z_transport_message_encode(_z_wbuf_t *wbf, const _z_transport_messag
return ret;
}

z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_svec_t *arc_pool) {
z_result_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_svec_t *arc_pool,
_z_network_message_svec_t *msg_pool) {
z_result_t ret = _Z_RES_OK;

ret |= _z_uint8_decode(&msg->_header, zbf); // Decode the header
if (ret == _Z_RES_OK) {
uint8_t mid = _Z_MID(msg->_header);
switch (mid) {
case _Z_MID_T_FRAME: {
ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header, arc_pool);
ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header, arc_pool, msg_pool);
} break;
case _Z_MID_T_FRAGMENT: {
ret |= _z_fragment_decode(&msg->_body._fragment, zbf, msg->_header);
Expand Down
4 changes: 3 additions & 1 deletion src/transport/common/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ z_result_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl
if (ret == _Z_RES_OK) {
_z_transport_message_t l_t_msg;
_z_arc_slice_svec_t arc_pool = _z_arc_slice_svec_make(1);
ret = _z_transport_message_decode(&l_t_msg, &zbf, &arc_pool);
_z_network_message_svec_t msg_pool = _z_network_message_svec_make(1);
ret = _z_transport_message_decode(&l_t_msg, &zbf, &arc_pool, &msg_pool);
if (ret == _Z_RES_OK) {
_z_t_msg_copy(t_msg, &l_t_msg);
}
_z_arc_slice_svec_clear(&arc_pool);
_z_network_message_svec_clear(&msg_pool);
}
_z_zbuf_clear(&zbf);

Expand Down
3 changes: 2 additions & 1 deletion src/transport/multicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void *_zp_multicast_read_task(void *ztm_arg) {
while (_z_zbuf_len(&zbuf) > 0) {
// Decode one session message
_z_transport_message_t t_msg;
z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf, &ztm->_common._arc_pool);
z_result_t ret =
_z_transport_message_decode(&t_msg, &zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool);
if (ret == _Z_RES_OK) {
ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr);

Expand Down
2 changes: 1 addition & 1 deletion src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static z_result_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_t

if (ret == _Z_RES_OK) {
_Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf));
ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool);
ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool);
}
_z_transport_rx_mutex_unlock(&ztm->_common);
return ret;
Expand Down
7 changes: 5 additions & 2 deletions src/transport/multicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ z_result_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl,
ztm->_common._wbuf = _z_wbuf_make(mtu, false);
ztm->_common._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);

// Initialize rx pool
// Initialize resource pool
ztm->_common._arc_pool = _z_arc_slice_svec_make(_Z_RES_POOL_INIT_SIZE);
ztm->_common._msg_pool = _z_network_message_svec_make(_Z_RES_POOL_INIT_SIZE);

// Clean up the buffers if one of them failed to be allocated
if ((ztm->_common._arc_pool._capacity == 0) || (_z_wbuf_capacity(&ztm->_common._wbuf) != mtu) ||
if ((ztm->_common._msg_pool._capacity == 0) || (ztm->_common._arc_pool._capacity == 0) ||
(_z_wbuf_capacity(&ztm->_common._wbuf) != mtu) ||
(_z_zbuf_capacity(&ztm->_common._zbuf) != Z_BATCH_MULTICAST_SIZE)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_Z_ERROR("Not enough memory to allocate transport tx rx buffers!");
Expand Down Expand Up @@ -213,6 +215,7 @@ void _z_multicast_transport_clear(_z_transport_t *zt) {
_z_wbuf_clear(&ztm->_common._wbuf);
_z_zbuf_clear(&ztm->_common._zbuf);
_z_arc_slice_svec_release(&ztm->_common._arc_pool);
_z_network_message_svec_release(&ztm->_common._msg_pool);

// Clean up peer list
_z_transport_peer_entry_list_free(&ztm->_peers);
Expand Down
2 changes: 1 addition & 1 deletion src/transport/raweth/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_m
// Decode message
if (ret == _Z_RES_OK) {
_Z_DEBUG(">> \t transport_message_decode: %ju", (uintmax_t)_z_zbuf_len(&ztm->_common._zbuf));
ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool);
ret = _z_transport_message_decode(t_msg, &ztm->_common._zbuf, &ztm->_common._arc_pool, &ztm->_common._msg_pool);
}
_z_transport_rx_mutex_unlock(&ztm->_common);
return ret;
Expand Down
3 changes: 2 additions & 1 deletion src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ void *_zp_unicast_read_task(void *ztu_arg) {
while (_z_zbuf_len(&zbuf) > 0) {
// Decode one session message
_z_transport_message_t t_msg;
z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf, &ztu->_common._arc_pool);
z_result_t ret =
_z_transport_message_decode(&t_msg, &zbuf, &ztu->_common._arc_pool, &ztu->_common._msg_pool);

if (ret == _Z_RES_OK) {
ret = _z_unicast_handle_transport_message(ztu, &t_msg);
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_me

if (ret == _Z_RES_OK) {
_Z_DEBUG(">> \t transport_message_decode");
ret = _z_transport_message_decode(t_msg, &ztu->_common._zbuf, &ztu->_common._arc_pool);
ret = _z_transport_message_decode(t_msg, &ztu->_common._zbuf, &ztu->_common._arc_pool, &ztu->_common._msg_pool);

// Mark the session that we have received data
if (ret == _Z_RES_OK) {
Expand Down
7 changes: 5 additions & 2 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl,
ztu->_common._wbuf = _z_wbuf_make(wbuf_size, false);
ztu->_common._zbuf = _z_zbuf_make(zbuf_size);

// Initialize rx pool
// Initialize resources pool
ztu->_common._arc_pool = _z_arc_slice_svec_make(_Z_RES_POOL_INIT_SIZE);
ztu->_common._msg_pool = _z_network_message_svec_make(_Z_RES_POOL_INIT_SIZE);

// Clean up the buffers if one of them failed to be allocated
if ((ztu->_common._arc_pool._capacity == 0) || (_z_wbuf_capacity(&ztu->_common._wbuf) != wbuf_size) ||
if ((ztu->_common._msg_pool._capacity == 0) || (ztu->_common._arc_pool._capacity == 0) ||
(_z_wbuf_capacity(&ztu->_common._wbuf) != wbuf_size) ||
(_z_zbuf_capacity(&ztu->_common._zbuf) != zbuf_size)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_Z_ERROR("Not enough memory to allocate transport tx rx buffers!");
Expand Down Expand Up @@ -343,6 +345,7 @@ void _z_unicast_transport_clear(_z_transport_t *zt) {
_z_wbuf_clear(&ztu->_common._wbuf);
_z_zbuf_clear(&ztu->_common._zbuf);
_z_arc_slice_svec_release(&ztu->_common._arc_pool);
_z_network_message_svec_release(&ztu->_common._msg_pool);
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&ztu->_dbuf_reliable);
_z_wbuf_clear(&ztu->_dbuf_best_effort);
Expand Down
6 changes: 4 additions & 2 deletions tests/z_msgcodec_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1724,8 +1724,9 @@ void frame_message(void) {
assert(_z_frame_encode(&wbf, expected._header, &expected._body._frame) == _Z_RES_OK);
_z_t_msg_frame_t decoded = {0};
_z_arc_slice_svec_t arcs = _z_arc_slice_svec_make(1);
_z_network_message_svec_t msg = _z_network_message_svec_make(1);
_z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf);
z_result_t ret = _z_frame_decode(&decoded, &zbf, expected._header, &arcs);
z_result_t ret = _z_frame_decode(&decoded, &zbf, expected._header, &arcs, &msg);
assert(_Z_RES_OK == ret);
assert_eq_frame(&expected._body._frame, &decoded);
_z_t_msg_frame_clear(&decoded);
Expand Down Expand Up @@ -1818,8 +1819,9 @@ void transport_message(void) {
assert(_z_transport_message_encode(&wbf, &expected) == _Z_RES_OK);
_z_transport_message_t decoded = {0};
_z_arc_slice_svec_t arcs = _z_arc_slice_svec_make(1);
_z_network_message_svec_t msg = _z_network_message_svec_make(1);
_z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf);
z_result_t ret = _z_transport_message_decode(&decoded, &zbf, &arcs);
z_result_t ret = _z_transport_message_decode(&decoded, &zbf, &arcs, &msg);
assert(_Z_RES_OK == ret);
assert_eq_transport(&expected, &decoded);
_z_t_msg_clear(&decoded);
Expand Down

0 comments on commit c78d437

Please sign in to comment.