diff --git a/CMakeLists.txt b/CMakeLists.txt index ac9c77c5d..73db9fa37 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -186,7 +186,7 @@ elseif(WITH_FREERTOS_PLUS_TCP) file (GLOB Sources_Freertos_Plus_TCP "src/system/freertos_plus_tcp/*.c") list(APPEND Sources ${Sources_Freertos_Plus_TCP}) elseif(CMAKE_SYSTEM_NAME MATCHES "Linux" OR CMAKE_SYSTEM_NAME MATCHES "Darwin" OR CMAKE_SYSTEM_NAME MATCHES "BSD" OR POSIX_COMPATIBLE) - file (GLOB Sources_Unix "src/system/unix/*.c") + file (GLOB Sources_Unix "src/system/unix/*.c" "src/system/unix/link/*.c") list(APPEND Sources ${Sources_Unix}) elseif(CMAKE_SYSTEM_NAME MATCHES "Emscripten") file (GLOB Sources_Emscripten "src/system/emscripten/*.c") diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index fc7fe5f97..c1bb5ad83 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -218,6 +218,13 @@ #endif #endif +/** + * Enable raweth transport/link. + */ +#ifndef Z_FEATURE_RAWETH_TRANSPORT +#define Z_FEATURE_RAWETH_TRANSPORT 0 +#endif + /*------------------ Compile-time configuration properties ------------------*/ /** * Default length for Zenoh ID. Maximum size is 16 bytes. diff --git a/include/zenoh-pico/link/config/raweth.h b/include/zenoh-pico/link/config/raweth.h new file mode 100644 index 000000000..6682805ad --- /dev/null +++ b/include/zenoh-pico/link/config/raweth.h @@ -0,0 +1,32 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_LINK_CONFIG_RAWETH_H +#define ZENOH_PICO_LINK_CONFIG_RAWETH_H + +#include "zenoh-pico/collections/intmap.h" +#include "zenoh-pico/collections/string.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/link/link.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +#define RAWETH_SCHEMA "reth" + +int8_t _z_endpoint_raweth_valid(_z_endpoint_t *endpoint); + +int8_t _z_new_link_raweth(_z_link_t *zl, _z_endpoint_t endpoint); + +#endif /* Z_FEATURE_RAWETH_TRANSPORT */ +#endif /* ZENOH_PICO_LINK_CONFIG_RAWETH_H */ diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h new file mode 100644 index 000000000..d2009144e --- /dev/null +++ b/include/zenoh-pico/system/link/raweth.h @@ -0,0 +1,67 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_SYSTEM_LINK_RAWETH_H +#define ZENOH_PICO_SYSTEM_LINK_RAWETH_H + +#include + +#include "zenoh-pico/collections/string.h" +#include "zenoh-pico/system/platform.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +// Ethernet types (big endian) +#define _ZP_ETH_TYPE_VLAN 0x0081 + +// Address Sizes +#define _ZP_MAC_ADDR_LENGTH 6 + +// Max frame size +#define _ZP_MAX_ETH_FRAME_SIZE 1500 + +// Ethernet header structure type +typedef struct { + uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address + uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address + uint16_t ethtype; // Ethertype of frame +} _zp_eth_header_t; + +typedef struct { + uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address + uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address + uint16_t vlan_type; // Vlan ethtype + uint16_t tag; // Vlan tag + uint16_t ethtype; // Ethertype of frame +} _zp_eth_vlan_header_t; + +typedef struct { + const char *_interface; + _z_sys_net_socket_t _sock; + uint16_t _vlan; + uint16_t ethtype; + uint8_t _dmac[_ZP_MAC_ADDR_LENGTH]; + uint8_t _smac[_ZP_MAC_ADDR_LENGTH]; + _Bool _has_vlan; +} _z_raweth_socket_t; + +int8_t _z_get_smac_raweth(_z_raweth_socket_t *resock); +int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface); +size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len); +size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr); +int8_t _z_close_raweth(_z_sys_net_socket_t *sock); + +#endif + +#endif /* ZENOH_PICO_SYSTEM_LINK_RAWETH_H */ diff --git a/include/zenoh-pico/transport/multicast/read.h b/include/zenoh-pico/transport/multicast/read.h index b2d53f770..308fe1249 100644 --- a/include/zenoh-pico/transport/multicast/read.h +++ b/include/zenoh-pico/transport/multicast/read.h @@ -22,4 +22,4 @@ int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _ int8_t _zp_multicast_stop_read_task(_z_transport_t *zt); void *_zp_multicast_read_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks -#endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_READ_H */ +#endif /* ZENOH_PICO_MULTICAST_READ_H */ diff --git a/include/zenoh-pico/transport/multicast/tx.h b/include/zenoh-pico/transport/multicast/tx.h index d8171b719..6b48266db 100644 --- a/include/zenoh-pico/transport/multicast/tx.h +++ b/include/zenoh-pico/transport/multicast/tx.h @@ -18,8 +18,6 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -int8_t _z_multicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/raweth/config.h b/include/zenoh-pico/transport/raweth/config.h new file mode 100644 index 000000000..c8d644747 --- /dev/null +++ b/include/zenoh-pico/transport/raweth/config.h @@ -0,0 +1,58 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_CONFIG_H +#define ZENOH_PICO_RAWETH_CONFIG_H + +#include + +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/system/link/raweth.h" +#include "zenoh-pico/transport/transport.h" +#include "zenoh-pico/utils/result.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +typedef struct { + _z_keyexpr_t _keyexpr; + uint16_t _vlan; // vlan tag (pcp + dei + id), big endian + uint8_t _dmac[_ZP_MAC_ADDR_LENGTH]; + _Bool _has_vlan; +} _zp_raweth_cfg_entry; + +typedef struct { + uint8_t _mac[_ZP_MAC_ADDR_LENGTH]; +} _zp_raweth_cfg_whitelist_val; + +// Ethertype to use in frame +extern const uint16_t _ZP_RAWETH_CFG_ETHTYPE; + +// Interface to use +extern const char *_ZP_RAWETH_CFG_INTERFACE; + +// Source mac address +extern const uint8_t _ZP_RAWETH_CFG_SMAC[_ZP_MAC_ADDR_LENGTH]; + +// Main config array +extern const _zp_raweth_cfg_entry _ZP_RAWETH_CFG_ARRAY[]; + +// Mac address rx whitelist array +extern const _zp_raweth_cfg_whitelist_val _ZP_RAWETH_CFG_WHITELIST[]; + +// Array size +extern const size_t _ZP_RAWETH_CFG_SIZE; +extern const size_t _ZP_RAWETH_CFG_WHITELIST_SIZE; + +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 +#endif // ZENOH_PICO_RAWETH_CONFIG_H diff --git a/include/zenoh-pico/transport/raweth/join.h b/include/zenoh-pico/transport/raweth/join.h new file mode 100644 index 000000000..da8f875d9 --- /dev/null +++ b/include/zenoh-pico/transport/raweth/join.h @@ -0,0 +1,22 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_RAWETH_JOIN_H +#define ZENOH_RAWETH_JOIN_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm); + +#endif /* ZENOH_RAWETH_JOIN_H */ diff --git a/include/zenoh-pico/transport/raweth/lease.h b/include/zenoh-pico/transport/raweth/lease.h new file mode 100644 index 000000000..924fd38a0 --- /dev/null +++ b/include/zenoh-pico/transport/raweth/lease.h @@ -0,0 +1,25 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_LEASE_H +#define ZENOH_PICO_RAWETH_LEASE_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm); +int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt); +void *_zp_raweth_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_RAWETH_LEASE_H */ diff --git a/include/zenoh-pico/transport/raweth/read.h b/include/zenoh-pico/transport/raweth/read.h new file mode 100644 index 000000000..fa881b4ba --- /dev/null +++ b/include/zenoh-pico/transport/raweth/read.h @@ -0,0 +1,25 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_READ_H +#define ZENOH_PICO_RAWETH_READ_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _zp_raweth_read(_z_transport_multicast_t *ztm); +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_raweth_stop_read_task(_z_transport_t *zt); +void *_zp_raweth_read_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks + +#endif /* ZENOH_PICO_RAWETH_READ_H */ diff --git a/include/zenoh-pico/transport/raweth/rx.h b/include/zenoh-pico/transport/raweth/rx.h new file mode 100644 index 000000000..a8f281369 --- /dev/null +++ b/include/zenoh-pico/transport/raweth/rx.h @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_RX_H +#define ZENOH_PICO_RAWETH_RX_H + +#include "zenoh-pico/transport/transport.h" + +int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); +int8_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); + +#endif /* ZENOH_PICO_RAWETH_RX_H */ diff --git a/include/zenoh-pico/transport/raweth/transport.h b/include/zenoh-pico/transport/raweth/transport.h new file mode 100644 index 000000000..a91eba18a --- /dev/null +++ b/include/zenoh-pico/transport/raweth/transport.h @@ -0,0 +1,28 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_TRANSPORT_H +#define ZENOH_PICO_RAWETH_TRANSPORT_H + +#include "zenoh-pico/api/types.h" + +int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param); +int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid); +int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only); +int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason); +void _z_raweth_transport_clear(_z_transport_t *zt); +#endif /* ZENOH_PICO_RAWETH_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/raweth/tx.h b/include/zenoh-pico/transport/raweth/tx.h new file mode 100644 index 000000000..8146e38e4 --- /dev/null +++ b/include/zenoh-pico/transport/raweth/tx.h @@ -0,0 +1,26 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_RAWETH_TX_H +#define ZENOH_PICO_RAWETH_TX_H + +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/transport/transport.h" + +int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg); +int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl); +int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg); + +#endif /* ZENOH_PICO_RAWETH_TX_H */ diff --git a/include/zenoh-pico/transport/unicast/tx.h b/include/zenoh-pico/transport/unicast/tx.h index ed42e0c73..d087dd696 100644 --- a/include/zenoh-pico/transport/unicast/tx.h +++ b/include/zenoh-pico/transport/unicast/tx.h @@ -18,8 +18,6 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -int8_t _z_unicast_send_z_msg(_z_session_t *zn, _z_zenoh_message_t *z_msg, z_reliability_t reliability, - z_congestion_control_t cong_ctrl); int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/utils/result.h b/include/zenoh-pico/utils/result.h index bae8d7b17..514473947 100644 --- a/include/zenoh-pico/utils/result.h +++ b/include/zenoh-pico/utils/result.h @@ -16,6 +16,7 @@ #define ZENOH_PICO_UTILS_RESULT_H #define _ZP_UNUSED(x) (void)(x) +#define _ZP_ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define _Z_ERR_MESSAGE_MASK 0x88 #define _Z_ERR_ENTITY_MASK 0x90 diff --git a/src/system/arduino/esp32/network.cpp b/src/system/arduino/esp32/network.cpp index c02557fac..c088f1ce1 100644 --- a/src/system/arduino/esp32/network.cpp +++ b/src/system/arduino/esp32/network.cpp @@ -858,4 +858,8 @@ size_t _z_send_serial(const _z_sys_net_socket_t sock, const uint8_t *ptr, size_t } #endif +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on ESP32 port of Zenoh-Pico" +#endif + } // extern "C" diff --git a/src/system/arduino/opencr/network.cpp b/src/system/arduino/opencr/network.cpp index bdbb9939a..b99046729 100644 --- a/src/system/arduino/opencr/network.cpp +++ b/src/system/arduino/opencr/network.cpp @@ -347,4 +347,8 @@ size_t _z_send_udp_multicast(const _z_sys_net_socket_t sock, const uint8_t *ptr, #if Z_FEATURE_LINK_SERIAL == 1 #error "Serial not supported yet on OpenCR port of Zenoh-Pico" #endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on OpenCR port of Zenoh-Pico" +#endif } diff --git a/src/system/emscripten/network.c b/src/system/emscripten/network.c index 8b2f14caf..95c0d2f09 100644 --- a/src/system/emscripten/network.c +++ b/src/system/emscripten/network.c @@ -173,3 +173,7 @@ size_t _z_send_ws(const _z_sys_net_socket_t sock, const uint8_t *ptr, size_t len #if Z_FEATURE_LINK_SERIAL == 1 #error "Serial not supported yet on Emscripten port of Zenoh-Pico" #endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on Emscripten port of Zenoh-Pico" +#endif diff --git a/src/system/espidf/network.c b/src/system/espidf/network.c index 9a49bf157..ad40e5078 100644 --- a/src/system/espidf/network.c +++ b/src/system/espidf/network.c @@ -739,3 +739,7 @@ size_t _z_send_serial(const _z_sys_net_socket_t sock, const uint8_t *ptr, size_t #if Z_FEATURE_LINK_BLUETOOTH == 1 #error "Bluetooth not supported yet on ESP-IDF port of Zenoh-Pico" #endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on ESP-IDF port of Zenoh-Pico" +#endif diff --git a/src/system/freertos_plus_tcp/network.c b/src/system/freertos_plus_tcp/network.c index 60d717ddd..57057c58b 100644 --- a/src/system/freertos_plus_tcp/network.c +++ b/src/system/freertos_plus_tcp/network.c @@ -209,3 +209,15 @@ size_t _z_send_udp_unicast(const _z_sys_net_socket_t sock, const uint8_t *ptr, s #if Z_FEATURE_LINK_UDP_MULTICAST == 1 #error "UDP Multicast not supported yet on FreeRTOS-Plus-TCP port of Zenoh-Pico" #endif + +#if Z_FEATURE_LINK_BLUETOOTH == 1 +#error "Bluetooth not supported yet on FreeRTOS-Plus-TCP port of Zenoh-Pico" +#endif + +#if Z_FEATURE_LINK_SERIAL == 1 +#error "Serial not supported yet on FreeRTOS-Plus-TCP port of Zenoh-Pico" +#endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on FreeRTOS-Plus-TCP port of Zenoh-Pico" +#endif diff --git a/src/system/mbed/network.cpp b/src/system/mbed/network.cpp index 744b92df0..72cd3adc3 100644 --- a/src/system/mbed/network.cpp +++ b/src/system/mbed/network.cpp @@ -483,4 +483,8 @@ size_t _z_send_serial(const _z_sys_net_socket_t sock, const uint8_t *ptr, size_t #error "Bluetooth not supported yet on MBED port of Zenoh-Pico" #endif -} // extern "C" \ No newline at end of file +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on MBED port of Zenoh-Pico" +#endif + +} // extern "C" diff --git a/src/system/unix/link/raweth.c b/src/system/unix/link/raweth.c new file mode 100644 index 000000000..c64376594 --- /dev/null +++ b/src/system/unix/link/raweth.c @@ -0,0 +1,120 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/system/link/raweth.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "zenoh-pico/collections/string.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/system/platform/unix.h" +#include "zenoh-pico/transport/raweth/config.h" +#include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/pointers.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +#if !defined(__linux) +#error "Raweth transport only supported on linux systems" +#else +#include + +int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface) { + int8_t ret = _Z_RES_OK; + // Open a raw network socket in promiscuous mode + sock->_fd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); + if (sock->_fd == -1) { + return _Z_ERR_GENERIC; + } + // Get the index of the interface to send on + struct ifreq if_idx; + memset(&if_idx, 0, sizeof(struct ifreq)); + strncpy(if_idx.ifr_name, interface, strlen(interface)); + if (ioctl(sock->_fd, SIOCGIFINDEX, &if_idx) < 0) { + return _Z_ERR_GENERIC; + } + // Bind the socket + struct sockaddr_ll addr; + memset(&addr, 0, sizeof(addr)); + addr.sll_family = AF_PACKET; + addr.sll_protocol = htons(ETH_P_ALL); + addr.sll_ifindex = if_idx.ifr_ifindex; + addr.sll_pkttype = PACKET_HOST | PACKET_BROADCAST | PACKET_MULTICAST; + + if (bind(sock->_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(sock->_fd); + ret = _Z_ERR_GENERIC; + } + return ret; +} + +int8_t _z_close_raweth(_z_sys_net_socket_t *sock) { + int8_t ret = _Z_RES_OK; + if (close(sock->_fd) != 0) { + ret = _Z_ERR_GENERIC; + } + return ret; +} + +size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len) { + // Send data + ssize_t wb = write(sock->_fd, buff, buff_len); + if (wb < 0) { + return SIZE_MAX; + } + return (size_t)wb; +} + +size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr) { + // Read from socket + ssize_t bytesRead = recvfrom(sock->_fd, buff, buff_len, 0, NULL, NULL); + if ((bytesRead < 0) || (bytesRead < sizeof(_zp_eth_header_t))) { + return SIZE_MAX; + } + // Address filtering + _zp_eth_header_t *header = (_zp_eth_header_t *)buff; + _Bool is_valid = false; + for (size_t i = 0; i < _ZP_RAWETH_CFG_WHITELIST_SIZE; i++) { + if (memcmp(&header->smac, _ZP_RAWETH_CFG_WHITELIST[i]._mac, _ZP_MAC_ADDR_LENGTH) == 0) { // Test byte ordering + is_valid = true; + break; + } + } + // Ignore packet from unknown sources + if (!is_valid) { + return SIZE_MAX; + } + // Copy sender mac if needed + if (addr != NULL) { + *addr = _z_bytes_make(sizeof(ETH_ALEN)); + (void)memcpy((uint8_t *)addr->start, (buff + ETH_ALEN), sizeof(ETH_ALEN)); + } + return bytesRead; +} + +#endif // defined(__linux) +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/system/windows/network.c b/src/system/windows/network.c index 76d37027c..833e43a97 100644 --- a/src/system/windows/network.c +++ b/src/system/windows/network.c @@ -610,3 +610,7 @@ size_t _z_send_udp_multicast(const _z_sys_net_socket_t sock, const uint8_t *ptr, #if Z_FEATURE_LINK_SERIAL == 1 #error "Serial not supported yet on Windows port of Zenoh-Pico" #endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on Windows port of Zenoh-Pico" +#endif diff --git a/src/system/zephyr/network.c b/src/system/zephyr/network.c index e07381914..91a693e0a 100644 --- a/src/system/zephyr/network.c +++ b/src/system/zephyr/network.c @@ -728,3 +728,7 @@ size_t _z_send_serial(const _z_sys_net_socket_t sock, const uint8_t *ptr, size_t #if Z_FEATURE_LINK_BLUETOOTH == 1 #error "Bluetooth not supported yet on Zephyr port of Zenoh-Pico" #endif + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#error "Raw ethernet transport not supported yet on Zephyr port of Zenoh-Pico" +#endif diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 230c8c85a..e3e0c9b71 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -155,8 +155,8 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } #else -int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztu, const _z_transport_message_t *t_msg) { - _ZP_UNUSED(ztu); +int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztm); _ZP_UNUSED(t_msg); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } diff --git a/src/transport/raweth/config.c b/src/transport/raweth/config.c new file mode 100644 index 000000000..08e4684e7 --- /dev/null +++ b/src/transport/raweth/config.c @@ -0,0 +1,47 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/transport/raweth/config.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +// Should be generated (big endian format) +const uint16_t _ZP_RAWETH_CFG_ETHTYPE = 0x72e0; + +// Should be generated +const char *_ZP_RAWETH_CFG_INTERFACE = "lo"; + +// Should be generated +const uint8_t _ZP_RAWETH_CFG_SMAC[_ZP_MAC_ADDR_LENGTH] = {0x30, 0x03, 0xc8, 0x37, 0x25, 0xa1}; + +// Should be generated +const _zp_raweth_cfg_entry _ZP_RAWETH_CFG_ARRAY[] = { + {{0, {0}, ""}, 0x0000, {0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, false}, // Default mac addr + {{0, {0}, "some/key/expr"}, 0x8c00, {0x00, 0x11, 0x22, 0x33, 0x44, 0x55}, true}, // entry1 + {{0, {0}, "demo/example/zenoh-pico-pub"}, 0xab00, {0x41, 0x55, 0xa8, 0x00, 0x9d, 0xc0}, true}, // entry2 + {{0, {0}, "another/keyexpr"}, 0x4300, {0x01, 0x23, 0x45, 0x67, 0x89, 0xab}, true}, // entry3 +}; + +// Should be generated +const _zp_raweth_cfg_whitelist_val _ZP_RAWETH_CFG_WHITELIST[] = { + {{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}}, + {{0x00, 0x11, 0x22, 0x33, 0x44, 0x55}}, + {{0x30, 0x03, 0xc8, 0x37, 0x25, 0xa1}}, +}; + +// Don't modify +const size_t _ZP_RAWETH_CFG_SIZE = _ZP_ARRAY_SIZE(_ZP_RAWETH_CFG_ARRAY); +const size_t _ZP_RAWETH_CFG_WHITELIST_SIZE = _ZP_ARRAY_SIZE(_ZP_RAWETH_CFG_WHITELIST); + +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/join.c b/src/transport/raweth/join.c new file mode 100644 index 000000000..7ac9e7506 --- /dev/null +++ b/src/transport/raweth/join.c @@ -0,0 +1,38 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/transport/raweth/join.h" + +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/raweth/tx.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { + _z_conduit_sn_list_t next_sn; + next_sn._is_qos = false; + next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; + next_sn._val._plain._reliable = ztm->_sn_tx_reliable; + + _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; + _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); + + return _z_raweth_send_t_msg(ztm, &jsm); +} +#else +int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/lease.c b/src/transport/raweth/lease.c new file mode 100644 index 000000000..10fa08a8f --- /dev/null +++ b/src/transport/raweth/lease.c @@ -0,0 +1,208 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/transport/raweth/lease.h" + +#include + +#include "zenoh-pico/config.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/common/join.h" +#include "zenoh-pico/transport/raweth/join.h" +#include "zenoh-pico/transport/raweth/transport.h" +#include "zenoh-pico/transport/raweth/tx.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { + _z_zint_t ret = local_lease; + + _z_transport_peer_entry_list_t *it = peers; + while (it != NULL) { + _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); + _z_zint_t lease = val->_lease; + if (lease < ret) { + ret = lease; + } + + it = _z_transport_peer_entry_list_tail(it); + } + + return ret; +} + +static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { + _z_zint_t ret = SIZE_MAX; + + _z_transport_peer_entry_list_t *it = peers; + while (it != NULL) { + _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); + _z_zint_t next_lease = val->_next_lease; + if (next_lease < ret) { + ret = next_lease; + } + + it = _z_transport_peer_entry_list_tail(it); + } + + return ret; +} + +int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { + int8_t ret = _Z_RES_OK; + + _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); + ret = _z_raweth_send_t_msg(ztm, &t_msg); + + return ret; +} + +int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._raweth._lease_task = task; + zt->_transport._raweth._lease_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_raweth_lease_task, &zt->_transport._raweth) != _Z_RES_OK) { + zt->_transport._raweth._lease_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { + zt->_transport._raweth._lease_task_running = false; + return _Z_RES_OK; +} + +void *_zp_raweth_lease_task(void *ztm_arg) { +#if Z_FEATURE_MULTI_THREAD == 1 + _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; + ztm->_transmitted = false; + + // From all peers, get the next lease time (minimum) + _z_zint_t next_lease = _z_get_minimum_lease(ztm->_peers, ztm->_lease); + _z_zint_t next_keep_alive = (_z_zint_t)(next_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); + _z_zint_t next_join = Z_JOIN_INTERVAL; + + _z_transport_peer_entry_list_t *it = NULL; + while (ztm->_lease_task_running == true) { + _z_mutex_lock(&ztm->_mutex_peer); + + if (next_lease <= 0) { + it = ztm->_peers; + while (it != NULL) { + _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); + if (entry->_received == true) { + // Reset the lease parameters + entry->_received = false; + entry->_next_lease = entry->_lease; + it = _z_transport_peer_entry_list_tail(it); + } else { + _Z_INFO("Remove peer from know list because it has expired after %zums\n", entry->_lease); + ztm->_peers = + _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); + it = ztm->_peers; + } + } + } + + if (next_join <= 0) { + _zp_raweth_send_join(ztm); + ztm->_transmitted = true; + + // Reset the join parameters + next_join = Z_JOIN_INTERVAL; + } + + if (next_keep_alive <= 0) { + // Check if need to send a keep alive + if (ztm->_transmitted == false) { + if (_zp_raweth_send_keep_alive(ztm) < 0) { + // TODO: Handle retransmission or error + } + } + + // Reset the keep alive parameters + ztm->_transmitted = false; + next_keep_alive = + (_z_zint_t)(_z_get_minimum_lease(ztm->_peers, ztm->_lease) / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); + } + + // Compute the target interval to sleep + _z_zint_t interval; + if (next_lease > 0) { + interval = next_lease; + if (next_keep_alive < interval) { + interval = next_keep_alive; + } + if (next_join < interval) { + interval = next_join; + } + } else { + interval = next_keep_alive; + if (next_join < interval) { + interval = next_join; + } + } + + _z_mutex_unlock(&ztm->_mutex_peer); + + // The keep alive and lease intervals are expressed in milliseconds + z_sleep_ms(interval); + + // Decrement all intervals + _z_mutex_lock(&ztm->_mutex_peer); + + it = ztm->_peers; + while (it != NULL) { + _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); + entry->_next_lease = entry->_next_lease - interval; + it = _z_transport_peer_entry_list_tail(it); + } + next_lease = _z_get_next_lease(ztm->_peers); + next_keep_alive = next_keep_alive - interval; + next_join = next_join - interval; + + _z_mutex_unlock(&ztm->_mutex_peer); + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return 0; +} +#else +int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_raweth_lease_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/link.c b/src/transport/raweth/link.c new file mode 100644 index 000000000..b669e1f02 --- /dev/null +++ b/src/transport/raweth/link.c @@ -0,0 +1,164 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include + +#include "zenoh-pico/config.h" +#include "zenoh-pico/link/config/raweth.h" +#include "zenoh-pico/link/manager.h" +#include "zenoh-pico/system/link/raweth.h" +#include "zenoh-pico/system/platform.h" +#include "zenoh-pico/transport/raweth/config.h" +#include "zenoh-pico/utils/pointers.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +static _Bool __z_valid_address_raweth(const char *address) { + // Check if the string has the correct length + size_t len = strlen(address); + if (len != 17) { // 6 pairs of hexadecimal digits and 5 colons + return false; + } + // Check if the colons are at the correct positions + for (int i = 2; i < len; i += 3) { + if (address[i] != ':') { + return false; + } + } + // Check if each character is a valid hexadecimal digit + for (int i = 0; i < len; ++i) { + if (i % 3 != 2) { + char c = address[i]; + if (!((c >= '0' && c <= '9') || (c >= 'A' && c <= 'F') || (c >= 'a' && c <= 'f'))) { + return false; + } + } + } + return true; +} + +static uint8_t *__z_parse_address_raweth(const char *address) { + size_t len = strlen(address); + // Allocate data + uint8_t *ret = (uint8_t *)z_malloc(_ZP_MAC_ADDR_LENGTH); + if (ret == NULL) { + return ret; + } + for (int i = 0; i < _ZP_MAC_ADDR_LENGTH; ++i) { + // Extract a pair of hexadecimal digits from the MAC address string + char byteString[3]; + strncpy(byteString, address + i * 3, 2); + byteString[2] = '\0'; + // Convert the hexadecimal string to an integer + ret[i] = (unsigned char)strtol(byteString, NULL, 16); + } + return ret; +} + +static int8_t _z_f_link_open_raweth(_z_link_t *self) { + // Init socket smac + memcpy(&self->_socket._raweth._smac, _ZP_RAWETH_CFG_SMAC, _ZP_MAC_ADDR_LENGTH); + // Init socket interface + self->_socket._raweth._interface = _ZP_RAWETH_CFG_INTERFACE; + // Open raweth link + return _z_open_raweth(&self->_socket._raweth._sock, _ZP_RAWETH_CFG_INTERFACE); +} + +static int8_t _z_f_link_listen_raweth(_z_link_t *self) { return _z_f_link_open_raweth(self); } + +static void _z_f_link_close_raweth(_z_link_t *self) { _z_close_raweth(&self->_socket._raweth._sock); } + +static void _z_f_link_free_raweth(_z_link_t *self) { _ZP_UNUSED(self); } + +static size_t _z_f_link_write_raweth(const _z_link_t *self, const uint8_t *ptr, size_t len) { + _ZP_UNUSED(self); + _ZP_UNUSED(ptr); + _ZP_UNUSED(len); + return SIZE_MAX; +} + +static size_t _z_f_link_write_all_raweth(const _z_link_t *self, const uint8_t *ptr, size_t len) { + _ZP_UNUSED(self); + _ZP_UNUSED(ptr); + _ZP_UNUSED(len); + return SIZE_MAX; +} + +static size_t _z_f_link_read_raweth(const _z_link_t *self, uint8_t *ptr, size_t len, _z_bytes_t *addr) { + _ZP_UNUSED(self); + _ZP_UNUSED(ptr); + _ZP_UNUSED(len); + _ZP_UNUSED(addr); + return SIZE_MAX; +} + +static size_t _z_f_link_read_exact_raweth(const _z_link_t *self, uint8_t *ptr, size_t len, _z_bytes_t *addr) { + _ZP_UNUSED(self); + _ZP_UNUSED(ptr); + _ZP_UNUSED(len); + _ZP_UNUSED(addr); + return SIZE_MAX; +} + +static uint16_t _z_get_link_mtu_raweth(void) { return _ZP_MAX_ETH_FRAME_SIZE; } + +int8_t _z_endpoint_raweth_valid(_z_endpoint_t *endpoint) { + int8_t ret = _Z_RES_OK; + + // Check the root + if (!_z_str_eq(endpoint->_locator._protocol, RAWETH_SCHEMA)) { + ret = _Z_ERR_CONFIG_LOCATOR_INVALID; + } + return ret; +} + +int8_t _z_new_link_raweth(_z_link_t *zl, _z_endpoint_t endpoint) { + int8_t ret = _Z_RES_OK; + + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_RAWETH; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_raweth(); + + zl->_endpoint = endpoint; + + // Init socket + memset(&zl->_socket._raweth, 0, sizeof(zl->_socket._raweth)); + + zl->_open_f = _z_f_link_open_raweth; + zl->_listen_f = _z_f_link_listen_raweth; + zl->_close_f = _z_f_link_close_raweth; + zl->_free_f = _z_f_link_free_raweth; + + zl->_write_f = _z_f_link_write_raweth; + zl->_write_all_f = _z_f_link_write_all_raweth; + zl->_read_f = _z_f_link_read_raweth; + zl->_read_exact_f = _z_f_link_read_exact_raweth; + + return ret; +} +#else +int8_t _z_endpoint_raweth_valid(_z_endpoint_t *endpoint) { + _ZP_UNUSED(endpoint); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_new_link_raweth(_z_link_t *zl, _z_endpoint_t endpoint) { + _ZP_UNUSED(zl); + _ZP_UNUSED(endpoint); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +#endif diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c new file mode 100644 index 000000000..668b650f5 --- /dev/null +++ b/src/transport/raweth/read.c @@ -0,0 +1,120 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/transport/raweth/read.h" + +#include + +#include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/transport/multicast/rx.h" +#include "zenoh-pico/transport/raweth/rx.h" +#include "zenoh-pico/transport/unicast/rx.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { + int8_t ret = _Z_RES_OK; + + _z_bytes_t addr; + _z_transport_message_t t_msg; + ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr); + if (ret == _Z_RES_OK) { + ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); + _z_t_msg_clear(&t_msg); + } + return ret; +} + +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._raweth._read_task = task; + zt->_transport._raweth._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_raweth_read_task, &zt->_transport._raweth) != _Z_RES_OK) { + zt->_transport._raweth._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { + zt->_transport._raweth._read_task_running = false; + return _Z_RES_OK; +} + +void *_zp_raweth_read_task(void *ztm_arg) { +#if Z_FEATURE_MULTI_THREAD == 1 + _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; + _z_transport_message_t t_msg; + _z_bytes_t addr = _z_bytes_wrap(NULL, 0); + + // Task loop + while (ztm->_read_task_running == true) { + // Read message from link + int8_t ret = _z_raweth_recv_t_msg(ztm, &t_msg, &addr); + switch (ret) { + case _Z_RES_OK: + // Process message + break; + case _Z_ERR_TRANSPORT_RX_FAILED: + // Drop message + continue; + break; + default: + // Drop message & stop task + _Z_ERROR("Connection closed due to malformed message\n"); + ztm->_read_task_running = false; + continue; + break; + } + // Process message + if (_z_multicast_handle_transport_message(ztm, &t_msg, &addr) != _Z_RES_OK) { + ztm->_read_task_running = false; + continue; + } + _z_t_msg_clear(&t_msg); + _z_bytes_clear(&addr); + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return NULL; +} +#else +int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(zt); + _ZP_UNUSED(attr); + _ZP_UNUSED(task); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { + _ZP_UNUSED(zt); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +void *_zp_raweth_read_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c new file mode 100644 index 000000000..1709ed67f --- /dev/null +++ b/src/transport/raweth/rx.c @@ -0,0 +1,113 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +// #include "zenoh-pico/transport/link/rx.h" + +#include + +#include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/codec/network.h" +#include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +void print_buf(_z_zbuf_t *buf) { + printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity); +} + +static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) { + uint8_t *buff = _z_zbuf_get_wptr(zbf); + size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr); + // Check validity + if ((rb == SIZE_MAX) || (rb < sizeof(_zp_eth_header_t))) { + return SIZE_MAX; + } + // Check if header has vlan + _Bool has_vlan = false; + _zp_eth_header_t *header = (_zp_eth_header_t *)buff; + if (header->ethtype == _ZP_ETH_TYPE_VLAN) { + has_vlan = true; + } + // Check validity + if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) { + return SIZE_MAX; + } + // Update buffer but skip eth header + _z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb); + if (has_vlan) { + _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t)); + } else { + _z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t)); + } + return rb; +} + +/*------------------ Reception helper ------------------*/ +int8_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { + _Z_DEBUG(">> recv session msg\n"); + int8_t ret = _Z_RES_OK; + +#if Z_FEATURE_MULTI_THREAD == 1 + // Acquire the lock + _z_mutex_lock(&ztm->_mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Prepare the buffer + _z_zbuf_reset(&ztm->_zbuf); + + switch (ztm->_link._cap._flow) { + // Datagram capable links + case Z_LINK_CAP_FLOW_DATAGRAM: { + _z_zbuf_compact(&ztm->_zbuf); + // Read from link + size_t to_read = _z_raweth_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + } + default: + ret = _Z_ERR_GENERIC; + break; + } + // Decode message + if (ret == _Z_RES_OK) { + _Z_DEBUG(">> \t transport_message_decode: %ju\n", (uintmax_t)_z_zbuf_len(&ztm->_zbuf)); + ret = _z_transport_message_decode(t_msg, &ztm->_zbuf); + } + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_unlock(&ztm->_mutex_rx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return ret; +} + +int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { + return _z_raweth_recv_t_msg_na(ztm, t_msg, addr); +} + +#else +int8_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(t_msg); + _ZP_UNUSED(addr); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/transport.c b/src/transport/raweth/transport.c new file mode 100644 index 000000000..7dced540d --- /dev/null +++ b/src/transport/raweth/transport.c @@ -0,0 +1,227 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/transport/raweth/transport.h" + +#include +#include +#include +#include +#include + +#include "zenoh-pico/link/link.h" +#include "zenoh-pico/transport/common/lease.h" +#include "zenoh-pico/transport/common/read.h" +#include "zenoh-pico/transport/common/tx.h" +#include "zenoh-pico/transport/raweth/rx.h" +#include "zenoh-pico/transport/raweth/tx.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { + int8_t ret = _Z_RES_OK; + + zt->_type = _Z_TRANSPORT_RAWETH_TYPE; + +#if Z_FEATURE_MULTI_THREAD == 1 + // Initialize the mutexes + ret = _z_mutex_init(&zt->_transport._raweth._mutex_tx); + if (ret == _Z_RES_OK) { + ret = _z_mutex_init(&zt->_transport._raweth._mutex_rx); + if (ret == _Z_RES_OK) { + ret = _z_mutex_init(&zt->_transport._raweth._mutex_peer); + if (ret != _Z_RES_OK) { + _z_mutex_free(&zt->_transport._raweth._mutex_tx); + _z_mutex_free(&zt->_transport._raweth._mutex_rx); + } + } else { + _z_mutex_free(&zt->_transport._raweth._mutex_tx); + } + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Initialize the read and write buffers + if (ret == _Z_RES_OK) { + uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; + zt->_transport._raweth._wbuf = _z_wbuf_make(mtu, false); + zt->_transport._raweth._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + + // Clean up the buffers if one of them failed to be allocated + if ((_z_wbuf_capacity(&zt->_transport._raweth._wbuf) != mtu) || + (_z_zbuf_capacity(&zt->_transport._raweth._zbuf) != Z_BATCH_MULTICAST_SIZE)) { + ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_free(&zt->_transport._raweth._mutex_tx); + _z_mutex_free(&zt->_transport._raweth._mutex_rx); + _z_mutex_free(&zt->_transport._raweth._mutex_peer); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + _z_wbuf_clear(&zt->_transport._raweth._wbuf); + _z_zbuf_clear(&zt->_transport._raweth._zbuf); + } + } + + if (ret == _Z_RES_OK) { + // Set default SN resolution + zt->_transport._raweth._sn_res = _z_sn_max(param->_seq_num_res); + + // The initial SN at TX side + zt->_transport._raweth._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; + zt->_transport._raweth._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; + + // Initialize peer list + zt->_transport._raweth._peers = _z_transport_peer_entry_list_new(); + +#if Z_FEATURE_MULTI_THREAD == 1 + // Tasks + zt->_transport._raweth._read_task_running = false; + zt->_transport._raweth._read_task = NULL; + zt->_transport._raweth._lease_task_running = false; + zt->_transport._raweth._lease_task = NULL; +#endif // Z_FEATURE_MULTI_THREAD == 1 + + zt->_transport._raweth._lease = Z_TRANSPORT_LEASE; + + // Notifiers + zt->_transport._raweth._transmitted = false; + + // Transport link for raweth + zt->_transport._raweth._link = *zl; + } + + return ret; +} + +int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + int8_t ret = _Z_RES_OK; + + _z_zint_t initial_sn_tx = 0; + z_random_fill(&initial_sn_tx, sizeof(initial_sn_tx)); + initial_sn_tx = initial_sn_tx & !_z_sn_modulo_mask(Z_SN_RESOLUTION); + + _z_conduit_sn_list_t next_sn; + next_sn._is_qos = false; + next_sn._val._plain._best_effort = initial_sn_tx; + next_sn._val._plain._reliable = initial_sn_tx; + + _z_id_t zid = *local_zid; + _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); + + // Encode and send the message + _Z_INFO("Sending Z_JOIN message\n"); + ret = _z_raweth_link_send_t_msg(zl, &jsm); + _z_t_msg_clear(&jsm); + + if (ret == _Z_RES_OK) { + param->_seq_num_res = jsm._body._join._seq_num_res; + param->_initial_sn_tx = next_sn; + } + + return ret; +} + +int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_CLIENT_MULTICAST; + // @TODO: not implemented + return ret; +} + +int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { + int8_t ret = _Z_RES_OK; + // Send and clear message + _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); + ret = _z_raweth_send_t_msg(ztm, &cm); + _z_t_msg_clear(&cm); + return ret; +} + +int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { + return _z_raweth_send_close(ztm, reason, false); +} + +void _z_raweth_transport_clear(_z_transport_t *zt) { + _z_transport_multicast_t *ztm = &zt->_transport._raweth; +#if Z_FEATURE_MULTI_THREAD == 1 + // Clean up tasks + if (ztm->_read_task != NULL) { + _z_task_join(ztm->_read_task); + _z_task_free(&ztm->_read_task); + } + if (ztm->_lease_task != NULL) { + _z_task_join(ztm->_lease_task); + _z_task_free(&ztm->_lease_task); + } + // Clean up the mutexes + _z_mutex_free(&ztm->_mutex_tx); + _z_mutex_free(&ztm->_mutex_rx); + _z_mutex_free(&ztm->_mutex_peer); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + // Clean up the buffers + _z_wbuf_clear(&ztm->_wbuf); + _z_zbuf_clear(&ztm->_zbuf); + + // Clean up peer list + _z_transport_peer_entry_list_free(&ztm->_peers); + _z_link_clear(&ztm->_link); +} + +#else + +int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { + _ZP_UNUSED(zt); + _ZP_UNUSED(zl); + _ZP_UNUSED(param); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, + const _z_id_t *local_zid) { + _ZP_UNUSED(param); + _ZP_UNUSED(zl); + _ZP_UNUSED(local_zid); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(reason); + _ZP_UNUSED(link_only); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(reason); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + ; +} + +void _z_raweth_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); } +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c new file mode 100644 index 000000000..bba3153aa --- /dev/null +++ b/src/transport/raweth/tx.c @@ -0,0 +1,303 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +// #include "zenoh-pico/transport/link/tx.h" + +#include "zenoh-pico/transport/common/tx.h" + +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/codec/network.h" +#include "zenoh-pico/protocol/codec/transport.h" +#include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/system/link/raweth.h" +#include "zenoh-pico/transport/raweth/config.h" +#include "zenoh-pico/transport/transport.h" +#include "zenoh-pico/transport/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + +int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) { + int8_t ret = _Z_RES_OK; + + if (_ZP_RAWETH_CFG_SIZE < 1) { + return _Z_ERR_GENERIC; + } + if (keyexpr == NULL) { + // Store default value into socket + memcpy(&sock->_dmac, &_ZP_RAWETH_CFG_ARRAY[0]._dmac, _ZP_MAC_ADDR_LENGTH); + uint16_t vlan = _ZP_RAWETH_CFG_ARRAY[0]._vlan; + sock->_has_vlan = _ZP_RAWETH_CFG_ARRAY[0]._has_vlan; + if (sock->_has_vlan) { + memcpy(&sock->_vlan, &vlan, sizeof(vlan)); + } + } else { + // Find config entry (linear) + ret = _Z_ERR_GENERIC; // Key not found case + for (int i = 1; i < _ZP_RAWETH_CFG_SIZE; i++) { + // Find matching keyexpr + if (zp_keyexpr_intersect_null_terminated(keyexpr->_suffix, _ZP_RAWETH_CFG_ARRAY[i]._keyexpr._suffix) != + _Z_RES_OK) { + continue; + } + // Store data into socket + memcpy(&sock->_dmac, &_ZP_RAWETH_CFG_ARRAY[i]._dmac, _ZP_MAC_ADDR_LENGTH); + uint16_t vlan = _ZP_RAWETH_CFG_ARRAY[i]._vlan; + sock->_has_vlan = _ZP_RAWETH_CFG_ARRAY[i]._has_vlan; + if (sock->_has_vlan) { + memcpy(&sock->_vlan, &vlan, sizeof(vlan)); + } + ret = _Z_RES_OK; + break; + } + } + return ret; +} + +/** + * This function is unsafe because it operates in potentially concurrent data. + * Make sure that the following mutexes are locked before calling this function: + * - ztm->_mutex_inner + */ +static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_reliability_t reliability) { + _z_zint_t sn; + if (reliability == Z_RELIABILITY_RELIABLE) { + sn = ztm->_sn_tx_reliable; + ztm->_sn_tx_reliable = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_reliable); + } else { + sn = ztm->_sn_tx_best_effort; + ztm->_sn_tx_best_effort = _z_sn_increment(ztm->_sn_res, ztm->_sn_tx_best_effort); + } + return sn; +} + +/** + * This function is unsafe because it operates in potentially concurrent data. + * Make sure that the following mutexes are locked before calling this function: + * - ztm->_mutex_inner + */ +static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) { + _z_raweth_socket_t *resocket = &zl->_socket._raweth; + + // Write eth header in buffer + if (resocket->_has_vlan) { + _zp_eth_vlan_header_t header; + memset(&header, 0, sizeof(header)); + memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); + memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); + header.vlan_type = _ZP_ETH_TYPE_VLAN; + header.tag = resocket->_vlan; + header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + // Write header + _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); + } else { + _zp_eth_header_t header; + memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH); + memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH); + header.ethtype = _ZP_RAWETH_CFG_ETHTYPE; + // Write header + _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header))); + } + return _Z_RES_OK; +} + +static int8_t _z_raweth_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf) { + int8_t ret = _Z_RES_OK; + for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { + _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); + size_t n = bs.len; + + do { + // Retrieve addr from config + vlan tag above (locator) + size_t wb = _z_send_raweth(&zl->_socket._raweth._sock, bs.start, n); // Unix + if (wb == SIZE_MAX) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + n = n - wb; + bs.start = bs.start + (bs.len - n); + } while (n > (size_t)0); + } + return ret; +} + +int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg) { + int8_t ret = _Z_RES_OK; + + // Create and prepare the buffer to serialize the message on + uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; + _z_wbuf_t wbf = _z_wbuf_make(mtu, false); + + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } + // Discard const qualifier + _z_link_t *mzl = (_z_link_t *)zl; + // Set socket info + _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth)); + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf)); + // Encode the session message + ret = _z_transport_message_encode(&wbf, t_msg); + if (ret == _Z_RES_OK) { + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } + // Send the wbuf on the socket + ret = _z_raweth_link_send_wbuf(zl, &wbf); + } + _z_wbuf_clear(&wbf); + + return ret; +} + +int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { + int8_t ret = _Z_RES_OK; + _Z_DEBUG(">> send session message\n"); + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_lock(&ztm->_mutex_tx); +#endif + // Reset wbuf + _z_wbuf_reset(&ztm->_wbuf); + // Set socket info + _Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth)); + // Write the message header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Encode the session message + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg)); + // Send the wbuf on the socket + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + // Mark the session that we have transmitted data + ztm->_transmitted = true; + +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_unlock(&ztm->_mutex_tx); +#endif + + return ret; +} + +int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl) { + int8_t ret = _Z_RES_OK; + _z_transport_multicast_t *ztm = &zn->_tp._transport._raweth; + _Z_DEBUG(">> send network message\n"); + + // Acquire the lock and drop the message if needed +#if Z_FEATURE_MULTI_THREAD == 1 + if (cong_ctrl == Z_CONGESTION_CONTROL_BLOCK) { + _z_mutex_lock(&ztm->_mutex_tx); + } else { + if (_z_mutex_trylock(&ztm->_mutex_tx) != (int8_t)0) { + _Z_INFO("Dropping zenoh message because of congestion control\n"); + // We failed to acquire the lock, drop the message + return ret; + } + } +#else + _ZP_UNUSED(cong_ctrl); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + const z_keyexpr_t *keyexpr = NULL; + switch (n_msg->_tag) { + case _Z_N_PUSH: + keyexpr = &n_msg->_body._push._key; + break; + case _Z_N_REQUEST: + keyexpr = &n_msg->_body._request._key; + break; + case _Z_N_RESPONSE: + keyexpr = &n_msg->_body._response._key; + break; + case _Z_N_RESPONSE_FINAL: + case _Z_N_DECLARE: + default: + break; + } + // Reset wbuf + _z_wbuf_reset(&ztm->_wbuf); + // Set socket info + _Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth)); + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Set the frame header + _z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability); + _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); + // Encode the frame header + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg)); + // Encode the network message + ret = _z_network_message_encode(&ztm->_wbuf, n_msg); + if (ret == _Z_RES_OK) { + // Send the wbuf on the socket + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + // Mark the session that we have transmitted data + ztm->_transmitted = true; + } else { // The message does not fit in the current batch, let's fragment it + // Create an expandable wbuf for fragmentation + _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true); + // Encode the message on the expandable wbuf + _Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg)); + // Fragment and send the message + _Bool is_first = true; + while (_z_wbuf_len(&fbf) > 0) { + if (is_first) { + // Get the fragment sequence number + sn = __unsafe_z_raweth_get_sn(ztm, reliability); + } + is_first = false; + // Reset wbuf + _z_wbuf_reset(&ztm->_wbuf); + // Write the eth header + _Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf)); + // Serialize one fragment + _Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn)); + // Send the wbuf on the socket + _Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf)); + // Mark the session that we have transmitted data + ztm->_transmitted = true; + } + } +#if Z_FEATURE_MULTI_THREAD == 1 + _z_mutex_unlock(&ztm->_mutex_tx); +#endif // Z_FEATURE_MULTI_THREAD == 1 + return ret; +} + +#else +int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + +int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, z_reliability_t reliability, + z_congestion_control_t cong_ctrl) { + _ZP_UNUSED(zn); + _ZP_UNUSED(n_msg); + _ZP_UNUSED(reliability); + _ZP_UNUSED(cong_ctrl); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1