From 30d1bedcdc32d0d4bd122b195ebda5ea36501f1b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 17:34:00 +0100 Subject: [PATCH 1/6] test: add fragment test to integration testing --- .github/workflows/integration.yaml | 1 + tests/z_client_test.c | 35 +++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index d6b3a0205..175c103bf 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -51,6 +51,7 @@ jobs: - name: Test debug run: make test + timeout-minutes: 15 env: BUILD_TYPE: Debug # Workaround for Windows as it seems the previous step is being ignored BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored diff --git a/tests/z_client_test.c b/tests/z_client_test.c index 22475a352..1058e5b31 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -25,6 +25,8 @@ #define MSG 1000 #define MSG_LEN 1024 +#define FRAGMENT_MSG_NB 100 +#define FRAGMENT_MSG_LEN 100000 #define QRY 100 #define QRY_CLT 10 #define SET 100 @@ -92,7 +94,7 @@ void data_handler(const z_sample_t *sample, void *arg) { printf(">> Received data: %s\t(%u/%u)\n", res, datas, total); z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr); - assert(sample->payload.len == MSG_LEN); + assert((sample->payload.len == MSG_LEN) || (sample->payload.len == FRAGMENT_MSG_LEN)); assert(_z_str_eq(z_loan(k_str), res) == true); datas++; @@ -233,6 +235,37 @@ int main(int argc, char **argv) { z_sleep_s(SLEEP); + // Write fragment data from first session + if (is_reliable) { + z_free((uint8_t *)payload); + len = FRAGMENT_MSG_LEN; + payload = (uint8_t *)z_malloc(len); + memset(payload, 1, FRAGMENT_MSG_LEN); + + total = FRAGMENT_MSG_NB * SET; + for (unsigned int n = 0; n < FRAGMENT_MSG_NB; n++) { + for (unsigned int i = 0; i < SET; i++) { + z_put_options_t opt = z_put_options_default(); + opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; + z_put(z_loan(s1), z_loan(rids1[i]), (const uint8_t *)payload, len, &opt); + printf("Wrote fragment data from session 1: %u %zu b\t(%u/%u)\n", z_loan(rids1[i])._id, len, + n * SET + (i + 1), total); + } + } + // Wait to receive all the data + now = z_clock_now(); + while (datas < total) { + assert(z_clock_elapsed_s(&now) < TIMEOUT); + printf("Waiting for fragment datas... %u/%u\n", datas, total); + z_sleep_s(SLEEP); + } + if (is_reliable == true) { + assert(datas == total); + } + datas = 0; + z_sleep_s(SLEEP); + } + // Query data from first session total = QRY * SET; for (unsigned int n = 0; n < QRY; n++) { From 2a371f005dab66ae57f075508efe257611aefad5 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 19 Dec 2023 10:50:33 +0100 Subject: [PATCH 2/6] fix: change perf test config --- tests/z_perf_rx.c | 15 ++++++++++++--- tests/z_perf_tx.c | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c index 7a67e153a..391bcd231 100644 --- a/tests/z_perf_rx.c +++ b/tests/z_perf_rx.c @@ -60,21 +60,30 @@ void on_sample(const z_sample_t *sample, void *context) { int main(int argc, char **argv) { char *keyexpr = "test/thr"; - const char *mode = "client"; + const char *mode = NULL; char *llocator = NULL; + char *clocator = NULL; (void)argv; - // Check if peer mode + // Check if peer or client mode if (argc > 1) { mode = "peer"; llocator = "udp/224.0.0.224:7447#iface=lo"; + } else { + mode = "client"; + clocator = "tcp/127.0.0.1:7447"; } // Set config z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (mode != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + } if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) { diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c index 454d72de2..277d1da14 100644 --- a/tests/z_perf_tx.c +++ b/tests/z_perf_tx.c @@ -41,21 +41,30 @@ int main(int argc, char **argv) { uint8_t *value = (uint8_t *)malloc(len_array[0]); memset(value, 1, len_array[0]); char *keyexpr = "test/thr"; - const char *mode = "client"; + const char *mode = NULL; char *llocator = NULL; + char *clocator = NULL; (void)argv; - // Check if peer mode + // Check if peer or client mode if (argc > 1) { mode = "peer"; llocator = "udp/224.0.0.224:7447#iface=lo"; + } else { + mode = "client"; + clocator = "tcp/127.0.0.1:7447"; } // Set config z_owned_config_t config = z_config_default(); - zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (mode != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + } if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) { From 409d96b1cebc2a69447df56a6f2aea042ce5f3f7 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 19 Dec 2023 11:24:52 +0100 Subject: [PATCH 3/6] feat: add throughput examples --- examples/CMakeLists.txt | 2 + examples/unix/c11/z_pub_thr.c | 75 +++++++++++++++++++++ examples/unix/c11/z_sub_thr.c | 122 ++++++++++++++++++++++++++++++++++ 3 files changed, 199 insertions(+) create mode 100644 examples/unix/c11/z_pub_thr.c create mode 100644 examples/unix/c11/z_sub_thr.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 837e41199..fb780510d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -45,6 +45,8 @@ if(UNIX) add_example(z_scout unix/c11/z_scout.c) add_example(z_ping unix/c11/z_ping.c) add_example(z_pong unix/c11/z_pong.c) + add_example(z_pub_thr unix/c11/z_pub_thr.c) + add_example(z_sub_thr unix/c11/z_sub_thr.c) endif() elseif(MSVC) add_example(z_put windows/z_put.c) diff --git a/examples/unix/c11/z_pub_thr.c b/examples/unix/c11/z_pub_thr.c new file mode 100644 index 000000000..d1379c43c --- /dev/null +++ b/examples/unix/c11/z_pub_thr.c @@ -0,0 +1,75 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#if Z_FEATURE_PUBLICATION == 1 +int main(int argc, char **argv) { + if (argc < 2) { + printf("USAGE:\n\tz_pub_thr []\n\n"); + exit(-1); + } + char *keyexpr = "test/thr"; + size_t len = atoi(argv[1]); + uint8_t *value = (uint8_t *)malloc(len); + memset(value, 1, len); + + // Set config + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[2])) < 0) { + printf("Couldn't insert locator in config: %s\n", argv[2]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare publisher + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + + // Send packets + while (1) { + z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL); + } + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c new file mode 100644 index 000000000..17b126274 --- /dev/null +++ b/examples/unix/c11/z_sub_thr.c @@ -0,0 +1,122 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#define N 1000000 + +typedef struct { + volatile unsigned long count; + volatile unsigned long finished_rounds; + volatile clock_t start; + volatile clock_t stop; + volatile clock_t first_start; +} z_stats_t; + +#if Z_FEATURE_SUBSCRIPTION == 1 + +z_stats_t *z_stats_make(void) { + z_stats_t *stats = malloc(sizeof(z_stats_t)); + stats->count = 0; + stats->finished_rounds = 0; + stats->first_start = 0; + return stats; +} + +void on_sample(const z_sample_t *sample, void *context) { + (void)sample; + z_stats_t *stats = (z_stats_t *)context; + if (stats->count == 0) { + stats->start = clock(); + if (!stats->first_start) { + stats->first_start = stats->start; + } + stats->count++; + } else if (stats->count < N) { + stats->count++; + } else { + stats->stop = clock(); + stats->finished_rounds++; + printf("%f msg/s\n", N * (double)CLOCKS_PER_SEC / (double)(stats->stop - stats->start)); + stats->count = 0; + } +} + +void drop_stats(void *context) { + const clock_t end = clock(); + const z_stats_t *stats = (z_stats_t *)context; + const double elapsed = (double)(end - stats->first_start) / (double)CLOCKS_PER_SEC; + const unsigned long sent_messages = N * stats->finished_rounds + stats->count; + printf("Stats being dropped after unsubscribing: sent %ld messages over %f seconds (%f msg/s)\n", sent_messages, + elapsed, (double)sent_messages / elapsed); + free(context); +} + +int main(int argc, char **argv) { + char *keyexpr = "test/thr"; + z_owned_config_t config = z_config_default(); + + // Set config + if (argc > 1) { + if (zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(argv[1])) < 0) { + printf("Failed to insert locator in config: %s\n", argv[1]); + exit(-1); + } + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare Subscriber/resource + z_stats_t *context = z_stats_make(); + z_owned_closure_sample_t callback = z_closure(on_sample, drop_stats, (void *)context); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to create subscriber.\n"); + exit(-1); + } + // Listen until stopped + printf("Start listening.\n"); + char c = 0; + while (c != 'q') { + c = fgetc(stdin); + } + // Wait for everything to settle + printf("End of test\n"); + z_sleep_s(1); + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif From e772c644a2d146c908b5a1a8cf88ea019222ae0c Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 19 Dec 2023 14:44:31 +0100 Subject: [PATCH 4/6] fix: remove test for performance --- tests/z_perf_tx.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c index 277d1da14..eac90d159 100644 --- a/tests/z_perf_tx.c +++ b/tests/z_perf_tx.c @@ -26,10 +26,7 @@ int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value z_clock_t test_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < TEST_DURATION_US) { - if (z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL) != 0) { - printf("Put failed for pkt len: %lu\n", pkt_len); - return -1; - } + z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL); elapsed_us = z_clock_elapsed_us(&test_start); } return 0; From a49c9f6b956376beb44984a157c1403001758889 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 19 Dec 2023 16:12:08 +0100 Subject: [PATCH 5/6] fix: use monotonic time for throughput example --- examples/unix/c11/z_sub_thr.c | 39 +++++++++++++++++------------------ 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c index 17b126274..6cddc4acb 100644 --- a/examples/unix/c11/z_sub_thr.c +++ b/examples/unix/c11/z_sub_thr.c @@ -19,14 +19,13 @@ #include "zenoh-pico.h" -#define N 1000000 +#define PACKET_NB 1000000 typedef struct { volatile unsigned long count; volatile unsigned long finished_rounds; - volatile clock_t start; - volatile clock_t stop; - volatile clock_t first_start; + z_clock_t start; + z_clock_t first_start; } z_stats_t; #if Z_FEATURE_SUBSCRIPTION == 1 @@ -35,36 +34,36 @@ z_stats_t *z_stats_make(void) { z_stats_t *stats = malloc(sizeof(z_stats_t)); stats->count = 0; stats->finished_rounds = 0; - stats->first_start = 0; + stats->first_start.tv_nsec = 0; return stats; } void on_sample(const z_sample_t *sample, void *context) { (void)sample; z_stats_t *stats = (z_stats_t *)context; - if (stats->count == 0) { - stats->start = clock(); - if (!stats->first_start) { + stats->count++; + // Start set measurement + if (stats->count == 1) { + stats->start = z_clock_now(); + if (stats->first_start.tv_nsec == 0) { stats->first_start = stats->start; } - stats->count++; - } else if (stats->count < N) { - stats->count++; - } else { - stats->stop = clock(); + } else if (stats->count >= PACKET_NB) { + // Stop set measurement stats->finished_rounds++; - printf("%f msg/s\n", N * (double)CLOCKS_PER_SEC / (double)(stats->stop - stats->start)); + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); + printf("Received %d msg in %lu ms (%.1f msg/s)\n", PACKET_NB, elapsed_ms, + (double)(PACKET_NB * 1000 / elapsed_ms)); stats->count = 0; } } void drop_stats(void *context) { - const clock_t end = clock(); - const z_stats_t *stats = (z_stats_t *)context; - const double elapsed = (double)(end - stats->first_start) / (double)CLOCKS_PER_SEC; - const unsigned long sent_messages = N * stats->finished_rounds + stats->count; - printf("Stats being dropped after unsubscribing: sent %ld messages over %f seconds (%f msg/s)\n", sent_messages, - elapsed, (double)sent_messages / elapsed); + z_stats_t *stats = (z_stats_t *)context; + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->first_start); + const unsigned long sent_messages = PACKET_NB * stats->finished_rounds + stats->count; + printf("Stats after unsubscribing: received %ld messages over %lu miliseconds (%.1f msg/s)\n", sent_messages, + elapsed_ms, (double)(sent_messages * 1000 / elapsed_ms)); free(context); } From 66af3b280548660ef65fa2c3331c37a56523339f Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 20 Dec 2023 10:19:33 +0100 Subject: [PATCH 6/6] fix: convert type before divide operation --- examples/unix/c11/z_sub_thr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/unix/c11/z_sub_thr.c b/examples/unix/c11/z_sub_thr.c index 6cddc4acb..676d37d9c 100644 --- a/examples/unix/c11/z_sub_thr.c +++ b/examples/unix/c11/z_sub_thr.c @@ -53,7 +53,7 @@ void on_sample(const z_sample_t *sample, void *context) { stats->finished_rounds++; unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); printf("Received %d msg in %lu ms (%.1f msg/s)\n", PACKET_NB, elapsed_ms, - (double)(PACKET_NB * 1000 / elapsed_ms)); + (double)(PACKET_NB * 1000) / (double)elapsed_ms); stats->count = 0; } } @@ -63,7 +63,7 @@ void drop_stats(void *context) { unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->first_start); const unsigned long sent_messages = PACKET_NB * stats->finished_rounds + stats->count; printf("Stats after unsubscribing: received %ld messages over %lu miliseconds (%.1f msg/s)\n", sent_messages, - elapsed_ms, (double)(sent_messages * 1000 / elapsed_ms)); + elapsed_ms, (double)(sent_messages * 1000) / (double)elapsed_ms); free(context); }