Skip to content

Commit

Permalink
Subscriber callbacks now receive a refcounted z_sample (#410)
Browse files Browse the repository at this point in the history
* refactor: move code to net/sample and net/reply files

* feat: make attachment a permanent sub function arg

* feat: make z_sample type an rc and add z_loaned_sample type

* feat: add z_sample accessors functions

* fix: update tests with new z_sample

* fix: update examples with new z_sample

* fix: undefined function call

* fix: example updates with new z_sample

* fix: bad dummy subscription function args

* fix: update tests with new z_sample

* fix: update remaining examples with new z_sample

* fix: missing brace initializer
  • Loading branch information
jean-roland authored May 15, 2024
1 parent 0555baf commit 382e3ab
Show file tree
Hide file tree
Showing 53 changed files with 647 additions and 400 deletions.
2 changes: 1 addition & 1 deletion examples/arduino/z_get.ino
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *oreply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
std::string val((const char *)sample.payload.start, sample.payload.len);

Expand Down
6 changes: 4 additions & 2 deletions examples/arduino/z_sub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
std::string val((const char *)sample->payload.start, sample->payload.len);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
std::string val((const char *)payload.start, payload.len);

Serial.print(" >> [Subscription listener] Received (");
Serial.print(z_str_loan(&keystr));
Expand Down
2 changes: 1 addition & 1 deletion examples/espidf/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/espidf/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ void wifi_init_sta(void) {
}

void data_handler(const z_sample_t* sample, void* arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/freertos_plus_tcp/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/freertos_plus_tcp/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/freertos_plus_tcp/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/mbed/z_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start);
z_str_drop(z_str_move(&keystr));
Expand Down
8 changes: 5 additions & 3 deletions examples/mbed/z_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len,
payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
#if Z_FEATURE_ATTACHMENT == 1
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int main(int argc, char **argv) {
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_loaned_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/unix/c11/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
8 changes: 5 additions & 3 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ int main(int argc, char **argv) {
z_owned_sample_t sample = z_sample_null();
while (true) {
for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr);
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len,
z_loan(sample).payload.start);
z_sample_t loaned_sample = z_loan(sample);
z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample);
z_bytes_t payload = z_sample_payload(&loaned_sample);
z_owned_str_t keystr = z_keyexpr_to_string(sample_key);
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
}
Expand Down
12 changes: 7 additions & 5 deletions examples/unix/c11/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t value, void *ctx) {

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
#if Z_FEATURE_ATTACHMENT == 1
if (z_attachment_check(&sample->attachment)) {
z_attachment_t attachment = z_sample_attachment(sample);
if (z_attachment_check(&attachment)) {
printf("Attachement found\n");
z_attachment_iterate(sample->attachment, attachment_handler, NULL);
z_attachment_iterate(attachment, attachment_handler, NULL);
}
#endif
z_drop(z_move(keystr));
Expand Down
8 changes: 5 additions & 3 deletions examples/unix/c11/z_sub_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ int main(int argc, char **argv) {

z_owned_sample_t sample = z_sample_null();
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len,
z_loan(sample).payload.start);
z_sample_t loaned_sample = z_loan(sample);
z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample);
z_bytes_t payload = z_sample_payload(&loaned_sample);
z_owned_str_t keystr = z_keyexpr_to_string(sample_key);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
sample = z_sample_null();
Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c11/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ static int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c99/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start);
z_str_drop(z_str_move(&keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/unix/c99/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_publisher_loan((z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c99/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *arg) {
(void)(arg);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c99/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ static int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *arg) {
(void)(arg);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/windows/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/windows/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
7 changes: 4 additions & 3 deletions examples/windows/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/windows/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ int msg_nb = 0;
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));

msg_nb++;
Expand Down
2 changes: 1 addition & 1 deletion examples/zephyr/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/zephyr/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

// -- Samples handler
void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const z_sample_t *src);

// -- Queries handler
void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src);
Expand Down
Loading

0 comments on commit 382e3ab

Please sign in to comment.