From 382e3abf72f3694ca7976b81ff316b76c3c8eecf Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Wed, 15 May 2024 16:46:55 +0200 Subject: [PATCH] Subscriber callbacks now receive a refcounted z_sample (#410) * refactor: move code to net/sample and net/reply files * feat: make attachment a permanent sub function arg * feat: make z_sample type an rc and add z_loaned_sample type * feat: add z_sample accessors functions * fix: update tests with new z_sample * fix: update examples with new z_sample * fix: undefined function call * fix: example updates with new z_sample * fix: bad dummy subscription function args * fix: update tests with new z_sample * fix: update remaining examples with new z_sample * fix: missing brace initializer --- examples/arduino/z_get.ino | 2 +- examples/arduino/z_sub.ino | 6 +- examples/espidf/z_get.c | 2 +- examples/espidf/z_sub.c | 7 +- examples/freertos_plus_tcp/z_get.c | 2 +- examples/freertos_plus_tcp/z_sub.c | 7 +- examples/freertos_plus_tcp/z_sub_st.c | 7 +- examples/mbed/z_get.cpp | 2 +- examples/mbed/z_sub.cpp | 8 +- examples/unix/c11/z_get.c | 2 +- examples/unix/c11/z_get_channel.c | 2 +- examples/unix/c11/z_pong.c | 3 +- examples/unix/c11/z_pull.c | 8 +- examples/unix/c11/z_sub.c | 12 ++- examples/unix/c11/z_sub_channel.c | 8 +- examples/unix/c11/z_sub_st.c | 7 +- examples/unix/c99/z_get.c | 2 +- examples/unix/c99/z_pong.c | 3 +- examples/unix/c99/z_sub.c | 7 +- examples/unix/c99/z_sub_st.c | 7 +- examples/windows/z_get.c | 2 +- examples/windows/z_pong.c | 3 +- examples/windows/z_sub.c | 7 +- examples/windows/z_sub_st.c | 7 +- examples/zephyr/z_get.c | 2 +- examples/zephyr/z_sub.c | 7 +- include/zenoh-pico/api/handlers.h | 2 +- include/zenoh-pico/api/primitives.h | 89 +++++++++++++++- include/zenoh-pico/api/types.h | 7 +- include/zenoh-pico/net/memory.h | 32 ------ include/zenoh-pico/net/reply.h | 73 +++++++++++++ include/zenoh-pico/net/sample.h | 64 +++++++++++ include/zenoh-pico/net/zenoh-pico.h | 2 +- include/zenoh-pico/protocol/core.h | 26 +---- include/zenoh-pico/session/session.h | 58 ++-------- include/zenoh-pico/session/subscription.h | 14 +-- src/api/api.c | 57 +++++++--- src/api/handlers.c | 11 +- src/net/memory.c | 80 +------------- src/net/primitives.c | 2 +- src/net/reply.c | 79 ++++++++++++++ src/net/sample.c | 124 ++++++++++++++++++++++ src/net/session.c | 2 +- src/session/push.c | 9 +- src/session/query.c | 42 +------- src/session/rx.c | 17 +-- src/session/subscription.c | 43 ++------ tests/z_api_alignment_test.c | 7 +- tests/z_channels_test.c | 44 ++++---- tests/z_client_test.c | 8 +- tests/z_peer_multicast_test.c | 6 +- tests/z_perf_rx.c | 7 +- tests/z_test_fragment_rx.c | 10 +- 53 files changed, 647 insertions(+), 400 deletions(-) delete mode 100644 include/zenoh-pico/net/memory.h create mode 100644 include/zenoh-pico/net/reply.h create mode 100644 include/zenoh-pico/net/sample.h create mode 100644 src/net/reply.c create mode 100644 src/net/sample.c diff --git a/examples/arduino/z_get.ino b/examples/arduino/z_get.ino index ddd607641..754d4a1fa 100644 --- a/examples/arduino/z_get.ino +++ b/examples/arduino/z_get.ino @@ -45,7 +45,7 @@ void reply_dropper(void *ctx) { void reply_handler(z_owned_reply_t *oreply, void *ctx) { (void)(ctx); if (z_reply_is_ok(oreply)) { - z_sample_t sample = z_reply_ok(oreply); + z_loaned_sample_t sample = z_reply_ok(oreply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); std::string val((const char *)sample.payload.start, sample.payload.len); diff --git a/examples/arduino/z_sub.ino b/examples/arduino/z_sub.ino index 9abc75df0..3d49f5fca 100644 --- a/examples/arduino/z_sub.ino +++ b/examples/arduino/z_sub.ino @@ -35,8 +35,10 @@ #define KEYEXPR "demo/example/**" void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - std::string val((const char *)sample->payload.start, sample->payload.len); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + std::string val((const char *)payload.start, payload.len); Serial.print(" >> [Subscription listener] Received ("); Serial.print(z_str_loan(&keystr)); diff --git a/examples/espidf/z_get.c b/examples/espidf/z_get.c index a8e06549f..9c984012f 100644 --- a/examples/espidf/z_get.c +++ b/examples/espidf/z_get.c @@ -105,7 +105,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n" void reply_handler(z_owned_reply_t *oreply, void *ctx) { if (z_reply_is_ok(oreply)) { - z_sample_t sample = z_reply_ok(oreply); + z_loaned_sample_t sample = z_reply_ok(oreply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); diff --git a/examples/espidf/z_sub.c b/examples/espidf/z_sub.c index 467160440..4863cb853 100644 --- a/examples/espidf/z_sub.c +++ b/examples/espidf/z_sub.c @@ -101,9 +101,10 @@ void wifi_init_sta(void) { } void data_handler(const z_sample_t* sample, void* arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start); z_str_drop(z_str_move(&keystr)); } diff --git a/examples/freertos_plus_tcp/z_get.c b/examples/freertos_plus_tcp/z_get.c index 09c1d3507..851ffb70a 100644 --- a/examples/freertos_plus_tcp/z_get.c +++ b/examples/freertos_plus_tcp/z_get.c @@ -39,7 +39,7 @@ void reply_dropper(void *ctx) { void reply_handler(z_owned_reply_t *reply, void *ctx) { (void)(ctx); if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); diff --git a/examples/freertos_plus_tcp/z_sub.c b/examples/freertos_plus_tcp/z_sub.c index 9331619a6..1e3f312ce 100644 --- a/examples/freertos_plus_tcp/z_sub.c +++ b/examples/freertos_plus_tcp/z_sub.c @@ -30,9 +30,10 @@ void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); } diff --git a/examples/freertos_plus_tcp/z_sub_st.c b/examples/freertos_plus_tcp/z_sub_st.c index 20ff5d636..9c9e94707 100644 --- a/examples/freertos_plus_tcp/z_sub_st.c +++ b/examples/freertos_plus_tcp/z_sub_st.c @@ -33,9 +33,10 @@ int msg_nb = 0; void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); msg_nb++; } diff --git a/examples/mbed/z_get.cpp b/examples/mbed/z_get.cpp index 905ac4e1c..1784b6d4b 100644 --- a/examples/mbed/z_get.cpp +++ b/examples/mbed/z_get.cpp @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n" void reply_handler(z_owned_reply_t *oreply, void *ctx) { if (z_reply_is_ok(oreply)) { - z_sample_t sample = z_reply_ok(oreply); + z_loaned_sample_t sample = z_reply_ok(oreply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(" >> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start); z_str_drop(z_str_move(&keystr)); diff --git a/examples/mbed/z_sub.cpp b/examples/mbed/z_sub.cpp index b60f82d44..45d9a1b6d 100644 --- a/examples/mbed/z_sub.cpp +++ b/examples/mbed/z_sub.cpp @@ -31,9 +31,11 @@ #define KEYEXPR "demo/example/**" void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, + payload.start); z_str_drop(z_str_move(&keystr)); } diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index d6c4f8ead..7e76e51c7 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -40,7 +40,7 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) { void reply_handler(z_owned_reply_t *reply, void *ctx) { (void)(ctx); if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); #if Z_FEATURE_ATTACHMENT == 1 diff --git a/examples/unix/c11/z_get_channel.c b/examples/unix/c11/z_get_channel.c index d40ca4379..d79d53488 100644 --- a/examples/unix/c11/z_get_channel.c +++ b/examples/unix/c11/z_get_channel.c @@ -100,7 +100,7 @@ int main(int argc, char **argv) { z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); + z_loaned_sample_t sample = z_reply_ok(&reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); diff --git a/examples/unix/c11/z_pong.c b/examples/unix/c11/z_pong.c index dc958e1bb..b990e21c2 100644 --- a/examples/unix/c11/z_pong.c +++ b/examples/unix/c11/z_pong.c @@ -18,7 +18,8 @@ #if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context); - z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); + z_bytes_t payload = z_sample_payload(sample); + z_publisher_put(pub, payload.start, payload.len, NULL); } void drop(void* context) { z_owned_publisher_t* pub = (z_owned_publisher_t*)context; diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index 6bc0c824d..d64efe68d 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -83,9 +83,11 @@ int main(int argc, char **argv) { z_owned_sample_t sample = z_sample_null(); while (true) { for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr); - printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len, - z_loan(sample).payload.start); + z_sample_t loaned_sample = z_loan(sample); + z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample); + z_bytes_t payload = z_sample_payload(&loaned_sample); + z_owned_str_t keystr = z_keyexpr_to_string(sample_key); + printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); z_drop(z_move(sample)); } diff --git a/examples/unix/c11/z_sub.c b/examples/unix/c11/z_sub.c index 185e3df63..aaff86582 100644 --- a/examples/unix/c11/z_sub.c +++ b/examples/unix/c11/z_sub.c @@ -34,13 +34,15 @@ int8_t attachment_handler(z_bytes_t key, z_bytes_t value, void *ctx) { void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); #if Z_FEATURE_ATTACHMENT == 1 - if (z_attachment_check(&sample->attachment)) { + z_attachment_t attachment = z_sample_attachment(sample); + if (z_attachment_check(&attachment)) { printf("Attachement found\n"); - z_attachment_iterate(sample->attachment, attachment_handler, NULL); + z_attachment_iterate(attachment, attachment_handler, NULL); } #endif z_drop(z_move(keystr)); diff --git a/examples/unix/c11/z_sub_channel.c b/examples/unix/c11/z_sub_channel.c index 43548d1ba..eb2b2f3f9 100644 --- a/examples/unix/c11/z_sub_channel.c +++ b/examples/unix/c11/z_sub_channel.c @@ -73,9 +73,11 @@ int main(int argc, char **argv) { z_owned_sample_t sample = z_sample_null(); for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { - z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len, - z_loan(sample).payload.start); + z_sample_t loaned_sample = z_loan(sample); + z_keyexpr_t sample_key = z_sample_keyexpr(&loaned_sample); + z_bytes_t payload = z_sample_payload(&loaned_sample); + z_owned_str_t keystr = z_keyexpr_to_string(sample_key); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); z_drop(z_move(sample)); sample = z_sample_null(); diff --git a/examples/unix/c11/z_sub_st.c b/examples/unix/c11/z_sub_st.c index 8e825a06e..b187f640d 100644 --- a/examples/unix/c11/z_sub_st.c +++ b/examples/unix/c11/z_sub_st.c @@ -25,9 +25,10 @@ static int msg_nb = 0; void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); msg_nb++; } diff --git a/examples/unix/c99/z_get.c b/examples/unix/c99/z_get.c index 7bec8e497..8731d3903 100644 --- a/examples/unix/c99/z_get.c +++ b/examples/unix/c99/z_get.c @@ -32,7 +32,7 @@ void reply_dropper(void *ctx) { void reply_handler(z_owned_reply_t *reply, void *ctx) { (void)(ctx); if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample.payload.len, sample.payload.start); z_str_drop(z_str_move(&keystr)); diff --git a/examples/unix/c99/z_pong.c b/examples/unix/c99/z_pong.c index 73f93d9a1..3b627218b 100644 --- a/examples/unix/c99/z_pong.c +++ b/examples/unix/c99/z_pong.c @@ -19,7 +19,8 @@ #if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_publisher_loan((z_owned_publisher_t*)context); - z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); + z_bytes_t payload = z_sample_payload(sample); + z_publisher_put(pub, payload.start, payload.len, NULL); } void drop(void* context) { z_owned_publisher_t* pub = (z_owned_publisher_t*)context; diff --git a/examples/unix/c99/z_sub.c b/examples/unix/c99/z_sub.c index 185c742ed..b50a58ccc 100644 --- a/examples/unix/c99/z_sub.c +++ b/examples/unix/c99/z_sub.c @@ -22,9 +22,10 @@ #if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *arg) { (void)(arg); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start); z_str_drop(z_str_move(&keystr)); } diff --git a/examples/unix/c99/z_sub_st.c b/examples/unix/c99/z_sub_st.c index 765bf3d5a..d09033c38 100644 --- a/examples/unix/c99/z_sub_st.c +++ b/examples/unix/c99/z_sub_st.c @@ -25,9 +25,10 @@ static int msg_nb = 0; void data_handler(const z_sample_t *sample, void *arg) { (void)(arg); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)payload.len, payload.start); z_str_drop(z_str_move(&keystr)); msg_nb++; } diff --git a/examples/windows/z_get.c b/examples/windows/z_get.c index 597287afd..35cadd7be 100644 --- a/examples/windows/z_get.c +++ b/examples/windows/z_get.c @@ -31,7 +31,7 @@ void reply_dropper(void *ctx) { void reply_handler(z_owned_reply_t *reply, void *ctx) { (void)(ctx); if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); diff --git a/examples/windows/z_pong.c b/examples/windows/z_pong.c index dc958e1bb..b990e21c2 100644 --- a/examples/windows/z_pong.c +++ b/examples/windows/z_pong.c @@ -18,7 +18,8 @@ #if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 void callback(const z_sample_t* sample, void* context) { z_publisher_t pub = z_loan(*(z_owned_publisher_t*)context); - z_publisher_put(pub, sample->payload.start, sample->payload.len, NULL); + z_bytes_t payload = z_sample_payload(sample); + z_publisher_put(pub, payload.start, payload.len, NULL); } void drop(void* context) { z_owned_publisher_t* pub = (z_owned_publisher_t*)context; diff --git a/examples/windows/z_sub.c b/examples/windows/z_sub.c index bde475e8b..3c1e154db 100644 --- a/examples/windows/z_sub.c +++ b/examples/windows/z_sub.c @@ -21,9 +21,10 @@ #if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); } diff --git a/examples/windows/z_sub_st.c b/examples/windows/z_sub_st.c index fa91d9e41..55ee53e09 100644 --- a/examples/windows/z_sub_st.c +++ b/examples/windows/z_sub_st.c @@ -24,9 +24,10 @@ int msg_nb = 0; #if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); msg_nb++; diff --git a/examples/zephyr/z_get.c b/examples/zephyr/z_get.c index ffe90520a..9e37ffdc2 100644 --- a/examples/zephyr/z_get.c +++ b/examples/zephyr/z_get.c @@ -35,7 +35,7 @@ void reply_dropper(void *ctx) { printf(" >> Received query final notification\n" void reply_handler(z_owned_reply_t *oreply, void *ctx) { if (z_reply_is_ok(oreply)) { - z_sample_t sample = z_reply_ok(oreply); + z_loaned_sample_t sample = z_reply_ok(oreply); z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); printf(" >> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); diff --git a/examples/zephyr/z_sub.c b/examples/zephyr/z_sub.c index ce0a0e73d..c2e47869f 100644 --- a/examples/zephyr/z_sub.c +++ b/examples/zephyr/z_sub.c @@ -30,9 +30,10 @@ #define KEYEXPR "demo/example/**" void data_handler(const z_sample_t *sample, void *arg) { - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); - printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, - sample->payload.start); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); + printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)payload.len, payload.start); z_drop(z_move(keystr)); } diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 7d6d142fa..43bd1db21 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -25,7 +25,7 @@ // -- Samples handler void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src); -z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); +z_owned_sample_t *_z_sample_to_owned_ptr(const z_sample_t *src); // -- Queries handler void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src); diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 51ee18c2d..60a676947 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -864,6 +864,91 @@ int8_t z_info_routers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) */ z_id_t z_info_zid(const z_session_t zs); +/** + * Get a sample's keyexpr value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the keyexpr from. + * + * Returns: + * Returns the keyexpr wrapped as a :c:type:`z_keyexpr_t`. + */ +z_keyexpr_t z_sample_keyexpr(const z_sample_t *sample); + +/** + * Get a sample's payload value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the payload from. + * + * Returns: + * Returns the payload wrapped as a :c:type:`z_bytes_t`. + */ +z_bytes_t z_sample_payload(const z_sample_t *sample); + +/** + * Get a sample's timestamp value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the timestamp from. + * + * Returns: + * Returns the timestamp wrapped as a :c:type:`z_timestamp_t`. + */ +z_timestamp_t z_sample_timestamp(const z_sample_t *sample); + +/** + * Get a sample's encoding value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the encoding from. + * + * Returns: + * Returns the encoding wrapped as a :c:type:`z_encoding_t`. + */ +z_encoding_t z_sample_encoding(const z_sample_t *sample); + +/** + * Get a sample's kind by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the sample kind from. + * + * Returns: + * Returns the sample kind wrapped as a :c:type:`z_sample_kind_t`. + */ +z_sample_kind_t z_sample_kind(const z_sample_t *sample); + +/** + * Get a sample's qos value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the qos from. + * + * Returns: + * Returns the qos wrapped as a :c:type:`z_qos_t`. + */ +z_qos_t z_sample_qos(const z_sample_t *sample); + +#if Z_FEATURE_ATTACHMENT == 1 +/** + * Get a sample's attachment value by aliasing it. + * Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release. + * + * Parameters: + * sample: Pointer to the sample to get the attachment from. + * + * Returns: + * Returns the attachment wrapped as a :c:type:`z_attachment_t`. + */ +z_attachment_t z_sample_attachment(const z_sample_t *sample); +#endif #if Z_FEATURE_PUBLICATION == 1 /** * Constructs the default values for the put operation. @@ -1051,9 +1136,9 @@ _Bool z_reply_is_ok(const z_owned_reply_t *reply); * reply: Pointer to the received query reply. * * Returns: - * Returns the :c:type:`z_sample_t` wrapped in the query reply. + * Returns the :c:type:`z_loaned_sample_t` wrapped in the query reply. */ -z_sample_t z_reply_ok(const z_owned_reply_t *reply); +z_loaned_sample_t z_reply_ok(const z_owned_reply_t *reply); /** * Yields the contents of the reply by asserting it indicates a failure. diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index dd58ea809..bad8daefa 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -20,6 +20,8 @@ #include "zenoh-pico/collections/list.h" #include "zenoh-pico/net/publish.h" #include "zenoh-pico/net/query.h" +#include "zenoh-pico/net/reply.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/net/subscribe.h" #include "zenoh-pico/protocol/core.h" @@ -426,8 +428,11 @@ static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; } * z_timestamp_t timestamp: The timestamp of this data sample. * z_qos_t qos: Quality of service settings used to deliver this sample. */ -typedef _z_sample_t z_sample_t; +typedef struct z_sample_t { + _z_sample_rc_t _rc; +} z_sample_t; _OWNED_TYPE_PTR(z_sample_t, sample) +typedef _z_sample_t z_loaned_sample_t; /** * Represents the content of a `hello` message returned by a zenoh entity as a reply to a `scout` message. diff --git a/include/zenoh-pico/net/memory.h b/include/zenoh-pico/net/memory.h deleted file mode 100644 index cf578faf3..000000000 --- a/include/zenoh-pico/net/memory.h +++ /dev/null @@ -1,32 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, - -#ifndef ZENOH_PICO_MEMORY_NETAPI_H -#define ZENOH_PICO_MEMORY_NETAPI_H - -#include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/session/session.h" - -/** - * Free a :c:type:`_z_sample_t`, including its internal fields. - * - * Parameters: - * sample: The :c:type:`_z_sample_t` to free. - */ -void _z_sample_move(_z_sample_t *dst, _z_sample_t *src); -void _z_sample_clear(_z_sample_t *sample); -void _z_sample_free(_z_sample_t **sample); -void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src); -_z_sample_t _z_sample_duplicate(const _z_sample_t *src); - -#endif /* ZENOH_PICO_MEMORY_NETAPI_H */ diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h new file mode 100644 index 000000000..2f6425775 --- /dev/null +++ b/include/zenoh-pico/net/reply.h @@ -0,0 +1,73 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#ifndef ZENOH_PICO_REPLY_NETAPI_H +#define ZENOH_PICO_REPLY_NETAPI_H + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/list.h" +#include "zenoh-pico/collections/refcount.h" +#include "zenoh-pico/collections/string.h" +#include "zenoh-pico/net/sample.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/session.h" + +/** + * An reply to a :c:func:`z_query`. + * + * Members: + * _z_sample_t data: a :c:type:`_z_sample_t` containing the key and value of the reply. + * _z_bytes_t replier_id: The id of the replier that sent this reply. + * + */ +typedef struct _z_reply_data_t { + _z_sample_t sample; + _z_id_t replier_id; +} _z_reply_data_t; + +void _z_reply_data_clear(_z_reply_data_t *rd); +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src); +_z_reply_t *_z_reply_alloc_and_move(_z_reply_t *_reply); + +_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) +_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) + +/** + * An reply to a :c:func:`z_query`. + * + * Members: + * _z_reply_t_Tag tag: Indicates if the reply contains data or if it's a FINAL reply. + * _z_reply_data_t data: The reply data if :c:member:`_z_reply_t.tag` equals + * :c:member:`_z_reply_t_Tag.Z_REPLY_TAG_DATA`. + * + */ +typedef struct _z_reply_t { + _z_reply_data_t data; + z_reply_tag_t _tag; +} _z_reply_t; +void _z_reply_clear(_z_reply_t *src); +void _z_reply_free(_z_reply_t **hello); +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src); + +typedef struct _z_pending_reply_t { + _z_reply_t _reply; + _z_timestamp_t _tstamp; +} _z_pending_reply_t; + +_Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two); +void _z_pending_reply_clear(_z_pending_reply_t *res); + +_Z_ELEM_DEFINE(_z_pending_reply, _z_pending_reply_t, _z_noop_size, _z_pending_reply_clear, _z_noop_copy) +_Z_LIST_DEFINE(_z_pending_reply, _z_pending_reply_t) + +#endif /* ZENOH_PICO_REPLY_NETAPI_H */ diff --git a/include/zenoh-pico/net/sample.h b/include/zenoh-pico/net/sample.h new file mode 100644 index 000000000..1d0a50677 --- /dev/null +++ b/include/zenoh-pico/net/sample.h @@ -0,0 +1,64 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#ifndef ZENOH_PICO_SAMPLE_NETAPI_H +#define ZENOH_PICO_SAMPLE_NETAPI_H + +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/session.h" + +/** + * A zenoh-net data sample. + * + * A sample is the value associated to a given resource at a given point in time. + * + * Members: + * _z_keyexpr_t key: The resource key of this data sample. + * _z_bytes_t value: The value of this data sample. + * _z_encoding_t encoding: The encoding for the value of this data sample. + */ +typedef struct _z_sample_t { + _z_keyexpr_t keyexpr; + _z_bytes_t payload; + _z_timestamp_t timestamp; + _z_encoding_t encoding; + z_sample_kind_t kind; + _z_qos_t qos; +#if Z_FEATURE_ATTACHMENT == 1 + z_attachment_t attachment; +#endif +} _z_sample_t; +void _z_sample_clear(_z_sample_t *sample); + +_Z_REFCOUNT_DEFINE(_z_sample, _z_sample) + +_z_sample_t _z_sample_null(void); +_Bool _z_sample_check(const _z_sample_t *sample); +void _z_sample_move(_z_sample_t *dst, _z_sample_t *src); + +/** + * Free a :c:type:`_z_sample_t`, including its internal fields. + * + * Parameters: + * sample: The :c:type:`_z_sample_t` to free. + */ +void _z_sample_free(_z_sample_t **sample); + +void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src); +_z_sample_t _z_sample_duplicate(const _z_sample_t *src); + +_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t *payload, _z_timestamp_t timestamp, + const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, + const z_attachment_t att); + +#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */ diff --git a/include/zenoh-pico/net/zenoh-pico.h b/include/zenoh-pico/net/zenoh-pico.h index b90fcf05a..ee4614ce5 100644 --- a/include/zenoh-pico/net/zenoh-pico.h +++ b/include/zenoh-pico/net/zenoh-pico.h @@ -16,10 +16,10 @@ #include "zenoh-pico/net/config.h" #include "zenoh-pico/net/logger.h" -#include "zenoh-pico/net/memory.h" #include "zenoh-pico/net/primitives.h" #include "zenoh-pico/net/publish.h" #include "zenoh-pico/net/query.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/net/subscribe.h" diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 133c30acc..4d9453fc2 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -22,6 +22,7 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/refcount.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" #include "zenoh-pico/system/platform.h" @@ -225,31 +226,6 @@ typedef struct { uint8_t _val; } _z_qos_t; -/** - * A zenoh-net data sample. - * - * A sample is the value associated to a given resource at a given point in time. - * - * Members: - * _z_keyexpr_t key: The resource key of this data sample. - * _z_bytes_t value: The value of this data sample. - * _z_encoding_t encoding: The encoding for the value of this data sample. - */ -typedef struct { - _z_keyexpr_t keyexpr; - _z_bytes_t payload; - _z_timestamp_t timestamp; - _z_encoding_t encoding; - z_sample_kind_t kind; - _z_qos_t qos; -#if Z_FEATURE_ATTACHMENT == 1 - z_attachment_t attachment; -#endif -} _z_sample_t; -static inline bool _z_sample_check(const _z_sample_t *sample) { - return _z_keyexpr_check(sample->keyexpr) && _z_bytes_check(sample->payload); -} - /** * Represents a Zenoh value. * diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 94f26759b..34ff93139 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -34,42 +34,6 @@ typedef void (*_z_drop_handler_t)(void *arg); #define _Z_RESOURCE_IS_REMOTE 0 #define _Z_RESOURCE_IS_LOCAL 1 -/** - * An reply to a :c:func:`z_query`. - * - * Members: - * _z_sample_t data: a :c:type:`_z_sample_t` containing the key and value of the reply. - * _z_bytes_t replier_id: The id of the replier that sent this reply. - * - */ -typedef struct { - _z_sample_t sample; - _z_id_t replier_id; -} _z_reply_data_t; - -void _z_reply_data_clear(_z_reply_data_t *rd); -void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src); - -_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) -_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) - -/** - * An reply to a :c:func:`z_query`. - * - * Members: - * _z_reply_t_Tag tag: Indicates if the reply contains data or if it's a FINAL reply. - * _z_reply_data_t data: The reply data if :c:member:`_z_reply_t.tag` equals - * :c:member:`_z_reply_t_Tag.Z_REPLY_TAG_DATA`. - * - */ -typedef struct { - _z_reply_data_t data; - z_reply_tag_t _tag; -} _z_reply_t; -void _z_reply_clear(_z_reply_t *src); -void _z_reply_free(_z_reply_t **hello); -void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src); - typedef struct { _z_keyexpr_t _key; uint16_t _id; @@ -84,10 +48,13 @@ void _z_resource_free(_z_resource_t **res); _Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy) _Z_LIST_DEFINE(_z_resource, _z_resource_t) +// Forward declaration to avoid cyclical include +typedef struct z_sample_t z_sample_t; + /** * The callback signature of the functions handling data messages. */ -typedef void (*_z_data_handler_t)(const _z_sample_t *sample, void *arg); +typedef void (*_z_data_handler_t)(const z_sample_t *sample, void *arg); typedef struct { _z_keyexpr_t _key; @@ -137,18 +104,11 @@ _Z_ELEM_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t, _z_noop_size, _z_noop_copy) _Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t) -typedef struct { - _z_reply_t _reply; - _z_timestamp_t _tstamp; -} _z_pending_reply_t; - -_Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two); -void _z_pending_reply_clear(_z_pending_reply_t *res); - -_Z_ELEM_DEFINE(_z_pending_reply, _z_pending_reply_t, _z_noop_size, _z_pending_reply_clear, _z_noop_copy) -_Z_LIST_DEFINE(_z_pending_reply, _z_pending_reply_t) - -struct __z_reply_handler_wrapper_t; // Forward declaration to be used in _z_reply_handler_t +// Forward declaration to avoid cyclical includes +typedef struct _z_reply_t _z_reply_t; +typedef _z_list_t _z_reply_data_list_t; +typedef _z_list_t _z_pending_reply_list_t; +struct __z_reply_handler_wrapper_t; /** * The callback signature of the functions handling query replies. */ diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index 3532a0c68..56ec11733 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -19,12 +19,7 @@ /*------------------ Subscription ------------------*/ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len, _z_n_qos_t qos -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_t att -#endif -); + _z_zint_t payload_len, const _z_n_qos_t qos, const z_attachment_t att); #if Z_FEATURE_SUBSCRIPTION == 1 _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); @@ -33,12 +28,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, - const _z_n_qos_t qos -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_t att -#endif -); + const _z_n_qos_t qos, const z_attachment_t att); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); #endif diff --git a/src/api/api.c b/src/api/api.c index fa80589ec..d08d6c579 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -25,8 +25,8 @@ #include "zenoh-pico/net/config.h" #include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" -#include "zenoh-pico/net/memory.h" #include "zenoh-pico/net/primitives.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/keyexpr.h" @@ -445,7 +445,26 @@ OWNED_FUNCTIONS_PTR_DROP(z_scouting_config_t, z_owned_scouting_config_t, scoutin OWNED_FUNCTIONS_PTR_INTERNAL(z_keyexpr_t, z_owned_keyexpr_t, keyexpr, _z_keyexpr_free, _z_keyexpr_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_hello_t, z_owned_hello_t, hello, _z_hello_free, _z_owner_noop_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_str_array_t, z_owned_str_array_t, str_array, _z_str_array_free, _z_owner_noop_copy) -OWNED_FUNCTIONS_PTR_INTERNAL(z_sample_t, z_owned_sample_t, sample, _z_sample_free, _z_sample_copy) + +// Owned sample functions +_Bool z_sample_check(const z_owned_sample_t *val) { return val->_value != ((void *)0); } +z_sample_t z_sample_loan(const z_owned_sample_t *val) { return *val->_value; } +z_owned_sample_t z_sample_null(void) { return (z_owned_sample_t){._value = ((void *)0)}; } +z_owned_sample_t *z_sample_move(z_owned_sample_t *val) { return val; } +z_owned_sample_t z_sample_clone(z_owned_sample_t *val) { + z_owned_sample_t ret; + ret._value = (z_sample_t *)z_malloc(sizeof(z_sample_t)); + if (ret._value != ((void *)0)) { + ret._value->_rc = _z_sample_rc_clone(&val->_value->_rc); + } + return ret; +} +void z_sample_drop(z_owned_sample_t *val) { + if (val->_value != ((void *)0)) { + _z_sample_rc_drop(&val->_value->_rc); + z_free(val->_value); + } +} _Bool z_session_check(const z_owned_session_t *val) { return val->_value.in != NULL; } z_session_t z_session_loan(const z_owned_session_t *val) { return (z_session_t){._val = val->_value}; } @@ -644,6 +663,16 @@ int8_t z_info_routers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) z_id_t z_info_zid(const z_session_t zs) { return zs._val.in->val._local_zid; } +z_keyexpr_t z_sample_keyexpr(const z_sample_t *sample) { return sample->_rc.in->val.keyexpr; } +z_bytes_t z_sample_payload(const z_sample_t *sample) { return sample->_rc.in->val.payload; } +z_timestamp_t z_sample_timestamp(const z_sample_t *sample) { return sample->_rc.in->val.timestamp; } +z_encoding_t z_sample_encoding(const z_sample_t *sample) { return sample->_rc.in->val.encoding; } +z_sample_kind_t z_sample_kind(const z_sample_t *sample) { return sample->_rc.in->val.kind; } +z_qos_t z_sample_qos(const z_sample_t *sample) { return sample->_rc.in->val.qos; } +#if Z_FEATURE_ATTACHMENT == 1 +z_attachment_t z_sample_attachment(const z_sample_t *sample) { return sample->_rc.in->val.attachment; } +#endif + #if Z_FEATURE_PUBLICATION == 1 OWNED_FUNCTIONS_PTR_COMMON(z_publisher_t, z_owned_publisher_t, publisher) OWNED_FUNCTIONS_PTR_CLONE(z_publisher_t, z_owned_publisher_t, publisher, _z_owner_noop_copy) @@ -685,12 +714,15 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint ); // Trigger local subscriptions - _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, - _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority) #if Z_FEATURE_ATTACHMENT == 1 - , - opt.attachment + z_attachment_t att = opt.attachment; +#else + z_attachment_t att = z_attachment_null(); #endif + _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, + _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), + att + ); return ret; @@ -800,13 +832,14 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l #endif ); } - // Trigger local subscriptions - _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT #if Z_FEATURE_ATTACHMENT == 1 - , - opt.attachment + z_attachment_t att = opt.attachment; +#else + z_attachment_t att = z_attachment_null(); #endif - ); + + // Trigger local subscriptions + _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT, att); return ret; } @@ -908,7 +941,7 @@ _Bool z_reply_is_ok(const z_owned_reply_t *reply) { return true; } -z_sample_t z_reply_ok(const z_owned_reply_t *reply) { return reply->_value->data.sample; } +z_loaned_sample_t z_reply_ok(const z_owned_reply_t *reply) { return reply->_value->data.sample; } z_value_t z_reply_err(const z_owned_reply_t *reply) { (void)(reply); diff --git a/src/api/handlers.c b/src/api/handlers.c index 0e9bea4bb..14b3f6960 100644 --- a/src/api/handlers.c +++ b/src/api/handlers.c @@ -14,7 +14,7 @@ #include "zenoh-pico/api/handlers.h" -#include "zenoh-pico/net/memory.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/system/platform.h" // -- Sample @@ -23,14 +23,17 @@ void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src) { zp_free(src); } -z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { +z_owned_sample_t *_z_sample_to_owned_ptr(const z_sample_t *src) { z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); if (dst == NULL) { return NULL; } if (src != NULL) { - dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t)); - _z_sample_copy(dst->_value, src); + dst->_value = (z_sample_t *)zp_malloc(sizeof(z_sample_t)); + if (dst->_value == NULL) { + return NULL; + } + _z_sample_rc_copy(&dst->_value->_rc, &src->_rc); } else { dst->_value = NULL; } diff --git a/src/net/memory.c b/src/net/memory.c index 553d47999..dcf095b27 100644 --- a/src/net/memory.c +++ b/src/net/memory.c @@ -11,68 +11,11 @@ // Contributors: // ZettaScale Zenoh Team, -#include "zenoh-pico/net/memory.h" - #include +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" -void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) { - dst->keyexpr._id = src->keyexpr._id; // FIXME: call the z_keyexpr_move - dst->keyexpr._suffix = src->keyexpr._suffix; // FIXME: call the z_keyexpr_move - src->keyexpr._suffix = NULL; // FIXME: call the z_keyexpr_move - - _z_bytes_move(&dst->payload, &src->payload); - - dst->encoding.id = src->encoding.id; // FIXME: call the z_encoding_move - _z_bytes_move(&dst->encoding.schema, &src->encoding.schema); // FIXME: call the z_encoding_move - - dst->timestamp.time = src->timestamp.time; // FIXME: call the z_timestamp_move - dst->timestamp.id = src->timestamp.id; // FIXME: call the z_timestamp_move -} - -void _z_sample_clear(_z_sample_t *sample) { - _z_keyexpr_clear(&sample->keyexpr); - _z_bytes_clear(&sample->payload); - _z_bytes_clear(&sample->encoding.schema); // FIXME: call the z_encoding_clear - _z_timestamp_clear(&sample->timestamp); -#if Z_FEATURE_ATTACHMENT == 1 - _z_attachment_drop(&sample->attachment); -#endif -} - -void _z_sample_free(_z_sample_t **sample) { - _z_sample_t *ptr = *sample; - - if (ptr != NULL) { - _z_sample_clear(ptr); - - z_free(ptr); - *sample = NULL; - } -} - -void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src) { - dst->keyexpr = _z_keyexpr_duplicate(src->keyexpr); - dst->payload = _z_bytes_duplicate(&src->payload); - dst->timestamp = _z_timestamp_duplicate(&src->timestamp); - - // TODO(sashacmc): should be changed after encoding rework - dst->encoding.id = src->encoding.id; - _z_bytes_copy(&dst->encoding.schema, &src->encoding.schema); - - dst->kind = src->kind; -#if Z_FEATURE_ATTACHMENT == 1 - dst->attachment = src->attachment; -#endif -} - -_z_sample_t _z_sample_duplicate(const _z_sample_t *src) { - _z_sample_t dst; - _z_sample_copy(&dst, src); - return dst; -} - void _z_hello_clear(_z_hello_t *hello) { if (hello->locators.len > 0) { _z_str_array_clear(&hello->locators); @@ -90,27 +33,6 @@ void _z_hello_free(_z_hello_t **hello) { } } -void _z_reply_data_clear(_z_reply_data_t *reply_data) { - _z_sample_clear(&reply_data->sample); - reply_data->replier_id = _z_id_empty(); -} - -void _z_reply_data_free(_z_reply_data_t **reply_data) { - _z_reply_data_t *ptr = *reply_data; - - if (ptr != NULL) { - _z_reply_data_clear(ptr); - - z_free(ptr); - *reply_data = NULL; - } -} - -void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) { - _z_sample_copy(&dst->sample, &src->sample); - dst->replier_id = src->replier_id; -} - void _z_value_clear(_z_value_t *value) { _z_bytes_clear(&value->encoding.schema); _z_bytes_clear(&value->payload); diff --git a/src/net/primitives.c b/src/net/primitives.c index a07cfe0b8..c4cc5d34d 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -22,7 +22,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" -#include "zenoh-pico/net/memory.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" #include "zenoh-pico/protocol/definitions/network.h" diff --git a/src/net/reply.c b/src/net/reply.c new file mode 100644 index 000000000..809d8cb1c --- /dev/null +++ b/src/net/reply.c @@ -0,0 +1,79 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/net/reply.h" + +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_QUERY == 1 +void _z_reply_data_clear(_z_reply_data_t *reply_data) { + _z_sample_clear(&reply_data->sample); + reply_data->replier_id = _z_id_empty(); +} + +void _z_reply_data_free(_z_reply_data_t **reply_data) { + _z_reply_data_t *ptr = *reply_data; + + if (ptr != NULL) { + _z_reply_data_clear(ptr); + + z_free(ptr); + *reply_data = NULL; + } +} + +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) { + _z_sample_copy(&dst->sample, &src->sample); + dst->replier_id = src->replier_id; +} + +_z_reply_t *_z_reply_alloc_and_move(_z_reply_t *_reply) { + _z_reply_t *reply = (_z_reply_t *)z_malloc(sizeof(_z_reply_t)); + if (reply != NULL) { + *reply = *_reply; + (void)memset(_reply, 0, sizeof(_z_reply_t)); + } + return reply; +} + +void _z_reply_clear(_z_reply_t *reply) { _z_reply_data_clear(&reply->data); } + +void _z_reply_free(_z_reply_t **reply) { + _z_reply_t *ptr = *reply; + + if (*reply != NULL) { + _z_reply_clear(ptr); + + z_free(ptr); + *reply = NULL; + } +} + +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) { + _z_reply_data_copy(&dst->data, &src->data); + dst->_tag = src->_tag; +} + +_Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two) { + return one->_tstamp.time == two->_tstamp.time; +} + +void _z_pending_reply_clear(_z_pending_reply_t *pr) { + // Free reply + _z_reply_clear(&pr->_reply); + + // Free the timestamp + _z_timestamp_clear(&pr->_tstamp); +} +#endif diff --git a/src/net/sample.c b/src/net/sample.c new file mode 100644 index 000000000..781008f64 --- /dev/null +++ b/src/net/sample.c @@ -0,0 +1,124 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include "zenoh-pico/net/sample.h" + +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" + +_z_sample_t _z_sample_null(void) { + _z_sample_t s = { + .keyexpr = _z_keyexpr_null(), + .payload = _z_bytes_empty(), + .encoding = {.id = Z_ENCODING_PREFIX_DEFAULT, + .schema = _z_bytes_wrap(NULL, (size_t)0)}, // FIXME: call _z_encoding_null + .timestamp = _z_timestamp_null(), + .kind = 0, + .qos = {0}, +#if Z_FEATURE_ATTACHMENT == 1 + .attachment = z_attachment_null(), +#endif + }; + return s; +} + +_Bool _z_sample_check(const _z_sample_t *sample) { + return _z_keyexpr_check(sample->keyexpr) && _z_bytes_check(sample->payload); +} + +void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) { + dst->keyexpr._id = src->keyexpr._id; // FIXME: call the z_keyexpr_move + dst->keyexpr._suffix = src->keyexpr._suffix; // FIXME: call the z_keyexpr_move + src->keyexpr._suffix = NULL; // FIXME: call the z_keyexpr_move + + _z_bytes_move(&dst->payload, &src->payload); + + dst->encoding.id = src->encoding.id; // FIXME: call the z_encoding_move + _z_bytes_move(&dst->encoding.schema, &src->encoding.schema); // FIXME: call the z_encoding_move + + dst->timestamp.time = src->timestamp.time; // FIXME: call the z_timestamp_move + dst->timestamp.id = src->timestamp.id; // FIXME: call the z_timestamp_move +} + +void _z_sample_clear(_z_sample_t *sample) { + _z_keyexpr_clear(&sample->keyexpr); + _z_bytes_clear(&sample->payload); + _z_bytes_clear(&sample->encoding.schema); // FIXME: call the z_encoding_clear + _z_timestamp_clear(&sample->timestamp); +#if Z_FEATURE_ATTACHMENT == 1 + _z_attachment_drop(&sample->attachment); +#endif +} + +void _z_sample_free(_z_sample_t **sample) { + _z_sample_t *ptr = *sample; + if (ptr != NULL) { + _z_sample_clear(ptr); + z_free(ptr); + *sample = NULL; + } +} + +void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src) { + dst->keyexpr = _z_keyexpr_duplicate(src->keyexpr); + dst->payload = _z_bytes_duplicate(&src->payload); + dst->timestamp = _z_timestamp_duplicate(&src->timestamp); + + // TODO(sashacmc): should be changed after encoding rework + dst->encoding.id = src->encoding.id; + _z_bytes_copy(&dst->encoding.schema, &src->encoding.schema); + + dst->kind = src->kind; +#if Z_FEATURE_ATTACHMENT == 1 + dst->attachment = src->attachment; +#endif +} + +_z_sample_t _z_sample_duplicate(const _z_sample_t *src) { + _z_sample_t dst; + _z_sample_copy(&dst, src); + return dst; +} + +#if Z_FEATURE_SUBSCRIPTION == 1 +_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t timestamp, + const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, + const z_attachment_t att) { + _z_sample_t s = _z_sample_null(); + _z_keyexpr_copy(&s.keyexpr, key); + _z_bytes_copy(&s.payload, payload); + s.encoding = encoding; // FIXME: call z_encoding_move or copy + s.kind = kind; + s.timestamp = timestamp; + s.qos = qos; +#if Z_FEATURE_ATTACHMENT == 1 + s.attachment = att; // FIXME: call z_attachment_move or copy +#else + _ZP_UNUSED(att); +#endif + return s; +} +#else +_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t *payload, const _z_timestamp_t timestamp, + const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, + const z_attachment_t att) { + _ZP_UNUSED(key); + _ZP_UNUSED(payload); + _ZP_UNUSED(timestamp); + _ZP_UNUSED(encoding); + _ZP_UNUSED(kind); + _ZP_UNUSED(qos); + _ZP_UNUSED(att); + return _z_sample_null(); +} +#endif diff --git a/src/net/session.c b/src/net/session.c index fd3d5997d..3b2af1097 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -21,7 +21,7 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" -#include "zenoh-pico/net/memory.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/transport/common/lease.h" diff --git a/src/session/push.c b/src/session/push.c index 0fa85cb4c..9b676668a 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -31,13 +31,10 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { #if Z_FEATURE_SUBSCRIPTION == 1 #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment); +#else + z_attachment_t att = z_attachment_null(); #endif - ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos -#if Z_FEATURE_ATTACHMENT == 1 - , - att -#endif - ); + ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos, att); #endif return ret; } diff --git a/src/session/query.c b/src/session/query.c index fcfa7cc9f..a84cf7290 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -17,52 +17,14 @@ #include #include "zenoh-pico/config.h" -#include "zenoh-pico/net/memory.h" +#include "zenoh-pico/net/reply.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_QUERY == 1 -_z_reply_t *_z_reply_alloc_and_move(_z_reply_t *_reply) { - _z_reply_t *reply = (_z_reply_t *)z_malloc(sizeof(_z_reply_t)); - if (reply != NULL) { - *reply = *_reply; - (void)memset(_reply, 0, sizeof(_z_reply_t)); - } - return reply; -} - -void _z_reply_clear(_z_reply_t *reply) { _z_reply_data_clear(&reply->data); } - -void _z_reply_free(_z_reply_t **reply) { - _z_reply_t *ptr = *reply; - - if (*reply != NULL) { - _z_reply_clear(ptr); - - z_free(ptr); - *reply = NULL; - } -} - -void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) { - _z_reply_data_copy(&dst->data, &src->data); - dst->_tag = src->_tag; -} - -_Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two) { - return one->_tstamp.time == two->_tstamp.time; -} - -void _z_pending_reply_clear(_z_pending_reply_t *pr) { - // Free reply - _z_reply_clear(&pr->_reply); - - // Free the timestamp - _z_timestamp_clear(&pr->_tstamp); -} - void _z_pending_query_clear(_z_pending_query_t *pen_qry) { if (pen_qry->_dropper != NULL) { pen_qry->_dropper(pen_qry->_drop_arg); diff --git a/src/session/rx.c b/src/session/rx.c index b9e126bd9..ba303c0f5 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -102,14 +102,11 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint #if Z_FEATURE_SUBSCRIPTION == 1 #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t att = _z_encoded_as_attachment(&put._attachment); +#else + z_attachment_t att = z_attachment_null(); #endif ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp, req._ext_qos -#if Z_FEATURE_ATTACHMENT == 1 - , - att -#endif - ); + put._commons._timestamp, req._ext_qos, att); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req._rid); @@ -120,12 +117,8 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = req._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_null() -#endif - ); + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos, + z_attachment_null()); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req._rid); diff --git a/src/session/subscription.c b/src/session/subscription.c index 28670ba29..3d1ec99fe 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -17,7 +17,9 @@ #include #include +#include "zenoh-pico/api/types.h" #include "zenoh-pico/config.h" +#include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" @@ -137,31 +139,16 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca } void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len, _z_n_qos_t qos -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_t att -#endif -) { + _z_zint_t payload_len, const _z_n_qos_t qos, const z_attachment_t att) { _z_encoding_t encoding = {.id = Z_ENCODING_PREFIX_DEFAULT, .schema = _z_bytes_wrap(NULL, 0)}; int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT, - _z_timestamp_null(), qos -#if Z_FEATURE_ATTACHMENT == 1 - , - att -#endif - ); + _z_timestamp_null(), qos, att); (void)ret; } int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, - const _z_n_qos_t qos -#if Z_FEATURE_ATTACHMENT == 1 - , - z_attachment_t att -#endif -) { + const _z_n_qos_t qos, const z_attachment_t att) { int8_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -175,26 +162,18 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _zp_session_unlock_mutex(zn); // Build the sample - _z_sample_t s; - s.keyexpr = key; - s.payload = payload; - s.encoding = encoding; - s.kind = kind; - s.timestamp = timestamp; - s.qos = qos; -#if Z_FEATURE_ATTACHMENT == 1 - s.attachment = att; -#endif + z_sample_t sample = {._rc = _z_sample_rc_new()}; + sample._rc.in->val = _z_sample_create(&key, &payload, timestamp, encoding, kind, qos, att); + // Parse subscription list _z_subscription_rc_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs)); while (xs != NULL) { _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); - sub->in->val._callback(&s, sub->in->val._arg); + sub->in->val._callback(&sample, sub->in->val._arg); xs = _z_subscription_rc_list_tail(xs); } -#if Z_FEEATURE_ATTACHMENT == 1 - _z_attachment_drop(&s.attachment); -#endif + // Clean up + _z_sample_rc_drop(&sample._rc); _z_keyexpr_clear(&key); _z_subscription_rc_list_free(&subs); } else { diff --git a/tests/z_api_alignment_test.c b/tests/z_api_alignment_test.c index ac208fa68..9ac4b220c 100644 --- a/tests/z_api_alignment_test.c +++ b/tests/z_api_alignment_test.c @@ -88,7 +88,7 @@ void reply_handler(z_owned_reply_t *reply, void *arg) { replies++; if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); z_owned_str_t k_str = z_keyexpr_to_string(sample.keyexpr); #ifdef ZENOH_PICO @@ -111,10 +111,11 @@ void data_handler(const z_sample_t *sample, void *arg) { printf("%s\n", __func__); datas++; - z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_owned_str_t k_str = z_keyexpr_to_string(keyexpr); #ifdef ZENOH_PICO if (z_check(k_str) == false) { - k_str = zp_keyexpr_resolve(*(z_session_t *)arg, sample->keyexpr); + k_str = zp_keyexpr_resolve(*(z_session_t *)arg, keyexpr); } #endif z_drop(z_move(k_str)); diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index e8307d7b7..1751f8675 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -17,32 +17,36 @@ #include #include "zenoh-pico/api/handlers.h" +#include "zenoh-pico/net/sample.h" #undef NDEBUG #include -#define SEND(channel, v) \ - do { \ - z_sample_t sample; \ - sample.payload.start = (const uint8_t *)v; \ - sample.payload.len = strlen(v); \ - sample.keyexpr = _z_rname("key"); \ - sample.timestamp = _z_timestamp_null(); \ - sample.encoding = z_encoding_default(); \ - z_call(channel.send, &sample); \ +#define SEND(channel, v) \ + do { \ + _z_sample_t s = {.keyexpr = _z_rname("key"), \ + .payload = {.start = (const uint8_t *)v, .len = strlen(v)}, \ + .timestamp = _z_timestamp_null(), \ + .encoding = z_encoding_default(), \ + .kind = 0, \ + .qos = {0}}; \ + z_sample_t sample = {._rc = _z_sample_rc_new_from_val(s)}; \ + z_call(channel.send, &sample); \ } while (0); -#define _RECV(channel, method, buf) \ - do { \ - z_owned_sample_t sample = z_sample_null(); \ - z_call(channel.method, &sample); \ - if (z_check(sample)) { \ - strncpy(buf, (const char *)z_loan(sample).payload.start, (size_t)z_loan(sample).payload.len); \ - buf[z_loan(sample).payload.len] = '\0'; \ - z_drop(z_move(sample)); \ - } else { \ - buf[0] = '\0'; \ - } \ +#define _RECV(channel, method, buf) \ + do { \ + z_owned_sample_t sample = z_sample_null(); \ + z_call(channel.method, &sample); \ + if (z_check(sample)) { \ + z_sample_t loaned_sample = z_loan(sample); \ + z_bytes_t payload = z_sample_payload(&loaned_sample); \ + strncpy(buf, (const char *)payload.start, (size_t)payload.len); \ + buf[payload.len] = '\0'; \ + z_drop(z_move(sample)); \ + } else { \ + buf[0] = '\0'; \ + } \ } while (0); #define RECV(channel, buf) _RECV(channel, recv, buf) diff --git a/tests/z_client_test.c b/tests/z_client_test.c index 023798062..3564ec198 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -71,7 +71,7 @@ void reply_handler(z_owned_reply_t *reply, void *arg) { char *res = (char *)malloc(64); snprintf(res, 64, "%s%u", uri, *(unsigned int *)arg); if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); + z_loaned_sample_t sample = z_reply_ok(reply); printf(">> Received reply data: %s\t(%u/%u)\n", res, replies, total); z_owned_str_t k_str = z_keyexpr_to_string(sample.keyexpr); @@ -93,8 +93,10 @@ void data_handler(const z_sample_t *sample, void *arg) { snprintf(res, 64, "%s%u", uri, *(unsigned int *)arg); printf(">> Received data: %s\t(%u/%u)\n", res, datas, total); - z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr); - assert((sample->payload.len == MSG_LEN) || (sample->payload.len == FRAGMENT_MSG_LEN)); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t k_str = z_keyexpr_to_string(keyexpr); + assert((payload.len == MSG_LEN) || (payload.len == FRAGMENT_MSG_LEN)); assert(_z_str_eq(z_loan(k_str), res) == true); datas++; diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index cd8b28f05..5965d5ab1 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -46,8 +46,10 @@ void data_handler(const z_sample_t *sample, void *arg) { snprintf(res, 64, "%s%u", uri, *(unsigned int *)arg); printf(">> Received data: %s\t(%u/%u)\n", res, datas, total); - z_owned_str_t k_str = z_keyexpr_to_string(sample->keyexpr); - assert(sample->payload.len == MSG_LEN); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_bytes_t payload = z_sample_payload(sample); + z_owned_str_t k_str = z_keyexpr_to_string(keyexpr); + assert(payload.len == MSG_LEN); assert(strlen(z_loan(k_str)) == strlen(res)); assert(strncmp(res, z_loan(k_str), strlen(res)) == 0); (void)(sample); diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c index 58d5959c1..b6ef2d291 100644 --- a/tests/z_perf_rx.c +++ b/tests/z_perf_rx.c @@ -41,13 +41,14 @@ void z_stats_stop(z_stats_t *stats) { void on_sample(const z_sample_t *sample, void *context) { z_stats_t *stats = (z_stats_t *)context; + z_bytes_t payload = z_sample_payload(sample); - if (stats->curr_len != sample->payload.len) { + if (stats->curr_len != payload.len) { // End previous measurement z_stats_stop(stats); // Check for end packet - stats->curr_len = (unsigned long)sample->payload.len; - if (sample->payload.len == 1) { + stats->curr_len = (unsigned long)payload.len; + if (payload.len == 1) { test_end = true; return; } diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c index ae562a518..dac7f0883 100644 --- a/tests/z_test_fragment_rx.c +++ b/tests/z_test_fragment_rx.c @@ -20,16 +20,18 @@ #if Z_FEATURE_SUBSCRIPTION == 1 void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); - z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + z_keyexpr_t keyexpr = z_sample_keyexpr(sample); + z_owned_str_t keystr = z_keyexpr_to_string(keyexpr); bool is_valid = true; - const uint8_t *data = sample->payload.start; - for (size_t i = 0; i < sample->payload.len; i++) { + z_bytes_t payload = z_sample_payload(sample); + const uint8_t *data = payload.start; + for (size_t i = 0; i < payload.len; i++) { if (data[i] != (uint8_t)i) { is_valid = false; break; } } - printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid); + printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)payload.len, is_valid); z_drop(z_move(keystr)); }