From 627c7d9a71b9fe34ae9ce3bd68cf157314451880 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 20 Oct 2023 10:48:06 +0200 Subject: [PATCH 01/24] feat: add modular test --- CMakeLists.txt | 2 + tests/z_modular_test.c | 285 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+) create mode 100644 tests/z_modular_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index ac9c77c5d..d84f080e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -290,6 +290,7 @@ if(UNIX OR MSVC) add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c) add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c) add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c) + add_executable(z_modular_test ${PROJECT_SOURCE_DIR}/tests/z_modular_test.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) @@ -298,6 +299,7 @@ if(UNIX OR MSVC) target_link_libraries(z_keyexpr_test ${Libname}) target_link_libraries(z_api_null_drop_test ${Libname}) target_link_libraries(z_api_double_drop_test ${Libname}) + target_link_libraries(z_modular_test ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) diff --git a/tests/z_modular_test.c b/tests/z_modular_test.c new file mode 100644 index 000000000..e5ea32b26 --- /dev/null +++ b/tests/z_modular_test.c @@ -0,0 +1,285 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#undef NDEBUG +#include + +static const char *ARG_LIST[] = {"Z_FEATURE_PUBLICATION", "Z_FEATURE_SUBSCRIPTION", "Z_FEATURE_QUERYABLE", + "Z_FEATURE_QUERY"}; +#define ARG_NB (sizeof(ARG_LIST) / sizeof(ARG_LIST[0])) + +int test_publication(void) { +#if Z_FEATURE_PUBLICATION == 1 + const char *keyexpr = "demo/example/zenoh-pico-pub"; + const char *value = "Pub from Pico!"; + const char *mode = "client"; + + // Set up config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + // Open session + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Declare publisher + printf("Declaring publisher for '%s'...\n", keyexpr); + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + return -1; + } + // Put data + printf("Putting Data ('%s': '%s')...\n", keyexpr, value); + z_publisher_put_options_t options = z_publisher_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), &options); + + // Clean-up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + return 1; +#else + return 0; +#endif +} + +#if Z_FEATURE_SUBSCRIPTION == 1 +static void subscription_data_handler(const z_sample_t *sample, void *ctx) { + (void)(ctx); + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, + sample->payload.start); + z_drop(z_move(keystr)); +} +#endif + +int test_subscription(void) { +#if Z_FEATURE_SUBSCRIPTION == 1 + const char *keyexpr = "demo/example/**"; + const char *mode = "client"; + + // Set up config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + // Open session + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Declare subscriber + z_owned_closure_sample_t callback = z_closure(subscription_data_handler); + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } + // Clean-up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + return 1; +#else + return 0; +#endif +} + +#if Z_FEATURE_QUERYABLE == 1 +static const char *queryable_keyexpr = "demo/example/zenoh-pico-queryable"; +static const char *queryable_value = "Queryable from Pico!"; + +void query_handler(const z_query_t *query, void *ctx) { + (void)(ctx); + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); + z_bytes_t pred = z_query_parameters(query); + z_value_t payload_value = z_query_value(query); + printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); + if (payload_value.payload.len > 0) { + printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start); + } + z_query_reply_options_t options = z_query_reply_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + z_query_reply(query, z_keyexpr(queryable_keyexpr), (const unsigned char *)queryable_value, strlen(queryable_value), &options); + z_drop(z_move(keystr)); +} +#endif + +int test_queryable(void) { +#if Z_FEATURE_QUERYABLE == 1 + const char *mode = "client"; + + z_keyexpr_t ke = z_keyexpr(queryable_keyexpr); + if (!z_check(ke)) { + printf("%s is not a valid key expression", queryable_keyexpr); + return -1; + } + // Set up config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + // Open session + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Declare queryable + printf("Creating Queryable on '%s'...\n", queryable_keyexpr); + z_owned_closure_query_t callback = z_closure(query_handler); + z_owned_queryable_t qable = z_declare_queryable(z_loan(s), ke, z_move(callback), NULL); + if (!z_check(qable)) { + printf("Unable to create queryable.\n"); + return -1; + } + // Clean-up + z_undeclare_queryable(z_move(qable)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + + return 1; +#else + return 0; +#endif +} + +#if Z_FEATURE_QUERY == 1 +void reply_dropper(void *ctx) { + (void)(ctx); + printf(">> Received query final notification\n"); +} + +void reply_handler(z_owned_reply_t *reply, void *ctx) { + (void)(ctx); + if (z_reply_is_ok(reply)) { + z_sample_t sample = z_reply_ok(reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } +} +#endif + +int test_query(void) { +#if Z_FEATURE_QUERY == 1 + const char *keyexpr = "demo/example/**"; + const char *mode = "client"; + + z_keyexpr_t ke = z_keyexpr(keyexpr); + if (!z_check(ke)) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + // Set up config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + // Open session + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Send query + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts = z_get_options_default(); + z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); + if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; + } + // Clean-up + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + + return 1; +#else + return 0; +#endif +} + +// Send feature config as int list, and compare with compiled feature +int main(int argc, char **argv) { + if (argc < (int)(ARG_NB + 1)) { + printf("To start this test you must give the state of the feature config as argument\n"); + printf("Arg order: "); + for (size_t i = 0; i < ARG_NB; i++) { + printf("%s ", ARG_LIST[i]); + } + printf("\n"); + return -1; + } + if (test_publication() != atoi(argv[1])) { + printf("Problem during publication testing\n"); + return -1; + } else { + printf("Publication status ok\n"); + } + if (test_subscription() != atoi(argv[2])) { + printf("Problem during subscription testing\n"); + return -1; + } else { + printf("Subscription status ok\n"); + } + if (test_queryable() != atoi(argv[3])) { + printf("Problem during queryable testing\n"); + return -1; + } else { + printf("Queryable status ok\n"); + } + if (test_query() != atoi(argv[4])) { + printf("Problem during query testing\n"); + return -1; + } else { + printf("Query status ok\n"); + } + return 0; +} From c0afb69501f2cd5034e78a068807e657e47a4f27 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 24 Oct 2023 10:26:43 +0200 Subject: [PATCH 02/24] build: add cmake generator option for build --- GNUmakefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/GNUmakefile b/GNUmakefile index 67932a3bc..ff4029495 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -52,6 +52,9 @@ Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 Z_FEATURE_QUERYABLE?=1 +# Generator +CMAKE_GENERATOR?="Unix Makefiles" + # zenoh-pico/ directory ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) @@ -74,7 +77,7 @@ all: make $(BUILD_DIR)/Makefile: mkdir -p $(BUILD_DIR) echo $(CMAKE_OPT) - cmake $(CMAKE_OPT) -B $(BUILD_DIR) + cmake $(CMAKE_OPT) -B $(BUILD_DIR) -G $(CMAKE_GENERATOR) make: $(BUILD_DIR)/Makefile cmake --build $(BUILD_DIR) From a1c44f703d7fb1f060c65eb84479aadadb250afe Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 24 Oct 2023 11:15:11 +0200 Subject: [PATCH 03/24] style: run clang-format --- tests/z_modular_test.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/z_modular_test.c b/tests/z_modular_test.c index e5ea32b26..87a7b005c 100644 --- a/tests/z_modular_test.c +++ b/tests/z_modular_test.c @@ -134,7 +134,8 @@ void query_handler(const z_query_t *query, void *ctx) { } z_query_reply_options_t options = z_query_reply_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - z_query_reply(query, z_keyexpr(queryable_keyexpr), (const unsigned char *)queryable_value, strlen(queryable_value), &options); + z_query_reply(query, z_keyexpr(queryable_keyexpr), (const unsigned char *)queryable_value, strlen(queryable_value), + &options); z_drop(z_move(keystr)); } #endif From f8e809c0497b72f9ae926c4f021fdfe2ec24bf79 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 24 Oct 2023 14:45:46 +0200 Subject: [PATCH 04/24] fix: keep a command with default cmake generator --- GNUmakefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/GNUmakefile b/GNUmakefile index ff4029495..e3f7e1000 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -77,7 +77,7 @@ all: make $(BUILD_DIR)/Makefile: mkdir -p $(BUILD_DIR) echo $(CMAKE_OPT) - cmake $(CMAKE_OPT) -B $(BUILD_DIR) -G $(CMAKE_GENERATOR) + cmake $(CMAKE_OPT) -B $(BUILD_DIR) make: $(BUILD_DIR)/Makefile cmake --build $(BUILD_DIR) @@ -88,6 +88,9 @@ install: $(BUILD_DIR)/Makefile test: make ctest --verbose --test-dir build +set_gen: + cmake $(CMAKE_OPT) -B $(BUILD_DIR) -G $(CMAKE_GENERATOR) + crossbuilds: $(CROSSBUILD_TARGETS) DOCKER_OK := $(shell docker version 2> /dev/null) From 2d0bce837241c608cc07fe2fff092730923306e9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 25 Oct 2023 15:05:36 +0200 Subject: [PATCH 05/24] build: remove unnecessary target --- GNUmakefile | 6 ------ 1 file changed, 6 deletions(-) diff --git a/GNUmakefile b/GNUmakefile index e3f7e1000..67932a3bc 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -52,9 +52,6 @@ Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 Z_FEATURE_QUERYABLE?=1 -# Generator -CMAKE_GENERATOR?="Unix Makefiles" - # zenoh-pico/ directory ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) @@ -88,9 +85,6 @@ install: $(BUILD_DIR)/Makefile test: make ctest --verbose --test-dir build -set_gen: - cmake $(CMAKE_OPT) -B $(BUILD_DIR) -G $(CMAKE_GENERATOR) - crossbuilds: $(CROSSBUILD_TARGETS) DOCKER_OK := $(shell docker version 2> /dev/null) From 3b2c23a5e574f1916e29f43e1e3aca5e80de2702 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 6 Nov 2023 11:50:39 +0100 Subject: [PATCH 06/24] fix: remove old modular test --- CMakeLists.txt | 2 - tests/z_modular_test.c | 286 ----------------------------------------- 2 files changed, 288 deletions(-) delete mode 100644 tests/z_modular_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index d84f080e4..ac9c77c5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -290,7 +290,6 @@ if(UNIX OR MSVC) add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c) add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c) add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c) - add_executable(z_modular_test ${PROJECT_SOURCE_DIR}/tests/z_modular_test.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) @@ -299,7 +298,6 @@ if(UNIX OR MSVC) target_link_libraries(z_keyexpr_test ${Libname}) target_link_libraries(z_api_null_drop_test ${Libname}) target_link_libraries(z_api_double_drop_test ${Libname}) - target_link_libraries(z_modular_test ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) diff --git a/tests/z_modular_test.c b/tests/z_modular_test.c deleted file mode 100644 index 87a7b005c..000000000 --- a/tests/z_modular_test.c +++ /dev/null @@ -1,286 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, - -#include -#include -#include -#include - -#include "zenoh-pico.h" - -#undef NDEBUG -#include - -static const char *ARG_LIST[] = {"Z_FEATURE_PUBLICATION", "Z_FEATURE_SUBSCRIPTION", "Z_FEATURE_QUERYABLE", - "Z_FEATURE_QUERY"}; -#define ARG_NB (sizeof(ARG_LIST) / sizeof(ARG_LIST[0])) - -int test_publication(void) { -#if Z_FEATURE_PUBLICATION == 1 - const char *keyexpr = "demo/example/zenoh-pico-pub"; - const char *value = "Pub from Pico!"; - const char *mode = "client"; - - // Set up config - z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); - // Open session - printf("Opening session...\n"); - z_owned_session_t s = z_open(z_move(config)); - if (!z_check(s)) { - printf("Unable to open session!\n"); - return -1; - } - // Start read and lease tasks for zenoh-pico - if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { - printf("Unable to start read and lease tasks"); - return -1; - } - // Declare publisher - printf("Declaring publisher for '%s'...\n", keyexpr); - z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); - if (!z_check(pub)) { - printf("Unable to declare publisher for key expression!\n"); - return -1; - } - // Put data - printf("Putting Data ('%s': '%s')...\n", keyexpr, value); - z_publisher_put_options_t options = z_publisher_put_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), &options); - - // Clean-up - z_undeclare_publisher(z_move(pub)); - zp_stop_read_task(z_loan(s)); - zp_stop_lease_task(z_loan(s)); - z_close(z_move(s)); - return 1; -#else - return 0; -#endif -} - -#if Z_FEATURE_SUBSCRIPTION == 1 -static void subscription_data_handler(const z_sample_t *sample, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); - z_drop(z_move(keystr)); -} -#endif - -int test_subscription(void) { -#if Z_FEATURE_SUBSCRIPTION == 1 - const char *keyexpr = "demo/example/**"; - const char *mode = "client"; - - // Set up config - z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); - // Open session - printf("Opening session...\n"); - z_owned_session_t s = z_open(z_move(config)); - if (!z_check(s)) { - printf("Unable to open session!\n"); - return -1; - } - // Start read and lease tasks for zenoh-pico - if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { - printf("Unable to start read and lease tasks"); - return -1; - } - // Declare subscriber - z_owned_closure_sample_t callback = z_closure(subscription_data_handler); - printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); - if (!z_check(sub)) { - printf("Unable to declare subscriber.\n"); - return -1; - } - // Clean-up - z_undeclare_subscriber(z_move(sub)); - zp_stop_read_task(z_loan(s)); - zp_stop_lease_task(z_loan(s)); - z_close(z_move(s)); - return 1; -#else - return 0; -#endif -} - -#if Z_FEATURE_QUERYABLE == 1 -static const char *queryable_keyexpr = "demo/example/zenoh-pico-queryable"; -static const char *queryable_value = "Queryable from Pico!"; - -void query_handler(const z_query_t *query, void *ctx) { - (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query)); - z_bytes_t pred = z_query_parameters(query); - z_value_t payload_value = z_query_value(query); - printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); - if (payload_value.payload.len > 0) { - printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start); - } - z_query_reply_options_t options = z_query_reply_options_default(); - options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - z_query_reply(query, z_keyexpr(queryable_keyexpr), (const unsigned char *)queryable_value, strlen(queryable_value), - &options); - z_drop(z_move(keystr)); -} -#endif - -int test_queryable(void) { -#if Z_FEATURE_QUERYABLE == 1 - const char *mode = "client"; - - z_keyexpr_t ke = z_keyexpr(queryable_keyexpr); - if (!z_check(ke)) { - printf("%s is not a valid key expression", queryable_keyexpr); - return -1; - } - // Set up config - z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); - // Open session - printf("Opening session...\n"); - z_owned_session_t s = z_open(z_move(config)); - if (!z_check(s)) { - printf("Unable to open session!\n"); - return -1; - } - // Start read and lease tasks for zenoh-pico - if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { - printf("Unable to start read and lease tasks"); - return -1; - } - // Declare queryable - printf("Creating Queryable on '%s'...\n", queryable_keyexpr); - z_owned_closure_query_t callback = z_closure(query_handler); - z_owned_queryable_t qable = z_declare_queryable(z_loan(s), ke, z_move(callback), NULL); - if (!z_check(qable)) { - printf("Unable to create queryable.\n"); - return -1; - } - // Clean-up - z_undeclare_queryable(z_move(qable)); - zp_stop_read_task(z_loan(s)); - zp_stop_lease_task(z_loan(s)); - z_close(z_move(s)); - - return 1; -#else - return 0; -#endif -} - -#if Z_FEATURE_QUERY == 1 -void reply_dropper(void *ctx) { - (void)(ctx); - printf(">> Received query final notification\n"); -} - -void reply_handler(z_owned_reply_t *reply, void *ctx) { - (void)(ctx); - if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); - z_drop(z_move(keystr)); - } else { - printf(">> Received an error\n"); - } -} -#endif - -int test_query(void) { -#if Z_FEATURE_QUERY == 1 - const char *keyexpr = "demo/example/**"; - const char *mode = "client"; - - z_keyexpr_t ke = z_keyexpr(keyexpr); - if (!z_check(ke)) { - printf("%s is not a valid key expression", keyexpr); - return -1; - } - // Set up config - z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); - // Open session - printf("Opening session...\n"); - z_owned_session_t s = z_open(z_move(config)); - if (!z_check(s)) { - printf("Unable to open session!\n"); - return -1; - } - // Start read and lease tasks for zenoh-pico - if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { - printf("Unable to start read and lease tasks"); - return -1; - } - // Send query - printf("Sending Query '%s'...\n", keyexpr); - z_get_options_t opts = z_get_options_default(); - z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); - if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { - printf("Unable to send query.\n"); - return -1; - } - // Clean-up - zp_stop_read_task(z_loan(s)); - zp_stop_lease_task(z_loan(s)); - z_close(z_move(s)); - - return 1; -#else - return 0; -#endif -} - -// Send feature config as int list, and compare with compiled feature -int main(int argc, char **argv) { - if (argc < (int)(ARG_NB + 1)) { - printf("To start this test you must give the state of the feature config as argument\n"); - printf("Arg order: "); - for (size_t i = 0; i < ARG_NB; i++) { - printf("%s ", ARG_LIST[i]); - } - printf("\n"); - return -1; - } - if (test_publication() != atoi(argv[1])) { - printf("Problem during publication testing\n"); - return -1; - } else { - printf("Publication status ok\n"); - } - if (test_subscription() != atoi(argv[2])) { - printf("Problem during subscription testing\n"); - return -1; - } else { - printf("Subscription status ok\n"); - } - if (test_queryable() != atoi(argv[3])) { - printf("Problem during queryable testing\n"); - return -1; - } else { - printf("Queryable status ok\n"); - } - if (test_query() != atoi(argv[4])) { - printf("Problem during query testing\n"); - return -1; - } else { - printf("Query status ok\n"); - } - return 0; -} From c29a1c2d278acfecf0ea07bbae1d69b576e0a611 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 8 Nov 2023 17:13:51 +0100 Subject: [PATCH 07/24] build: update actions deps --- .github/workflows/codacy-analysis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/codacy-analysis.yml b/.github/workflows/codacy-analysis.yml index 056e81353..29ffb6378 100644 --- a/.github/workflows/codacy-analysis.yml +++ b/.github/workflows/codacy-analysis.yml @@ -25,7 +25,8 @@ jobs: # Execute Codacy Analysis CLI and generate a SARIF output with the security issues identified during the analysis - name: Run Codacy Analysis CLI - uses: codacy/codacy-analysis-cli-action@1.1.0 + uses: codacy/codacy-analysis-cli-action@v4.3.0 + with: # Check https://github.com/codacy/codacy-analysis-cli#project-token to get your project token from your Codacy repository # You can also omit the token and run the tools that support default configurations @@ -41,6 +42,6 @@ jobs: # Upload the SARIF file generated in the previous step - name: Upload SARIF results file - uses: github/codeql-action/upload-sarif@v1 + uses: github/codeql-action/upload-sarif@v2.22.5 with: sarif_file: results.sarif From 64d90bc21e0a67b527a689807f2a6883441eb47b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 8 Nov 2023 17:23:02 +0100 Subject: [PATCH 08/24] fix: revert codacy workflow changes --- .github/workflows/codacy-analysis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codacy-analysis.yml b/.github/workflows/codacy-analysis.yml index 29ffb6378..056e81353 100644 --- a/.github/workflows/codacy-analysis.yml +++ b/.github/workflows/codacy-analysis.yml @@ -25,8 +25,7 @@ jobs: # Execute Codacy Analysis CLI and generate a SARIF output with the security issues identified during the analysis - name: Run Codacy Analysis CLI - uses: codacy/codacy-analysis-cli-action@v4.3.0 - + uses: codacy/codacy-analysis-cli-action@1.1.0 with: # Check https://github.com/codacy/codacy-analysis-cli#project-token to get your project token from your Codacy repository # You can also omit the token and run the tools that support default configurations @@ -42,6 +41,6 @@ jobs: # Upload the SARIF file generated in the previous step - name: Upload SARIF results file - uses: github/codeql-action/upload-sarif@v2.22.5 + uses: github/codeql-action/upload-sarif@v1 with: sarif_file: results.sarif From 23ab70df65e777764ff3f7fcd6dc195e4cf30c55 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 12:01:26 +0100 Subject: [PATCH 09/24] feat: switch capabilities from bitfield to enum --- include/zenoh-pico/link/link.h | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 9a49f2862..8caef96e2 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -46,22 +46,17 @@ * Link capabilities values, defined as a bitmask. * * Enumerators: - * Z_LINK_CAPABILITY_NONE: Bitmask to define that link has no capabilities. - * Z_LINK_CAPABILITY_RELIABLE: Bitmask to define and check if link is reliable. - * Z_LINK_CAPABILITY_STREAMED: Bitmask to define and check if link is streamed. - * Z_LINK_CAPABILITY_MULTICAST: Bitmask to define and check if link is multicast. + * Z_LINK_CAP_UNICAST_STREAM: Link has unicast stream capabilities. + * Z_LINK_CAP_UNICAST_DATAGRAM: Link has unicast datagram capabilities. + * Z_LINK_CAP_MULTICAST_DATAGRAM: Link has multicast datagram capabilities. */ typedef enum { - Z_LINK_CAPABILITY_NONE = 0x00, // 0 - Z_LINK_CAPABILITY_RELIABLE = 0x01, // 1 << 0 - Z_LINK_CAPABILITY_STREAMED = 0x02, // 1 << 1 - Z_LINK_CAPABILITY_MULTICAST = 0x04 // 1 << 2 + Z_LINK_CAP_UNICAST_STREAM = 0, + Z_LINK_CAP_UNICAST_DATAGRAM, + Z_LINK_CAP_MULTICAST_STREAM, + Z_LINK_CAP_MULTICAST_DATAGRAM, } _z_link_capabilities_t; -#define _Z_LINK_IS_RELIABLE(X) ((X & Z_LINK_CAPABILITY_RELIABLE) == Z_LINK_CAPABILITY_RELIABLE) -#define _Z_LINK_IS_STREAMED(X) ((X & Z_LINK_CAPABILITY_STREAMED) == Z_LINK_CAPABILITY_STREAMED) -#define _Z_LINK_IS_MULTICAST(X) ((X & Z_LINK_CAPABILITY_MULTICAST) == Z_LINK_CAPABILITY_MULTICAST) - struct _z_link_t; // Forward declaration to be used in _z_f_link_* typedef int8_t (*_z_f_link_open)(struct _z_link_t *self); @@ -105,6 +100,7 @@ typedef struct _z_link_t { uint16_t _mtu; uint8_t _capabilities; + bool _is_reliable; } _z_link_t; void _z_link_clear(_z_link_t *zl); From b27a6c2b5b24a07af5f6e7f38b30fa2fbe45ea1e Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 12:03:12 +0100 Subject: [PATCH 10/24] feat: change unsafe wbuf signatures --- include/zenoh-pico/transport/common/tx.h | 4 +-- src/transport/common/tx.c | 42 +++++++++++++++++------- src/transport/multicast/tx.c | 12 +++---- src/transport/unicast/tx.c | 12 +++---- 4 files changed, 44 insertions(+), 26 deletions(-) diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 33ba5593f..634b38e2e 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index f0d68eb6c..02fc5e3d2 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,14 +27,23 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { _z_wbuf_reset(buf); - if (is_streamed == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, 0, i); - } - _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + switch (link_capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, 0, i); + } + _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + default: + break; } } @@ -43,12 +52,21 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) { - if (is_streamed == true) { - size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); - } +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { + switch (link_capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + default: + break; } } diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index c2f665a19..94ff3c333 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); // Encode the session message ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); if (ret == _Z_RES_OK) { @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 6e7ea4f85..93997048a 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); // Encode the session message ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); if (ret == _Z_RES_OK) { @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); if (ztu->_wbuf._ioss._len == 1) { ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, _Z_LINK_IS_STREAMED(ztu->_link._capabilities)); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { From da4aaecaef502f7f781df63324946eb3ace208a6 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 12:05:02 +0100 Subject: [PATCH 11/24] feat: update link capabilities init --- src/link/multicast/bt.c | 3 ++- src/link/multicast/udp.c | 3 ++- src/link/unicast/serial.c | 3 ++- src/link/unicast/tcp.c | 3 ++- src/link/unicast/udp.c | 3 ++- src/link/unicast/ws.c | 3 ++- 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/link/multicast/bt.c b/src/link/multicast/bt.c index 332e5f9fa..3ece71c40 100644 --- a/src/link/multicast/bt.c +++ b/src/link/multicast/bt.c @@ -115,7 +115,8 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; } int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_STREAMED | Z_LINK_CAPABILITY_MULTICAST; + zl->_capabilities = Z_LINK_CAP_MULTICAST_STREAM; + zl->_is_reliable = false; zl->_mtu = _z_get_link_mtu_bt(); zl->_endpoint = endpoint; diff --git a/src/link/multicast/udp.c b/src/link/multicast/udp.c index bcb2a4dc8..6385fa1a6 100644 --- a/src/link/multicast/udp.c +++ b/src/link/multicast/udp.c @@ -171,7 +171,8 @@ uint16_t _z_get_link_mtu_udp_multicast(void) { int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_MULTICAST; + zl->_capabilities = Z_LINK_CAP_MULTICAST_DATAGRAM; + zl->_is_reliable = false; zl->_mtu = _z_get_link_mtu_udp_multicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/serial.c b/src/link/unicast/serial.c index eebbff12a..cafb2d681 100644 --- a/src/link/unicast/serial.c +++ b/src/link/unicast/serial.c @@ -116,7 +116,8 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; } int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; + zl->_is_reliable = false; zl->_mtu = _z_get_link_mtu_serial(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/tcp.c b/src/link/unicast/tcp.c index b7b78fa43..a6794df4f 100644 --- a/src/link/unicast/tcp.c +++ b/src/link/unicast/tcp.c @@ -156,7 +156,8 @@ uint16_t _z_get_link_mtu_tcp(void) { int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE | Z_LINK_CAPABILITY_STREAMED; + zl->_capabilities = Z_LINK_CAP_UNICAST_STREAM; + zl->_is_reliable = true; zl->_mtu = _z_get_link_mtu_tcp(); zl->_endpoint = *endpoint; diff --git a/src/link/unicast/udp.c b/src/link/unicast/udp.c index c87441709..83289a6d8 100644 --- a/src/link/unicast/udp.c +++ b/src/link/unicast/udp.c @@ -162,7 +162,8 @@ uint16_t _z_get_link_mtu_udp_unicast(void) { int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_NONE; + zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; + zl->_is_reliable = false; zl->_mtu = _z_get_link_mtu_udp_unicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/ws.c b/src/link/unicast/ws.c index 426e5fa96..1f5b92d5c 100644 --- a/src/link/unicast/ws.c +++ b/src/link/unicast/ws.c @@ -157,7 +157,8 @@ uint16_t _z_get_link_mtu_ws(void) { int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE; + zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; + zl->is_reliable = true; zl->_mtu = _z_get_link_mtu_ws(); zl->_endpoint = *endpoint; From 7ab167c99efd85642bac09c41b946ccd44458c12 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:21:55 +0100 Subject: [PATCH 12/24] feat: update capabilities in unicast --- src/transport/unicast/read.c | 50 +++++++++++++++------------ src/transport/unicast/rx.c | 57 +++++++++++++++++-------------- src/transport/unicast/transport.c | 14 +++++++- 3 files changed, 74 insertions(+), 47 deletions(-) diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 4d0728eee..d3fcbbdad 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -68,35 +68,43 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (ztu->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + continue; + } + } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztu->_zbuf, to_read); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 965ec58c1..c5ee7748e 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -37,35 +37,42 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztu->_link._capabilities) == true) { - if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + switch (ztu->_link._capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztu->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztu->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - continue; + _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (_z_zbuf_len(&ztu->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztu->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + continue; + } } - } - } else { - _z_zbuf_compact(&ztu->_zbuf); - to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + _z_zbuf_compact(&ztu->_zbuf); + to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 0aa951b5b..046b41630 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -50,8 +50,20 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo // Initialize the read and write buffers if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; - _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); size_t dbuf_size = 0; + _Bool expandable = false; + + switch (zl->_capabilities) { + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + expandable = true; + break; + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + default: + expandable = false; + break; + } #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 expandable = false; From a53f7f2841902646e391d9f6da5c9868c320c1ec Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:22:08 +0100 Subject: [PATCH 13/24] feat: update capabilities in multicast --- src/transport/multicast/read.c | 53 ++++++++++++++++++------------- src/transport/multicast/rx.c | 57 +++++++++++++++++++--------------- 2 files changed, 63 insertions(+), 47 deletions(-) diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index a0c483a6b..ce221fe64 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -72,36 +72,45 @@ void *_zp_multicast_read_task(void *ztm_arg) { while (ztm->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + + switch (ztm->_link._capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_bytes_clear(&addr); - _z_zbuf_compact(&ztm->_zbuf); - continue; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_bytes_clear(&addr); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } } - } - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); + } - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, NULL); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + continue; + } + } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); + if (to_read == SIZE_MAX) { continue; } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); - if (to_read == SIZE_MAX) { - continue; - } + break; + default: + break; } - // Wrap the main buffer for to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 6d73c10f8..0893b4307 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -59,35 +59,42 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me size_t to_read = 0; do { - if (_Z_LINK_IS_STREAMED(ztm->_link._capabilities) == true) { - if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + switch (ztm->_link._capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } + } + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); } - } - - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - to_read |= _z_zbuf_read(&ztm->_zbuf) << (i * (uint8_t)8); - } - - if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < to_read) { - _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); - _z_zbuf_compact(&ztm->_zbuf); - ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; - break; + _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (_z_zbuf_len(&ztm->_zbuf) < to_read) { + _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) - _Z_MSG_LEN_ENC_SIZE); + _z_zbuf_compact(&ztm->_zbuf); + ret = _Z_ERR_TRANSPORT_NOT_ENOUGH_BYTES; + break; + } } - } - } else { - _z_zbuf_compact(&ztm->_zbuf); - to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); - if (to_read == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + _z_zbuf_compact(&ztm->_zbuf); + to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); + if (to_read == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + break; } } while (false); // The 1-iteration loop to use continue to break the entire loop on error From d86a1bbd7e1ddd0a7716a0e00c2f11fc5d25bb22 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:22:23 +0100 Subject: [PATCH 14/24] feat: update capabilities in common transport --- src/transport/common/rx.c | 51 +++++++++++++++---------- src/transport/common/tx.c | 47 ++++++++++++++++------- src/transport/manager.c | 80 +++++++++++++++++++++++++-------------- 3 files changed, 116 insertions(+), 62 deletions(-) diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 67692eb3c..2476a9b9a 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -28,32 +28,41 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - // Read the message length - if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { - size_t len = 0; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); - } + switch (zl->_capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + // Read the message length + if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { + size_t len = 0; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8)); + } - size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); - if (writable >= len) { - // Read enough bytes to decode the message - if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; + size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf); + if (writable >= len) { + // Read enough bytes to decode the message + if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + } else { + ret = _Z_ERR_TRANSPORT_NO_SPACE; } } else { - ret = _Z_ERR_TRANSPORT_NO_SPACE; + ret = _Z_ERR_TRANSPORT_RX_FAILED; } - } else { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } - } else { - if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { - ret = _Z_ERR_TRANSPORT_RX_FAILED; - } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { + ret = _Z_ERR_TRANSPORT_RX_FAILED; + } + break; + default: + ret = _Z_ERR_GENERIC; + break; } - if (ret == _Z_RES_OK) { _z_transport_message_t l_t_msg; ret = _z_transport_message_decode(&l_t_msg, &zbf); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 02fc5e3d2..81f2b1b3f 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -92,24 +92,45 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m // Create and prepare the buffer to serialize the message on uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, 0, i); - } - _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); - } + switch (zl->_capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, 0, i); + } + _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; + } // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - // Write the message length in the reserved space if needed - if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { - size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; - for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { - _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); - } + switch (zl->_capabilities) { + // Stream capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + // Write the message length in the reserved space if needed + size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; + for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { + _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); + } + break; + // Datagram capable links + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + break; + default: + ret = _Z_ERR_GENERIC; + break; } - // Send the wbuf on the socket ret = _z_link_send_wbuf(zl, &wbf); } diff --git a/src/transport/manager.c b/src/transport/manager.c index 06662e513..bc85a8dbc 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -31,22 +31,34 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local return ret; } // Open transport - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._capabilities) { + // Unicast transport + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_UNICAST_DATAGRAM: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_client(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + // Multicast transport + case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_client(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } @@ -61,22 +73,34 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z if (ret != _Z_RES_OK) { return ret; } - if (_Z_LINK_IS_MULTICAST(zl._capabilities)) { - _z_transport_multicast_establish_param_t tp_param; - ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + switch (zl._capabilities) { + // Unicast capable links + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_UNICAST_DATAGRAM: { + _z_transport_unicast_establish_param_t tp_param; + ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_unicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_multicast_transport_create(zt, &zl, &tp_param); - } else { - _z_transport_unicast_establish_param_t tp_param; - ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); - if (ret != _Z_RES_OK) { - _z_link_clear(&zl); - return ret; + // Multicast capable links + case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: { + _z_transport_multicast_establish_param_t tp_param; + ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); + if (ret != _Z_RES_OK) { + _z_link_clear(&zl); + return ret; + } + ret = _z_multicast_transport_create(zt, &zl, &tp_param); + break; } - ret = _z_unicast_transport_create(zt, &zl, &tp_param); + default: + ret = _Z_ERR_GENERIC; + break; } return ret; } From 0b247b0a2cee22388e7ad6345e848425d28ee6b2 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:22:48 +0100 Subject: [PATCH 15/24] feat: update capabilities in link --- src/link/link.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/link/link.c b/src/link/link.c index 1a5f89925..b240d606e 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -148,7 +148,19 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; - _Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities); + _Bool link_is_streamed = false; + + switch (link->_capabilities) { + case Z_LINK_CAP_UNICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: + link_is_streamed = true; + break; + case Z_LINK_CAP_UNICAST_DATAGRAM: + case Z_LINK_CAP_MULTICAST_DATAGRAM: + default: + link_is_streamed = false; + break; + } for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); size_t n = bs.len; From 18645d9969f07b384a9cae7423a2a937175216b7 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:24:15 +0100 Subject: [PATCH 16/24] doc: add missing capabilities enum value --- include/zenoh-pico/link/link.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 8caef96e2..bcf4e0348 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -48,6 +48,7 @@ * Enumerators: * Z_LINK_CAP_UNICAST_STREAM: Link has unicast stream capabilities. * Z_LINK_CAP_UNICAST_DATAGRAM: Link has unicast datagram capabilities. + * Z_LINK_CAP_MULTICAST_STREAM: Link has multicast stream capabilities. * Z_LINK_CAP_MULTICAST_DATAGRAM: Link has multicast datagram capabilities. */ typedef enum { From cf76ee368d18ce372ed4c4acee49195649f49039 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:34:56 +0100 Subject: [PATCH 17/24] fix: add needed blocks to switch --- src/transport/common/tx.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 81f2b1b3f..ca17089c3 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -56,12 +56,13 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { switch (link_capabilities) { // Stream capable links case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: { size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); } break; + } // Datagram capable links case Z_LINK_CAP_UNICAST_DATAGRAM: case Z_LINK_CAP_MULTICAST_DATAGRAM: @@ -116,13 +117,14 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m switch (zl->_capabilities) { // Stream capable links case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_MULTICAST_STREAM: { // Write the message length in the reserved space if needed size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); } break; + } // Datagram capable links case Z_LINK_CAP_UNICAST_DATAGRAM: case Z_LINK_CAP_MULTICAST_DATAGRAM: From b68033267bc097a937b21ea4eac09f14bd6b90ba Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 14 Nov 2023 13:43:15 +0100 Subject: [PATCH 18/24] fix: typo --- src/link/unicast/ws.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/link/unicast/ws.c b/src/link/unicast/ws.c index 1f5b92d5c..f5c91ce1b 100644 --- a/src/link/unicast/ws.c +++ b/src/link/unicast/ws.c @@ -158,7 +158,7 @@ int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; - zl->is_reliable = true; + zl->_is_reliable = true; zl->_mtu = _z_get_link_mtu_ws(); zl->_endpoint = *endpoint; From d73b3d1dbc5ccd4d169a6a485af96df273c69a97 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 16 Nov 2023 14:39:19 +0100 Subject: [PATCH 19/24] fix: define all enum values --- include/zenoh-pico/link/link.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index bcf4e0348..b0384e06d 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -53,9 +53,9 @@ */ typedef enum { Z_LINK_CAP_UNICAST_STREAM = 0, - Z_LINK_CAP_UNICAST_DATAGRAM, - Z_LINK_CAP_MULTICAST_STREAM, - Z_LINK_CAP_MULTICAST_DATAGRAM, + Z_LINK_CAP_UNICAST_DATAGRAM = 1, + Z_LINK_CAP_MULTICAST_STREAM = 2, + Z_LINK_CAP_MULTICAST_DATAGRAM = 3, } _z_link_capabilities_t; struct _z_link_t; // Forward declaration to be used in _z_f_link_* From c525a188f59fd8b84aff2dfd6f177985ec7f68e8 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:07:04 +0100 Subject: [PATCH 20/24] feat: switch to register-like structure --- include/zenoh-pico/link/link.h | 45 +++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index b0384e06d..8d24d50f0 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -43,19 +43,43 @@ #include "zenoh-pico/utils/result.h" /** - * Link capabilities values, defined as a bitmask. + * Link transport capability enum. * * Enumerators: - * Z_LINK_CAP_UNICAST_STREAM: Link has unicast stream capabilities. - * Z_LINK_CAP_UNICAST_DATAGRAM: Link has unicast datagram capabilities. - * Z_LINK_CAP_MULTICAST_STREAM: Link has multicast stream capabilities. - * Z_LINK_CAP_MULTICAST_DATAGRAM: Link has multicast datagram capabilities. + * Z_LINK_CAP_TRANSPORT_UNICAST: Link has unicast capabilities. + * Z_LINK_CAP_TRANSPORT_MULTICAST: Link has multicast capabilities. */ typedef enum { - Z_LINK_CAP_UNICAST_STREAM = 0, - Z_LINK_CAP_UNICAST_DATAGRAM = 1, - Z_LINK_CAP_MULTICAST_STREAM = 2, - Z_LINK_CAP_MULTICAST_DATAGRAM = 3, + Z_LINK_CAP_TRANSPORT_UNICAST = 0, + Z_LINK_CAP_TRANSPORT_MULTICAST = 1, +} _z_link_cap_transport_t; + +/** + * Link flow capability enum. + * + * Enumerators: + * Z_LINK_CAP_FLOW_STREAM: Link use datagrams. + * Z_LINK_CAP_FLOW_DATAGRAM: Link use byte stream. + */ +typedef enum { + Z_LINK_CAP_FLOW_DATAGRAM = 0, + Z_LINK_CAP_FLOW_STREAM = 1, +} _z_link_cap_flow_t; + +/** + * Link capabilities, stored as a register-like object. + * + * Fields: + * transport: 2 bits, see _z_link_cap_transport_t enum. + * flow: 1 bit, see _z_link_cap_flow_t enum. + * reliable: 1 bit, 1 if the link is reliable (network definition) + * reserved: 4 bits, reserved for futur use + */ +typedef struct _z_link_capabilities_t { + _z_link_cap_transport_t _transport: 2; + _z_link_cap_flow_t _flow: 1; + _Bool _is_reliable: 1; + uint8_t _reserved: 4; } _z_link_capabilities_t; struct _z_link_t; // Forward declaration to be used in _z_f_link_* @@ -100,8 +124,7 @@ typedef struct _z_link_t { _z_f_link_free _free_f; uint16_t _mtu; - uint8_t _capabilities; - bool _is_reliable; + _z_link_capabilities_t _cap; } _z_link_t; void _z_link_clear(_z_link_t *zl); From b29b95f2c6d8b39500132c6f9993aed53dbd60d9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:07:35 +0100 Subject: [PATCH 21/24] feat: update link files --- src/link/link.c | 8 +++----- src/link/multicast/bt.c | 6 ++++-- src/link/multicast/udp.c | 6 ++++-- src/link/unicast/serial.c | 6 ++++-- src/link/unicast/tcp.c | 6 ++++-- src/link/unicast/udp.c | 6 ++++-- src/link/unicast/ws.c | 6 ++++-- 7 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/link/link.c b/src/link/link.c index b240d606e..2888da55d 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -150,13 +150,11 @@ int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; _Bool link_is_streamed = false; - switch (link->_capabilities) { - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (link->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: link_is_streamed = true; break; - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: link_is_streamed = false; break; diff --git a/src/link/multicast/bt.c b/src/link/multicast/bt.c index 3ece71c40..f5483d932 100644 --- a/src/link/multicast/bt.c +++ b/src/link/multicast/bt.c @@ -115,8 +115,10 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; } int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_MULTICAST_STREAM; - zl->_is_reliable = false; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_bt(); zl->_endpoint = endpoint; diff --git a/src/link/multicast/udp.c b/src/link/multicast/udp.c index 6385fa1a6..dd0cd108b 100644 --- a/src/link/multicast/udp.c +++ b/src/link/multicast/udp.c @@ -171,8 +171,10 @@ uint16_t _z_get_link_mtu_udp_multicast(void) { int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_MULTICAST_DATAGRAM; - zl->_is_reliable = false; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_multicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/serial.c b/src/link/unicast/serial.c index cafb2d681..db034f07c 100644 --- a/src/link/unicast/serial.c +++ b/src/link/unicast/serial.c @@ -116,8 +116,10 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; } int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; - zl->_is_reliable = false; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_serial(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/tcp.c b/src/link/unicast/tcp.c index a6794df4f..a6cf3de7a 100644 --- a/src/link/unicast/tcp.c +++ b/src/link/unicast/tcp.c @@ -156,8 +156,10 @@ uint16_t _z_get_link_mtu_tcp(void) { int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_UNICAST_STREAM; - zl->_is_reliable = true; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_tcp(); zl->_endpoint = *endpoint; diff --git a/src/link/unicast/udp.c b/src/link/unicast/udp.c index 83289a6d8..2317c1fd4 100644 --- a/src/link/unicast/udp.c +++ b/src/link/unicast/udp.c @@ -162,8 +162,10 @@ uint16_t _z_get_link_mtu_udp_unicast(void) { int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; - zl->_is_reliable = false; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = false; + zl->_mtu = _z_get_link_mtu_udp_unicast(); zl->_endpoint = endpoint; diff --git a/src/link/unicast/ws.c b/src/link/unicast/ws.c index f5c91ce1b..f59fde872 100644 --- a/src/link/unicast/ws.c +++ b/src/link/unicast/ws.c @@ -157,8 +157,10 @@ uint16_t _z_get_link_mtu_ws(void) { int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) { int8_t ret = _Z_RES_OK; - zl->_capabilities = Z_LINK_CAP_UNICAST_DATAGRAM; - zl->_is_reliable = true; + zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST; + zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM; + zl->_cap._is_reliable = true; + zl->_mtu = _z_get_link_mtu_ws(); zl->_endpoint = *endpoint; From 6e7a3c973a71a3b8cb9ef8a25de7e0dc148d498a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:09:09 +0100 Subject: [PATCH 22/24] feat: update transport files --- include/zenoh-pico/transport/common/tx.h | 4 +-- src/transport/common/rx.c | 10 ++---- src/transport/common/tx.c | 40 +++++++++--------------- src/transport/manager.c | 18 ++++------- src/transport/multicast/read.c | 10 ++---- src/transport/multicast/rx.c | 9 ++---- src/transport/multicast/tx.c | 12 +++---- src/transport/unicast/read.c | 10 ++---- src/transport/unicast/rx.c | 8 ++--- src/transport/unicast/transport.c | 8 ++--- src/transport/unicast/tx.c | 12 +++---- 11 files changed, 52 insertions(+), 89 deletions(-) diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 634b38e2e..d11c1c80e 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 2476a9b9a..8f25fff0f 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -28,10 +28,8 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: // Read the message length if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) { size_t len = 0; @@ -52,9 +50,7 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) { ret = _Z_ERR_TRANSPORT_RX_FAILED; } diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index ca17089c3..12fc69e24 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,21 +27,19 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { _z_wbuf_reset(buf); - switch (link_capabilities) { + switch (flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_FLOW_STREAM: for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(buf, 0, i); } _z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE); break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: break; } @@ -52,11 +50,10 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { - switch (link_capabilities) { +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { + switch (flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: { + case Z_LINK_CAP_FLOW_STREAM: { size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i); @@ -64,8 +61,7 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_capabilities) { break; } // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: break; } @@ -94,18 +90,14 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { _z_wbuf_put(&wbf, 0, i); } _z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE); break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: break; default: ret = _Z_ERR_GENERIC; @@ -114,10 +106,8 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m // Encode the session message ret = _z_transport_message_encode(&wbf, t_msg); if (ret == _Z_RES_OK) { - switch (zl->_capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: { + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: { // Write the message length in the reserved space if needed size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE; for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { @@ -125,9 +115,7 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m } break; } - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: break; default: ret = _Z_ERR_GENERIC; diff --git a/src/transport/manager.c b/src/transport/manager.c index bc85a8dbc..7d8f79811 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -31,10 +31,9 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local return ret; } // Open transport - switch (zl._capabilities) { + switch (zl._cap._transport) { // Unicast transport - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_UNICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_UNICAST: { _z_transport_unicast_establish_param_t tp_param; ret = _z_unicast_open_client(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -45,8 +44,7 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local break; } // Multicast transport - case Z_LINK_CAP_MULTICAST_STREAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_client(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -73,10 +71,8 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z if (ret != _Z_RES_OK) { return ret; } - switch (zl._capabilities) { - // Unicast capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_UNICAST_DATAGRAM: { + switch (zl._cap._transport) { + case Z_LINK_CAP_TRANSPORT_UNICAST: { _z_transport_unicast_establish_param_t tp_param; ret = _z_unicast_open_peer(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { @@ -86,9 +82,7 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z ret = _z_unicast_transport_create(zt, &zl, &tp_param); break; } - // Multicast capable links - case Z_LINK_CAP_MULTICAST_STREAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: { + case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); if (ret != _Z_RES_OK) { diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index ce221fe64..12073d0ce 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -73,10 +73,8 @@ void *_zp_multicast_read_task(void *ztm_arg) { // Read bytes from socket to the main buffer size_t to_read = 0; - switch (ztm->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -99,9 +97,7 @@ void *_zp_multicast_read_task(void *ztm_arg) { } } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztm->_zbuf); to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, &addr); if (to_read == SIZE_MAX) { diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 0893b4307..5d43eb60c 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -59,10 +59,8 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me size_t to_read = 0; do { - switch (ztm->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztm->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -85,8 +83,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me } break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztm->_zbuf); to_read = _z_link_recv_zbuf(&ztm->_link, &ztm->_zbuf, addr); if (to_read == SIZE_MAX) { diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 94ff3c333..230c8c85a 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -50,13 +50,13 @@ int8_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztm->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); if (ret == _Z_RES_OK) { @@ -97,7 +97,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); _z_zint_t sn = __unsafe_z_multicast_get_sn(ztm, reliability); // Get the next sequence number @@ -107,7 +107,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m ret = _z_network_message_encode(&ztm->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { @@ -128,13 +128,13 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index d3fcbbdad..02b95ddef 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -68,10 +68,8 @@ void *_zp_unicast_read_task(void *ztu_arg) { while (ztu->_read_task_running == true) { // Read bytes from socket to the main buffer size_t to_read = 0; - switch (ztu->_link._capabilities) { - // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (ztu->_link._cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -93,9 +91,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { } } break; - // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztu->_zbuf); to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (to_read == SIZE_MAX) { diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index c5ee7748e..c1e3faa2d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -37,10 +37,9 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag size_t to_read = 0; do { - switch (ztu->_link._capabilities) { + switch (ztu->_link._cap._flow) { // Stream capable links - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (_z_zbuf_len(&ztu->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -63,8 +62,7 @@ int8_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_messag } break; // Datagram capable links - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: _z_zbuf_compact(&ztu->_zbuf); to_read = _z_link_recv_zbuf(&ztu->_link, &ztu->_zbuf, NULL); if (to_read == SIZE_MAX) { diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 046b41630..15f660bd8 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -53,13 +53,11 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo size_t dbuf_size = 0; _Bool expandable = false; - switch (zl->_capabilities) { - case Z_LINK_CAP_UNICAST_STREAM: - case Z_LINK_CAP_MULTICAST_STREAM: + switch (zl->_cap._flow) { + case Z_LINK_CAP_FLOW_STREAM: expandable = true; break; - case Z_LINK_CAP_UNICAST_DATAGRAM: - case Z_LINK_CAP_MULTICAST_DATAGRAM: + case Z_LINK_CAP_FLOW_DATAGRAM: default: expandable = false; break; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 93997048a..21694e1e9 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -53,13 +53,13 @@ int8_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_mes #endif // Z_FEATURE_MULTI_THREAD == 1 // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Encode the session message ret = _z_transport_message_encode(&ztu->_wbuf, t_msg); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Send the wbuf on the socket ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); if (ret == _Z_RES_OK) { @@ -100,7 +100,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg if (drop == false) { // Prepare the buffer eventually reserving space for the message length - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); _z_zint_t sn = __unsafe_z_unicast_get_sn(ztu, reliability); // Get the next sequence number @@ -110,7 +110,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg ret = _z_network_message_encode(&ztu->_wbuf, n_msg); // Encode the network message if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); if (ztu->_wbuf._ioss._len == 1) { ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket @@ -137,13 +137,13 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg is_first = false; // Clear the buffer for serialization - __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed - __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._capabilities); + __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); ret = _z_link_send_wbuf(&ztu->_link, &ztu->_wbuf); // Send the wbuf on the socket if (ret == _Z_RES_OK) { From 16d5e3f2a60be0cc5ef2e2d62698b4f582d9e5a9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:19:23 +0100 Subject: [PATCH 23/24] fix: run clang-format --- include/zenoh-pico/link/link.h | 8 ++++---- src/link/multicast/bt.c | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 8d24d50f0..568d9c68e 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -76,10 +76,10 @@ typedef enum { * reserved: 4 bits, reserved for futur use */ typedef struct _z_link_capabilities_t { - _z_link_cap_transport_t _transport: 2; - _z_link_cap_flow_t _flow: 1; - _Bool _is_reliable: 1; - uint8_t _reserved: 4; + _z_link_cap_transport_t _transport : 2; + _z_link_cap_flow_t _flow : 1; + _Bool _is_reliable : 1; + uint8_t _reserved : 4; } _z_link_capabilities_t; struct _z_link_t; // Forward declaration to be used in _z_f_link_* diff --git a/src/link/multicast/bt.c b/src/link/multicast/bt.c index f5483d932..9c54b966c 100644 --- a/src/link/multicast/bt.c +++ b/src/link/multicast/bt.c @@ -118,7 +118,7 @@ int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) { zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST; zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM; zl->_cap._is_reliable = false; - + zl->_mtu = _z_get_link_mtu_bt(); zl->_endpoint = endpoint; From 52cdb36c0d9f67939adb3be9376fbda414d7e776 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 17 Nov 2023 15:30:37 +0100 Subject: [PATCH 24/24] fix: use same time in the struct to reduce size --- include/zenoh-pico/link/link.h | 6 +++--- include/zenoh-pico/transport/common/tx.h | 4 ++-- src/transport/common/tx.c | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 568d9c68e..32c9ebb42 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -76,9 +76,9 @@ typedef enum { * reserved: 4 bits, reserved for futur use */ typedef struct _z_link_capabilities_t { - _z_link_cap_transport_t _transport : 2; - _z_link_cap_flow_t _flow : 1; - _Bool _is_reliable : 1; + uint8_t _transport : 2; + uint8_t _flow : 1; + uint8_t _is_reliable : 1; uint8_t _reserved : 4; } _z_link_capabilities_t; diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index d11c1c80e..d7870c54a 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -19,8 +19,8 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/transport/transport.h" -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow); +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 12fc69e24..6f38ac365 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -27,10 +27,10 @@ * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { +void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { _z_wbuf_reset(buf); - switch (flow) { + switch (link_flow_capability) { // Stream capable links case Z_LINK_CAP_FLOW_STREAM: for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { @@ -50,8 +50,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { * Make sure that the following mutexes are locked before calling this function: * - ztu->mutex_tx */ -void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _z_link_cap_flow_t flow) { - switch (flow) { +void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) { + switch (link_flow_capability) { // Stream capable links case Z_LINK_CAP_FLOW_STREAM: { size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;