diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 01ab37196..5ba583b28 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -95,6 +95,31 @@ jobs: env: Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }} + st_build: + name: Build and test in single thread on ubuntu-latest + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run docker image + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest + + - name: Build project and run test + run: | + sudo apt install -y ninja-build + CMAKE_GENERATOR=Ninja make + python3 ./build/tests/single_thread.py + timeout-minutes: 5 + env: + Z_FEATURE_MULTI_THREAD: 0 + + - name: Stop docker image + if: always() + run: | + docker stop zenoh_router + docker rm zenoh_router + fragment_test: name: Test multicast and unicast fragmentation runs-on: ubuntu-latest diff --git a/CMakeLists.txt b/CMakeLists.txt index 426f85cc6..2398e80bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,17 +112,20 @@ if (BATCH_UNICAST_SIZE) endif() # Zenoh pico feature configuration options +set(Z_FEATURE_MULTI_THREAD 1 CACHE STRING "Toggle multithread feature") set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature") set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature") +add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD}) add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION}) add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION}) add_definition(Z_FEATURE_QUERY=${Z_FEATURE_QUERY}) add_definition(Z_FEATURE_QUERYABLE=${Z_FEATURE_QUERYABLE}) add_definition(Z_FEATURE_RAWETH_TRANSPORT=${Z_FEATURE_RAWETH_TRANSPORT}) message(STATUS "Building with feature confing:\n\ +* MULTI-THREAD: ${Z_FEATURE_MULTI_THREAD}\n\ * PUBLICATION: ${Z_FEATURE_PUBLICATION}\n\ * SUBSCRIPTION: ${Z_FEATURE_SUBSCRIPTION}\n\ * QUERY: ${Z_FEATURE_QUERY}\n\ @@ -328,6 +331,7 @@ if(UNIX OR MSVC) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/fragment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/fragment.py COPYONLY) + configure_file(${PROJECT_SOURCE_DIR}/tests/single_thread.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/single_thread.py COPYONLY) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) diff --git a/GNUmakefile b/GNUmakefile index f6ad15055..e8299b14c 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -47,6 +47,7 @@ ZENOH_DEBUG?=0 # Feature config toggle # Accepted values: 0, 1 +Z_FEATURE_MULTI_THREAD?=1 Z_FEATURE_PUBLICATION?=1 Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 @@ -67,6 +68,7 @@ CROSSIMG_PREFIX=zenoh-pico_ # - ARM: old versions of dockcross/dockcross were creating some issues since they used an old GCC (4.8.3) which lacks (even using -std=gnu11) CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAKE_BUILD_TYPE=$(BUILD_TYPE) -DBUILD_TESTING=$(BUILD_TESTING) -DBUILD_MULTICAST=$(BUILD_MULTICAST)\ + -DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) \ -DZ_FEATURE_PUBLICATION=$(Z_FEATURE_PUBLICATION) -DZ_FEATURE_SUBSCRIPTION=$(Z_FEATURE_SUBSCRIPTION) -DZ_FEATURE_QUERY=$(Z_FEATURE_QUERY) -DZ_FEATURE_QUERYABLE=$(Z_FEATURE_QUERYABLE)\ -DZ_FEATURE_RAWETH_TRANSPORT=$(Z_FEATURE_RAWETH_TRANSPORT) -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. diff --git a/examples/unix/c11/z_ping.c b/examples/unix/c11/z_ping.c index 6e0369574..cfd57744f 100644 --- a/examples/unix/c11/z_ping.c +++ b/examples/unix/c11/z_ping.c @@ -21,7 +21,7 @@ #include "zenoh-pico.h" #include "zenoh-pico/system/platform.h" -#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 && Z_FEATURE_MULTI_THREAD == 1 #define DEFAULT_PKT_SIZE 8 #define DEFAULT_PING_NB 100 @@ -175,8 +175,8 @@ struct args_t parse_args(int argc, char** argv) { #else int main(void) { printf( - "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION but this example " - "requires them.\n"); + "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION or " + "Z_FEATURE_MULTI_THREAD but this example requires them.\n"); return -2; } #endif diff --git a/examples/unix/c11/z_pub.c b/examples/unix/c11/z_pub.c index 5da6db5d8..2d0ea81ac 100644 --- a/examples/unix/c11/z_pub.c +++ b/examples/unix/c11/z_pub.c @@ -54,7 +54,8 @@ int main(int argc, char **argv) { n = atoi(optarg); break; case '?': - if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || + optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -97,7 +98,6 @@ int main(int argc, char **argv) { for (int idx = 0; idx < n; ++idx) { sleep(1); (void)idx; - // snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, value); z_publisher_put_options_t options = z_publisher_put_options_default(); diff --git a/examples/unix/c11/z_pub_st.c b/examples/unix/c11/z_pub_st.c index b53bc346e..225272ed7 100644 --- a/examples/unix/c11/z_pub_st.c +++ b/examples/unix/c11/z_pub_st.c @@ -22,13 +22,15 @@ #if Z_FEATURE_PUBLICATION == 1 int main(int argc, char **argv) { const char *keyexpr = "demo/example/zenoh-pico-pub"; - const char *value = "Pub from Pico!"; + char *const default_value = "Pub from Pico!"; + const char *value = default_value; const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; + int n = 10; int opt; - while ((opt = getopt(argc, argv, "k:v:e:m:l:")) != -1) { + while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -45,8 +47,12 @@ int main(int argc, char **argv) { case 'l': llocator = optarg; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || + optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -79,29 +85,19 @@ int main(int argc, char **argv) { printf("Unable to declare publisher for key expression!\n"); return -1; } - - char *buf = (char *)malloc(256); - z_clock_t now = z_clock_now(); - for (int idx = 0; 1;) { - if (z_clock_elapsed_ms(&now) > 1000) { - snprintf(buf, 256, "[%4d] %s", idx, value); - printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); - ++idx; - - now = z_clock_now(); - } + // Main loop + for (int idx = 0; idx < n; idx++) { + sleep(1); + (void)idx; + printf("Putting Data ('%s': '%s')...\n", keyexpr, value); + z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), NULL); zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); zp_send_join(z_loan(s), NULL); } - z_undeclare_publisher(z_move(pub)); - z_close(z_move(s)); - - free(buf); return 0; } #else diff --git a/examples/unix/c11/z_sub_st.c b/examples/unix/c11/z_sub_st.c index 1afa9bed6..3bc53e4f5 100644 --- a/examples/unix/c11/z_sub_st.c +++ b/examples/unix/c11/z_sub_st.c @@ -20,12 +20,16 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 + +static int msg_nb = 0; + void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, sample->payload.start); z_drop(z_move(keystr)); + msg_nb++; } int main(int argc, char **argv) { @@ -33,9 +37,10 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; + int n = -1; int opt; - while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) { + while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -49,8 +54,11 @@ int main(int argc, char **argv) { case 'l': llocator = optarg; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -85,16 +93,13 @@ int main(int argc, char **argv) { return -1; } - while (1) { + while (msg_nb != n) { zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); zp_send_join(z_loan(s), NULL); } - z_undeclare_subscriber(z_move(sub)); - z_close(z_move(s)); - return 0; } #else diff --git a/include/zenoh-pico/system/platform/emscripten.h b/include/zenoh-pico/system/platform/emscripten.h index fcb732aab..7ceeb5f3e 100644 --- a/include/zenoh-pico/system/platform/emscripten.h +++ b/include/zenoh-pico/system/platform/emscripten.h @@ -15,12 +15,13 @@ #ifndef ZENOH_PICO_SYSTEM_WASM_TYPES_H #define ZENOH_PICO_SYSTEM_WASM_TYPES_H -#include #include #include "zenoh-pico/config.h" #if Z_FEATURE_MULTI_THREAD == 1 +#include + typedef pthread_t _z_task_t; typedef pthread_attr_t _z_task_attr_t; typedef pthread_mutex_t _z_mutex_t; diff --git a/include/zenoh-pico/system/platform/espidf.h b/include/zenoh-pico/system/platform/espidf.h index 2ba7773d7..1a19ccdc6 100644 --- a/include/zenoh-pico/system/platform/espidf.h +++ b/include/zenoh-pico/system/platform/espidf.h @@ -18,11 +18,12 @@ #include #include #include -#include #include "zenoh-pico/config.h" #if Z_FEATURE_MULTI_THREAD == 1 +#include + typedef TaskHandle_t _z_task_t; typedef void *_z_task_attr_t; // Not used in ESP32 typedef pthread_mutex_t _z_mutex_t; diff --git a/include/zenoh-pico/system/platform/unix.h b/include/zenoh-pico/system/platform/unix.h index 577ef670e..615b02849 100644 --- a/include/zenoh-pico/system/platform/unix.h +++ b/include/zenoh-pico/system/platform/unix.h @@ -15,7 +15,6 @@ #ifndef ZENOH_PICO_SYSTEM_UNIX_TYPES_H #define ZENOH_PICO_SYSTEM_UNIX_TYPES_H -#include #include #include #include @@ -24,6 +23,8 @@ #include "zenoh-pico/config.h" #if Z_FEATURE_MULTI_THREAD == 1 +#include + typedef pthread_t _z_task_t; typedef pthread_attr_t _z_task_attr_t; typedef pthread_mutex_t _z_mutex_t; diff --git a/include/zenoh-pico/transport/multicast/lease.h b/include/zenoh-pico/transport/multicast/lease.h index a7896a3f5..c0a545f39 100644 --- a/include/zenoh-pico/transport/multicast/lease.h +++ b/include/zenoh-pico/transport/multicast/lease.h @@ -19,8 +19,13 @@ int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm); int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm); -int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task); int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm); void *_zp_multicast_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks +#if Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task); +#else +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, void *attr, void *task); +#endif /* Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) */ + #endif /* ZENOH_PICO_MULTICAST_LEASE_H */ diff --git a/include/zenoh-pico/transport/multicast/read.h b/include/zenoh-pico/transport/multicast/read.h index 308fe1249..1ac947b3a 100644 --- a/include/zenoh-pico/transport/multicast/read.h +++ b/include/zenoh-pico/transport/multicast/read.h @@ -18,8 +18,13 @@ #include "zenoh-pico/transport/transport.h" int8_t _zp_multicast_read(_z_transport_multicast_t *ztm); -int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); int8_t _zp_multicast_stop_read_task(_z_transport_t *zt); void *_zp_multicast_read_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_MULTICAST_TRANSPORT == 1 +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +#else +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, void *attr, void *task); +#endif /* #if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_MULTICAST_TRANSPORT == 1 */ + #endif /* ZENOH_PICO_MULTICAST_READ_H */ diff --git a/include/zenoh-pico/transport/raweth/read.h b/include/zenoh-pico/transport/raweth/read.h index fa881b4ba..d35d32fce 100644 --- a/include/zenoh-pico/transport/raweth/read.h +++ b/include/zenoh-pico/transport/raweth/read.h @@ -18,8 +18,13 @@ #include "zenoh-pico/transport/transport.h" int8_t _zp_raweth_read(_z_transport_multicast_t *ztm); -int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); int8_t _zp_raweth_stop_read_task(_z_transport_t *zt); void *_zp_raweth_read_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_RAWETH_TRANSPORT == 1 +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +#else +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, void *attr, void *task); +#endif /* Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_RAWETH_TRANSPORT == 1 */ + #endif /* ZENOH_PICO_RAWETH_READ_H */ diff --git a/include/zenoh-pico/transport/unicast/lease.h b/include/zenoh-pico/transport/unicast/lease.h index fbe97e5f7..bd5c4bd3d 100644 --- a/include/zenoh-pico/transport/unicast/lease.h +++ b/include/zenoh-pico/transport/unicast/lease.h @@ -18,8 +18,13 @@ #include "zenoh-pico/transport/transport.h" int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu); -int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt); void *_zp_unicast_lease_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +#else +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, void *attr, void *task); +#endif /* Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 */ + #endif /* ZENOH_PICO_TRANSPORT_LINK_TASK_LEASE_H */ diff --git a/include/zenoh-pico/transport/unicast/read.h b/include/zenoh-pico/transport/unicast/read.h index e5d4db852..1f40b8f20 100644 --- a/include/zenoh-pico/transport/unicast/read.h +++ b/include/zenoh-pico/transport/unicast/read.h @@ -18,8 +18,13 @@ #include "zenoh-pico/transport/transport.h" int8_t _zp_unicast_read(_z_transport_unicast_t *ztu); -int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); int8_t _zp_unicast_stop_read_task(_z_transport_t *zt); void *_zp_unicast_read_task(void *ztu_arg); // The argument is void* to avoid incompatible pointer types in tasks +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); +#else +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, void *attr, void *task); +#endif /* Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 */ + #endif /* ZENOH_PICO_UNICAST_READ_H */ diff --git a/src/system/unix/system.c b/src/system/unix/system.c index 990619dac..28223b83f 100644 --- a/src/system/unix/system.c +++ b/src/system/unix/system.c @@ -15,6 +15,7 @@ #include #include #include +#include #if defined(ZENOH_LINUX) #include diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index 4a56a9c39..8ab532bee 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -74,27 +74,21 @@ int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { return ztm->_send_f(ztm, &t_msg); } -int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - ztm->_lease_task = task; - ztm->_lease_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_multicast_lease_task, ztm) != _Z_RES_OK) { - ztm->_lease_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; +#else +int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { - ztm->_lease_task_running = false; - return _Z_RES_OK; +int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 + +#if Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) void *_zp_multicast_lease_task(void *ztm_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; ztm->_transmitted = false; @@ -184,22 +178,35 @@ void *_zp_multicast_lease_task(void *ztm_arg) { _z_mutex_unlock(&ztm->_mutex_peer); } -#endif // Z_FEATURE_MULTI_THREAD == 1 - return 0; } -#else -int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + ztm->_lease_task = task; + ztm->_lease_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_multicast_lease_task, ztm) != _Z_RES_OK) { + ztm->_lease_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; } -int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { + ztm->_lease_task_running = false; + return _Z_RES_OK; } +#else -int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { +void *_zp_multicast_lease_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} + +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, void *attr, void *task) { _ZP_UNUSED(ztm); _ZP_UNUSED(attr); _ZP_UNUSED(task); @@ -210,9 +217,4 @@ int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -void *_zp_multicast_lease_task(void *ztm_arg) { - _ZP_UNUSED(ztm_arg); - return NULL; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 +#endif // Z_FEATURE_MULTI_THREAD == 1 && (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 12073d0ce..8081d1fcc 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -38,28 +38,16 @@ int8_t _zp_multicast_read(_z_transport_multicast_t *ztm) { return ret; } - -int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._multicast._read_task = task; - zt->_transport._multicast._read_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_multicast_read_task, &zt->_transport._multicast) != _Z_RES_OK) { - zt->_transport._multicast._read_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; +#else +int8_t _zp_multicast_read(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 -int8_t _zp_multicast_stop_read_task(_z_transport_t *zt) { - zt->_transport._multicast._read_task_running = false; - return _Z_RES_OK; -} +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_MULTICAST_TRANSPORT == 1 void *_zp_multicast_read_task(void *ztm_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; // Acquire and keep the lock @@ -136,19 +124,36 @@ void *_zp_multicast_read_task(void *ztm_arg) { // Move the read position of the read buffer _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); } - _z_mutex_unlock(&ztm->_mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 - return NULL; } + +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._multicast._read_task = task; + zt->_transport._multicast._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_multicast_read_task, &zt->_transport._multicast) != _Z_RES_OK) { + zt->_transport._multicast._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_multicast_stop_read_task(_z_transport_t *zt) { + zt->_transport._multicast._read_task_running = false; + return _Z_RES_OK; +} #else -int8_t _zp_multicast_read(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + +void *_zp_multicast_read_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; } -int8_t _zp_multicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { +int8_t _zp_multicast_start_read_task(_z_transport_t *zt, void *attr, void *task) { _ZP_UNUSED(zt); _ZP_UNUSED(attr); _ZP_UNUSED(task); @@ -159,9 +164,4 @@ int8_t _zp_multicast_stop_read_task(_z_transport_t *zt) { _ZP_UNUSED(zt); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -void *_zp_multicast_read_task(void *ztm_arg) { - _ZP_UNUSED(ztm_arg); - return NULL; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 +#endif diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index 668b650f5..49680e6b0 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -38,28 +38,17 @@ int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { } return ret; } +#else -int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._raweth._read_task = task; - zt->_transport._raweth._read_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_raweth_read_task, &zt->_transport._raweth) != _Z_RES_OK) { - zt->_transport._raweth._read_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; +int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } +#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 -int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { - zt->_transport._raweth._read_task_running = false; - return _Z_RES_OK; -} +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_RAWETH_TRANSPORT == 1 void *_zp_raweth_read_task(void *ztm_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; _z_transport_message_t t_msg; _z_bytes_t addr = _z_bytes_wrap(NULL, 0); @@ -91,17 +80,34 @@ void *_zp_raweth_read_task(void *ztm_arg) { _z_t_msg_clear(&t_msg); _z_bytes_clear(&addr); } -#endif // Z_FEATURE_MULTI_THREAD == 1 - return NULL; } -#else -int8_t _zp_raweth_read(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} int8_t _zp_raweth_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._raweth._read_task = task; + zt->_transport._raweth._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_raweth_read_task, &zt->_transport._raweth) != _Z_RES_OK) { + zt->_transport._raweth._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { + zt->_transport._raweth._read_task_running = false; + return _Z_RES_OK; +} +#else + +void *_zp_raweth_read_task(void *ztm_arg) { + _ZP_UNUSED(ztm_arg); + return NULL; +} +int8_t _zp_raweth_start_read_task(_z_transport_t *zt, void *attr, void *task) { _ZP_UNUSED(zt); _ZP_UNUSED(attr); _ZP_UNUSED(task); @@ -112,9 +118,4 @@ int8_t _zp_raweth_stop_read_task(_z_transport_t *zt) { _ZP_UNUSED(zt); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -void *_zp_raweth_read_task(void *ztm_arg) { - _ZP_UNUSED(ztm_arg); - return NULL; -} -#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 +#endif diff --git a/src/transport/unicast/lease.c b/src/transport/unicast/lease.c index 8fde5868e..5ef50565b 100644 --- a/src/transport/unicast/lease.c +++ b/src/transport/unicast/lease.c @@ -28,28 +28,17 @@ int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { return ret; } +#else -int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._unicast._lease_task = task; - zt->_transport._unicast._lease_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_unicast_lease_task, &zt->_transport._unicast) != _Z_RES_OK) { - zt->_transport._unicast._lease_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; +int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { + _ZP_UNUSED(ztu); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } +#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { - zt->_transport._unicast._lease_task_running = false; - return _Z_RES_OK; -} +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 void *_zp_unicast_lease_task(void *ztu_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; ztu->_received = false; @@ -103,17 +92,35 @@ void *_zp_unicast_lease_task(void *ztu_arg) { next_lease = next_lease - interval; next_keep_alive = next_keep_alive - interval; } -#endif // Z_FEATURE_MULTI_THREAD == 1 - return 0; } + +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._unicast._lease_task = task; + zt->_transport._unicast._lease_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_unicast_lease_task, &zt->_transport._unicast) != _Z_RES_OK) { + zt->_transport._unicast._lease_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { + zt->_transport._unicast._lease_task_running = false; + return _Z_RES_OK; +} #else -int8_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) { - _ZP_UNUSED(ztu); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + +void *_zp_unicast_lease_task(void *ztu_arg) { + _ZP_UNUSED(ztu_arg); + return NULL; } -int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { +int8_t _zp_unicast_start_lease_task(_z_transport_t *zt, void *attr, void *task) { _ZP_UNUSED(zt); _ZP_UNUSED(attr); _ZP_UNUSED(task); @@ -124,9 +131,4 @@ int8_t _zp_unicast_stop_lease_task(_z_transport_t *zt) { _ZP_UNUSED(zt); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -void *_zp_unicast_lease_task(void *ztu_arg) { - _ZP_UNUSED(ztu_arg); - return NULL; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 +#endif diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 02b95ddef..a62fa2474 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -35,28 +35,17 @@ int8_t _zp_unicast_read(_z_transport_unicast_t *ztu) { return ret; } +#else -int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._unicast._read_task = task; - zt->_transport._unicast._read_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_unicast_read_task, &zt->_transport._unicast) != _Z_RES_OK) { - zt->_transport._unicast._read_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; +int8_t _zp_unicast_read(_z_transport_unicast_t *ztu) { + _ZP_UNUSED(ztu); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } +#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 -int8_t _zp_unicast_stop_read_task(_z_transport_t *zt) { - zt->_transport._unicast._read_task_running = false; - return _Z_RES_OK; -} +#if Z_FEATURE_MULTI_THREAD == 1 && Z_FEATURE_UNICAST_TRANSPORT == 1 void *_zp_unicast_read_task(void *ztu_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 _z_transport_unicast_t *ztu = (_z_transport_unicast_t *)ztu_arg; // Acquire and keep the lock @@ -128,19 +117,37 @@ void *_zp_unicast_read_task(void *ztu_arg) { // Move the read position of the read buffer _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); } - _z_mutex_unlock(&ztu->_mutex_rx); -#endif // Z_FEATURE_MULTI_THREAD == 1 - return NULL; } + +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { + // Init memory + (void)memset(task, 0, sizeof(_z_task_t)); + // Attach task + zt->_transport._unicast._read_task = task; + zt->_transport._unicast._read_task_running = true; + // Init task + if (_z_task_init(task, attr, _zp_unicast_read_task, &zt->_transport._unicast) != _Z_RES_OK) { + zt->_transport._unicast._read_task_running = false; + return _Z_ERR_SYSTEM_TASK_FAILED; + } + return _Z_RES_OK; +} + +int8_t _zp_unicast_stop_read_task(_z_transport_t *zt) { + zt->_transport._unicast._read_task_running = false; + return _Z_RES_OK; +} + #else -int8_t _zp_unicast_read(_z_transport_unicast_t *ztu) { - _ZP_UNUSED(ztu); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; + +void *_zp_unicast_read_task(void *ztu_arg) { + _ZP_UNUSED(ztu_arg); + return NULL; } -int8_t _zp_unicast_start_read_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { +int8_t _zp_unicast_start_read_task(_z_transport_t *zt, void *attr, void *task) { _ZP_UNUSED(zt); _ZP_UNUSED(attr); _ZP_UNUSED(task); @@ -151,9 +158,4 @@ int8_t _zp_unicast_stop_read_task(_z_transport_t *zt) { _ZP_UNUSED(zt); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } - -void *_zp_unicast_read_task(void *ztu_arg) { - _ZP_UNUSED(ztu_arg); - return NULL; -} -#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 +#endif diff --git a/tests/single_thread.py b/tests/single_thread.py new file mode 100644 index 000000000..cffd7b82b --- /dev/null +++ b/tests/single_thread.py @@ -0,0 +1,117 @@ +import subprocess +import signal +import sys +import time + +# Specify the directory for the binaries +DIR_EXAMPLES = "build/examples" + +def pub_and_sub(): + print("*** Pub & sub test ***") + test_status = 0 + + # Expected z_pub output & status + z_pub_expected_status = 0 + z_pub_expected_output = '''Opening session... +Declaring publisher for 'demo/example/zenoh-pico-pub'... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')...''' + + # Expected z_sub output + z_sub_expected_status = 0 + z_sub_expected_output = '''Opening session... +Declaring Subscriber on 'demo/example/**'... +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!')''' + + print("Start subscriber") + # Start z_sub in the background + z_sub_command = f"./{DIR_EXAMPLES}/z_sub_st -n 10" + z_sub_process = subprocess.Popen(z_sub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + # Introduce a delay to ensure z_sub starts + time.sleep(2) + + print("Start publisher") + # Start z_pub + z_pub_command = f"./{DIR_EXAMPLES}/z_pub_st" + z_pub_process = subprocess.Popen(z_pub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + # Wait for z_pub to finish + z_pub_process.wait() + z_sub_process.wait() + + print("Check publisher status & output") + # Check the exit status of z_pub + z_pub_status = z_pub_process.returncode + if z_pub_status == z_pub_expected_status: + print("z_pub status valid") + else: + print(f"z_pub status invalid, expected: {z_pub_expected_status}, received: {z_pub_status}") + test_status = 1 + + # Check output of z_pub + z_pub_output = z_pub_process.stdout.read() + if z_pub_expected_output in z_pub_output: + print("z_pub output valid") + else: + print("z_pub output invalid:") + print(f"Expected: \"{z_pub_expected_output}\"") + print(f"Received: \"{z_pub_output}\"") + test_status = 1 + + print("Check subscriber status & output") + # Check the exit status of z_sub + z_sub_status = z_sub_process.returncode + if z_sub_status == z_sub_expected_status: + print("z_sub status valid") + else: + print(f"z_sub status invalid, expected: {z_sub_expected_status}, received: {z_sub_status}") + test_status = 1 + + # Check output of z_sub + z_sub_output = z_sub_process.stdout.read() + if z_sub_expected_output in z_sub_output: + print("z_sub output valid") + else: + print("z_sub output invalid:") + print(f"Expected: \"{z_sub_expected_output}\"") + print(f"Received: \"{z_sub_output}\"") + test_status = 1 + # Return value + return test_status + +if __name__ == "__main__": + EXIT_STATUS = 0 + + # Test pub and sub examples + if pub_and_sub() == 1: + EXIT_STATUS = 1 + # Exit + sys.exit(EXIT_STATUS)