diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cb9276179..f633f9d71 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,7 +25,9 @@ jobs: name: Preparation runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - name: Clone this repository + uses: actions/checkout@v4 + - name: Environment setup id: env shell: bash @@ -34,18 +36,18 @@ jobs: echo "GITHUB_SHA=${GITHUB_SHA:0:8}" GIT_BRANCH=`[[ $GITHUB_REF =~ ^refs/heads/.* ]] && echo ${GITHUB_REF/refs\/heads\//} || true` echo "GIT_BRANCH=${GIT_BRANCH}" - echo ::set-output name=GIT_BRANCH::"${GIT_BRANCH}" + echo "GIT_BRANCH=${GIT_BRANCH}" >> $GITHUB_OUTPUT GIT_TAG=`[[ $GITHUB_REF =~ ^refs/tags/.* ]] && echo ${GITHUB_REF/refs\/tags\//} || true` echo "GIT_TAG=${GIT_TAG}" - echo ::set-output name=GIT_TAG::"${GIT_TAG}" + echo "GIT_TAG=${GIT_TAG}" >> $GITHUB_OUTPUT ZENOH_VERSION=$(sed -n 's/^project(libzenohpico VERSION \(.*\) LANGUAGES C)/\1/p' CMakeLists.txt | head -n1) echo "ZENOH_VERSION=${ZENOH_VERSION}" - echo ::set-output name=ZENOH_VERSION::"${ZENOH_VERSION}" + echo "ZENOH_VERSION=${ZENOH_VERSION}" >> $GITHUB_OUTPUT if [ -n "${GIT_TAG}" ]; then IS_RELEASE="true" echo "IS_RELEASE=${IS_RELEASE}" - echo ::set-output name=IS_RELEASE::"${IS_RELEASE}" + echo "IS_RELEASE=${IS_RELEASE}" >> $GITHUB_OUTPUT PKG_VERSION=${GIT_TAG} elif [ -n "${GIT_BRANCH}" ]; then PKG_VERSION=${GIT_BRANCH}-${GITHUB_SHA:0:8} @@ -53,13 +55,13 @@ jobs: PKG_VERSION=${ZENOH_VERSION}-${GITHUB_SHA:0:8} fi echo "PKG_VERSION=${PKG_VERSION}" - echo ::set-output name=PKG_VERSION::"${PKG_VERSION}" + echo "PKG_VERSION=${PKG_VERSION}" >> $GITHUB_OUTPUT CROSSBUILD_TARGETS=$(sed -n 's/^CROSSBUILD_TARGETS=\(.*\)/\1/p' GNUmakefile | head -n1) echo "CROSSBUILD_TARGETS=$CROSSBUILD_TARGETS" TARGET_MATRIX="{\"target\": [\"${CROSSBUILD_TARGETS// /\",\"}\"]}" echo "TARGET_MATRIX=$TARGET_MATRIX" - echo ::set-output name=TARGET_MATRIX::"${TARGET_MATRIX}" + echo "TARGET_MATRIX=$TARGET_MATRIX" >> $GITHUB_OUTPUT outputs: GIT_BRANCH: ${{ steps.env.outputs.GIT_BRANCH }} GIT_TAG: ${{ steps.env.outputs.GIT_TAG }} @@ -73,11 +75,14 @@ jobs: needs: preps runs-on: macos-latest steps: - - uses: actions/checkout@v4 + - name: Clone this repository + uses: actions/checkout@v4 + - name: MacOS build run: make env: BUILD_TYPE: RELEASE + - name: Packaging id: package shell: bash @@ -86,14 +91,15 @@ jobs: echo "Packaging ${LIB_PKG_NAME}:" cd build && zip -r ${LIB_PKG_NAME} lib && cd - zip -r ${LIB_PKG_NAME} include - echo ::set-output name=LIB_PKG_NAME::"${LIB_PKG_NAME}" + echo "LIB_PKG_NAME=${LIB_PKG_NAME}" >> $GITHUB_OUTPUT EXP_PKG_NAME=${PWD}/zenoh-pico-${{ needs.preps.outputs.PKG_VERSION }}-macos-x64-examples.zip echo "Packaging ${EXP_PKG_NAME}:" cd build/examples && zip ${EXP_PKG_NAME} * && cd - - echo ::set-output name=EXP_PKG_NAME::"${EXP_PKG_NAME}" + echo "EXP_PKG_NAME=${EXP_PKG_NAME}" >> $GITHUB_OUTPUT + - name: "Upload x86_64 macos package" - uses: actions/upload-artifact@master + uses: actions/upload-artifact@v3 with: name: macos-x64 path: | @@ -108,11 +114,14 @@ jobs: fail-fast: false matrix: ${{fromJson(needs.preps.outputs.TARGET_MATRIX)}} steps: - - uses: actions/checkout@v4 + - name: Clone this repository + uses: actions/checkout@v4 + - name: make for ${{ matrix.target }} env: BUILD_TYPE: RELEASE run: make ${{ matrix.target }} + - name: Packaging id: package shell: bash @@ -126,22 +135,23 @@ jobs: echo "Packaging ${LIB_PKG_NAME}:" cd crossbuilds/${TARGET} && zip -r ${LIB_PKG_NAME} lib && cd - zip -r ${LIB_PKG_NAME} include - echo ::set-output name=LIB_PKG_NAME::"${LIB_PKG_NAME}" + echo "LIB_PKG_NAME=${LIB_PKG_NAME}" >> $GITHUB_OUTPUT cd crossbuilds/${TARGET}/packages echo "Packaging ${DEB_PKG_NAME}:" zip ${DEB_PKG_NAME} *.deb - echo ::set-output name=DEB_PKG_NAME::"${DEB_PKG_NAME}" + echo "DEB_PKG_NAME=${DEB_PKG_NAME}" >> $GITHUB_OUTPUT echo "Packaging ${RPM_PKG_NAME}:" zip ${RPM_PKG_NAME} *.rpm - echo ::set-output name=RPM_PKG_NAME::"${RPM_PKG_NAME}" + echo "RPM_PKG_NAME=${RPM_PKG_NAME}" >> $GITHUB_OUTPUT cd - echo "Packaging ${EXP_PKG_NAME}:" cd crossbuilds/${TARGET}/examples && zip ${EXP_PKG_NAME} * && cd - - echo ::set-output name=EXP_PKG_NAME::"${EXP_PKG_NAME}" + echo "EXP_PKG_NAME=${EXP_PKG_NAME}" >> $GITHUB_OUTPUT + - name: "Upload packages" - uses: actions/upload-artifact@master + uses: actions/upload-artifact@v3 with: name: ${{ matrix.target }} path: | @@ -157,13 +167,15 @@ jobs: runs-on: ubuntu-latest steps: - name: Download result of previous builds - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v3 with: path: ARTIFACTS + - name: Publish as github release uses: softprops/action-gh-release@v1 with: files: ARTIFACTS/*/*.* + - name: Publish to download.eclipse.org/zenoh env: SSH_TARGET: genie.zenoh@projects-storage.eclipse.org diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 9a49f2862..32c9ebb42 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -43,24 +43,44 @@ #include "zenoh-pico/utils/result.h" /** - * Link capabilities values, defined as a bitmask. + * Link transport capability enum. * * Enumerators: - * Z_LINK_CAPABILITY_NONE: Bitmask to define that link has no capabilities. - * Z_LINK_CAPABILITY_RELIABLE: Bitmask to define and check if link is reliable. - * Z_LINK_CAPABILITY_STREAMED: Bitmask to define and check if link is streamed. - * Z_LINK_CAPABILITY_MULTICAST: Bitmask to define and check if link is multicast. + * Z_LINK_CAP_TRANSPORT_UNICAST: Link has unicast capabilities. + * Z_LINK_CAP_TRANSPORT_MULTICAST: Link has multicast capabilities. */ typedef enum { - Z_LINK_CAPABILITY_NONE = 0x00, // 0 - Z_LINK_CAPABILITY_RELIABLE = 0x01, // 1 << 0 - Z_LINK_CAPABILITY_STREAMED = 0x02, // 1 << 1 - Z_LINK_CAPABILITY_MULTICAST = 0x04 // 1 << 2 -} _z_link_capabilities_t; + Z_LINK_CAP_TRANSPORT_UNICAST = 0, + Z_LINK_CAP_TRANSPORT_MULTICAST = 1, +} _z_link_cap_transport_t; -#define _Z_LINK_IS_RELIABLE(X) ((X & Z_LINK_CAPABILITY_RELIABLE) == Z_LINK_CAPABILITY_RELIABLE) -#define _Z_LINK_IS_STREAMED(X) ((X & Z_LINK_CAPABILITY_STREAMED) == Z_LINK_CAPABILITY_STREAMED) -#define _Z_LINK_IS_MULTICAST(X) ((X & Z_LINK_CAPABILITY_MULTICAST) == Z_LINK_CAPABILITY_MULTICAST) +/** + * Link flow capability enum. + * + * Enumerators: + * Z_LINK_CAP_FLOW_STREAM: Link use datagrams. + * Z_LINK_CAP_FLOW_DATAGRAM: Link use byte stream. + */ +typedef enum { + Z_LINK_CAP_FLOW_DATAGRAM = 0, + Z_LINK_CAP_FLOW_STREAM = 1, +} _z_link_cap_flow_t; + +/** + * Link capabilities, stored as a register-like object. + * + * Fields: + * transport: 2 bits, see _z_link_cap_transport_t enum. + * flow: 1 bit, see _z_link_cap_flow_t enum. + * reliable: 1 bit, 1 if the link is reliable (network definition) + * reserved: 4 bits, reserved for futur use + */ +typedef struct _z_link_capabilities_t { + uint8_t _transport : 2; + uint8_t _flow : 1; + uint8_t _is_reliable : 1; + uint8_t _reserved : 4; +} _z_link_capabilities_t; struct _z_link_t; // Forward declaration to be used in _z_f_link_* @@ -104,7 +124,7 @@ typedef struct _z_link_t { _z_f_link_free _free_f; uint16_t _mtu; - uint8_t _capabilities; + _z_link_capabilities_t _cap; } _z_link_t; void _z_link_clear(_z_link_t *zl); diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 33ba5593f..d7870c54a 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/link/link.c b/src/link/link.c index 1a5f89925..2888da55d 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -148,7 +148,17 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; - _Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities); + _Bool link_is_streamed = false; + + switch (link->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + link_is_streamed = true; + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + link_is_streamed = false; + break; + } for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); size_t n = bs.len; diff --git a/src/link/multicast/bt.c b/src/link/multicast/bt.c index 332e5f9fa..9c54b966c 100644 --- a/src/link/multicast/bt.c +++ b/src/link/multicast/bt.c @@ -115,7 +115,10 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; } int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_STREAMED | Z_LINK_CAPABILITY_MULTICAST; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_bt(); zl->_endpoint = endpoint; diff --git a/src/link/multicast/udp.c b/src/link/multicast/udp.c index bcb2a4dc8..dd0cd108b 100644 --- a/src/link/multicast/udp.c +++ b/src/link/multicast/udp.c @@ -171,7 +171,10 @@ uint16_t _z_get_link_mtu_udp_multicast(void) { int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_MULTICAST; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_multicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/serial.c b/src/link/unicast/serial.c index eebbff12a..db034f07c 100644 --- a/src/link/unicast/serial.c +++ b/src/link/unicast/serial.c @@ -116,7 +116,10 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; } int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_serial(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/tcp.c b/src/link/unicast/tcp.c index b7b78fa43..a6cf3de7a 100644 --- a/src/link/unicast/tcp.c +++ b/src/link/unicast/tcp.c @@ -156,7 +156,10 @@ uint16_t _z_get_link_mtu_tcp(void) { int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE | Z_LINK_CAPABILITY_STREAMED; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_tcp(); zl->_endpoint = *endpoint; diff --git a/src/link/unicast/udp.c b/src/link/unicast/udp.c index c87441709..2317c1fd4 100644 --- a/src/link/unicast/udp.c +++ b/src/link/unicast/udp.c @@ -162,7 +162,10 @@ uint16_t _z_get_link_mtu_udp_unicast(void) { int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_unicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/ws.c b/src/link/unicast/ws.c index 426e5fa96..f59fde872 100644 --- a/src/link/unicast/ws.c +++ b/src/link/unicast/ws.c @@ -157,7 +157,10 @@ uint16_t _z_get_link_mtu_ws(void) { int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_ws(); zl->_endpoint = *endpoint; diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 67692eb3c..8f25fff0f 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -28,32 +28,37 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - // Read the message length - if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { - size_t len = 0; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); - } + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + // Read the message length + if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { + size_t len = 0; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); + } - size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); - if (writable >= len) { - // Read enough bytes to decode the message - if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; + size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); + if (writable >= len) { + // Read enough bytes to decode the message + if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + } else { + ret = _Z_ERR_TRANSPORT_NO_SPACE; } } else { - ret = _Z_ERR_TRANSPORT_NO_SPACE; + ret = _Z_ERR_TRANSPORT_RX_FAILED; } - } else { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } - } else { - if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + ret = _Z_ERR_GENERIC; + break; } - if (ret == _Z_RES_OK) { _z_transport_message_t l_t_msg; ret = _z_transport_message_decode(&l_t_msg, &zbf); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index f0d68eb6c..6f38ac365 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,14 +27,21 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { _z_wbuf_reset(buf); - if (is_streamed == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, 0, i); - } - _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + switch (link_flow_capability) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, 0, i); + } + _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + break; } } @@ -43,12 +50,20 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { - if (is_streamed == true) { - size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { + switch (link_flow_capability) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: { + size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; } + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + break; } } @@ -74,24 +89,38 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m // Create and prepare the buffer to serialize the message on uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, 0, i); - } - _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); - } + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, 0, i); + } + _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - // Write the message length in the reserved space if needed - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: { + // Write the message length in the reserved space if needed + size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; } + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; } - // Send the wbuf on the socket ret = _z_link_send_wbuf(zl, &wbf); } diff --git a/src/transport/manager.c b/src/transport/manager.c index 06662e513..7d8f79811 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -31,22 +31,32 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local return ret; } // Open transport - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._cap._transport) { + // Unicast transport + case Z_LINK_CAP_TRANSPORT_UNICAST: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + // Multicast transport + case Z_LINK_CAP_TRANSPORT_MULTICAST: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } @@ -61,22 +71,30 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z if (ret != _Z_RES_OK) { return ret; } - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._cap._transport) { + case Z_LINK_CAP_TRANSPORT_UNICAST: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + case Z_LINK_CAP_TRANSPORT_MULTICAST: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index a0c483a6b..12073d0ce 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -72,36 +72,41 @@ void *_zp_multicast_read_task(void *ztm_arg) { while (ztm->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_bytes_clear(&addr); - _z_zbuf_compact(&ztm->_zbuf); - continue; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_bytes_clear(&addr); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } + } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 6d73c10f8..5d43eb60c 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -59,35 +59,39 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index c2f665a19..230c8c85a 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); if (ret == _Z_RES_OK) { @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 4d0728eee..02b95ddef 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -68,35 +68,39 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (ztu->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + continue; + } + } + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 965ec58c1..c1e3faa2d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -37,35 +37,40 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._cap._flow) { + // Stream capable links + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 0aa951b5b..15f660bd8 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -50,8 +50,18 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo // Initialize the read and write buffers if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; - _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); size_t dbuf_size = 0; + _Bool expandable = false; + + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: + expandable = true; + break; + case Z_LINK_CAP_FLOW_DATAGRAM: + default: + expandable = false; + break; + } #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 expandable = false; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 6e7ea4f85..21694e1e9 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); if (ret == _Z_RES_OK) { @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); if (ztu->_wbuf._ioss._len == 1) { ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) {