Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber callbacks now receive a refcounted z_sample #410

Merged
merged 12 commits into from
May 15, 2024
Merged
2 changes: 1 addition & 1 deletion examples/arduino/z_get.ino
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *oreply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert z_reply_ok result type renaming z_sample_t / z_loaned_sample_t
(z_reply_ok in the new API will return const z_loaned_sample_t *, I already did it and this renaming will only introduce additional conflicts.)

z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
std::string val((const char *)sample.payload.start, sample.payload.len);

Expand Down
6 changes: 4 additions & 2 deletions examples/arduino/z_sub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
std::string val((const char *)sample->payload.start, sample->payload.len);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
std::string val((const char *)payload.start, payload.len);

Serial.print(" >> [Subscription listener] Received (");
Serial.print(z_str_loan(&keystr));
Expand Down
2 changes: 1 addition & 1 deletion examples/espidf/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/espidf/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ void wifi_init_sta(void) {
}

void data_handler(const z_sample_t* sample, void* arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/freertos_plus_tcp/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/freertos_plus_tcp/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/freertos_plus_tcp/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/mbed/z_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start);
z_str_drop(z_str_move(&keystr));
Expand Down
8 changes: 5 additions & 3 deletions examples/mbed/z_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len,
payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
#if Z_FEATURE_ATTACHMENT == 1
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int main(int argc, char **argv) {
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_loaned_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/unix/c11/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
8 changes: 5 additions & 3 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ int main(int argc, char **argv) {
z_owned_sample_t sample = z_sample_null();
while (true) {
for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr);
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len,
z_loan(sample).payload.start);
z_sample_t loaned_sample = z_loan(sample);
z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample);
z_bytes_t payload = z_sample_payload(&loaned_sample);
z_owned_str_t keystr = z_keyexpr_to_string(sample_key);
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
}
Expand Down
12 changes: 7 additions & 5 deletions examples/unix/c11/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t value, void *ctx) {

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
#if Z_FEATURE_ATTACHMENT == 1
if (z_attachment_check(&sample->attachment)) {
z_attachment_t attachment = z_sample_attachment(sample);
if (z_attachment_check(&attachment)) {
printf("Attachement found\n");
z_attachment_iterate(sample->attachment, attachment_handler, NULL);
z_attachment_iterate(attachment, attachment_handler, NULL);
}
#endif
z_drop(z_move(keystr));
Expand Down
8 changes: 5 additions & 3 deletions examples/unix/c11/z_sub_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ int main(int argc, char **argv) {

z_owned_sample_t sample = z_sample_null();
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len,
z_loan(sample).payload.start);
z_sample_t loaned_sample = z_loan(sample);
z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample);
z_bytes_t payload = z_sample_payload(&loaned_sample);
z_owned_str_t keystr = z_keyexpr_to_string(sample_key);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
sample = z_sample_null();
Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c11/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ static int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c99/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start);
z_str_drop(z_str_move(&keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/unix/c99/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_publisher_loan((z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c99/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *arg) {
(void)(arg);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/unix/c99/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ static int msg_nb = 0;

void data_handler(const z_sample_t *sample, void *arg) {
(void)(arg);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start);
z_str_drop(z_str_move(&keystr));
msg_nb++;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/windows/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void reply_dropper(void *ctx) {
void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_loaned_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
3 changes: 2 additions & 1 deletion examples/windows/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
void callback(const z_sample_t* sample, void* context) {
z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context);
z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL);
z_bytes_t payload = z_sample_payload(sample);
z_publisher_put(pub, payload.start, payload.len, NULL);
}
void drop(void* context) {
z_owned_publisher_t* pub = (z_owned_publisher_t*)context;
Expand Down
7 changes: 4 additions & 3 deletions examples/windows/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
7 changes: 4 additions & 3 deletions examples/windows/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ int msg_nb = 0;
#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));

msg_nb++;
Expand Down
2 changes: 1 addition & 1 deletion examples/zephyr/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n"

void reply_handler(z_owned_reply_t *oreply, void *ctx) {
if (z_reply_is_ok(oreply)) {
z_sample_t sample = z_reply_ok(oreply);
z_loaned_sample_t sample = z_reply_ok(oreply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
Expand Down
7 changes: 4 additions & 3 deletions examples/zephyr/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_keyexpr_t keyexpr = z_sample_keyexpr(sample);
z_bytes_t payload = z_sample_payload(sample);
z_owned_str_t keystr = z_keyexpr_to_string(keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start);
z_drop(z_move(keystr));
}

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

// -- Samples handler
void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const z_sample_t *src);

// -- Queries handler
void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src);
Expand Down
Loading
Loading