diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index b072b9283..ba3dd8139 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -44,8 +44,10 @@ if(UNIX) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) add_example(z_get_channel unix/c11/z_get_channel.c) + add_example(z_get_attachment unix/c11/z_get_attachment.c) add_example(z_queryable unix/c11/z_queryable.c) add_example(z_queryable_channel unix/c11/z_queryable_channel.c) + add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c) add_example(z_info unix/c11/z_info.c) add_example(z_scout unix/c11/z_scout.c) add_example(z_ping unix/c11/z_ping.c) diff --git a/examples/unix/c11/z_get_attachment.c b/examples/unix/c11/z_get_attachment.c new file mode 100644 index 000000000..da158e7f9 --- /dev/null +++ b/examples/unix/c11/z_get_attachment.c @@ -0,0 +1,248 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +typedef struct kv_pair_t { + const char *key; + const char *value; +} kv_pair_t; + +typedef struct kv_pairs_tx_t { + const kv_pair_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_tx_t; + +typedef struct kv_pair_decoded_t { + z_owned_string_t key; + z_owned_string_t value; +} kv_pair_decoded_t; + +typedef struct kv_pairs_rx_t { + kv_pair_decoded_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_rx_t; + +#define KVP_LEN 16 + +#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 +static z_condvar_t cond; +static z_mutex_t mutex; + +size_t kv_pairs_length(kv_pairs_tx_t *kvp) { + size_t ret = 0; + for (size_t i = 0; i < kvp->len; i++) { + // Size fields + ret += 2 * sizeof(uint32_t); + // Data size + ret += strlen(kvp->data[i].key) + strlen(kvp->data[i].value); + } + return ret; +} + +_Bool create_attachment_iter(z_owned_bytes_t *kv_pair, void *context, size_t *curr_idx) { + kv_pairs_tx_t *kvs = (kv_pairs_tx_t *)(context); + z_owned_bytes_t k, v; + if (kvs->current_idx >= kvs->len) { + return false; + } else { + z_bytes_encode_from_string(&k, kvs->data[kvs->current_idx].key); + z_bytes_encode_from_string(&v, kvs->data[kvs->current_idx].value); + zp_bytes_encode_from_pair(kv_pair, z_move(k), z_move(v), curr_idx); + kvs->current_idx++; + return true; + } +} + +void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) { + size_t curr_idx = 0; + z_owned_bytes_t first, second; + while ((kvp->current_idx < kvp->len) && (zp_bytes_decode_into_pair(attachment, &first, &second, &curr_idx) == 0)) { + z_bytes_decode_into_string(z_loan(first), &kvp->data[kvp->current_idx].key); + z_bytes_decode_into_string(z_loan(second), &kvp->data[kvp->current_idx].value); + z_bytes_drop(&first); + z_bytes_drop(&second); + kvp->current_idx++; + } +} + +void print_attachment(kv_pairs_rx_t *kvp) { + printf(" with attachment:\n"); + for (uint32_t i = 0; i < kvp->current_idx; i++) { + printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), + z_string_data(z_loan(kvp->data[i].value))); + } +} + +void drop_attachment(kv_pairs_rx_t *kvp) { + for (size_t i = 0; i < kvp->current_idx; i++) { + z_string_drop(&kvp->data[i].key); + z_string_drop(&kvp->data[i].value); + } + z_free(kvp->data); +} + +void reply_dropper(void *ctx) { + (void)(ctx); + printf(">> Received query final notification\n"); + z_condvar_signal(&cond); + z_condvar_free(&cond); +} + +void reply_handler(const z_loaned_reply_t *reply, void *ctx) { + (void)(ctx); + if (z_reply_is_ok(reply)) { + const z_loaned_sample_t *sample = z_reply_ok(reply); + z_owned_string_t keystr; + z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr); + z_owned_string_t replystr; + z_bytes_decode_into_string(z_sample_payload(sample), &replystr); + + printf(">> Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(replystr))); + + // Check attachment + kv_pairs_rx_t kvp = { + .current_idx = 0, .len = KVP_LEN, .data = (kv_pair_decoded_t *)malloc(KVP_LEN * sizeof(kv_pair_decoded_t))}; + parse_attachment(&kvp, z_sample_attachment(sample)); + if (kvp.current_idx > 0) { + print_attachment(&kvp); + } + drop_attachment(&kvp); + + z_drop(z_move(keystr)); + z_drop(z_move(replystr)); + } else { + printf(">> Received an error\n"); + } +} + +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/**"; + const char *mode = "client"; + const char *clocator = NULL; + const char *llocator = NULL; + const char *value = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'v': + value = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_mutex_init(&mutex); + z_condvar_init(&cond); + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + z_view_keyexpr_t ke; + if (z_view_keyexpr_from_string(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + z_mutex_lock(&mutex); + printf("Sending Query '%s'...\n", keyexpr); + z_get_options_t opts; + z_get_options_default(&opts); + // Value encoding + z_owned_bytes_t payload; + if (value != NULL) { + z_bytes_encode_from_string(&payload, value); + opts.payload = &payload; + } + + // Add attachment value + kv_pair_t kvs[1]; + kvs[0] = (kv_pair_t){.key = "test_key", .value = "test_value"}; + kv_pairs_tx_t ctx = (kv_pairs_tx_t){.data = kvs, .current_idx = 0, .len = 1}; + z_owned_bytes_t attachment; + zp_bytes_encode_from_iter(&attachment, create_attachment_iter, (void *)&ctx, kv_pairs_length(&ctx)); + opts.attachment = z_move(attachment); + + z_owned_closure_reply_t callback; + z_closure(&callback, reply_handler, reply_dropper); + if (z_get(z_loan(s), z_loan(ke), "", z_move(callback), &opts) < 0) { + printf("Unable to send query.\n"); + return -1; + } + z_condvar_wait(&cond, &mutex); + z_mutex_unlock(&mutex); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan_mut(s)); + zp_stop_lease_task(z_loan_mut(s)); + + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_queryable_attachment.c b/examples/unix/c11/z_queryable_attachment.c new file mode 100644 index 000000000..d6b41fac2 --- /dev/null +++ b/examples/unix/c11/z_queryable_attachment.c @@ -0,0 +1,254 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +typedef struct kv_pair_t { + const char *key; + const char *value; +} kv_pair_t; + +typedef struct kv_pairs_tx_t { + const kv_pair_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_tx_t; + +typedef struct kv_pair_decoded_t { + z_owned_string_t key; + z_owned_string_t value; +} kv_pair_decoded_t; + +typedef struct kv_pairs_rx_t { + kv_pair_decoded_t *data; + uint32_t len; + uint32_t current_idx; +} kv_pairs_rx_t; + +#define KVP_LEN 16 + +#if Z_FEATURE_QUERYABLE == 1 +const char *keyexpr = "demo/example/zenoh-pico-queryable"; +const char *value = "Queryable from Pico!"; +static int msg_nb = 0; + +size_t kv_pairs_length(kv_pairs_tx_t *kvp) { + size_t ret = 0; + for (size_t i = 0; i < kvp->len; i++) { + // Size fields + ret += 2 * sizeof(uint32_t); + // Data size + ret += strlen(kvp->data[i].key) + strlen(kvp->data[i].value); + } + return ret; +} + +_Bool create_attachment_iter(z_owned_bytes_t *kv_pair, void *context, size_t *curr_idx) { + kv_pairs_tx_t *kvs = (kv_pairs_tx_t *)(context); + z_owned_bytes_t k, v; + if (kvs->current_idx >= kvs->len) { + return false; + } else { + z_bytes_encode_from_string(&k, kvs->data[kvs->current_idx].key); + z_bytes_encode_from_string(&v, kvs->data[kvs->current_idx].value); + zp_bytes_encode_from_pair(kv_pair, z_move(k), z_move(v), curr_idx); + kvs->current_idx++; + return true; + } +} + +void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) { + size_t curr_idx = 0; + z_owned_bytes_t first, second; + while ((kvp->current_idx < kvp->len) && (zp_bytes_decode_into_pair(attachment, &first, &second, &curr_idx) == 0)) { + z_bytes_decode_into_string(z_loan(first), &kvp->data[kvp->current_idx].key); + z_bytes_decode_into_string(z_loan(second), &kvp->data[kvp->current_idx].value); + z_bytes_drop(&first); + z_bytes_drop(&second); + kvp->current_idx++; + } +} + +void print_attachment(kv_pairs_rx_t *kvp) { + printf(" with attachment:\n"); + for (uint32_t i = 0; i < kvp->current_idx; i++) { + printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), + z_string_data(z_loan(kvp->data[i].value))); + } +} + +void drop_attachment(kv_pairs_rx_t *kvp) { + for (size_t i = 0; i < kvp->current_idx; i++) { + z_string_drop(&kvp->data[i].key); + z_string_drop(&kvp->data[i].value); + } + z_free(kvp->data); +} + +void query_handler(const z_loaned_query_t *query, void *ctx) { + (void)(ctx); + z_owned_string_t keystr; + z_keyexpr_to_string(z_query_keyexpr(query), &keystr); + z_view_string_t params; + z_query_parameters(query, ¶ms); + printf(" >> [Queryable handler] Received Query '%s%.*s'\n", z_string_data(z_loan(keystr)), (int)z_loan(params)->len, + z_loan(params)->val); + // Process value + z_owned_string_t payload_string; + z_bytes_decode_into_string(z_value_payload(z_query_value(query)), &payload_string); + if (z_string_len(z_loan(payload_string)) > 1) { + printf(" with value '%s'\n", z_string_data(z_loan(payload_string))); + } + // Check attachment + kv_pairs_rx_t kvp = { + .current_idx = 0, .len = KVP_LEN, .data = (kv_pair_decoded_t *)malloc(KVP_LEN * sizeof(kv_pair_decoded_t))}; + parse_attachment(&kvp, z_query_attachment(query)); + if (kvp.current_idx > 0) { + print_attachment(&kvp); + } + drop_attachment(&kvp); + z_drop(z_move(payload_string)); + + // Create encoding + z_owned_encoding_t encoding; + zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); + + z_query_reply_options_t options; + z_query_reply_options_default(&options); + options.encoding = z_move(encoding); + + // Reply value encoding + z_owned_bytes_t reply_payload; + z_bytes_encode_from_string(&reply_payload, value); + + // Reply attachment + kv_pair_t kvs[1]; + kvs[0] = (kv_pair_t){.key = "reply_key", .value = "reply_value"}; + kv_pairs_tx_t kv_ctx = (kv_pairs_tx_t){.data = kvs, .current_idx = 0, .len = 1}; + z_owned_bytes_t attachment; + zp_bytes_encode_from_iter(&attachment, create_attachment_iter, (void *)&kv_ctx, kv_pairs_length(&kv_ctx)); + options.attachment = z_move(attachment); + + z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options); + z_drop(z_move(keystr)); + msg_nb++; +} + +int main(int argc, char **argv) { + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + int n = 0; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:v:l:n:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'v': + value = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' || + optopt == 'n') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config)) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + z_view_keyexpr_t ke; + if (z_view_keyexpr_from_string(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + printf("Creating Queryable on '%s'...\n", keyexpr); + z_owned_closure_query_t callback; + z_closure(&callback, query_handler); + z_owned_queryable_t qable; + if (z_declare_queryable(&qable, z_loan(s), z_loan(ke), z_move(callback), NULL) < 0) { + printf("Unable to create queryable.\n"); + return -1; + } + + printf("Press CTRL-C to quit...\n"); + while (1) { + if ((n != 0) && (msg_nb >= n)) { + break; + } + sleep(1); + } + + z_undeclare_queryable(z_move(qable)); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan_mut(s)); + zp_stop_lease_task(z_loan_mut(s)); + + z_close(z_move(s)); + + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -2; +} +#endif