From 8a1db828d9d36a82defbd4d6af2d1af4653b6d5d Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 14 Jun 2024 15:48:37 +0200 Subject: [PATCH] feat: add attachment examples --- examples/CMakeLists.txt | 2 + examples/unix/c11/z_pub_attachment.c | 184 +++++++++++++++++++++++++++ examples/unix/c11/z_sub_attachment.c | 178 ++++++++++++++++++++++++++ include/zenoh-pico/api/primitives.h | 2 +- 4 files changed, 365 insertions(+), 1 deletion(-) create mode 100644 examples/unix/c11/z_pub_attachment.c create mode 100644 examples/unix/c11/z_sub_attachment.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 7650b6cfa..b072b9283 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -36,9 +36,11 @@ if(UNIX) add_example(z_put unix/c11/z_put.c) add_example(z_pub unix/c11/z_pub.c) add_example(z_pub_st unix/c11/z_pub_st.c) + add_example(z_pub_attachment unix/c11/z_pub_attachment.c) add_example(z_sub unix/c11/z_sub.c) add_example(z_sub_channel unix/c11/z_sub_channel.c) add_example(z_sub_st unix/c11/z_sub_st.c) + add_example(z_sub_attachment unix/c11/z_sub_attachment.c) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) add_example(z_get_channel unix/c11/z_get_channel.c) diff --git a/examples/unix/c11/z_pub_attachment.c b/examples/unix/c11/z_pub_attachment.c new file mode 100644 index 000000000..50bc1c106 --- /dev/null +++ b/examples/unix/c11/z_pub_attachment.c @@ -0,0 +1,184 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include +#include + +#include "zenoh-pico/system/platform.h" + +typedef struct kv_pair_t { + const char *key; + const char *value; +} kv_pair_t; + +typedef struct kv_pairs_t { + const kv_pair_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_t; + +#if Z_FEATURE_PUBLICATION == 1 + +size_t kv_pairs_length(kv_pairs_t *kvp) { + size_t ret = 0; + for (size_t i = 0; i < kvp->len; i++) { + // Size fields + ret += 2 * sizeof(uint32_t); + // Data size + ret += strlen(kvp->data[i].key) + strlen(kvp->data[i].value); + } + return ret; +} + +_Bool create_attachment_iter(z_owned_bytes_t *kv_pair, void *context, size_t *curr_idx) { + kv_pairs_t *kvs = (kv_pairs_t *)(context); + z_owned_bytes_t k, v; + if (kvs->current_idx >= kvs->len) { + return false; + } else { + z_bytes_encode_from_string(&k, kvs->data[kvs->current_idx].key); + z_bytes_encode_from_string(&v, kvs->data[kvs->current_idx].value); + zp_bytes_encode_from_pair(kv_pair, z_move(k), z_move(v), curr_idx); + kvs->current_idx++; + return true; + } +} + +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/zenoh-pico-pub"; + char *const default_value = "Pub from Pico!"; + char *value = default_value; + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + int n = 2147483647; // max int value by default + + int opt; + while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'v': + value = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || + optopt == 'n') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_move(s)); + return -1; + } + // Wait for joins in peer mode + if (strcmp(mode, "peer") == 0) { + printf("Waiting for joins...\n"); + sleep(3); + } + // Declare publisher + printf("Declaring publisher for '%s'...\n", keyexpr); + z_view_keyexpr_t ke; + z_view_keyexpr_from_string(&ke, keyexpr); + z_owned_publisher_t pub; + if (z_declare_publisher(&pub, z_loan(s), z_loan(ke), NULL) < 0) { + printf("Unable to declare publisher for key expression!\n"); + return -1; + } + + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + + // Allocate attachment + kv_pair_t kvs[2]; + kvs[0] = (kv_pair_t){.key = "source", .value = "C"}; + z_owned_bytes_t attachment; + + // Allocate buffer + char buf[256]; + char buf_ind[16]; + + // Publish data + printf("Press CTRL-C to quit...\n"); + for (int idx = 0; idx < n; ++idx) { + z_sleep_s(1); + + // Add attachment value + sprintf(buf_ind, "%d", idx); + kvs[1] = (kv_pair_t){.key = "index", .value = buf_ind}; + kv_pairs_t ctx = (kv_pairs_t){.data = kvs, .current_idx = 0, .len = 2}; + zp_bytes_encode_from_iter(&attachment, create_attachment_iter, (void *)&ctx, kv_pairs_length(&ctx)); + options.attachment = z_move(attachment); + + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); + } + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan_mut(s)); + zp_stop_lease_task(z_loan_mut(s)); + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_sub_attachment.c b/examples/unix/c11/z_sub_attachment.c new file mode 100644 index 000000000..7c0a0cfef --- /dev/null +++ b/examples/unix/c11/z_sub_attachment.c @@ -0,0 +1,178 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include +#include + +typedef struct kv_pair_t { + z_owned_string_t key; + z_owned_string_t value; +} kv_pair_t; + +typedef struct kv_pairs_t { + kv_pair_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_t; + +#define KVP_LEN 16 + +#if Z_FEATURE_SUBSCRIPTION == 1 + +static int msg_nb = 0; + +void parse_attachment(kv_pairs_t *kvp, const z_loaned_bytes_t *attachment) { + size_t curr_idx = 0; + z_owned_bytes_t first, second; + while ((kvp->current_idx < kvp->len) && (zp_bytes_decode_into_pair(attachment, &first, &second, &curr_idx) == 0)) { + z_bytes_decode_into_string(z_loan(first), &kvp->data[kvp->current_idx].key); + z_bytes_decode_into_string(z_loan(second), &kvp->data[kvp->current_idx].value); + z_bytes_drop(&first); + z_bytes_drop(&second); + kvp->current_idx++; + } +} + +void print_attachment(kv_pairs_t *kvp) { + printf(" with attachment:\n"); + for (uint32_t i = 0; i < kvp->current_idx; i++) { + printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), + z_string_data(z_loan(kvp->data[i].value))); + } +} + +void drop_attachment(kv_pairs_t *kvp) { + for (size_t i = 0; i < kvp->current_idx; i++) { + z_string_drop(&kvp->data[i].key); + z_string_drop(&kvp->data[i].value); + } + z_free(kvp->data); +} + +void data_handler(const z_loaned_sample_t *sample, void *ctx) { + (void)(ctx); + z_owned_string_t keystr; + z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr); + z_owned_string_t value; + z_bytes_decode_into_string(z_sample_payload(sample), &value); + printf(">> [Subscriber] Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(value))); + // Check attachment + kv_pairs_t kvp = {.current_idx = 0, .len = KVP_LEN, .data = (kv_pair_t *)malloc(KVP_LEN * sizeof(kv_pair_t))}; + parse_attachment(&kvp, z_sample_attachment(sample)); + if (kvp.current_idx > 0) { + print_attachment(&kvp); + } + drop_attachment(&kvp); + z_drop(z_move(keystr)); + z_drop(z_move(value)); + msg_nb++; +} + +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/**"; + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + int n = 0; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler); + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_subscriber_t sub; + z_view_keyexpr_t ke; + z_view_keyexpr_from_string(&ke, keyexpr); + if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(callback), NULL) < 0) { + printf("Unable to declare subscriber.\n"); + return -1; + } + + printf("Press CTRL-C to quit...\n"); + while (1) { + if ((n != 0) && (msg_nb >= n)) { + break; + } + sleep(1); + } + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan_mut(s)); + zp_stop_lease_task(z_loan_mut(s)); + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index bdf754c92..5ea4f2557 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -795,7 +795,7 @@ int8_t zp_bytes_encode_from_iter(z_owned_bytes_t *bytes, * ``0`` if encode successful, ``negative value`` otherwise. */ int8_t zp_bytes_encode_from_pair(z_owned_bytes_t *bytes, z_owned_bytes_t *first, z_owned_bytes_t *second, - size_t *curr_idx); + size_t *curr_idx); /** * Checks validity of a timestamp