From b5483d79cb857d2d5c45fd6585d0d1c493991f3a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 16:37:27 +0100 Subject: [PATCH] Refactor link capabilities (#280) --- include/zenoh-pico/link/link.h | 48 ++++++++++----- include/zenoh-pico/transport/common/tx.h | 4 +- src/link/link.c | 12 +++- src/link/multicast/bt.c | 5 +- src/link/multicast/udp.c | 5 +- src/link/unicast/serial.c | 5 +- src/link/unicast/tcp.c | 5 +- src/link/unicast/udp.c | 5 +- src/link/unicast/ws.c | 5 +- src/transport/common/rx.c | 47 ++++++++------- src/transport/common/tx.c | 75 ++++++++++++++++-------- src/transport/manager.c | 74 ++++++++++++++--------- src/transport/multicast/read.c | 49 +++++++++------- src/transport/multicast/rx.c | 54 +++++++++-------- src/transport/multicast/tx.c | 12 ++-- src/transport/unicast/read.c | 46 ++++++++------- src/transport/unicast/rx.c | 55 +++++++++-------- src/transport/unicast/transport.c | 12 +++- src/transport/unicast/tx.c | 12 ++-- 19 files changed, 329 insertions(+), 201 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 9a49f2862..32c9ebb42 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -43,24 +43,44 @@ #include "zenoh-pico/utils/result.h" /** - * Link capabilities values, defined as a bitmask. + * Link transport capability enum. * * Enumerators: - * Z_LINK_CAPABILITY_NONE: Bitmask to define that link has no capabilities. - * Z_LINK_CAPABILITY_RELIABLE: Bitmask to define and check if link is reliable. - * Z_LINK_CAPABILITY_STREAMED: Bitmask to define and check if link is streamed. - * Z_LINK_CAPABILITY_MULTICAST: Bitmask to define and check if link is multicast. + * Z_LINK_CAP_TRANSPORT_UNICAST: Link has unicast capabilities. + * Z_LINK_CAP_TRANSPORT_MULTICAST: Link has multicast capabilities. */ typedef enum { - Z_LINK_CAPABILITY_NONE = 0x00, // 0 - Z_LINK_CAPABILITY_RELIABLE = 0x01, // 1 << 0 - Z_LINK_CAPABILITY_STREAMED = 0x02, // 1 << 1 - Z_LINK_CAPABILITY_MULTICAST = 0x04 // 1 << 2 -} _z_link_capabilities_t; + Z_LINK_CAP_TRANSPORT_UNICAST = 0, + Z_LINK_CAP_TRANSPORT_MULTICAST = 1, +} _z_link_cap_transport_t; -#define _Z_LINK_IS_RELIABLE(X) ((X & Z_LINK_CAPABILITY_RELIABLE) == Z_LINK_CAPABILITY_RELIABLE) -#define _Z_LINK_IS_STREAMED(X) ((X & Z_LINK_CAPABILITY_STREAMED) == Z_LINK_CAPABILITY_STREAMED) -#define _Z_LINK_IS_MULTICAST(X) ((X & Z_LINK_CAPABILITY_MULTICAST) == Z_LINK_CAPABILITY_MULTICAST) +/** + * Link flow capability enum. + * + * Enumerators: + * Z_LINK_CAP_FLOW_STREAM: Link use datagrams. + * Z_LINK_CAP_FLOW_DATAGRAM: Link use byte stream. + */ +typedef enum { + Z_LINK_CAP_FLOW_DATAGRAM = 0, + Z_LINK_CAP_FLOW_STREAM = 1, +} _z_link_cap_flow_t; + +/** + * Link capabilities, stored as a register-like object. + * + * Fields: + * transport: 2 bits, see _z_link_cap_transport_t enum. + * flow: 1 bit, see _z_link_cap_flow_t enum. + * reliable: 1 bit, 1 if the link is reliable (network definition) + * reserved: 4 bits, reserved for futur use + */ +typedef struct _z_link_capabilities_t { + uint8_t _transport : 2; + uint8_t _flow : 1; + uint8_t _is_reliable : 1; + uint8_t _reserved : 4; +} _z_link_capabilities_t; struct _z_link_t; // Forward declaration to be used in _z_f_link_* @@ -104,7 +124,7 @@ typedef struct _z_link_t { _z_f_link_free _free_f; uint16_t _mtu; - uint8_t _capabilities; + _z_link_capabilities_t _cap; } _z_link_t; void _z_link_clear(_z_link_t *zl); diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 33ba5593f..d7870c54a 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/link/link.c b/src/link/link.c index 1a5f89925..2888da55d 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -148,7 +148,17 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; - _Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities); + _Bool link_is_streamed = false; + + switch (link->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + link_is_streamed = true; + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + link_is_streamed = false; + break; + } for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); size_t n = bs.len; diff --git a/src/link/multicast/bt.c b/src/link/multicast/bt.c index 332e5f9fa..9c54b966c 100644 --- a/src/link/multicast/bt.c +++ b/src/link/multicast/bt.c @@ -115,7 +115,10 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; } int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_STREAMED | Z_LINK_CAPABILITY_MULTICAST; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_bt(); zl->_endpoint = endpoint; diff --git a/src/link/multicast/udp.c b/src/link/multicast/udp.c index bcb2a4dc8..dd0cd108b 100644 --- a/src/link/multicast/udp.c +++ b/src/link/multicast/udp.c @@ -171,7 +171,10 @@ uint16_t _z_get_link_mtu_udp_multicast(void) { int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_MULTICAST; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_multicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/serial.c b/src/link/unicast/serial.c index eebbff12a..db034f07c 100644 --- a/src/link/unicast/serial.c +++ b/src/link/unicast/serial.c @@ -116,7 +116,10 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; } int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_serial(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/tcp.c b/src/link/unicast/tcp.c index b7b78fa43..a6cf3de7a 100644 --- a/src/link/unicast/tcp.c +++ b/src/link/unicast/tcp.c @@ -156,7 +156,10 @@ uint16_t _z_get_link_mtu_tcp(void) { int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE | Z_LINK_CAPABILITY_STREAMED; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_tcp(); zl->_endpoint = *endpoint; diff --git a/src/link/unicast/udp.c b/src/link/unicast/udp.c index c87441709..2317c1fd4 100644 --- a/src/link/unicast/udp.c +++ b/src/link/unicast/udp.c @@ -162,7 +162,10 @@ uint16_t _z_get_link_mtu_udp_unicast(void) { int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_unicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/ws.c b/src/link/unicast/ws.c index 426e5fa96..f59fde872 100644 --- a/src/link/unicast/ws.c +++ b/src/link/unicast/ws.c @@ -157,7 +157,10 @@ uint16_t _z_get_link_mtu_ws(void) { int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_ws(); zl->_endpoint = *endpoint; diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 67692eb3c..8f25fff0f 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -28,32 +28,37 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - // Read the message length - if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { - size_t len = 0; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); - } + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + // Read the message length + if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { + size_t len = 0; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); + } - size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); - if (writable >= len) { - // Read enough bytes to decode the message - if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; + size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); + if (writable >= len) { + // Read enough bytes to decode the message + if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + } else { + ret = _Z_ERR_TRANSPORT_NO_SPACE; } } else { - ret = _Z_ERR_TRANSPORT_NO_SPACE; + ret = _Z_ERR_TRANSPORT_RX_FAILED; } - } else { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } - } else { - if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + ret = _Z_ERR_GENERIC; + break; } - if (ret == _Z_RES_OK) { _z_transport_message_t l_t_msg; ret = _z_transport_message_decode(&l_t_msg, &zbf); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index f0d68eb6c..6f38ac365 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,14 +27,21 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { _z_wbuf_reset(buf); - if (is_streamed == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, 0, i); - } - _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + switch (link_flow_capability) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, 0, i); + } + _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + break; } } @@ -43,12 +50,20 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { - if (is_streamed == true) { - size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { + switch (link_flow_capability) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: { + size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; } + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + break; } } @@ -74,24 +89,38 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m // Create and prepare the buffer to serialize the message on uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, 0, i); - } - _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); - } + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, 0, i); + } + _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - // Write the message length in the reserved space if needed - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: { + // Write the message length in the reserved space if needed + size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; } + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; } - // Send the wbuf on the socket ret = _z_link_send_wbuf(zl, &wbf); } diff --git a/src/transport/manager.c b/src/transport/manager.c index 06662e513..7d8f79811 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -31,22 +31,32 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local return ret; } // Open transport - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._cap._transport) { + // Unicast transport + case Z_LINK_CAP_TRANSPORT_UNICAST: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + // Multicast transport + case Z_LINK_CAP_TRANSPORT_MULTICAST: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } @@ -61,22 +71,30 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z if (ret != _Z_RES_OK) { return ret; } - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._cap._transport) { + case Z_LINK_CAP_TRANSPORT_UNICAST: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + case Z_LINK_CAP_TRANSPORT_MULTICAST: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index a0c483a6b..12073d0ce 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -72,36 +72,41 @@ void *_zp_multicast_read_task(void *ztm_arg) { while (ztm->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_bytes_clear(&addr); - _z_zbuf_compact(&ztm->_zbuf); - continue; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_bytes_clear(&addr); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } + } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 6d73c10f8..5d43eb60c 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -59,35 +59,39 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index c2f665a19..230c8c85a 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); if (ret == _Z_RES_OK) { @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 4d0728eee..02b95ddef 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -68,35 +68,39 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (ztu->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + continue; + } + } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 965ec58c1..c1e3faa2d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -37,35 +37,40 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._cap._flow) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 0aa951b5b..15f660bd8 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -50,8 +50,18 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo // Initialize the read and write buffers if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; - _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); size_t dbuf_size = 0; + _Bool expandable = false; + + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + expandable = true; + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + expandable = false; + break; + } #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 expandable = false; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 6e7ea4f85..21694e1e9 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); if (ret == _Z_RES_OK) { @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); if (ztu->_wbuf._ioss._len == 1) { ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) {