diff --git a/include/zenoh-pico/transport/common/rx.h b/include/zenoh-pico/transport/common/rx.h index 950f9dcb1..2ead25f7f 100644 --- a/include/zenoh-pico/transport/common/rx.h +++ b/include/zenoh-pico/transport/common/rx.h @@ -19,6 +19,7 @@ #include "zenoh-pico/transport/transport.h" /*------------------ Transmission and Reception helpers ------------------*/ +uint16_t _z_read_stream_size(_z_zbuf_t *zbuf); int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl); #endif /* ZENOH_PICO_TRANSPORT_RX_H */ diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 8f25fff0f..66ff11464 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -12,15 +12,26 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/common/rx.h" #include #include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/transport/multicast/rx.h" #include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" /*------------------ Reception helper ------------------*/ +uint16_t _z_read_stream_size(_z_zbuf_t *zbuf) { + uint8_t stream_size[_Z_MSG_LEN_ENC_SIZE]; + // Read the bytes from stream + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + stream_size[i] = _z_zbuf_read(zbuf); + } + return _z_host_le_load16(stream_size); +} + int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { int8_t ret = _Z_RES_OK; diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 3508493f4..5d1a818cb 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -21,6 +21,7 @@ #include "zenoh-pico/transport/multicast/tx.h" #include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/tx.h" +#include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" /*------------------ Transmission helper ------------------*/ @@ -57,9 +58,9 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { // Stream capable links case Z_LINK_CAP_FLOW_STREAM: { size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); - } + // Encode the u16 size as little endian + _z_wbuf_put(buf, _z_host_u16_lsb(len), 0); + _z_wbuf_put(buf, _z_host_u16_msb(len), 1); break; } // Datagram capable links diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 1082a579a..b4de50730 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -19,6 +19,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/transport/common/rx.h" #include "zenoh-pico/transport/multicast/rx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" @@ -71,11 +72,9 @@ void *_zp_multicast_read_task(void *ztm_arg) { continue; } } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - + // Get stream size + to_read = _z_read_stream_size(&ztm->_zbuf); + // Read data if (_z_zbuf_len(&ztm->_zbuf) < to_read) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index ffcbf0701..6e9c5a098 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -24,6 +24,7 @@ #include "zenoh-pico/protocol/definitions/transport.h" #include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -50,9 +51,9 @@ static int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_trans break; } } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } + // Get stream size + to_read = _z_read_stream_size(&ztm->_zbuf); + // Read data if (_z_zbuf_len(&ztm->_zbuf) < to_read) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 80e35180c..eb19fca2c 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -18,6 +18,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/transport/common/rx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/utils/logging.h" @@ -66,11 +67,9 @@ void *_zp_unicast_read_task(void *ztu_arg) { continue; } } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } - + // Get stream size + to_read = _z_read_stream_size(&ztu->_zbuf); + // Read data if (_z_zbuf_len(&ztu->_zbuf) < to_read) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 33132b3ca..d8049f79d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -22,6 +22,7 @@ #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" @@ -48,9 +49,9 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag continue; } } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } + // Get stream size + to_read = _z_read_stream_size(&ztu->_zbuf); + // Read data if (_z_zbuf_len(&ztu->_zbuf) < to_read) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) {