Skip to content

Commit

Permalink
feat: add attachment examples
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Jun 14, 2024
1 parent 25e1986 commit 8a1db82
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 1 deletion.
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ if(UNIX)
add_example(z_put unix/c11/z_put.c)
add_example(z_pub unix/c11/z_pub.c)
add_example(z_pub_st unix/c11/z_pub_st.c)
add_example(z_pub_attachment unix/c11/z_pub_attachment.c)
add_example(z_sub unix/c11/z_sub.c)
add_example(z_sub_channel unix/c11/z_sub_channel.c)
add_example(z_sub_st unix/c11/z_sub_st.c)
add_example(z_sub_attachment unix/c11/z_sub_attachment.c)
add_example(z_pull unix/c11/z_pull.c)
add_example(z_get unix/c11/z_get.c)
add_example(z_get_channel unix/c11/z_get_channel.c)
Expand Down
184 changes: 184 additions & 0 deletions examples/unix/c11/z_pub_attachment.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/system/platform.h"

typedef struct kv_pair_t {
const char *key;
const char *value;
} kv_pair_t;

typedef struct kv_pairs_t {
const kv_pair_t *data;
uint32_t len;
uint32_t current_idx;
} kv_pairs_t;

#if Z_FEATURE_PUBLICATION == 1

size_t kv_pairs_length(kv_pairs_t *kvp) {
size_t ret = 0;
for (size_t i = 0; i < kvp->len; i++) {
// Size fields
ret += 2 * sizeof(uint32_t);
// Data size
ret += strlen(kvp->data[i].key) + strlen(kvp->data[i].value);
}
return ret;
}

_Bool create_attachment_iter(z_owned_bytes_t *kv_pair, void *context, size_t *curr_idx) {
kv_pairs_t *kvs = (kv_pairs_t *)(context);
z_owned_bytes_t k, v;
if (kvs->current_idx >= kvs->len) {
return false;
} else {
z_bytes_encode_from_string(&k, kvs->data[kvs->current_idx].key);
z_bytes_encode_from_string(&v, kvs->data[kvs->current_idx].value);
zp_bytes_encode_from_pair(kv_pair, z_move(k), z_move(v), curr_idx);
kvs->current_idx++;
return true;
}
}

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-pub";
char *const default_value = "Pub from Pico!";
char *value = default_value;
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 2147483647; // max int value by default

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'v':
value = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' ||
optopt == 'n') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config)) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_move(s));
return -1;
}
// Wait for joins in peer mode
if (strcmp(mode, "peer") == 0) {
printf("Waiting for joins...\n");
sleep(3);
}
// Declare publisher
printf("Declaring publisher for '%s'...\n", keyexpr);
z_view_keyexpr_t ke;
z_view_keyexpr_from_string(&ke, keyexpr);
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_loan(ke), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_publisher_put_options_t options;
z_publisher_put_options_default(&options);

// Allocate attachment
kv_pair_t kvs[2];
kvs[0] = (kv_pair_t){.key = "source", .value = "C"};
z_owned_bytes_t attachment;

// Allocate buffer
char buf[256];
char buf_ind[16];

// Publish data
printf("Press CTRL-C to quit...\n");
for (int idx = 0; idx < n; ++idx) {
z_sleep_s(1);

// Add attachment value
sprintf(buf_ind, "%d", idx);
kvs[1] = (kv_pair_t){.key = "index", .value = buf_ind};
kv_pairs_t ctx = (kv_pairs_t){.data = kvs, .current_idx = 0, .len = 2};
zp_bytes_encode_from_iter(&attachment, create_attachment_iter, (void *)&ctx, kv_pairs_length(&ctx));
options.attachment = z_move(attachment);

sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options);
}
// Clean up
z_undeclare_publisher(z_move(pub));
zp_stop_read_task(z_loan_mut(s));
zp_stop_lease_task(z_loan_mut(s));
z_close(z_move(s));
return 0;
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this example requires it.\n");
return -2;
}
#endif
178 changes: 178 additions & 0 deletions examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <ctype.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

typedef struct kv_pair_t {
z_owned_string_t key;
z_owned_string_t value;
} kv_pair_t;

typedef struct kv_pairs_t {
kv_pair_t *data;
uint32_t len;
uint32_t current_idx;
} kv_pairs_t;

#define KVP_LEN 16

#if Z_FEATURE_SUBSCRIPTION == 1

static int msg_nb = 0;

void parse_attachment(kv_pairs_t *kvp, const z_loaned_bytes_t *attachment) {
size_t curr_idx = 0;
z_owned_bytes_t first, second;
while ((kvp->current_idx < kvp->len) && (zp_bytes_decode_into_pair(attachment, &first, &second, &curr_idx) == 0)) {
z_bytes_decode_into_string(z_loan(first), &kvp->data[kvp->current_idx].key);
z_bytes_decode_into_string(z_loan(second), &kvp->data[kvp->current_idx].value);
z_bytes_drop(&first);
z_bytes_drop(&second);
kvp->current_idx++;
}
}

void print_attachment(kv_pairs_t *kvp) {
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
}
}

void drop_attachment(kv_pairs_t *kvp) {
for (size_t i = 0; i < kvp->current_idx; i++) {
z_string_drop(&kvp->data[i].key);
z_string_drop(&kvp->data[i].value);
}
z_free(kvp->data);
}

void data_handler(const z_loaned_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_string_t keystr;
z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr);
z_owned_string_t value;
z_bytes_decode_into_string(z_sample_payload(sample), &value);
printf(">> [Subscriber] Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(value)));
// Check attachment
kv_pairs_t kvp = {.current_idx = 0, .len = KVP_LEN, .data = (kv_pair_t *)malloc(KVP_LEN * sizeof(kv_pair_t))};
parse_attachment(&kvp, z_sample_attachment(sample));
if (kvp.current_idx > 0) {
print_attachment(&kvp);
}
drop_attachment(&kvp);
z_drop(z_move(keystr));
z_drop(z_move(value));
msg_nb++;
}

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/**";
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config)) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

z_owned_closure_sample_t callback;
z_closure(&callback, data_handler);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_subscriber_t sub;
z_view_keyexpr_t ke;
z_view_keyexpr_from_string(&ke, keyexpr);
if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}

printf("Press CTRL-C to quit...\n");
while (1) {
if ((n != 0) && (msg_nb >= n)) {
break;
}
sleep(1);
}
// Clean up
z_undeclare_subscriber(z_move(sub));
zp_stop_read_task(z_loan_mut(s));
zp_stop_lease_task(z_loan_mut(s));
z_close(z_move(s));
return 0;
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n");
return -2;
}
#endif
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ int8_t zp_bytes_encode_from_iter(z_owned_bytes_t *bytes,
* ``0`` if encode successful, ``negative value`` otherwise.
*/
int8_t zp_bytes_encode_from_pair(z_owned_bytes_t *bytes, z_owned_bytes_t *first, z_owned_bytes_t *second,
size_t *curr_idx);
size_t *curr_idx);

/**
* Checks validity of a timestamp
Expand Down

0 comments on commit 8a1db82

Please sign in to comment.