diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index d5ef7d295..346c6eda9 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -530,7 +530,8 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool start, bool stop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, bool is_last, bool start, bool stop); diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index a77fd1a28..9f344b1e6 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool start); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 7cd5e0d0a..575e2f356 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -200,17 +200,17 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } - #if Z_FEATURE_FRAGMENTATION == 1 - if (msg->_patch != _Z_CURRENT_PATCH) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); - _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); - } else { - _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); - ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; - } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_CURRENT_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } - #endif + } +#endif return ret; } diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 08bb6e9d3..fc458a755 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -279,7 +279,8 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) { +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool start, bool stop) { return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 4f2d66ce0..c333fdb12 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -135,7 +135,8 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool start) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 78ec6fe31..7b3f0ace7 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -193,8 +193,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == + true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_reliable; if (consecutive == false) { @@ -208,8 +210,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } } else { - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_best_effort; if (consecutive == false) { @@ -325,7 +329,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 - entry->_patch = t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; + entry->_patch = + t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 8e6540cdc..b5b0e8bc3 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -147,7 +147,8 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + bool consecutive = + _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_reliable; if (consecutive == false) { @@ -162,7 +163,8 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } } else { if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + bool consecutive = + _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_best_effort; if (consecutive == false) {