From 7c724c3806c3a7cce07ad8bb69f68c0a12277055 Mon Sep 17 00:00:00 2001 From: oteffahi <70609372+oteffahi@users.noreply.github.com> Date: Thu, 21 Mar 2024 14:29:49 +0100 Subject: [PATCH] Align examples and remove reading from stdin (#359) * Remove reading from stdin, align example implementations * Update examples tests * Adjust z_sub_thr output * Fix z_get single query examples * Replace pthread uses with z_mutex and z_condvar * Add multi-thread feature condition to z_get examples * Update error message for features absence * Update z_get expected output in modularity test * Update sample count for freertos single-thread examples --- examples/arduino/z_pub.ino | 3 +- examples/arduino/z_pull.ino | 6 +- examples/arduino/z_queryable.ino | 2 +- examples/arduino/z_sub.ino | 2 +- examples/espidf/z_pull.c | 5 +- examples/espidf/z_queryable.c | 2 +- examples/espidf/z_sub.c | 2 +- examples/freertos_plus_tcp/z_pub_st.c | 3 +- examples/freertos_plus_tcp/z_pull.c | 5 +- examples/freertos_plus_tcp/z_queryable.c | 2 +- examples/freertos_plus_tcp/z_sub.c | 2 +- examples/freertos_plus_tcp/z_sub_st.c | 7 +- examples/mbed/z_pull.cpp | 5 +- examples/mbed/z_queryable.cpp | 2 +- examples/mbed/z_sub.cpp | 2 +- examples/unix/c11/z_get.c | 46 ++++----- examples/unix/c11/z_pub.c | 10 +- examples/unix/c11/z_pub_st.c | 10 +- examples/unix/c11/z_pull.c | 12 +-- examples/unix/c11/z_queryable.c | 9 +- examples/unix/c11/z_sub.c | 9 +- examples/unix/c11/z_sub_st.c | 5 +- examples/unix/c11/z_sub_thr.c | 15 ++- examples/unix/c99/z_get.c | 46 ++++----- examples/unix/c99/z_pub.c | 10 +- examples/unix/c99/z_pub_st.c | 14 ++- examples/unix/c99/z_pull.c | 12 +-- examples/unix/c99/z_queryable.c | 9 +- examples/unix/c99/z_sub.c | 9 +- examples/unix/c99/z_sub_st.c | 18 ++-- examples/windows/z_get.c | 45 +++++---- examples/windows/z_pub.c | 1 + examples/windows/z_pub_st.c | 5 +- examples/windows/z_pull.c | 11 +-- examples/windows/z_queryable.c | 8 +- examples/windows/z_sub.c | 8 +- examples/windows/z_sub_st.c | 8 +- examples/zephyr/z_pull.c | 5 +- examples/zephyr/z_queryable.c | 2 +- examples/zephyr/z_sub.c | 2 +- tests/modularity.py | 116 +++++++++-------------- tests/raweth.py | 61 ++++++------ tests/single_thread.py | 44 +++++---- 43 files changed, 298 insertions(+), 302 deletions(-) diff --git a/examples/arduino/z_pub.ino b/examples/arduino/z_pub.ino index c7e60d62e..ce0a14621 100644 --- a/examples/arduino/z_pub.ino +++ b/examples/arduino/z_pub.ino @@ -94,6 +94,7 @@ void setup() { } void loop() { + delay(1000); char buf[256]; sprintf(buf, "[%4d] %s", idx++, VALUE); Serial.print("Writing Data ('"); @@ -105,8 +106,6 @@ void loop() { if (z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL) < 0) { Serial.println("Error while publishing data"); } - - delay(1000); } #else void setup() { diff --git a/examples/arduino/z_pull.ino b/examples/arduino/z_pull.ino index 7dc5600e8..5dc150adf 100644 --- a/examples/arduino/z_pull.ino +++ b/examples/arduino/z_pull.ino @@ -35,6 +35,7 @@ #define KEYEXPR "demo/example/**" z_owned_pull_subscriber_t sub; +int idx = 0; void data_handler(const z_sample_t *sample, void *arg) { z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); @@ -106,7 +107,10 @@ void setup() { } void loop() { - delay(5000); + delay(1000); + char buf[256]; + sprintf(buf, "[%4d] Pulling...", idx++); + Serial.println(buf); z_subscriber_pull(z_pull_subscriber_loan(&sub)); } #else diff --git a/examples/arduino/z_queryable.ino b/examples/arduino/z_queryable.ino index 772f60a0a..b6e3c7ed5 100644 --- a/examples/arduino/z_queryable.ino +++ b/examples/arduino/z_queryable.ino @@ -106,7 +106,7 @@ void setup() { delay(300); } -void loop() { delay(5000); } +void loop() { delay(1000); } #else void setup() { diff --git a/examples/arduino/z_sub.ino b/examples/arduino/z_sub.ino index 1a3302f55..9abc75df0 100644 --- a/examples/arduino/z_sub.ino +++ b/examples/arduino/z_sub.ino @@ -104,7 +104,7 @@ void setup() { delay(300); } -void loop() { delay(5000); } +void loop() { delay(1000); } #else void setup() { diff --git a/examples/espidf/z_pull.c b/examples/espidf/z_pull.c index 5f42b2852..e686190fd 100644 --- a/examples/espidf/z_pull.c +++ b/examples/espidf/z_pull.c @@ -153,9 +153,10 @@ void app_main() { } printf("OK!\n"); + int idx = 0; while (1) { - sleep(5); - printf("Pulling data from '%s'...\n", KEYEXPR); + sleep(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_loan(sub)); } diff --git a/examples/espidf/z_queryable.c b/examples/espidf/z_queryable.c index 4448b3b2b..15b4e9afd 100644 --- a/examples/espidf/z_queryable.c +++ b/examples/espidf/z_queryable.c @@ -159,7 +159,7 @@ void app_main() { printf("Zenoh setup finished!\n"); while (1) { - sleep(5); + sleep(1); } printf("Closing Zenoh Session..."); diff --git a/examples/espidf/z_sub.c b/examples/espidf/z_sub.c index 33f01548b..467160440 100644 --- a/examples/espidf/z_sub.c +++ b/examples/espidf/z_sub.c @@ -154,7 +154,7 @@ void app_main() { printf("OK!\n"); while (1) { - sleep(5); + sleep(1); } printf("Closing Zenoh Session..."); diff --git a/examples/freertos_plus_tcp/z_pub_st.c b/examples/freertos_plus_tcp/z_pub_st.c index 03311e25c..5803030c4 100644 --- a/examples/freertos_plus_tcp/z_pub_st.c +++ b/examples/freertos_plus_tcp/z_pub_st.c @@ -30,6 +30,7 @@ #define KEYEXPR "demo/example/zenoh-pico-pub" #define VALUE "[FreeRTOS-Plus-TCP] Pub from Zenoh-Pico!" +#define N 2147483647 // max int value by default void app_main(void) { z_owned_config_t config = z_config_default(); @@ -54,7 +55,7 @@ void app_main(void) { char *buf = (char *)pvPortMalloc(256); z_clock_t now = z_clock_now(); - for (int idx = 0; 1;) { + for (int idx = 0; idx < N;) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); diff --git a/examples/freertos_plus_tcp/z_pull.c b/examples/freertos_plus_tcp/z_pull.c index af7839902..3e8a04d6e 100644 --- a/examples/freertos_plus_tcp/z_pull.c +++ b/examples/freertos_plus_tcp/z_pull.c @@ -65,9 +65,10 @@ void app_main(void) { return; } + int idx = 0; while (1) { - z_sleep_s(5); - printf("Pulling data from '%s'...\n", KEYEXPR); + z_sleep_s(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_loan(sub)); } diff --git a/examples/freertos_plus_tcp/z_queryable.c b/examples/freertos_plus_tcp/z_queryable.c index 49a925ced..6e4a36b30 100644 --- a/examples/freertos_plus_tcp/z_queryable.c +++ b/examples/freertos_plus_tcp/z_queryable.c @@ -80,7 +80,7 @@ void app_main(void) { } while (1) { - z_sleep_s(5); + z_sleep_s(1); } z_undeclare_queryable(z_move(qable)); diff --git a/examples/freertos_plus_tcp/z_sub.c b/examples/freertos_plus_tcp/z_sub.c index 3bae862b4..9331619a6 100644 --- a/examples/freertos_plus_tcp/z_sub.c +++ b/examples/freertos_plus_tcp/z_sub.c @@ -66,7 +66,7 @@ void app_main(void) { } while (1) { - z_sleep_s(5); + z_sleep_s(1); } z_undeclare_subscriber(z_move(sub)); diff --git a/examples/freertos_plus_tcp/z_sub_st.c b/examples/freertos_plus_tcp/z_sub_st.c index 811a49877..20ff5d636 100644 --- a/examples/freertos_plus_tcp/z_sub_st.c +++ b/examples/freertos_plus_tcp/z_sub_st.c @@ -27,6 +27,9 @@ #endif #define KEYEXPR "demo/example/**" +#define N 2147483647 // max int value by default + +int msg_nb = 0; void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); @@ -34,6 +37,7 @@ void data_handler(const z_sample_t *sample, void *ctx) { printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, sample->payload.start); z_drop(z_move(keystr)); + msg_nb++; } void app_main(void) { @@ -58,7 +62,8 @@ void app_main(void) { return; } - while (1) { + printf("Running until %d messages are received...\n", N); + while (msg_nb < N) { zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); zp_send_join(z_loan(s), NULL); diff --git a/examples/mbed/z_pull.cpp b/examples/mbed/z_pull.cpp index d6b82c1d0..fa9ece75f 100644 --- a/examples/mbed/z_pull.cpp +++ b/examples/mbed/z_pull.cpp @@ -74,9 +74,10 @@ int main(int argc, char **argv) { } printf("OK!\n"); + int idx = 0; while (1) { - z_sleep_s(5); - printf("Pulling data from '%s'...\n", KEYEXPR); + z_sleep_s(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_pull_subscriber_loan(&sub)); } diff --git a/examples/mbed/z_queryable.cpp b/examples/mbed/z_queryable.cpp index 08ade9084..b0b9358bf 100644 --- a/examples/mbed/z_queryable.cpp +++ b/examples/mbed/z_queryable.cpp @@ -80,7 +80,7 @@ int main(int argc, char **argv) { printf("Zenoh setup finished!\n"); while (1) { - z_sleep_s(5); + z_sleep_s(1); } printf("Closing Zenoh Session..."); diff --git a/examples/mbed/z_sub.cpp b/examples/mbed/z_sub.cpp index 0b3cbdd47..b60f82d44 100644 --- a/examples/mbed/z_sub.cpp +++ b/examples/mbed/z_sub.cpp @@ -75,7 +75,7 @@ int main(int argc, char **argv) { printf("OK!\n"); while (1) { - z_sleep_s(5); + z_sleep_s(1); } printf("Closing Zenoh Session..."); diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index ca45ec493..f98426a15 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -18,10 +18,15 @@ #include #include -#if Z_FEATURE_QUERY == 1 +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 +static z_condvar_t cond; +static z_mutex_t mutex; + void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); + z_condvar_signal(&cond); + z_condvar_free(&cond); } void reply_handler(z_owned_reply_t *reply, void *ctx) { @@ -73,6 +78,9 @@ int main(int argc, char **argv) { } } + z_mutex_init(&mutex); + z_condvar_init(&cond); + z_owned_config_t config = z_config_default(); zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); if (clocator != NULL) { @@ -102,27 +110,19 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Remove unused result warning - if (c == 'q') { - break; - } - - printf("Sending Query '%s'...\n", keyexpr); - z_get_options_t opts = z_get_options_default(); - if (value != NULL) { - opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); - } - z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); - if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { - printf("Unable to send query.\n"); - return -1; - } + z_mutex_lock(&mutex); + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts = z_get_options_default(); + if (value != NULL) { + opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); + } + z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); + if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; } + z_condvar_wait(&cond, &mutex); + z_mutex_unlock(&mutex); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); @@ -134,7 +134,9 @@ int main(int argc, char **argv) { } #else int main(void) { - printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); return -2; } #endif diff --git a/examples/unix/c11/z_pub.c b/examples/unix/c11/z_pub.c index 064b53677..786ed58b5 100644 --- a/examples/unix/c11/z_pub.c +++ b/examples/unix/c11/z_pub.c @@ -30,7 +30,7 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; - int n = 10; + int n = 2147483647; // max int value by default int opt; while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { @@ -96,14 +96,16 @@ int main(int argc, char **argv) { return -1; } + printf("Press CTRL-C to quit...\n"); + char buf[256]; for (int idx = 0; idx < n; ++idx) { sleep(1); - (void)idx; - printf("Putting Data ('%s': '%s')...\n", keyexpr, value); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); z_publisher_put_options_t options = z_publisher_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), &options); + z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); } z_undeclare_publisher(z_move(pub)); diff --git a/examples/unix/c11/z_pub_st.c b/examples/unix/c11/z_pub_st.c index 225272ed7..deb9e1f60 100644 --- a/examples/unix/c11/z_pub_st.c +++ b/examples/unix/c11/z_pub_st.c @@ -27,7 +27,7 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; - int n = 10; + int n = 2147483647; // max int value by default int opt; while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { @@ -86,11 +86,13 @@ int main(int argc, char **argv) { return -1; } // Main loop + printf("Press CTRL-C to quit...\n"); + char buf[256]; for (int idx = 0; idx < n; idx++) { sleep(1); - (void)idx; - printf("Putting Data ('%s': '%s')...\n", keyexpr, value); - z_publisher_put(z_loan(pub), (const uint8_t *)value, strlen(value), NULL); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index c6f710bfb..8f1414622 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -79,15 +79,11 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; + printf("Press CTRL-C to quit...\n"); + int idx = 0; while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Remove unused result warning - if (c == 'q') { - break; - } + sleep(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_loan(sub)); } diff --git a/examples/unix/c11/z_queryable.c b/examples/unix/c11/z_queryable.c index e9d1b95e5..47001ec1f 100644 --- a/examples/unix/c11/z_queryable.c +++ b/examples/unix/c11/z_queryable.c @@ -109,12 +109,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Remove unused result warning + printf("Press CTRL-C to quit...\n"); + while (1) { + sleep(1); } z_undeclare_queryable(z_move(qable)); diff --git a/examples/unix/c11/z_sub.c b/examples/unix/c11/z_sub.c index e35fc1427..8bfedcade 100644 --- a/examples/unix/c11/z_sub.c +++ b/examples/unix/c11/z_sub.c @@ -108,12 +108,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Remove unused result warning + printf("Press CTRL-C to quit...\n"); + while (1) { + sleep(1); } z_undeclare_subscriber(z_move(sub)); diff --git a/examples/unix/c11/z_sub_st.c b/examples/unix/c11/z_sub_st.c index 3bc53e4f5..8e825a06e 100644 --- a/examples/unix/c11/z_sub_st.c +++ b/examples/unix/c11/z_sub_st.c @@ -37,7 +37,7 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; - int n = -1; + int n = 2147483647; // max int value by default int opt; while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { @@ -93,7 +93,8 @@ int main(int argc, char **argv) { return -1; } - while (msg_nb != n) { + printf("Press CTRL-C to quit...\n"); + while (msg_nb < n) { zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); zp_send_join(z_loan(s), NULL); diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c index 485ba9575..788b25b03 100644 --- a/examples/unix/c11/z_sub_thr.c +++ b/examples/unix/c11/z_sub_thr.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "zenoh-pico.h" @@ -52,8 +53,7 @@ void on_sample(const z_sample_t *sample, void *context) { // Stop set measurement stats->finished_rounds++; unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); - printf("Received %d msg in %lu ms (%.1f msg/s)\n", PACKET_NB, elapsed_ms, - (double)(PACKET_NB * 1000) / (double)elapsed_ms); + printf("%f msg/s\n", (double)(PACKET_NB * 1000) / (double)elapsed_ms); stats->count = 0; } } @@ -99,14 +99,11 @@ int main(int argc, char **argv) { exit(-1); } // Listen until stopped - printf("Start listening.\n"); - char c = 0; - while (c != 'q') { - c = (char)fgetc(stdin); + printf("Press CTRL-C to quit...\n"); + while (1) { + sleep(1); } - // Wait for everything to settle - printf("End of test\n"); - z_sleep_s(1); + // Clean up z_undeclare_subscriber(z_move(sub)); zp_stop_read_task(z_loan(s)); diff --git a/examples/unix/c99/z_get.c b/examples/unix/c99/z_get.c index 46121e59e..7bec8e497 100644 --- a/examples/unix/c99/z_get.c +++ b/examples/unix/c99/z_get.c @@ -18,10 +18,15 @@ #include #include -#if Z_FEATURE_QUERY == 1 +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 +z_condvar_t cond; +z_mutex_t mutex; + void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); + z_condvar_signal(&cond); + z_condvar_free(&cond); } void reply_handler(z_owned_reply_t *reply, void *ctx) { @@ -73,6 +78,9 @@ int main(int argc, char **argv) { } } + z_mutex_init(&mutex); + z_condvar_init(&cond); + z_owned_config_t config = z_config_default(); zp_config_insert(z_config_loan(&config), Z_CONFIG_MODE_KEY, z_string_make(mode)); if (clocator != NULL) { @@ -102,27 +110,19 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Clear unused result warning - if (c == 'q') { - break; - } - - printf("Sending Query '%s'...\n", keyexpr); - z_get_options_t opts = z_get_options_default(); - if (value != NULL) { - opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); - } - z_owned_closure_reply_t callback = z_closure_reply(reply_handler, reply_dropper, NULL); - if (z_get(z_session_loan(&s), ke, "", z_closure_reply_move(&callback), &opts) < 0) { - printf("Unable to send query.\n"); - return -1; - } + z_mutex_lock(&mutex); + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts = z_get_options_default(); + if (value != NULL) { + opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); + } + z_owned_closure_reply_t callback = z_closure_reply(reply_handler, reply_dropper, NULL); + if (z_get(z_session_loan(&s), ke, "", z_closure_reply_move(&callback), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; } + z_condvar_wait(&cond, &mutex); + z_mutex_unlock(&mutex); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_session_loan(&s)); @@ -134,7 +134,9 @@ int main(int argc, char **argv) { } #else int main(void) { - printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); return -2; } #endif diff --git a/examples/unix/c99/z_pub.c b/examples/unix/c99/z_pub.c index 36483fae3..a81065c9e 100644 --- a/examples/unix/c99/z_pub.c +++ b/examples/unix/c99/z_pub.c @@ -26,7 +26,7 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; - int n = 10; + int n = 2147483647; // max int value by default int opt; while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { @@ -92,14 +92,16 @@ int main(int argc, char **argv) { return -1; } + printf("Press CTRL-C to quit...\n"); + char *buf = (char *)malloc(256); for (int idx = 0; idx < n; ++idx) { sleep(1); - (void)idx; - printf("Putting Data ('%s': '%s')...\n", keyexpr, value); + snprintf(buf, 256, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); z_publisher_put_options_t options = z_publisher_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)value, strlen(value), &options); + z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), &options); } // Clean up z_undeclare_publisher(z_publisher_move(&pub)); diff --git a/examples/unix/c99/z_pub_st.c b/examples/unix/c99/z_pub_st.c index 44592adc8..f39b3753b 100644 --- a/examples/unix/c99/z_pub_st.c +++ b/examples/unix/c99/z_pub_st.c @@ -26,9 +26,10 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; + int n = 2147483647; // max int value by default int opt; - while ((opt = getopt(argc, argv, "k:v:e:m:l:")) != -1) { + while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -45,8 +46,12 @@ int main(int argc, char **argv) { case 'l': llocator = optarg; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || + optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -80,9 +85,10 @@ int main(int argc, char **argv) { return -1; } + printf("Press CTRL-C to quit...\n"); char *buf = (char *)malloc(256); z_clock_t now = z_clock_now(); - for (int idx = 0; 1;) { + for (int idx = 0; idx < n;) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); @@ -98,9 +104,7 @@ int main(int argc, char **argv) { } z_undeclare_publisher(z_publisher_move(&pub)); - z_close(z_session_move(&s)); - free(buf); return 0; } diff --git a/examples/unix/c99/z_pull.c b/examples/unix/c99/z_pull.c index efe2ba8bf..323ad0155 100644 --- a/examples/unix/c99/z_pull.c +++ b/examples/unix/c99/z_pull.c @@ -80,15 +80,11 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; + printf("Press CTRL-C to quit...\n"); + int idx = 0; while (1) { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Clear unused result warning - if (c == 'q') { - break; - } + sleep(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_pull_subscriber_loan(&sub)); } diff --git a/examples/unix/c99/z_queryable.c b/examples/unix/c99/z_queryable.c index 8460aef16..c6ccf555c 100644 --- a/examples/unix/c99/z_queryable.c +++ b/examples/unix/c99/z_queryable.c @@ -105,12 +105,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Clear unused result warning + printf("Press CTRL-C to quit...\n"); + while (1) { + sleep(1); } z_undeclare_queryable(z_queryable_move(&qable)); diff --git a/examples/unix/c99/z_sub.c b/examples/unix/c99/z_sub.c index 8d25dbb74..185c742ed 100644 --- a/examples/unix/c99/z_sub.c +++ b/examples/unix/c99/z_sub.c @@ -93,12 +93,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - int ret = scanf("%c", &c); - (void)ret; // Clear unused result warning + printf("Press CTRL-C to quit...\n"); + while (1) { + sleep(1); } z_undeclare_subscriber(z_subscriber_move(&sub)); diff --git a/examples/unix/c99/z_sub_st.c b/examples/unix/c99/z_sub_st.c index 326f5fcbc..765bf3d5a 100644 --- a/examples/unix/c99/z_sub_st.c +++ b/examples/unix/c99/z_sub_st.c @@ -20,12 +20,16 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 + +static int msg_nb = 0; + void data_handler(const z_sample_t *sample, void *arg) { (void)(arg); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, sample->payload.start); z_str_drop(z_str_move(&keystr)); + msg_nb++; } int main(int argc, char **argv) { @@ -33,9 +37,10 @@ int main(int argc, char **argv) { const char *mode = "client"; char *clocator = NULL; char *llocator = NULL; + int n = 2147483647; // max int value by default int opt; - while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) { + while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -49,8 +54,11 @@ int main(int argc, char **argv) { case 'l': llocator = optarg; break; + case 'n': + n = atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l') { + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -86,16 +94,14 @@ int main(int argc, char **argv) { return -1; } - while (1) { + printf("Press CTRL-C to quit...\n"); + while (msg_nb < n) { zp_read(z_session_loan(&s), NULL); zp_send_keep_alive(z_session_loan(&s), NULL); zp_send_join(z_session_loan(&s), NULL); } - z_undeclare_subscriber(z_subscriber_move(&sub)); - z_close(z_session_move(&s)); - return 0; } #else diff --git a/examples/windows/z_get.c b/examples/windows/z_get.c index 2815656dc..597287afd 100644 --- a/examples/windows/z_get.c +++ b/examples/windows/z_get.c @@ -17,10 +17,15 @@ #include #include -#if Z_FEATURE_QUERY == 1 +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 +z_condvar_t cond; +z_mutex_t mutex; + void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); + z_condvar_signal(&cond); + z_condvar_free(&cond); } void reply_handler(z_owned_reply_t *reply, void *ctx) { @@ -42,6 +47,9 @@ int main(int argc, char **argv) { const char *locator = NULL; const char *value = NULL; + z_mutex_init(&mutex); + z_condvar_init(&cond); + z_owned_config_t config = z_config_default(); if (locator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(locator)); @@ -67,26 +75,19 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - while (1) { - fflush(stdin); - scanf("%c", &c); - if (c == 'q') { - break; - } - - printf("Sending Query '%s'...\n", keyexpr); - z_get_options_t opts = z_get_options_default(); - if (value != NULL) { - opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); - } - z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); - if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { - printf("Unable to send query.\n"); - return -1; - } + z_mutex_lock(&mutex); + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts = z_get_options_default(); + if (value != NULL) { + opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); + } + z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); + if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; } + z_condvar_wait(&cond, &mutex); + z_mutex_unlock(&mutex); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); @@ -98,7 +99,9 @@ int main(int argc, char **argv) { } #else int main(void) { - printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); return -2; } #endif diff --git a/examples/windows/z_pub.c b/examples/windows/z_pub.c index 8c17d7f7d..ce62da93d 100644 --- a/examples/windows/z_pub.c +++ b/examples/windows/z_pub.c @@ -54,6 +54,7 @@ int main(int argc, char **argv) { return -1; } + printf("Press CTRL-C to quit...\n"); char *buf = (char *)malloc(256); for (int idx = 0; 1; ++idx) { Sleep(1); diff --git a/examples/windows/z_pub_st.c b/examples/windows/z_pub_st.c index cc6f41361..6d6f59d87 100644 --- a/examples/windows/z_pub_st.c +++ b/examples/windows/z_pub_st.c @@ -18,6 +18,8 @@ #include #include +#define N 2147483647 // max int value by default + #if Z_FEATURE_PUBLICATION == 1 int main(int argc, char **argv) { (void)(argc); @@ -47,9 +49,10 @@ int main(int argc, char **argv) { return -1; } + printf("Press CTRL-C to quit...\n"); char *buf = (char *)malloc(256); z_clock_t now = z_clock_now(); - for (int idx = 0; 1;) { + for (int idx = 0; idx < N;) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); diff --git a/examples/windows/z_pull.c b/examples/windows/z_pull.c index 512b2bb79..0ec172471 100644 --- a/examples/windows/z_pull.c +++ b/examples/windows/z_pull.c @@ -59,14 +59,11 @@ int main(int argc, char **argv) { return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; + printf("Press CTRL-C to quit...\n"); + int idx = 0; while (1) { - fflush(stdin); - scanf("%c", &c); - if (c == 'q') { - break; - } + Sleep(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_loan(sub)); } diff --git a/examples/windows/z_queryable.c b/examples/windows/z_queryable.c index 7a2d7771c..ea504d0e7 100644 --- a/examples/windows/z_queryable.c +++ b/examples/windows/z_queryable.c @@ -74,11 +74,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - scanf("%c", &c); + printf("Press CTRL-C to quit...\n"); + while (1) { + Sleep(1); } z_undeclare_queryable(z_move(qable)); diff --git a/examples/windows/z_sub.c b/examples/windows/z_sub.c index 0ffd26568..bde475e8b 100644 --- a/examples/windows/z_sub.c +++ b/examples/windows/z_sub.c @@ -62,11 +62,9 @@ int main(int argc, char **argv) { return -1; } - printf("Enter 'q' to quit...\n"); - char c = '\0'; - while (c != 'q') { - fflush(stdin); - scanf("%c", &c); + printf("Press CTRL-C to quit...\n"); + while (1) { + Sleep(1); } z_undeclare_subscriber(z_move(sub)); diff --git a/examples/windows/z_sub_st.c b/examples/windows/z_sub_st.c index 9b41877fa..fa91d9e41 100644 --- a/examples/windows/z_sub_st.c +++ b/examples/windows/z_sub_st.c @@ -18,6 +18,9 @@ #include #include +#define N 2147483647 // max int value by default +int msg_nb = 0; + #if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); @@ -25,6 +28,8 @@ void data_handler(const z_sample_t *sample, void *ctx) { printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, sample->payload.start); z_drop(z_move(keystr)); + + msg_nb++; } int main(int argc, char **argv) { @@ -55,7 +60,8 @@ int main(int argc, char **argv) { return -1; } - while (1) { + printf("Press CTRL-C to quit...\n"); + while (msg_nb < N) { zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); zp_send_join(z_loan(s), NULL); diff --git a/examples/zephyr/z_pull.c b/examples/zephyr/z_pull.c index 61bed75d0..c96baff84 100644 --- a/examples/zephyr/z_pull.c +++ b/examples/zephyr/z_pull.c @@ -68,9 +68,10 @@ int main(int argc, char **argv) { } printf("OK!\n"); + int idx = 0; while (1) { - sleep(5); - printf("Pulling data from '%s'...\n", KEYEXPR); + sleep(1); + printf("[%4d] Pulling...\n", idx++); z_subscriber_pull(z_loan(sub)); } diff --git a/examples/zephyr/z_queryable.c b/examples/zephyr/z_queryable.c index 044664bb2..febdaabe2 100644 --- a/examples/zephyr/z_queryable.c +++ b/examples/zephyr/z_queryable.c @@ -75,7 +75,7 @@ int main(int argc, char **argv) { printf("Zenoh setup finished!\n"); while (1) { - sleep(5); + sleep(1); } printf("Closing Zenoh Session..."); diff --git a/examples/zephyr/z_sub.c b/examples/zephyr/z_sub.c index ac307a47a..ce0a0e73d 100644 --- a/examples/zephyr/z_sub.c +++ b/examples/zephyr/z_sub.c @@ -69,7 +69,7 @@ int main(int argc, char **argv) { printf("OK!\n"); while (1) { - sleep(5); + sleep(1); } printf("Closing Zenoh Session..."); diff --git a/tests/modularity.py b/tests/modularity.py index 662236a1d..000b6b90a 100644 --- a/tests/modularity.py +++ b/tests/modularity.py @@ -1,4 +1,6 @@ import argparse +import os +from signal import SIGINT import subprocess import sys import time @@ -15,16 +17,17 @@ def pub_and_sub(args): z_pub_expected_status = 0 z_pub_expected_output = '''Opening session... Declaring publisher for 'demo/example/zenoh-pico-pub'... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')...''' +Press CTRL-C to quit... +Putting Data ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')...''' else : z_pub_expected_status = 254 z_pub_expected_output = ("ERROR: Zenoh pico was compiled without " @@ -32,25 +35,25 @@ def pub_and_sub(args): # Expected z_sub output & status if args.sub == 1: - z_sub_expected_status = 0 + z_sub_expected_status = -2 if args.pub == 1: z_sub_expected_output = '''Opening session... Declaring Subscriber on 'demo/example/**'... -Enter 'q' to quit... ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!')''' +Press CTRL-C to quit... +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')''' else: z_sub_expected_output = '''Opening session... Declaring Subscriber on 'demo/example/**'... -Enter 'q' to quit...''' +Press CTRL-C to quit...''' else : z_sub_expected_status = 254 z_sub_expected_output = ("ERROR: Zenoh pico was compiled without " @@ -58,19 +61,21 @@ def pub_and_sub(args): print("Start subscriber") # Start z_sub in the background - z_sub_command = f"./{DIR_EXAMPLES}/z_sub" + z_sub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub" z_sub_process = subprocess.Popen(z_sub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, text=True) + stderr=subprocess.PIPE, + start_new_session=True, + text=True) # Introduce a delay to ensure z_sub starts time.sleep(2) print("Start publisher") # Start z_pub - z_pub_command = f"./{DIR_EXAMPLES}/z_pub" + z_pub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub -n 10" z_pub_process = subprocess.Popen(z_pub_command, shell=True, stdin=subprocess.PIPE, @@ -83,9 +88,9 @@ def pub_and_sub(args): print("Stop subscriber") if z_sub_process.poll() is None: - # Send "q" command to z_sub to stop it - z_sub_process.stdin.write("q\n") - z_sub_process.stdin.flush() + # send SIGINT to group + z_sub_process_gid = os.getpgid(z_sub_process.pid) + os.killpg(z_sub_process_gid, SIGINT) # Wait for z_sub to finish z_sub_process.wait() @@ -139,44 +144,30 @@ def query_and_queryable(args): z_query_expected_status = 0 if args.queryable == 1: z_query_expected_output = '''Opening session... -Enter any key to pull data or 'q' to quit... -Sending Query 'demo/example/**'... ->> Received ('demo/example/zenoh-pico-queryable': 'Queryable from Pico!') ->> Received query final notification -Sending Query 'demo/example/**'... ->> Received ('demo/example/zenoh-pico-queryable': 'Queryable from Pico!') ->> Received query final notification Sending Query 'demo/example/**'... >> Received ('demo/example/zenoh-pico-queryable': 'Queryable from Pico!') >> Received query final notification''' else: z_query_expected_output = '''Opening session... -Enter any key to pull data or 'q' to quit... -Sending Query 'demo/example/**'... ->> Received query final notification -Sending Query 'demo/example/**'... ->> Received query final notification Sending Query 'demo/example/**'... >> Received query final notification''' else : z_query_expected_status = 254 z_query_expected_output = ("ERROR: Zenoh pico was compiled without " - "Z_FEATURE_QUERY but this example requires it.") + "Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires them.") # Expected z_queryable output & status if args.queryable == 1: - z_queryable_expected_status = 0 + z_queryable_expected_status = -2 if args.query == 1: z_queryable_expected_output = '''Opening session... Creating Queryable on 'demo/example/zenoh-pico-queryable'... -Enter 'q' to quit... - >> [Queryable handler] Received Query 'demo/example/**?' - >> [Queryable handler] Received Query 'demo/example/**?' +Press CTRL-C to quit... >> [Queryable handler] Received Query 'demo/example/**?''' else: z_queryable_expected_output = '''Opening session... Creating Queryable on 'demo/example/zenoh-pico-queryable'... -Enter 'q' to quit...''' +Press CTRL-C to quit...''' else : z_queryable_expected_status = 254 z_queryable_expected_output = ("ERROR: Zenoh pico was compiled without " @@ -184,12 +175,13 @@ def query_and_queryable(args): print("Start queryable") # Start z_queryable in the background - z_queryable_command = f"./{DIR_EXAMPLES}/z_queryable" + z_queryable_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_queryable" z_queryable_process = subprocess.Popen(z_queryable_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + start_new_session=True, text=True) # Introduce a delay to ensure z_queryable starts @@ -197,7 +189,7 @@ def query_and_queryable(args): print("Start query") # Start z_query - z_query_command = f"./{DIR_EXAMPLES}/z_get" + z_query_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_get" z_query_process = subprocess.Popen(z_query_command, shell=True, stdin=subprocess.PIPE, @@ -205,34 +197,14 @@ def query_and_queryable(args): stderr=subprocess.PIPE, text=True) - # Introduce a delay to ensure z_query starts - time.sleep(2) - - print("Send requests") - if z_query_process.poll() is None: - z_query_process.stdin.write("\n") - z_query_process.stdin.flush() - time.sleep(1) - z_query_process.stdin.write("\n") - z_query_process.stdin.flush() - time.sleep(1) - z_query_process.stdin.write("\n") - z_query_process.stdin.flush() - time.sleep(1) - - print("Stop query") - if z_query_process.poll() is None: - z_query_process.stdin.write("q\n") - z_query_process.stdin.flush() - # Wait for z_query to finish z_query_process.wait() print("Stop queryable") if z_queryable_process.poll() is None: - # Send "q" command to z_sub to stop it - z_queryable_process.stdin.write("q\n") - z_queryable_process.stdin.flush() + # send SIGINT to group + z_quaryable_process_gid = os.getpgid(z_queryable_process.pid) + os.killpg(z_quaryable_process_gid, SIGINT) # Wait for z_queryable to finish z_queryable_process.wait() diff --git a/tests/raweth.py b/tests/raweth.py index 8f14e3b78..930b20908 100644 --- a/tests/raweth.py +++ b/tests/raweth.py @@ -1,4 +1,6 @@ import argparse +import os +from signal import SIGINT import subprocess import sys import time @@ -15,16 +17,17 @@ def pub_and_sub(args): z_pub_expected_status = 0 z_pub_expected_output = '''Opening session... Declaring publisher for 'demo/example/zenoh-pico-pub'... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')...''' +Press CTRL-C to quit... +Putting Data ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')...''' else : z_pub_expected_status = 255 z_pub_expected_output = '''Opening session... @@ -32,20 +35,20 @@ def pub_and_sub(args): # Expected z_sub output & status if args.reth == 1: - z_sub_expected_status = 0 + z_sub_expected_status = -2 z_sub_expected_output = '''Opening session... Declaring Subscriber on 'demo/example/**'... -Enter 'q' to quit... ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!')''' +Press CTRL-C to quit... +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')''' else : z_sub_expected_status = 255 z_sub_expected_output = '''Opening session... @@ -53,19 +56,21 @@ def pub_and_sub(args): print("Start subscriber") # Start z_sub in the background - z_sub_command = f"sudo ./{DIR_EXAMPLES}/z_sub -m \"peer\" -l \"reth/0\"s" + z_sub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub -m \"peer\" -l \"reth/0\"s" z_sub_process = subprocess.Popen(z_sub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, text=True) + stderr=subprocess.PIPE, + start_new_session=True, + text=True) # Introduce a delay to ensure z_sub starts time.sleep(2) print("Start publisher") # Start z_pub - z_pub_command = f"sudo ./{DIR_EXAMPLES}/z_pub -m \"peer\" -l \"reth/0\"s" + z_pub_command = f"sudo stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub -n 10 -m \"peer\" -l \"reth/0\"s" z_pub_process = subprocess.Popen(z_pub_command, shell=True, stdin=subprocess.PIPE, @@ -78,9 +83,9 @@ def pub_and_sub(args): print("Stop subscriber") if z_sub_process.poll() is None: - # Send "q" command to z_sub to stop it - z_sub_process.stdin.write("q\n") - z_sub_process.stdin.flush() + # send SIGINT to group + z_sub_process_gid = os.getpgid(z_sub_process.pid) + os.killpg(z_sub_process_gid, SIGINT) # Wait for z_sub to finish z_sub_process.wait() diff --git a/tests/single_thread.py b/tests/single_thread.py index cffd7b82b..00a4955b8 100644 --- a/tests/single_thread.py +++ b/tests/single_thread.py @@ -14,31 +14,33 @@ def pub_and_sub(): z_pub_expected_status = 0 z_pub_expected_output = '''Opening session... Declaring publisher for 'demo/example/zenoh-pico-pub'... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... -Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')...''' +Press CTRL-C to quit... +Putting Data ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')...''' # Expected z_sub output z_sub_expected_status = 0 z_sub_expected_output = '''Opening session... Declaring Subscriber on 'demo/example/**'... ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') ->> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!')''' +Press CTRL-C to quit... +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 5] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 6] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 7] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 8] Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 9] Pub from Pico!')''' print("Start subscriber") # Start z_sub in the background @@ -55,7 +57,7 @@ def pub_and_sub(): print("Start publisher") # Start z_pub - z_pub_command = f"./{DIR_EXAMPLES}/z_pub_st" + z_pub_command = f"./{DIR_EXAMPLES}/z_pub_st -n 10" z_pub_process = subprocess.Popen(z_pub_command, shell=True, stdin=subprocess.PIPE,