Skip to content

Commit

Permalink
Add fragmentation feature token (#326)
Browse files Browse the repository at this point in the history
* feat: add fragmentation feature token

* fix: add error log if buffer are not allocated
  • Loading branch information
jean-roland authored Jan 25, 2024
1 parent b52ea11 commit 702dfeb
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 9 deletions.
7 changes: 7 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@
#define Z_FEATURE_RAWETH_TRANSPORT 0
#endif

/**
* Enable message fragmentation.
*/
#ifndef Z_FEATURE_FRAGMENTATION
#define Z_FEATURE_FRAGMENTATION 1
#endif

/*------------------ Compile-time configuration properties ------------------*/
/**
* Default length for Zenoh ID. Maximum size is 16 bytes.
Expand Down
12 changes: 9 additions & 3 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
#include "zenoh-pico/protocol/definitions/transport.h"

typedef struct {
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

_z_id_t _remote_zid;
_z_bytes_t _remote_addr;
Expand Down Expand Up @@ -73,9 +75,13 @@ typedef struct {

_z_link_t _link;

// Buffers
_z_wbuf_t _dbuf_reliable; // Defragmentation buffer
_z_wbuf_t _dbuf_best_effort; // Defragmentation buffer
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffer
_z_wbuf_t _dbuf_reliable;
_z_wbuf_t _dbuf_best_effort;
#endif

// Regular Buffers
_z_wbuf_t _wbuf;
_z_zbuf_t _zbuf;

Expand Down
16 changes: 15 additions & 1 deletion src/transport/multicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
true) {
entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&entry->_dbuf_reliable);
#endif
_Z_INFO("Reliable message dropped because it is out of order");
break;
}
Expand All @@ -155,7 +157,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
t_msg->_body._frame._sn) == true) {
entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&entry->_dbuf_best_effort);
#endif
_Z_INFO("Best effort message dropped because it is out of order");
break;
}
Expand All @@ -175,6 +179,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t

case _Z_MID_T_FRAGMENT: {
_Z_INFO("Received Z_FRAGMENT message");
#if Z_FEATURE_FRAGMENTATION == 1
if (entry == NULL) {
break;
}
Expand Down Expand Up @@ -218,6 +223,9 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
// Reset the defragmentation buffer
_z_wbuf_reset(dbuf);
}
#else
_Z_INFO("Fragment dropped because fragmentation feature is deactivated");
#endif
break;
}

Expand Down Expand Up @@ -267,14 +275,20 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
_z_conduit_sn_list_copy(&entry->_sn_rx_sns, &t_msg->_body._join._next_sn);
_z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns);

#if Z_FEATURE_FRAGMENTATION == 1
#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1
entry->_dbuf_reliable = _z_wbuf_make(0, true);
entry->_dbuf_best_effort = _z_wbuf_make(0, true);
#else
entry->_dbuf_reliable = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
entry->_dbuf_best_effort = _z_wbuf_make(Z_FRAG_MAX_SIZE, false);
#endif

if ((_z_wbuf_capacity(&entry->_dbuf_reliable) != Z_FRAG_MAX_SIZE) ||
(_z_wbuf_capacity(&entry->_dbuf_best_effort) != Z_FRAG_MAX_SIZE)) {
_Z_ERROR("Not enough memory to allocate peer defragmentation buffers!");
}
#endif
#endif
// Update lease time (set as ms during)
entry->_lease = t_msg->_body._join._lease;
entry->_next_lease = entry->_lease;
Expand Down
1 change: 1 addition & 0 deletions src/transport/multicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl,
// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&ztm->_wbuf) != mtu) || (_z_zbuf_capacity(&ztm->_zbuf) != Z_BATCH_MULTICAST_SIZE)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_Z_ERROR("Not enough memory to allocate transport tx rx buffers!");

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&ztm->_mutex_tx);
Expand Down
5 changes: 4 additions & 1 deletion src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
ztm->_transmitted = true; // Mark the session that we have transmitted data
}
} else {
#if Z_FEATURE_FRAGMENTATION == 1
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
Expand Down Expand Up @@ -143,9 +144,11 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
}
}
}

// Clear the buffer as it's no longer required
_z_wbuf_clear(&fbf);
#else
_Z_INFO("Sending the message required fragmentation feature that is deactivated.");
#endif
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/transport/peer_entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
#include "zenoh-pico/transport/utils.h"

void _z_transport_peer_entry_clear(_z_transport_peer_entry_t *src) {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&src->_dbuf_reliable);
_z_wbuf_clear(&src->_dbuf_best_effort);
#endif

src->_remote_zid = _z_id_empty();
_z_bytes_clear(&src->_remote_addr);
}

void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_transport_peer_entry_t *src) {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable);
_z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort);
#endif

dst->_sn_res = src->_sn_res;
_z_conduit_sn_list_copy(&dst->_sn_rx_sns, &src->_sn_rx_sns);
Expand Down
4 changes: 4 additions & 0 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Mark the session that we have transmitted data
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
#if Z_FEATURE_FRAGMENTATION == 1
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
// Encode the message on the expandable wbuf
Expand Down Expand Up @@ -307,6 +308,9 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
}
// Clear the expandable buffer
_z_wbuf_clear(&fbf);
#else
_Z_INFO("Sending the message required fragmentation feature that is deactivated.");
#endif
}
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&ztm->_mutex_tx);
Expand Down
9 changes: 9 additions & 0 deletions src/transport/unicast/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,19 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) {
ztu->_sn_rx_reliable = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&ztu->_dbuf_reliable);
#endif
_Z_INFO("Reliable message dropped because it is out of order");
break;
}
} else {
if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) {
ztu->_sn_rx_best_effort = t_msg->_body._frame._sn;
} else {
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&ztu->_dbuf_best_effort);
#endif
_Z_INFO("Best effort message dropped because it is out of order");
break;
}
Expand All @@ -134,6 +138,8 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
}

case _Z_MID_T_FRAGMENT: {
_Z_INFO("Received Z_FRAGMENT message");
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)
? &ztu->_dbuf_reliable
: &ztu->_dbuf_best_effort; // Select the right defragmentation buffer
Expand Down Expand Up @@ -172,6 +178,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
// Reset the defragmentation buffer
_z_wbuf_reset(dbuf);
}
#else
_Z_INFO("Fragment dropped because fragmentation feature is deactivated");
#endif
break;
}

Expand Down
29 changes: 25 additions & 4 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,33 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
expandable = false;
dbuf_size = Z_FRAG_MAX_SIZE;
#endif

// Initialize tx rx buffers
zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_Z_ERROR("Not enough memory to allocate transport tx rx buffers!");

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&zt->_transport._unicast._mutex_tx);
zp_mutex_free(&zt->_transport._unicast._mutex_rx);
#endif // Z_FEATURE_MULTI_THREAD == 1

_z_wbuf_clear(&zt->_transport._unicast._wbuf);
_z_zbuf_clear(&zt->_transport._unicast._zbuf);
}

#if Z_FEATURE_FRAGMENTATION == 1
// Initialize the defragmentation buffers
zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable);
zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) ||
if (
#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) ||
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) {
Expand All @@ -93,6 +109,10 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != Z_IOSLICE_SIZE)) {
#endif
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
_Z_ERROR("Not enough memory to allocate transport defragmentation buffers!");

_z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable);
_z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&zt->_transport._unicast._mutex_tx);
Expand All @@ -101,9 +121,8 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo

_z_wbuf_clear(&zt->_transport._unicast._wbuf);
_z_zbuf_clear(&zt->_transport._unicast._zbuf);
_z_wbuf_clear(&zt->_transport._unicast._dbuf_reliable);
_z_wbuf_clear(&zt->_transport._unicast._dbuf_best_effort);
}
#endif
}

if (ret == _Z_RES_OK) {
Expand Down Expand Up @@ -284,8 +303,10 @@ void _z_unicast_transport_clear(_z_transport_t *zt) {
// Clean up the buffers
_z_wbuf_clear(&ztu->_wbuf);
_z_zbuf_clear(&ztu->_zbuf);
#if Z_FEATURE_FRAGMENTATION == 1
_z_wbuf_clear(&ztu->_dbuf_reliable);
_z_wbuf_clear(&ztu->_dbuf_best_effort);
#endif

// Clean up PIDs
ztu->_remote_zid = _z_id_empty();
Expand Down
4 changes: 4 additions & 0 deletions src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
ztu->_transmitted = true; // Mark the session that we have transmitted data
}
} else {
#if Z_FEATURE_FRAGMENTATION == 1
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
Expand Down Expand Up @@ -155,6 +156,9 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg

// Clear the buffer as it's no longer required
_z_wbuf_clear(&fbf);
#else
_Z_INFO("Sending the message required fragmentation feature that is deactivated.");
#endif
}
}

Expand Down

0 comments on commit 702dfeb

Please sign in to comment.