diff --git a/Cargo.toml b/Cargo.toml index 2735c3f86..288cf6823 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/Cargo.toml.in b/Cargo.toml.in index f434df3a1..818752a39 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/examples/z_get.c b/examples/z_get.c index 272bef89a..a47d23def 100644 --- a/examples/z_get.c +++ b/examples/z_get.c @@ -54,7 +54,7 @@ int main(int argc, char **argv) { } printf("Sending Query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get_options_t opts = z_get_options_default(); if (value != NULL) { opts.value.payload = z_bytes_from_str(value); @@ -76,4 +76,4 @@ int main(int argc, char **argv) { z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/examples/z_get_liveliness.c b/examples/z_get_liveliness.c index c667cb03a..ba8893ab1 100644 --- a/examples/z_get_liveliness.c +++ b/examples/z_get_liveliness.c @@ -47,7 +47,7 @@ int main(int argc, char **argv) { } printf("Sending liveliness query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL); z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { diff --git a/examples/z_non_blocking_get.c b/examples/z_non_blocking_get.c index c0b02a274..43df578d5 100644 --- a/examples/z_non_blocking_get.c +++ b/examples/z_non_blocking_get.c @@ -47,26 +47,27 @@ int main(int argc, char **argv) { printf("Sending Query '%s'...\n", expr); z_get_options_t opts = z_get_options_default(); opts.target = Z_QUERY_TARGET_ALL; - z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get(z_loan(s), keyexpr, "", z_move(channel.send), &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); - z_drop(z_move(keystr)); - } else { - printf("Received an error\n"); + while (true) { + for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } } + printf(">> Nothing to get... sleep for 5 s\n"); + z_sleep_s(5); } + z_drop(z_move(reply)); z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/examples/z_pull.c b/examples/z_pull.c index 6115b50ac..b17d069a9 100644 --- a/examples/z_pull.c +++ b/examples/z_pull.c @@ -64,30 +64,30 @@ int main(int argc, char **argv) { } printf("Pull functionality not implemented!\n"); - // @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new - // printf("Declaring Subscriber on '%s'...\n", keyexpr); - // z_owned_sample_channel_t channel = z_sample_channel_ring_new(size); - // z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); - // if (!z_check(sub)) { - // printf("Unable to declare subscriber.\n"); - // return -1; - // } + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_ring_channel_t channel = z_sample_channel_ring_new(size); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } - // printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); - // z_owned_sample_t sample = z_sample_null(); - // while (true) { - // for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { - // z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); - // printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, - // sample.payload.start); - // z_drop(z_move(keystr)); - // z_drop(z_move(sample)); - // } - // printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); - // z_sleep_ms(interval); - // } + printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); + z_owned_sample_t sample = z_sample_null(); + while (true) { + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); + printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, + sample.payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + } + printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); + z_sleep_ms(interval); + } - // z_undeclare_subscriber(z_move(sub)); + z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); z_close(z_move(s)); diff --git a/examples/z_queryable_with_channels.c b/examples/z_queryable_with_channels.c index 672ddd607..ad201915b 100644 --- a/examples/z_queryable_with_channels.c +++ b/examples/z_queryable_with_channels.c @@ -14,18 +14,13 @@ #include #include #include + #include "zenoh.h" const char *expr = "demo/example/zenoh-c-queryable"; const char *value = "Queryable from C!"; z_keyexpr_t keyexpr; -void query_handler(const z_query_t *query, void *context) { - z_owned_closure_owned_query_t *channel = (z_owned_closure_owned_query_t *)context; - z_owned_query_t oquery = z_query_clone(query); - z_call(*channel, &oquery); -} - int main(int argc, char **argv) { if (argc > 1) { expr = argv[1]; @@ -54,9 +49,8 @@ int main(int argc, char **argv) { } printf("Declaring Queryable on '%s'...\n", expr); - z_owned_query_channel_t channel = zc_query_fifo_new(16); - z_owned_closure_query_t callback = z_closure(query_handler, NULL, &channel.send); - z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(callback), NULL); + z_owned_query_fifo_channel_t channel = z_query_fifo_channel_new(16); + z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(channel.send), NULL); if (!z_check(qable)) { printf("Unable to create queryable.\n"); exit(-1); diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 067170fd4..d2d75bb4d 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -315,6 +315,44 @@ typedef struct z_owned_closure_owned_query_t { void (*call)(struct z_owned_query_t*, void *context); void (*drop)(void*); } z_owned_closure_owned_query_t; +/** + * Owned variant of a z_sample_t. + * + * You may construct it by `z_sample_clone`-ing a loaned sample. + * When the last `z_owned_sample_t` corresponding to a sample is destroyed, or the callback that produced the sample cloned to build them returns, + * the sample will receive its termination signal. + */ +#if !defined(TARGET_ARCH_ARM) +typedef struct ALIGN(8) z_owned_sample_t { + uint64_t _0[17]; +} z_owned_sample_t; +#endif +#if defined(TARGET_ARCH_ARM) +typedef struct ALIGN(8) z_owned_sample_t { + uint64_t _0[15]; +} z_owned_sample_t; +#endif +/** + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: + * + * Members: + * void *context: a pointer to an arbitrary state. + * void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. + * void *drop(void*): allows the callback's state to be freed. + * + * Closures are not guaranteed not to be called concurrently. + * + * It is guaranteed that: + * + * - `call` will never be called once `drop` has started. + * - `drop` will only be called **once**, and **after every** `call` has ended. + * - The two previous guarantees imply that `call` and `drop` are never called concurrently. + */ +typedef struct z_owned_closure_owned_sample_t { + void *context; + void (*call)(struct z_owned_sample_t*, void *context); + void (*drop)(void*); +} z_owned_closure_owned_sample_t; /** * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: * @@ -741,31 +779,11 @@ typedef struct z_put_options_t { enum z_priority_t priority; struct z_attachment_t attachment; } z_put_options_t; -/** - * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: - * - `this` is a pointer to an arbitrary state. - * - `call` is the typical callback function. `this` will be passed as its last argument. - * - `drop` allows the callback's state to be freed. - * - * Closures are not guaranteed not to be called concurrently. - * - * We guarantee that: - * - `call` will never be called once `drop` has started. - * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. - * - The two previous guarantees imply that `call` and `drop` are never called concurrently. - */ -typedef struct z_owned_query_channel_closure_t { - void *context; - bool (*call)(struct z_owned_query_t*, void*); - void (*drop)(void*); -} z_owned_query_channel_closure_t; -/** - * A pair of closures - */ -typedef struct z_owned_query_channel_t { - struct z_owned_closure_owned_query_t send; - struct z_owned_query_channel_closure_t recv; -} z_owned_query_channel_t; +typedef struct z_owned_query_fifo_channel_t { + struct z_owned_closure_query_t send; + struct z_owned_closure_owned_query_t recv; + struct z_owned_closure_owned_query_t try_recv; +} z_owned_query_fifo_channel_t; /** * Represents the set of options that can be applied to a query reply, * sent via :c:func:`z_query_reply`. @@ -778,31 +796,31 @@ typedef struct z_query_reply_options_t { struct z_encoding_t encoding; struct z_attachment_t attachment; } z_query_reply_options_t; -/** - * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: - * - `this` is a pointer to an arbitrary state. - * - `call` is the typical callback function. `this` will be passed as its last argument. - * - `drop` allows the callback's state to be freed. - * - * Closures are not guaranteed not to be called concurrently. - * - * We guarantee that: - * - `call` will never be called once `drop` has started. - * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. - * - The two previous guarantees imply that `call` and `drop` are never called concurrently. - */ -typedef struct z_owned_reply_channel_closure_t { - void *context; - bool (*call)(struct z_owned_reply_t*, void*); - void (*drop)(void*); -} z_owned_reply_channel_closure_t; -/** - * A pair of closures, the `send` one accepting - */ -typedef struct z_owned_reply_channel_t { +typedef struct z_owned_query_ring_channel_t { + struct z_owned_closure_query_t send; + struct z_owned_closure_owned_query_t recv; + struct z_owned_closure_owned_query_t try_recv; +} z_owned_query_ring_channel_t; +typedef struct z_owned_reply_fifo_channel_t { + struct z_owned_closure_reply_t send; + struct z_owned_closure_reply_t recv; + struct z_owned_closure_reply_t try_recv; +} z_owned_reply_fifo_channel_t; +typedef struct z_owned_reply_ring_channel_t { struct z_owned_closure_reply_t send; - struct z_owned_reply_channel_closure_t recv; -} z_owned_reply_channel_t; + struct z_owned_closure_reply_t recv; + struct z_owned_closure_reply_t try_recv; +} z_owned_reply_ring_channel_t; +typedef struct z_owned_sample_fifo_channel_t { + struct z_owned_closure_sample_t send; + struct z_owned_closure_owned_sample_t recv; + struct z_owned_closure_owned_sample_t try_recv; +} z_owned_sample_fifo_channel_t; +typedef struct z_owned_sample_ring_channel_t { + struct z_owned_closure_sample_t send; + struct z_owned_closure_owned_sample_t recv; + struct z_owned_closure_owned_sample_t try_recv; +} z_owned_sample_ring_channel_t; typedef struct z_owned_scouting_config_t { struct z_owned_config_t _config; unsigned long zc_timeout_ms; @@ -1203,6 +1221,20 @@ ZENOHC_API struct z_owned_closure_owned_query_t z_closure_owned_query_null(void) * Calls the closure. Calling an uninitialized closure is a no-op. */ ZENOHC_API +void z_closure_owned_sample_call(const struct z_owned_closure_owned_sample_t *closure, + struct z_owned_sample_t *sample); +/** + * Drops the closure. Droping an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_owned_sample_drop(struct z_owned_closure_owned_sample_t *closure); +/** + * Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type + */ +ZENOHC_API struct z_owned_closure_owned_sample_t z_closure_owned_sample_null(void); +/** + * Calls the closure. Calling an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_query_call(const struct z_owned_closure_query_t *closure, const struct z_query_t *query); /** @@ -1773,25 +1805,6 @@ ZENOHC_API enum z_priority_t z_qos_get_priority(struct z_qos_t qos); * `z_check(return_value) == false` if there was no attachment to the query. */ ZENOHC_API struct z_attachment_t z_query_attachment(const struct z_query_t *query); -/** - * Calls the closure. Calling an uninitialized closure is a no-op. - */ -ZENOHC_API -bool z_query_channel_closure_call(const struct z_owned_query_channel_closure_t *closure, - struct z_owned_query_t *sample); -/** - * Drops the closure. Droping an uninitialized closure is a no-op. - */ -ZENOHC_API void z_query_channel_closure_drop(struct z_owned_query_channel_closure_t *closure); -/** - * Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type - */ -ZENOHC_API struct z_owned_query_channel_closure_t z_query_channel_closure_null(void); -ZENOHC_API void z_query_channel_drop(struct z_owned_query_channel_t *channel); -/** - * Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type - */ -ZENOHC_API struct z_owned_query_channel_t z_query_channel_null(void); /** * Returns `false` if `this` is in a gravestone state, `true` otherwise. * @@ -1840,6 +1853,24 @@ ZENOHC_API struct z_query_consolidation_t z_query_consolidation_none(void); */ ZENOHC_API void z_query_drop(struct z_owned_query_t *this_); +ZENOHC_API void z_query_fifo_channel_drop(struct z_owned_query_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_fifo_channel_t z_query_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_query_fifo_channel_t z_query_fifo_channel_null(void); /** * Get a query's key by aliasing it. */ @@ -1885,6 +1916,24 @@ int8_t z_query_reply(const struct z_query_t *query, * Constructs the default value for :c:type:`z_query_reply_options_t`. */ ZENOHC_API struct z_query_reply_options_t z_query_reply_options_default(void); +ZENOHC_API void z_query_ring_channel_drop(struct z_owned_query_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_ring_channel_t z_query_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_ring_channel_t' type + */ +ZENOHC_API struct z_owned_query_ring_channel_t z_query_ring_channel_null(void); /** * Create a default :c:type:`z_query_target_t`. */ @@ -1913,25 +1962,6 @@ ZENOHC_API uint16_t z_random_u16(void); ZENOHC_API uint32_t z_random_u32(void); ZENOHC_API uint64_t z_random_u64(void); ZENOHC_API uint8_t z_random_u8(void); -/** - * Calls the closure. Calling an uninitialized closure is a no-op. - */ -ZENOHC_API -bool z_reply_channel_closure_call(const struct z_owned_reply_channel_closure_t *closure, - struct z_owned_reply_t *sample); -/** - * Drops the closure. Droping an uninitialized closure is a no-op. - */ -ZENOHC_API void z_reply_channel_closure_drop(struct z_owned_reply_channel_closure_t *closure); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type - */ -ZENOHC_API struct z_owned_reply_channel_closure_t z_reply_channel_closure_null(void); -ZENOHC_API void z_reply_channel_drop(struct z_owned_reply_channel_t *channel); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type - */ -ZENOHC_API struct z_owned_reply_channel_t z_reply_channel_null(void); /** * Returns ``true`` if `reply_data` is valid. */ @@ -1947,6 +1977,24 @@ ZENOHC_API void z_reply_drop(struct z_owned_reply_t *reply_data); */ ZENOHC_API struct z_value_t z_reply_err(const struct z_owned_reply_t *reply); +ZENOHC_API void z_reply_fifo_channel_drop(struct z_owned_reply_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_null(void); /** * Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample. * @@ -1971,6 +2019,92 @@ ZENOHC_API struct z_owned_reply_t z_reply_null(void); */ ZENOHC_API struct z_sample_t z_reply_ok(const struct z_owned_reply_t *reply); +ZENOHC_API void z_reply_ring_channel_drop(struct z_owned_reply_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_reply_ring_channel_t z_reply_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_reply_ring_channel_t' type + */ +ZENOHC_API struct z_owned_reply_ring_channel_t z_reply_ring_channel_null(void); +/** + * Returns `false` if `this` is in a gravestone state, `true` otherwise. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +bool z_sample_check(const struct z_owned_sample_t *this_); +/** + * Clones the sample, allowing to keep it in an "open" state past the callback's return. + * + * This operation is infallible, but may return a gravestone value if `sample` itself was a gravestone value (which cannot be the case in a callback). + */ +ZENOHC_API +struct z_owned_sample_t z_sample_clone(const struct z_sample_t *sample); +/** + * Destroys the sample, setting `this` to its gravestone value to prevent double-frees. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +void z_sample_drop(struct z_owned_sample_t *this_); +ZENOHC_API void z_sample_fifo_channel_drop(struct z_owned_sample_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_sample_fifo_channel_t z_sample_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_sample_fifo_channel_t z_sample_fifo_channel_null(void); +/** + * Aliases the sample. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +struct z_sample_t z_sample_loan(const struct z_owned_sample_t *this_); +/** + * The gravestone value of `z_owned_sample_t`. + */ +ZENOHC_API struct z_owned_sample_t z_sample_null(void); +ZENOHC_API void z_sample_ring_channel_drop(struct z_owned_sample_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_sample_ring_channel_t z_sample_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type + */ +ZENOHC_API struct z_owned_sample_ring_channel_t z_sample_ring_channel_null(void); /** * Scout for routers and/or peers. * @@ -2337,58 +2471,6 @@ int8_t zc_put_owned(struct z_session_t session, struct z_keyexpr_t keyexpr, struct zc_owned_payload_t *payload, const struct z_put_options_t *opts); -/** - * Creates a new blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, - * which it will then return; or until the `send` closure is dropped and all queries have been consumed, - * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_query_channel_t zc_query_fifo_new(size_t bound); -/** - * Creates a new non-blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, - * which it will then return; or until the `send` closure is dropped and all queries have been consumed, - * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound); -/** - * Creates a new blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_fifo_new(size_t bound); -/** - * Creates a new non-blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_non_blocking_fifo_new(size_t bound); /** * Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 57a691d7a..617e99cb9 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,10 +35,12 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ - z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ - z_owned_query_channel_t * : z_query_channel_drop, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ + z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ + z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ @@ -67,8 +69,12 @@ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \ - z_owned_reply_channel_t * : z_reply_channel_null, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_null, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_null, \ + z_owned_query_fifo_channel_t * : z_query_fifo_channel_null, \ + z_owned_query_ring_channel_t * : z_query_ring_channel_null, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -110,9 +116,7 @@ z_owned_closure_reply_t : z_closure_reply_call, \ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ - zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \ - z_owned_reply_channel_closure_t : z_reply_channel_closure_call, \ - z_owned_query_channel_closure_t : z_query_channel_closure_call \ + zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call \ ) (&x, __VA_ARGS__) // clang-format on @@ -173,8 +177,12 @@ template<> struct zenoh_drop_type { typedef void type; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef int8_t type; }; @@ -201,8 +209,12 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_closure_t* v) { z_reply_channel_closure_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_sample_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_sample_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_query_fifo_channel_t* v) { z_query_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_query_ring_channel_t* v) { z_query_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_fifo_channel_t* v) { z_reply_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_ring_channel_t* v) { z_reply_ring_channel_drop(v); } template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); } template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); } template<> inline int8_t z_drop(ze_owned_publication_cache_t* v) { return ze_undeclare_publication_cache(v); } @@ -229,8 +241,12 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); } -inline void z_null(z_owned_reply_channel_closure_t& v) { v = z_reply_channel_closure_null(); } -inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); } +inline void z_null(z_owned_sample_fifo_channel_t& v) { v = z_sample_fifo_channel_null(); } +inline void z_null(z_owned_sample_ring_channel_t& v) { v = z_sample_ring_channel_null(); } +inline void z_null(z_owned_query_fifo_channel_t& v) { v = z_query_fifo_channel_null(); } +inline void z_null(z_owned_query_ring_channel_t& v) { v = z_query_ring_channel_null(); } +inline void z_null(z_owned_reply_fifo_channel_t& v) { v = z_reply_fifo_channel_null(); } +inline void z_null(z_owned_reply_ring_channel_t& v) { v = z_reply_ring_channel_null(); } inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); } inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); } inline void z_null(ze_owned_publication_cache_t& v) { v = ze_publication_cache_null(); } @@ -271,8 +287,6 @@ inline void z_call(const struct z_owned_closure_zid_t &closure, const struct z_i { z_closure_zid_call(&closure, zid); } inline void z_call(const struct zcu_owned_closure_matching_status_t &closure, const struct zcu_matching_status_t *matching_status) { zcu_closure_matching_status_call(&closure, matching_status); } -inline bool z_call(const struct z_owned_reply_channel_closure_t &closure, struct z_owned_reply_t *sample) - { return z_reply_channel_closure_call(&closure, sample); } // clang-format on #define _z_closure_overloader(callback, droper, ctx, ...) \ diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 74b2748aa..6e4524d6c 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -23,12 +23,15 @@ mod reply_closure; pub use zenohid_closure::*; mod zenohid_closure; -pub use response_channel::*; -mod response_channel; +pub use sample_channel::*; +mod sample_channel; pub use query_channel::*; mod query_channel; +pub use reply_channel::*; +mod reply_channel; + pub use hello_closure::*; mod hello_closure; diff --git a/src/closures/query_channel.rs b/src/closures/query_channel.rs index b7a51c9c0..837393239 100644 --- a/src/closures/query_channel.rs +++ b/src/closures/query_channel.rs @@ -1,42 +1,27 @@ -use crate::{z_closure_owned_query_drop, z_owned_closure_owned_query_t, z_owned_query_t}; -use libc::c_void; -use std::sync::mpsc::TryRecvError; +use crate::{ + z_closure_owned_query_drop, z_closure_query_drop, z_owned_closure_owned_query_t, + z_owned_closure_query_t, z_owned_query_t, z_query_clone, z_query_t, +}; -/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: -/// - `this` is a pointer to an arbitrary state. -/// - `call` is the typical callback function. `this` will be passed as its last argument. -/// - `drop` allows the callback's state to be freed. -/// -/// Closures are not guaranteed not to be called concurrently. -/// -/// We guarantee that: -/// - `call` will never be called once `drop` has started. -/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. #[repr(C)] -pub struct z_owned_query_channel_closure_t { - context: *mut c_void, - call: Option bool>, - drop: Option, -} - -/// A pair of closures -#[repr(C)] -pub struct z_owned_query_channel_t { - pub send: z_owned_closure_owned_query_t, - pub recv: z_owned_query_channel_closure_t, +pub struct z_owned_query_fifo_channel_t { + pub send: z_owned_closure_query_t, + pub recv: z_owned_closure_owned_query_t, + pub try_recv: z_owned_closure_owned_query_t, } #[no_mangle] -pub extern "C" fn z_query_channel_drop(channel: &mut z_owned_query_channel_t) { - z_closure_owned_query_drop(&mut channel.send); - z_query_channel_closure_drop(&mut channel.recv); +pub extern "C" fn z_query_fifo_channel_drop(channel: &mut z_owned_query_fifo_channel_t) { + z_closure_query_drop(&mut channel.send); + z_closure_owned_query_drop(&mut channel.recv); + z_closure_owned_query_drop(&mut channel.try_recv); } -/// Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type +/// Constructs a null safe-to-drop value of 'z_owned_query_fifo_channel_t' type #[no_mangle] -pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t { - z_owned_query_channel_t { - send: z_owned_closure_owned_query_t::empty(), - recv: z_owned_query_channel_closure_t::empty(), +pub extern "C" fn z_query_fifo_channel_null() -> z_owned_query_fifo_channel_t { + z_owned_query_fifo_channel_t { + send: z_owned_closure_query_t::empty(), + recv: z_owned_closure_owned_query_t::empty(), + try_recv: z_owned_closure_owned_query_t::empty(), } } @@ -47,167 +32,101 @@ pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t { /// The `send` end should be passed as callback to a `z_get` call. /// /// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, -/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, /// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. #[no_mangle] -pub extern "C" fn zc_query_fifo_new(bound: usize) -> z_owned_query_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) +pub extern "C" fn z_query_fifo_channel_new(bound: usize) -> z_owned_query_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) + crossbeam_channel::bounded(bound) }; - z_owned_query_channel_t { - send, + let rx_clone = rx.clone(); + z_owned_query_fifo_channel_t { + send: From::from(move |query: &z_query_t| { + let mut oquery = z_query_clone(Some(query)); + if let Some(oquery) = oquery.take() { + if let Err(e) = tx.send(oquery) { + log::error!("Attempted to push onto a closed query_ring: {}", e) + } + } + }), recv: From::from(move |receptacle: &mut z_owned_query_t| { *receptacle = match rx.recv() { Ok(val) => val.into(), Err(_) => None.into(), }; - true + }), + try_recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; }), } } -/// Creates a new non-blocking fifo channel, returned as a pair of closures. +#[repr(C)] +pub struct z_owned_query_ring_channel_t { + pub send: z_owned_closure_query_t, + pub recv: z_owned_closure_owned_query_t, + pub try_recv: z_owned_closure_owned_query_t, +} +#[no_mangle] +pub extern "C" fn z_query_ring_channel_drop(channel: &mut z_owned_query_ring_channel_t) { + z_closure_query_drop(&mut channel.send); + z_closure_owned_query_drop(&mut channel.recv); + z_closure_owned_query_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_query_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_query_ring_channel_null() -> z_owned_query_ring_channel_t { + z_owned_query_ring_channel_t { + send: z_owned_closure_query_t::empty(), + recv: z_owned_closure_owned_query_t::empty(), + try_recv: z_owned_closure_owned_query_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. /// /// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. /// /// The `send` end should be passed as callback to a `z_get` call. /// /// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, -/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, /// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. #[no_mangle] -pub extern "C" fn zc_query_non_blocking_fifo_new(bound: usize) -> z_owned_query_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) +pub extern "C" fn z_query_ring_channel_new(bound: usize) -> z_owned_query_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) + crossbeam_channel::bounded(bound) }; - z_owned_query_channel_t { - send, - recv: From::from( - move |receptacle: &mut z_owned_query_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_query_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true + let rx_clone = rx.clone(); + z_owned_query_ring_channel_t { + send: From::from(move |query: &z_query_t| { + let mut oquery = z_query_clone(Some(query)); + if let Some(oquery) = oquery.take() { + if let Err(e) = tx.send(oquery) { + log::error!("Attempted to push onto a closed query_ring: {}", e) } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } -} - -impl z_owned_query_channel_closure_t { - pub fn empty() -> Self { - z_owned_query_channel_closure_t { - context: std::ptr::null_mut(), - call: None, - drop: None, - } - } -} -unsafe impl Send for z_owned_query_channel_closure_t {} -unsafe impl Sync for z_owned_query_channel_closure_t {} -impl Drop for z_owned_query_channel_closure_t { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.context) - } - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type -#[no_mangle] -pub extern "C" fn z_query_channel_closure_null() -> z_owned_query_channel_closure_t { - z_owned_query_channel_closure_t::empty() -} - -/// Calls the closure. Calling an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_query_channel_closure_call( - closure: &z_owned_query_channel_closure_t, - sample: &mut z_owned_query_t, -) -> bool { - match closure.call { - Some(call) => call(sample, closure.context), - None => { - log::error!("Attempted to call an uninitialized closure!"); - true - } - } -} -/// Drops the closure. Droping an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_query_channel_closure_drop(closure: &mut z_owned_query_channel_closure_t) { - let mut empty_closure = z_owned_query_channel_closure_t::empty(); - std::mem::swap(&mut empty_closure, closure); -} -impl bool> From for z_owned_query_channel_closure_t { - fn from(f: F) -> Self { - let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call bool>( - query: &mut z_owned_query_t, - this: *mut c_void, - ) -> bool { - let this = unsafe { &*(this as *const F) }; - this(query) - } - extern "C" fn drop(this: *mut c_void) { - std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) - } - z_owned_query_channel_closure_t { - context: this, - call: Some(call::), - drop: Some(drop::), - } + } + }), + recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), } } diff --git a/src/closures/reply_channel.rs b/src/closures/reply_channel.rs new file mode 100644 index 000000000..6e88e4fd7 --- /dev/null +++ b/src/closures/reply_channel.rs @@ -0,0 +1,127 @@ +use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; + +#[repr(C)] +pub struct z_owned_reply_fifo_channel_t { + pub send: z_owned_closure_reply_t, + pub recv: z_owned_closure_reply_t, + pub try_recv: z_owned_closure_reply_t, +} +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_drop(channel: &mut z_owned_reply_fifo_channel_t) { + z_closure_reply_drop(&mut channel.send); + z_closure_reply_drop(&mut channel.recv); + z_closure_reply_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t { + z_owned_reply_fifo_channel_t { + send: z_owned_closure_reply_t::empty(), + recv: z_owned_closure_reply_t::empty(), + try_recv: z_owned_closure_reply_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_reply_fifo_channel_t { + send: From::from(move |reply: &mut z_owned_reply_t| { + if let Some(reply) = reply.take() { + if let Err(e) = tx.send(reply) { + log::error!("Attempted to push onto a closed reply_fifo: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} + +#[repr(C)] +pub struct z_owned_reply_ring_channel_t { + pub send: z_owned_closure_reply_t, + pub recv: z_owned_closure_reply_t, + pub try_recv: z_owned_closure_reply_t, +} +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_drop(channel: &mut z_owned_reply_ring_channel_t) { + z_closure_reply_drop(&mut channel.send); + z_closure_reply_drop(&mut channel.recv); + z_closure_reply_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_reply_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_null() -> z_owned_reply_ring_channel_t { + z_owned_reply_ring_channel_t { + send: z_owned_closure_reply_t::empty(), + recv: z_owned_closure_reply_t::empty(), + try_recv: z_owned_closure_reply_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_new(bound: usize) -> z_owned_reply_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_reply_ring_channel_t { + send: From::from(move |reply: &mut z_owned_reply_t| { + if let Some(reply) = reply.take() { + if let Err(e) = tx.send(reply) { + log::error!("Attempted to push onto a closed reply_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/response_channel.rs b/src/closures/response_channel.rs deleted file mode 100644 index 0ea046d5d..000000000 --- a/src/closures/response_channel.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; -use libc::c_void; -use std::sync::mpsc::TryRecvError; -/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: -/// - `this` is a pointer to an arbitrary state. -/// - `call` is the typical callback function. `this` will be passed as its last argument. -/// - `drop` allows the callback's state to be freed. -/// -/// Closures are not guaranteed not to be called concurrently. -/// -/// We guarantee that: -/// - `call` will never be called once `drop` has started. -/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. -#[repr(C)] -pub struct z_owned_reply_channel_closure_t { - context: *mut c_void, - call: Option bool>, - drop: Option, -} - -/// A pair of closures, the `send` one accepting -#[repr(C)] -pub struct z_owned_reply_channel_t { - pub send: z_owned_closure_reply_t, - pub recv: z_owned_reply_channel_closure_t, -} -#[no_mangle] -pub extern "C" fn z_reply_channel_drop(channel: &mut z_owned_reply_channel_t) { - z_closure_reply_drop(&mut channel.send); - z_reply_channel_closure_drop(&mut channel.recv); -} -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_null() -> z_owned_reply_channel_t { - z_owned_reply_channel_t { - send: z_owned_closure_reply_t::empty(), - recv: z_owned_reply_channel_closure_t::empty(), - } -} - -/// Creates a new blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - z_owned_reply_channel_t { - send, - recv: From::from(move |receptacle: &mut z_owned_reply_t| { - *receptacle = match rx.recv() { - Ok(val) => val.into(), - Err(_) => None.into(), - }; - true - }), - } -} - -/// Creates a new non-blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_non_blocking_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - - z_owned_reply_channel_t { - send, - recv: From::from( - move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_reply_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true - } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } -} - -impl z_owned_reply_channel_closure_t { - pub fn empty() -> Self { - z_owned_reply_channel_closure_t { - context: std::ptr::null_mut(), - call: None, - drop: None, - } - } -} -unsafe impl Send for z_owned_reply_channel_closure_t {} -unsafe impl Sync for z_owned_reply_channel_closure_t {} -impl Drop for z_owned_reply_channel_closure_t { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.context) - } - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_null() -> z_owned_reply_channel_closure_t { - z_owned_reply_channel_closure_t::empty() -} - -/// Calls the closure. Calling an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_call( - closure: &z_owned_reply_channel_closure_t, - sample: &mut z_owned_reply_t, -) -> bool { - match closure.call { - Some(call) => call(sample, closure.context), - None => { - log::error!("Attempted to call an uninitialized closure!"); - true - } - } -} -/// Drops the closure. Droping an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_drop(closure: &mut z_owned_reply_channel_closure_t) { - let mut empty_closure = z_owned_reply_channel_closure_t::empty(); - std::mem::swap(&mut empty_closure, closure); -} -impl bool> From for z_owned_reply_channel_closure_t { - fn from(f: F) -> Self { - let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call bool>( - response: &mut z_owned_reply_t, - this: *mut c_void, - ) -> bool { - let this = unsafe { &*(this as *const F) }; - this(response) - } - extern "C" fn drop(this: *mut c_void) { - std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) - } - z_owned_reply_channel_closure_t { - context: this, - call: Some(call::), - drop: Some(drop::), - } - } -} diff --git a/src/closures/sample_channel.rs b/src/closures/sample_channel.rs new file mode 100644 index 000000000..d63cddf30 --- /dev/null +++ b/src/closures/sample_channel.rs @@ -0,0 +1,132 @@ +use crate::{ + z_closure_owned_sample_drop, z_closure_sample_drop, z_owned_closure_owned_sample_t, + z_owned_closure_sample_t, z_owned_sample_t, z_sample_clone, z_sample_t, +}; + +#[repr(C)] +pub struct z_owned_sample_fifo_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_drop(channel: &mut z_owned_sample_fifo_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_null() -> z_owned_sample_fifo_channel_t { + z_owned_sample_fifo_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_new(bound: usize) -> z_owned_sample_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_fifo_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} + +#[repr(C)] +pub struct z_owned_sample_ring_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_drop(channel: &mut z_owned_sample_ring_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_null() -> z_owned_sample_ring_channel_t { + z_owned_sample_ring_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_new(bound: usize) -> z_owned_sample_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_ring_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/sample_closure.rs b/src/closures/sample_closure.rs index 6f0e55707..65cb97a49 100644 --- a/src/closures/sample_closure.rs +++ b/src/closures/sample_closure.rs @@ -1,4 +1,4 @@ -use crate::z_sample_t; +use crate::{z_owned_sample_t, z_sample_t}; use libc::c_void; /// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. /// @@ -77,3 +77,84 @@ impl From for z_owned_closure_sample_t { } } } + +/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: +/// +/// Members: +/// void *context: a pointer to an arbitrary state. +/// void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. +/// void *drop(void*): allows the callback's state to be freed. +/// +/// Closures are not guaranteed not to be called concurrently. +/// +/// It is guaranteed that: +/// +/// - `call` will never be called once `drop` has started. +/// - `drop` will only be called **once**, and **after every** `call` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +#[repr(C)] +pub struct z_owned_closure_owned_sample_t { + context: *mut c_void, + call: Option, + drop: Option, +} +impl z_owned_closure_owned_sample_t { + pub fn empty() -> Self { + z_owned_closure_owned_sample_t { + context: std::ptr::null_mut(), + call: None, + drop: None, + } + } +} +unsafe impl Send for z_owned_closure_owned_sample_t {} +unsafe impl Sync for z_owned_closure_owned_sample_t {} +impl Drop for z_owned_closure_owned_sample_t { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.context) + } + } +} +/// Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_null() -> z_owned_closure_owned_sample_t { + z_owned_closure_owned_sample_t::empty() +} +/// Calls the closure. Calling an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_call( + closure: &z_owned_closure_owned_sample_t, + sample: &mut z_owned_sample_t, +) { + match closure.call { + Some(call) => call(sample, closure.context), + None => log::error!("Attempted to call an uninitialized closure!"), + } +} +/// Drops the closure. Droping an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_drop(closure: &mut z_owned_closure_owned_sample_t) { + let mut empty_closure = z_owned_closure_owned_sample_t::empty(); + std::mem::swap(&mut empty_closure, closure); +} +impl From for z_owned_closure_owned_sample_t { + fn from(f: F) -> Self { + let this = Box::into_raw(Box::new(f)) as _; + extern "C" fn call( + sample: &mut z_owned_sample_t, + this: *mut c_void, + ) { + let this = unsafe { &*(this as *const F) }; + this(sample) + } + extern "C" fn drop(this: *mut c_void) { + std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) + } + z_owned_closure_owned_sample_t { + context: this, + call: Some(call::), + drop: Some(drop::), + } + } +} diff --git a/src/commons.rs b/src/commons.rs index 8ac21cb11..2969d3913 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -20,6 +20,7 @@ use crate::z_priority_t; use crate::{impl_guarded_transmute, GuardedTransmute}; use libc::c_void; use libc::{c_char, c_ulong}; +use std::ops::{Deref, DerefMut}; use zenoh::buffers::ZBuf; use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; @@ -280,6 +281,82 @@ impl<'a> z_sample_t<'a> { } } +/// Owned variant of a z_sample_t. +/// +/// You may construct it by `z_sample_clone`-ing a loaned sample. +/// When the last `z_owned_sample_t` corresponding to a sample is destroyed, or the callback that produced the sample cloned to build them returns, +/// the sample will receive its termination signal. +#[cfg(not(target_arch = "arm"))] +#[repr(C, align(8))] +pub struct z_owned_sample_t([u64; 17]); + +#[cfg(target_arch = "arm")] +#[repr(C, align(8))] +pub struct z_owned_sample_t([u64; 15]); + +impl From>> for z_owned_sample_t { + fn from(value: Option) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From> for z_owned_sample_t { + fn from(value: z_sample_t<'_>) -> Self { + Some(value).into() + } +} +/* +impl Deref for z_owned_sample_t { + type Target = Option; + fn deref(&self) -> &Self::Target { + unsafe { core::mem::transmute(self) } + } +} +impl DerefMut for z_owned_sample_t { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { core::mem::transmute(self) } + } +} +*/ +impl Drop for z_owned_sample_t { + fn drop(&mut self) { + let _: Option = self.take(); + } +} + +/// The gravestone value of `z_owned_sample_t`. +#[no_mangle] +pub extern "C" fn z_sample_null() -> z_owned_sample_t { + unsafe { core::mem::transmute(None::) } +} +/// Returns `false` if `this` is in a gravestone state, `true` otherwise. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_check(this: &z_owned_sample_t) -> bool { + this.is_some() +} +/// Aliases the sample. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_loan(this: &z_owned_sample_t) -> z_sample_t { + this.as_ref().into() +} +/// Destroys the sample, setting `this` to its gravestone value to prevent double-frees. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_drop(this: &mut z_owned_sample_t) { + let _: Option = this.take(); +} +/// Clones the sample, allowing to keep it in an "open" state past the callback's return. +/// +/// This operation is infallible, but may return a gravestone value if `sample` itself was a gravestone value (which cannot be the case in a callback). +#[no_mangle] +pub extern "C" fn z_sample_clone(sample: Option<&z_sample_t>) -> z_owned_sample_t { + sample.and_then(|q| q.cloned()).into() +} + /// Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). #[no_mangle] pub extern "C" fn zc_sample_payload_rcinc(sample: Option<&z_sample_t>) -> zc_owned_payload_t { diff --git a/tests/z_api_null_drop_test.c b/tests/z_api_null_drop_test.c index 96eb102ce..69bb1a9f4 100644 --- a/tests/z_api_null_drop_test.c +++ b/tests/z_api_null_drop_test.c @@ -39,8 +39,7 @@ int main(int argc, char **argv) { z_owned_closure_reply_t closure_reply_null_1 = z_closure_reply_null(); z_owned_closure_hello_t closure_hello_null_1 = z_closure_hello_null(); z_owned_closure_zid_t closure_zid_null_1 = z_closure_zid_null(); - z_owned_reply_channel_closure_t reply_channel_closure_null_1 = z_reply_channel_closure_null(); - z_owned_reply_channel_t reply_channel_null_1 = z_reply_channel_null(); + z_owned_reply_fifo_channel_t reply_channel_null_1 = z_reply_fifo_channel_null(); z_owned_str_t str_null_1 = z_str_null(); zc_owned_payload_t payload_null_1 = zc_payload_null(); zc_owned_shmbuf_t shmbuf_null_1 = zc_shmbuf_null(); @@ -82,8 +81,7 @@ int main(int argc, char **argv) { z_owned_closure_reply_t closure_reply_null_2; z_owned_closure_hello_t closure_hello_null_2; z_owned_closure_zid_t closure_zid_null_2; - z_owned_reply_channel_closure_t reply_channel_closure_null_2; - z_owned_reply_channel_t reply_channel_null_2; + z_owned_reply_fifo_channel_t reply_channel_null_2; z_owned_str_t str_null_2; zc_owned_payload_t payload_null_2; zc_owned_shmbuf_t shmbuf_null_2; @@ -104,7 +102,6 @@ int main(int argc, char **argv) { z_null(&closure_reply_null_2); z_null(&closure_hello_null_2); z_null(&closure_zid_null_2); - z_null(&reply_channel_closure_null_2); z_null(&reply_channel_null_2); z_null(&str_null_2); z_null(&payload_null_2); @@ -148,7 +145,6 @@ int main(int argc, char **argv) { z_drop(z_move(closure_reply_null_1)); z_drop(z_move(closure_hello_null_1)); z_drop(z_move(closure_zid_null_1)); - z_drop(z_move(reply_channel_closure_null_1)); z_drop(z_move(reply_channel_null_1)); z_drop(z_move(str_null_1)); z_drop(z_move(payload_null_1)); @@ -170,7 +166,6 @@ int main(int argc, char **argv) { z_drop(z_move(closure_reply_null_2)); z_drop(z_move(closure_hello_null_2)); z_drop(z_move(closure_zid_null_2)); - z_drop(z_move(reply_channel_closure_null_2)); z_drop(z_move(reply_channel_null_2)); z_drop(z_move(str_null_2)); z_drop(z_move(payload_null_2)); diff --git a/tests/z_int_queryable_attachment_test.c b/tests/z_int_queryable_attachment_test.c index ce1ac9f42..b2171e900 100644 --- a/tests/z_int_queryable_attachment_test.c +++ b/tests/z_int_queryable_attachment_test.c @@ -102,7 +102,7 @@ int run_get() { for (int val_num = 0; val_num < values_count; ++val_num) { z_bytes_map_insert_by_copy(&map, z_bytes_from_str(K_VAR), z_bytes_from_str(values[val_num])); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get(z_loan(s), z_keyexpr(keyexpr), "", z_move(channel.send), &opts); z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { diff --git a/tests/z_int_queryable_test.c b/tests/z_int_queryable_test.c index 2999451c4..8a5b7f961 100644 --- a/tests/z_int_queryable_test.c +++ b/tests/z_int_queryable_test.c @@ -77,7 +77,7 @@ int run_get() { } for (int val_num = 0; val_num < values_count; ++val_num) { - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get_options_t opts = z_get_options_default(); z_get(z_loan(s), z_keyexpr(keyexpr), "", z_move(channel.send), &opts); z_owned_reply_t reply = z_reply_null();