diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index d2009144e..e4360a5df 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -36,6 +36,7 @@ typedef struct { uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_header_t; typedef struct { @@ -44,6 +45,7 @@ typedef struct { uint16_t vlan_type; // Vlan ethtype uint16_t tag; // Vlan tag uint16_t ethtype; // Ethertype of frame + uint16_t data_length; // Payload length } _zp_eth_vlan_header_t; typedef struct { @@ -61,6 +63,8 @@ int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface); size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len); size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr); int8_t _z_close_raweth(_z_sys_net_socket_t *sock); +size_t _z_raweth_ntohs(size_t val); +size_t _z_raweth_htons(size_t val); #endif diff --git a/src/system/unix/link/raweth.c b/src/system/unix/link/raweth.c index c64376594..78a8aee92 100644 --- a/src/system/unix/link/raweth.c +++ b/src/system/unix/link/raweth.c @@ -116,5 +116,9 @@ size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buf return bytesRead; } +size_t _z_raweth_ntohs(size_t val) { return ntohs(val); } + +size_t _z_raweth_htons(size_t val) { return htons(val); } + #endif // defined(__linux) #endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 1709ed67f..6deec0839 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -27,10 +27,6 @@ #if Z_FEATURE_RAWETH_TRANSPORT == 1 -void print_buf(_z_zbuf_t *buf) { - printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity); -} - static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) { uint8_t *buff = _z_zbuf_get_wptr(zbf); size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr); @@ -48,14 +44,31 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) { return SIZE_MAX; } - // Update buffer but skip eth header - _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb); + size_t data_length = 0; if (has_vlan) { + _zp_eth_vlan_header_t *header = (_zp_eth_vlan_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_vlan_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_vlan_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t)); } else { + _zp_eth_header_t *header = (_zp_eth_header_t *)buff; + // Retrieve data length + data_length = _z_raweth_ntohs(header->data_length); + if (rb < (data_length + sizeof(_zp_eth_header_t))) { + // Invalid data_length + return SIZE_MAX; + } + // Skip header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_header_t) + data_length); _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t)); } - return rb; + return data_length; } /*------------------ Reception helper ------------------*/ diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e48f7640b..4fb0b643d 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -85,6 +85,16 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia return sn; } +static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) { + _z_raweth_socket_t *resocket = &zl->_socket._raweth; + // Reserve eth header in buffer + if (resocket->_has_vlan) { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_vlan_header_t)); + } else { + _z_wbuf_set_wpos(wbf, sizeof(_zp_eth_header_t)); + } +} + /** * This function is unsafe because it operates in potentially concurrent data. * Make sure that the following mutexes are locked before calling this function: @@ -92,26 +102,38 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia */ static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { _z_raweth_socket_t *resocket = &zl->_socket._raweth; - + size_t wpos = 0; // Write eth header in buffer if (resocket->_has_vlan) { _zp_eth_vlan_header_t header; + // Save buf position + wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); + // Set header memset(&header, 0, sizeof(header)); memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.vlan_type = _ZP_ETH_TYPE_VLAN; header.tag = resocket->_vlan; header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } else { _zp_eth_header_t header; + // Save buf position + wpos = _z_wbuf_len(wbf); + _z_wbuf_set_wpos(wbf, 0); + // Set header memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + header.data_length = _z_raweth_htons(wpos - sizeof(header)); // Write header _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); } + // Restore wpos + _z_wbuf_set_wpos(wbf, wpos); return _Z_RES_OK; } @@ -141,29 +163,17 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } // Discard const qualifier _z_link_t *mzl = (_z_link_t *)zl; // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth)); - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(mzl, &wbf); // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - switch (zl->_cap._flow) { - case Z_LINK_CAP_FLOW_DATAGRAM: - break; - default: - ret = _Z_ERR_GENERIC; - break; - } + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); // Send the wbuf on the socket ret = _z_raweth_link_send_wbuf(zl, &wbf); } @@ -183,10 +193,12 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me _z_wbuf_reset(&ztm->_wbuf); // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth)); - // Write the message header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Encode the session message _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data @@ -240,8 +252,8 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, _z_wbuf_reset(&ztm->_wbuf); // Set socket info _Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth)); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Set the frame header _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); @@ -250,6 +262,8 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Encode the network message ret = _z_network_message_encode(&ztm->_wbuf, n_msg); if (ret == _Z_RES_OK) { + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data @@ -269,10 +283,12 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); - // Write the eth header - _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Prepare buff + __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); // Send the wbuf on the socket _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); // Mark the session that we have transmitted data