From 871b5e83335ecd61702d6f0dd62c08b5d65bcced Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 11 Apr 2024 21:36:45 +0200 Subject: [PATCH] Rework reply fifo channel --- Cargo.toml | 1 + Cargo.toml.in | 1 + examples/z_get.c | 4 +- examples/z_get_liveliness.c | 2 +- examples/z_non_blocking_get.c | 29 +++-- include/zenoh_commons.h | 88 ++++--------- include/zenoh_macros.h | 12 +- src/closures/mod.rs | 4 +- src/closures/reply_channel.rs | 64 ++++++++++ src/closures/response_channel.rs | 213 ------------------------------- 10 files changed, 110 insertions(+), 308 deletions(-) create mode 100644 src/closures/reply_channel.rs delete mode 100644 src/closures/response_channel.rs diff --git a/Cargo.toml b/Cargo.toml index 2735c3f86..288cf6823 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/Cargo.toml.in b/Cargo.toml.in index f434df3a1..818752a39 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/examples/z_get.c b/examples/z_get.c index 272bef89a..a47d23def 100644 --- a/examples/z_get.c +++ b/examples/z_get.c @@ -54,7 +54,7 @@ int main(int argc, char **argv) { } printf("Sending Query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get_options_t opts = z_get_options_default(); if (value != NULL) { opts.value.payload = z_bytes_from_str(value); @@ -76,4 +76,4 @@ int main(int argc, char **argv) { z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/examples/z_get_liveliness.c b/examples/z_get_liveliness.c index c667cb03a..ba8893ab1 100644 --- a/examples/z_get_liveliness.c +++ b/examples/z_get_liveliness.c @@ -47,7 +47,7 @@ int main(int argc, char **argv) { } printf("Sending liveliness query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL); z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { diff --git a/examples/z_non_blocking_get.c b/examples/z_non_blocking_get.c index c0b02a274..43df578d5 100644 --- a/examples/z_non_blocking_get.c +++ b/examples/z_non_blocking_get.c @@ -47,26 +47,27 @@ int main(int argc, char **argv) { printf("Sending Query '%s'...\n", expr); z_get_options_t opts = z_get_options_default(); opts.target = Z_QUERY_TARGET_ALL; - z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get(z_loan(s), keyexpr, "", z_move(channel.send), &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); - z_drop(z_move(keystr)); - } else { - printf("Received an error\n"); + while (true) { + for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } } + printf(">> Nothing to get... sleep for 5 s\n"); + z_sleep_s(5); } + z_drop(z_move(reply)); z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 067170fd4..68eb424b6 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -778,31 +778,14 @@ typedef struct z_query_reply_options_t { struct z_encoding_t encoding; struct z_attachment_t attachment; } z_query_reply_options_t; -/** - * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: - * - `this` is a pointer to an arbitrary state. - * - `call` is the typical callback function. `this` will be passed as its last argument. - * - `drop` allows the callback's state to be freed. - * - * Closures are not guaranteed not to be called concurrently. - * - * We guarantee that: - * - `call` will never be called once `drop` has started. - * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. - * - The two previous guarantees imply that `call` and `drop` are never called concurrently. - */ -typedef struct z_owned_reply_channel_closure_t { - void *context; - bool (*call)(struct z_owned_reply_t*, void*); - void (*drop)(void*); -} z_owned_reply_channel_closure_t; /** * A pair of closures, the `send` one accepting */ -typedef struct z_owned_reply_channel_t { +typedef struct z_owned_reply_fifo_channel_t { struct z_owned_closure_reply_t send; - struct z_owned_reply_channel_closure_t recv; -} z_owned_reply_channel_t; + struct z_owned_closure_reply_t recv; + struct z_owned_closure_reply_t try_recv; +} z_owned_reply_fifo_channel_t; typedef struct z_owned_scouting_config_t { struct z_owned_config_t _config; unsigned long zc_timeout_ms; @@ -1913,25 +1896,6 @@ ZENOHC_API uint16_t z_random_u16(void); ZENOHC_API uint32_t z_random_u32(void); ZENOHC_API uint64_t z_random_u64(void); ZENOHC_API uint8_t z_random_u8(void); -/** - * Calls the closure. Calling an uninitialized closure is a no-op. - */ -ZENOHC_API -bool z_reply_channel_closure_call(const struct z_owned_reply_channel_closure_t *closure, - struct z_owned_reply_t *sample); -/** - * Drops the closure. Droping an uninitialized closure is a no-op. - */ -ZENOHC_API void z_reply_channel_closure_drop(struct z_owned_reply_channel_closure_t *closure); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type - */ -ZENOHC_API struct z_owned_reply_channel_closure_t z_reply_channel_closure_null(void); -ZENOHC_API void z_reply_channel_drop(struct z_owned_reply_channel_t *channel); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type - */ -ZENOHC_API struct z_owned_reply_channel_t z_reply_channel_null(void); /** * Returns ``true`` if `reply_data` is valid. */ @@ -1947,6 +1911,24 @@ ZENOHC_API void z_reply_drop(struct z_owned_reply_t *reply_data); */ ZENOHC_API struct z_value_t z_reply_err(const struct z_owned_reply_t *reply); +ZENOHC_API void z_reply_fifo_channel_drop(struct z_owned_reply_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_null(void); /** * Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample. * @@ -2363,32 +2345,6 @@ struct z_owned_query_channel_t zc_query_fifo_new(size_t bound); */ ZENOHC_API struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound); -/** - * Creates a new blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_fifo_new(size_t bound); -/** - * Creates a new non-blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_non_blocking_fifo_new(size_t bound); /** * Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 57a691d7a..3262d35f6 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,9 +35,8 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ z_owned_query_channel_t * : z_query_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ @@ -67,8 +66,7 @@ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \ - z_owned_reply_channel_t * : z_reply_channel_null, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -111,7 +109,6 @@ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \ - z_owned_reply_channel_closure_t : z_reply_channel_closure_call, \ z_owned_query_channel_closure_t : z_query_channel_closure_call \ ) (&x, __VA_ARGS__) // clang-format on @@ -173,7 +170,6 @@ template<> struct zenoh_drop_type { typedef void type; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; @@ -201,7 +197,6 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_closure_t* v) { z_reply_channel_closure_drop(v); } template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); } template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); } template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); } @@ -229,7 +224,6 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); } -inline void z_null(z_owned_reply_channel_closure_t& v) { v = z_reply_channel_closure_null(); } inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); } inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); } inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); } @@ -271,8 +265,6 @@ inline void z_call(const struct z_owned_closure_zid_t &closure, const struct z_i { z_closure_zid_call(&closure, zid); } inline void z_call(const struct zcu_owned_closure_matching_status_t &closure, const struct zcu_matching_status_t *matching_status) { zcu_closure_matching_status_call(&closure, matching_status); } -inline bool z_call(const struct z_owned_reply_channel_closure_t &closure, struct z_owned_reply_t *sample) - { return z_reply_channel_closure_call(&closure, sample); } // clang-format on #define _z_closure_overloader(callback, droper, ctx, ...) \ diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 74b2748aa..93d959fba 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -23,8 +23,8 @@ mod reply_closure; pub use zenohid_closure::*; mod zenohid_closure; -pub use response_channel::*; -mod response_channel; +pub use reply_channel::*; +mod reply_channel; pub use query_channel::*; mod query_channel; diff --git a/src/closures/reply_channel.rs b/src/closures/reply_channel.rs new file mode 100644 index 000000000..7e656d57c --- /dev/null +++ b/src/closures/reply_channel.rs @@ -0,0 +1,64 @@ +use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; + +/// A pair of closures, the `send` one accepting +#[repr(C)] +pub struct z_owned_reply_fifo_channel_t { + pub send: z_owned_closure_reply_t, + pub recv: z_owned_closure_reply_t, + pub try_recv: z_owned_closure_reply_t, +} +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_drop(channel: &mut z_owned_reply_fifo_channel_t) { + z_closure_reply_drop(&mut channel.send); + z_closure_reply_drop(&mut channel.recv); + z_closure_reply_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t { + z_owned_reply_fifo_channel_t { + send: z_owned_closure_reply_t::empty(), + recv: z_owned_closure_reply_t::empty(), + try_recv: z_owned_closure_reply_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t { + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_reply_fifo_channel_t { + send: From::from(move |reply: &mut z_owned_reply_t| { + if let Some(reply) = reply.take() { + if let Err(e) = tx.send(reply) { + log::error!("Attempted to push onto a closed reply_fifo: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/response_channel.rs b/src/closures/response_channel.rs deleted file mode 100644 index 0ea046d5d..000000000 --- a/src/closures/response_channel.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; -use libc::c_void; -use std::sync::mpsc::TryRecvError; -/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: -/// - `this` is a pointer to an arbitrary state. -/// - `call` is the typical callback function. `this` will be passed as its last argument. -/// - `drop` allows the callback's state to be freed. -/// -/// Closures are not guaranteed not to be called concurrently. -/// -/// We guarantee that: -/// - `call` will never be called once `drop` has started. -/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. -#[repr(C)] -pub struct z_owned_reply_channel_closure_t { - context: *mut c_void, - call: Option bool>, - drop: Option, -} - -/// A pair of closures, the `send` one accepting -#[repr(C)] -pub struct z_owned_reply_channel_t { - pub send: z_owned_closure_reply_t, - pub recv: z_owned_reply_channel_closure_t, -} -#[no_mangle] -pub extern "C" fn z_reply_channel_drop(channel: &mut z_owned_reply_channel_t) { - z_closure_reply_drop(&mut channel.send); - z_reply_channel_closure_drop(&mut channel.recv); -} -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_null() -> z_owned_reply_channel_t { - z_owned_reply_channel_t { - send: z_owned_closure_reply_t::empty(), - recv: z_owned_reply_channel_closure_t::empty(), - } -} - -/// Creates a new blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - z_owned_reply_channel_t { - send, - recv: From::from(move |receptacle: &mut z_owned_reply_t| { - *receptacle = match rx.recv() { - Ok(val) => val.into(), - Err(_) => None.into(), - }; - true - }), - } -} - -/// Creates a new non-blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_non_blocking_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - - z_owned_reply_channel_t { - send, - recv: From::from( - move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_reply_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true - } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } -} - -impl z_owned_reply_channel_closure_t { - pub fn empty() -> Self { - z_owned_reply_channel_closure_t { - context: std::ptr::null_mut(), - call: None, - drop: None, - } - } -} -unsafe impl Send for z_owned_reply_channel_closure_t {} -unsafe impl Sync for z_owned_reply_channel_closure_t {} -impl Drop for z_owned_reply_channel_closure_t { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.context) - } - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_null() -> z_owned_reply_channel_closure_t { - z_owned_reply_channel_closure_t::empty() -} - -/// Calls the closure. Calling an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_call( - closure: &z_owned_reply_channel_closure_t, - sample: &mut z_owned_reply_t, -) -> bool { - match closure.call { - Some(call) => call(sample, closure.context), - None => { - log::error!("Attempted to call an uninitialized closure!"); - true - } - } -} -/// Drops the closure. Droping an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_drop(closure: &mut z_owned_reply_channel_closure_t) { - let mut empty_closure = z_owned_reply_channel_closure_t::empty(); - std::mem::swap(&mut empty_closure, closure); -} -impl bool> From for z_owned_reply_channel_closure_t { - fn from(f: F) -> Self { - let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call bool>( - response: &mut z_owned_reply_t, - this: *mut c_void, - ) -> bool { - let this = unsafe { &*(this as *const F) }; - this(response) - } - extern "C" fn drop(this: *mut c_void) { - std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) - } - z_owned_reply_channel_closure_t { - context: this, - call: Some(call::), - drop: Some(drop::), - } - } -}