diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 046cd815b..7e4414681 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -91,6 +91,16 @@ extern "C" { // Z Extensions if Z==1 then Zenoh extensions are present #define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5 +/*=============================*/ +/* Patch */ +/*=============================*/ +/// Used to negotiate the patch version of the protocol +/// if not present (or 0), then protocol as released with 1.0.0 +/// if >= 1, then fragmentation start/stop marker +#define _Z_NO_PATCH 0x00 +#define _Z_CURRENT_PATCH 0x01 +#define _Z_PATCH_HAS_FRAGMENT_MARKERS(patch) (patch >= 1) + /*=============================*/ /* Transport Messages */ /*=============================*/ @@ -235,6 +245,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_join_t; void _z_t_msg_join_clear(_z_t_msg_join_t *msg); @@ -315,6 +328,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_init_t; void _z_t_msg_init_clear(_z_t_msg_init_t *msg); @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; + bool first; + bool drop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); -#define _Z_FRAGMENT_HEADER_SIZE 12 - /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; @@ -514,9 +530,10 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_svec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool first, bool drop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last); + bool is_last, bool first, bool drop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index 989d7491d..2a8daf574 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -43,7 +43,11 @@ extern "C" { /*=============================*/ /* Extension IDs */ /*=============================*/ -// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID) +#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) +#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_FRAGMENT_FIRST (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_DROP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ @@ -58,6 +62,7 @@ extern "C" { #define _Z_MSG_EXT_FLAG_M 0x10 #define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0) #define _Z_MSG_EXT_FLAG_Z 0x80 +#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0) typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior diff --git a/include/zenoh-pico/system/platform/zephyr.h b/include/zenoh-pico/system/platform/zephyr.h index 1f25e839c..addf67f31 100644 --- a/include/zenoh-pico/system/platform/zephyr.h +++ b/include/zenoh-pico/system/platform/zephyr.h @@ -19,10 +19,10 @@ #if KERNEL_VERSION_MAJOR == 2 #include -#elif KERNEL_VERSION_MAJOR == 3 +#elif KERNEL_VERSION_MAJOR == 3 || KERNEL_VERSION_MAJOR == 4 #include #else -#pragma "This Zephyr version might not be supported." +#pragma GCC warning "This Zephyr version might not be supported." #include #endif diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 6809f20f3..4e452b046 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool first); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index d4fcf7ea7..c2f87b6ee 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -57,6 +57,8 @@ typedef struct { uint8_t _state_best_effort; _z_wbuf_t _dbuf_reliable; _z_wbuf_t _dbuf_best_effort; + // Patch + uint8_t _patch; #endif } _z_transport_peer_entry_t; @@ -122,6 +124,8 @@ typedef struct { uint8_t _state_best_effort; _z_wbuf_t _dbuf_reliable; _z_wbuf_t _dbuf_best_effort; + // Patch + uint8_t _patch; #endif } _z_transport_unicast_t; @@ -161,6 +165,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; bool _is_qos; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_transport_unicast_establish_param_t; typedef struct { diff --git a/include/zenoh-pico/transport/utils.h b/include/zenoh-pico/transport/utils.h index 62fa319b4..25862370b 100644 --- a/include/zenoh-pico/transport/utils.h +++ b/include/zenoh-pico/transport/utils.h @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits); _z_zint_t _z_sn_half(_z_zint_t sn); _z_zint_t _z_sn_modulo_mask(uint8_t bits); bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn); _z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 5e07d5fea..894fb41d8 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort)); +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg->_patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif if (msg->_next_sn._is_qos) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + @@ -82,6 +87,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } +#if Z_FEATURE_FRAGMENTATION == 1 + if (has_patch) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif return ret; } @@ -89,14 +105,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == - (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) { msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -148,6 +167,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } + msg->_patch = _Z_NO_PATCH; if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } @@ -181,6 +201,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_NO_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif + + return ret; +} + +z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx; + if (false) { +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } return ret; } @@ -224,8 +270,9 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_null(); } - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01); + msg->_patch = _Z_NO_PATCH; + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } return ret; @@ -450,16 +497,42 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT"); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn)) - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + if (msg->first) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop))); + } else { + _Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + if (msg->drop) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP)); + } else { + _Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } } - if (ret == _Z_RES_OK && _z_slice_check(&msg->_payload)) { + if (_z_slice_check(&msg->_payload)) { _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_payload.start, 0, msg->_payload.len)); } return ret; } +z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx; + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_FIRST) { + msg->first = true; + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_DROP) { + msg->drop = true; + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } + return ret; +} + z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) { z_result_t ret = _Z_RES_OK; *msg = (_z_t_msg_fragment_t){0}; @@ -467,8 +540,10 @@ z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t _Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT"); ret |= _z_zsize_decode(&msg->_sn, zbf); + msg->first = false; + msg->drop = false; if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05); + ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg); } msg->_payload = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf)); zbf->_ios._r_pos = zbf->_ios._w_pos; diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index db0ff814c..cd74a8ded 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -107,6 +107,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._join._patch = _Z_CURRENT_PATCH; +#endif if ((lease % 1000) == 0) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); @@ -117,7 +120,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } - if (next_sn._is_qos) { +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (next_sn._is_qos || has_patch) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } @@ -136,6 +144,9 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_slice_reset(&msg._body._init._cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -143,6 +154,12 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg._body._init._patch != _Z_NO_PATCH) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } +#endif + return msg; } @@ -158,6 +175,9 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -165,6 +185,11 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg._body._init._patch != _Z_NO_PATCH) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } +#endif return msg; } @@ -252,11 +277,13 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) { +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool first, bool drop) { return _z_t_msg_make_fragment(sn, _z_slice_null(), reliability, is_last); + return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, first, drop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last) { + bool is_last, bool first, bool drop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -268,12 +295,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; + if (first || drop) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + msg._body._fragment.first = first; + msg._body._fragment.drop = drop; return msg; } void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { + clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); + clone->first = msg->first; + clone->drop = msg->drop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { @@ -284,6 +319,9 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; +#if Z_FEATURE_FRAGMENTATION == 1 + clone->_patch = msg->_patch; +#endif memcpy(clone->_zid.id, msg->_zid.id, 16); } @@ -295,6 +333,9 @@ void _z_t_msg_copy_init(_z_t_msg_init_t *clone, _z_t_msg_init_t *msg) { clone->_batch_size = msg->_batch_size; memcpy(clone->_zid.id, msg->_zid.id, 16); _z_slice_copy(&clone->_cookie, &msg->_cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + clone->_patch = msg->_patch; +#endif } void _z_t_msg_copy_open(_z_t_msg_open_t *clone, _z_t_msg_open_t *msg) { diff --git a/src/system/zephyr/network.c b/src/system/zephyr/network.c index d60555aa2..b3a0505a4 100644 --- a/src/system/zephyr/network.c +++ b/src/system/zephyr/network.c @@ -65,8 +65,8 @@ z_result_t _z_open_tcp(_z_sys_net_socket_t *sock, const _z_sys_net_endpoint_t re tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } #if Z_FEATURE_TCP_NODELAY == 1 @@ -188,8 +188,8 @@ z_result_t _z_open_udp_unicast(_z_sys_net_socket_t *sock, const _z_sys_net_endpo tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if (ret != _Z_RES_OK) { @@ -297,8 +297,8 @@ z_result_t _z_open_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_end tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if ((ret == _Z_RES_OK) && (bind(sock->_fd, lsockaddr, addrlen) < 0)) { @@ -395,8 +395,8 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e tv.tv_sec = tout / (uint32_t)1000; tv.tv_usec = (tout % (uint32_t)1000) * (uint32_t)1000; if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)) < 0)) { - // FIXME: setting the setsockopt is consistently failing. Commenting it until further inspection. - // ret = _Z_ERR_GENERIC; + // FIXME: setting the setsockopt is consistently failing. Commenting it + // until further inspection. ret = _Z_ERR_GENERIC; } if ((ret == _Z_RES_OK) && (bind(sock->_fd, lsockaddr, addrlen) < 0)) { @@ -416,7 +416,7 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e if (!mcast) { ret = _Z_ERR_GENERIC; } -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv4_maddr_join(ifa, mcast); #else net_if_ipv4_maddr_join(mcast); @@ -427,7 +427,7 @@ z_result_t _z_listen_udp_multicast(_z_sys_net_socket_t *sock, const _z_sys_net_e if (!mcast) { ret = _Z_ERR_GENERIC; } -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv6_maddr_join(ifa, mcast); #else net_if_ipv6_maddr_join(mcast); @@ -460,7 +460,7 @@ void _z_close_udp_multicast(_z_sys_net_socket_t *sockrecv, _z_sys_net_socket_t * if (rep._iptcp->ai_family == AF_INET) { mcast = net_if_ipv4_maddr_add(ifa, &((struct sockaddr_in *)rep._iptcp->ai_addr)->sin_addr); if (mcast != NULL) { -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv4_maddr_leave(ifa, mcast); #else net_if_ipv4_maddr_leave(mcast); @@ -472,7 +472,7 @@ void _z_close_udp_multicast(_z_sys_net_socket_t *sockrecv, _z_sys_net_socket_t * } else if (rep._iptcp->ai_family == AF_INET6) { mcast = net_if_ipv6_maddr_add(ifa, &((struct sockaddr_in6 *)rep._iptcp->ai_addr)->sin6_addr); if (mcast != NULL) { -#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 +#if KERNEL_VERSION_MAJOR == 3 && KERNEL_VERSION_MINOR > 3 || KERNEL_VERSION_MAJOR >= 4 net_if_ipv6_maddr_leave(ifa, mcast); #else net_if_ipv6_maddr_leave(mcast); @@ -508,7 +508,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ struct sockaddr_in *a = ((struct sockaddr_in *)lep._iptcp->ai_addr); struct sockaddr_in *b = ((struct sockaddr_in *)&raddr); if (!((a->sin_port == b->sin_port) && (a->sin_addr.s_addr == b->sin_addr.s_addr))) { - // If addr is not NULL, it means that the raddr was requested by the upper-layers + // If addr is not NULL, it means that the raddr was requested by the + // upper-layers if (addr != NULL) { addr->len = sizeof(uint32_t) + sizeof(uint16_t); (void)memcpy((uint8_t *)addr->start, &b->sin_addr.s_addr, sizeof(uint32_t)); @@ -521,7 +522,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ struct sockaddr_in6 *b = ((struct sockaddr_in6 *)&raddr); if (!((a->sin6_port == b->sin6_port) && (memcmp(a->sin6_addr.s6_addr, b->sin6_addr.s6_addr, sizeof(uint32_t) * 4UL) == 0))) { - // If addr is not NULL, it means that the raddr was requested by the upper-layers + // If addr is not NULL, it means that the raddr was requested by the + // upper-layers if (addr != NULL) { addr->len = (sizeof(uint32_t) * 4UL) + sizeof(uint16_t); (void)memcpy((uint8_t *)addr->start, &b->sin6_addr.s6_addr, sizeof(uint32_t) * 4UL); @@ -530,7 +532,8 @@ size_t _z_read_udp_multicast(const _z_sys_net_socket_t sock, uint8_t *ptr, size_ break; } } else { - continue; // FIXME: support error report on invalid packet to the upper layer + continue; // FIXME: support error report on invalid packet to the upper + // layer } } while (1); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 4b6d9bf32..aeebe1daa 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -364,7 +364,8 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool first) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -373,7 +374,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, first, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index dc64460e4..cb047bf54 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -186,6 +186,61 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, // Note that we receive data from the peer entry->_received = true; + bool consecutive; + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn) == true) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; + dbuf = &entry->_dbuf_reliable; + } else { + _z_wbuf_clear(&entry->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; + dbuf = &entry->_dbuf_best_effort; + } else { + _z_wbuf_clear(&entry->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) { + if (t_msg->_body._fragment.first) { + _z_wbuf_reset(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the first marker"); + break; + } + if (t_msg->_body._fragment.drop) { + _z_wbuf_reset(dbuf); + break; + } + } + + bool drop = false; + if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { + // Filling the wbuf capacity as a way to signaling the last fragment to reset the dbuf + // Otherwise, last (smaller) fragments can be understood as a complete message + _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf)); + drop = true; _z_wbuf_t *dbuf; uint8_t *dbuf_state; z_reliability_t tmsg_reliability; @@ -309,6 +364,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 + entry->_patch = + t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; entry->_state_reliable = _Z_DBUF_STATE_NULL; entry->_state_best_effort = _Z_DBUF_STATE_NULL; entry->_dbuf_reliable = _z_wbuf_null(); diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index b5feffcdb..4dc1ceeaa 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -32,6 +32,8 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans dst->_state_best_effort = src->_state_best_effort; _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); + + dst->_patch = src->_patch; #endif dst->_sn_res = src->_sn_res; diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e44d5bdab..f78399e3a 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -274,7 +274,6 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Get the fragment sequence number sn = __unsafe_z_raweth_get_sn(ztm, reliability); } - is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_common._wbuf); // Prepare buff @@ -282,6 +281,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Serialize one fragment _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_common._wbuf, &fbf, reliability, sn), _z_transport_tx_mutex_unlock(&ztm->_common)); + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first), // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), _z_transport_tx_mutex_unlock(&ztm->_common)); @@ -290,6 +290,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _z_transport_tx_mutex_unlock(&ztm->_common)); // Mark the session that we have transmitted data ztm->_common._transmitted = true; + is_first = false; } // Clear the expandable buffer _z_wbuf_clear(&fbf); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index d90b59f38..515ee7091 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -140,6 +140,56 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_DEBUG("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 + bool consecutive; + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; + dbuf = &ztu->_dbuf_reliable; + } else { + _z_wbuf_clear(&ztu->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; + dbuf = &ztu->_dbuf_best_effort; + } else { + _z_wbuf_clear(&ztu->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) { + if (t_msg->_body._fragment.first) { + _z_wbuf_reset(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.drop) { + _z_wbuf_reset(dbuf); + break; + } + } + bool drop = false; + if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { + // Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf + // Otherwise, last (smaller) fragments can be understood as a complete message + _z_wbuf_write_bytes(dbuf, t_msg->_body._fragment._payload.start, 0, _z_wbuf_space_left(dbuf)); + drop = true; _z_wbuf_t *dbuf; uint8_t *dbuf_state; z_reliability_t tmsg_reliability; diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 64a1d6540..b6c62c46f 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -36,6 +36,10 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, zt->_type = _Z_TRANSPORT_UNICAST_TYPE; _z_transport_unicast_t *ztu = &zt->_transport._unicast; +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + zt->_transport._unicast._patch = param->_patch; +#endif // Initialize batching data #if Z_FEATURE_BATCHING == 1 @@ -216,6 +220,13 @@ static z_result_t _z_unicast_handshake_client(_z_transport_unicast_establish_par _z_t_msg_clear(&oam); return _Z_RES_OK; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch > ism._body._init._patch) { + // TODO: Use a better error code? + ret = _Z_ERR_GENERIC; + } + param->_patch = iam._body._init._patch; +#endif // TODO: Activate if we add peer unicast support #if 0 diff --git a/src/transport/utils.c b/src/transport/utils.c index 510a0f19c..dc0697043 100644 --- a/src/transport/utils.c +++ b/src/transport/utils.c @@ -82,6 +82,11 @@ bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, cons return ((distance <= _z_sn_half(sn_resolution)) && (distance != 0)); } +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right) { + _z_zint_t distance = (sn_right - sn_left) & sn_resolution; + return distance == 1; +} + _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn) { _z_zint_t ret = sn + 1; return (ret &= sn_resolution); diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 15bb696c9..744165534 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1837,7 +1837,7 @@ void frame_message(void) { } _z_transport_message_t gen_fragment(void) { - return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool()); + return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool(), gen_bool(), gen_bool()); } void assert_eq_fragment(const _z_t_msg_fragment_t *left, const _z_t_msg_fragment_t *right) { assert(left->_sn == right->_sn);