diff --git a/examples/z_pull.c b/examples/z_pull.c index 6115b50ac..b17d069a9 100644 --- a/examples/z_pull.c +++ b/examples/z_pull.c @@ -64,30 +64,30 @@ int main(int argc, char **argv) { } printf("Pull functionality not implemented!\n"); - // @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new - // printf("Declaring Subscriber on '%s'...\n", keyexpr); - // z_owned_sample_channel_t channel = z_sample_channel_ring_new(size); - // z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); - // if (!z_check(sub)) { - // printf("Unable to declare subscriber.\n"); - // return -1; - // } + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_ring_channel_t channel = z_sample_channel_ring_new(size); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } - // printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); - // z_owned_sample_t sample = z_sample_null(); - // while (true) { - // for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { - // z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); - // printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, - // sample.payload.start); - // z_drop(z_move(keystr)); - // z_drop(z_move(sample)); - // } - // printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); - // z_sleep_ms(interval); - // } + printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); + z_owned_sample_t sample = z_sample_null(); + while (true) { + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); + printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, + sample.payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + } + printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); + z_sleep_ms(interval); + } - // z_undeclare_subscriber(z_move(sub)); + z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); z_close(z_move(s)); diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 0f0dbd176..617e99cb9 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,10 +35,12 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ - z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ @@ -67,10 +69,12 @@ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ - z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ - z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_null, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_null, \ z_owned_query_fifo_channel_t * : z_query_fifo_channel_null, \ z_owned_query_ring_channel_t * : z_query_ring_channel_null, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -173,7 +177,12 @@ template<> struct zenoh_drop_type { typedef void type; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef int8_t type; }; @@ -200,7 +209,12 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_sample_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_sample_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_query_fifo_channel_t* v) { z_query_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_query_ring_channel_t* v) { z_query_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_fifo_channel_t* v) { z_reply_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_ring_channel_t* v) { z_reply_ring_channel_drop(v); } template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); } template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); } template<> inline int8_t z_drop(ze_owned_publication_cache_t* v) { return ze_undeclare_publication_cache(v); } @@ -227,7 +241,12 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); } -inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); } +inline void z_null(z_owned_sample_fifo_channel_t& v) { v = z_sample_fifo_channel_null(); } +inline void z_null(z_owned_sample_ring_channel_t& v) { v = z_sample_ring_channel_null(); } +inline void z_null(z_owned_query_fifo_channel_t& v) { v = z_query_fifo_channel_null(); } +inline void z_null(z_owned_query_ring_channel_t& v) { v = z_query_ring_channel_null(); } +inline void z_null(z_owned_reply_fifo_channel_t& v) { v = z_reply_fifo_channel_null(); } +inline void z_null(z_owned_reply_ring_channel_t& v) { v = z_reply_ring_channel_null(); } inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); } inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); } inline void z_null(ze_owned_publication_cache_t& v) { v = ze_publication_cache_null(); } diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 93d959fba..6e4524d6c 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -23,12 +23,15 @@ mod reply_closure; pub use zenohid_closure::*; mod zenohid_closure; -pub use reply_channel::*; -mod reply_channel; +pub use sample_channel::*; +mod sample_channel; pub use query_channel::*; mod query_channel; +pub use reply_channel::*; +mod reply_channel; + pub use hello_closure::*; mod hello_closure; diff --git a/src/closures/sample_channel.rs b/src/closures/sample_channel.rs new file mode 100644 index 000000000..d63cddf30 --- /dev/null +++ b/src/closures/sample_channel.rs @@ -0,0 +1,132 @@ +use crate::{ + z_closure_owned_sample_drop, z_closure_sample_drop, z_owned_closure_owned_sample_t, + z_owned_closure_sample_t, z_owned_sample_t, z_sample_clone, z_sample_t, +}; + +#[repr(C)] +pub struct z_owned_sample_fifo_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_drop(channel: &mut z_owned_sample_fifo_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_null() -> z_owned_sample_fifo_channel_t { + z_owned_sample_fifo_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_new(bound: usize) -> z_owned_sample_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_fifo_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} + +#[repr(C)] +pub struct z_owned_sample_ring_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_drop(channel: &mut z_owned_sample_ring_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_null() -> z_owned_sample_ring_channel_t { + z_owned_sample_ring_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_new(bound: usize) -> z_owned_sample_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_ring_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/sample_closure.rs b/src/closures/sample_closure.rs index 6f0e55707..65cb97a49 100644 --- a/src/closures/sample_closure.rs +++ b/src/closures/sample_closure.rs @@ -1,4 +1,4 @@ -use crate::z_sample_t; +use crate::{z_owned_sample_t, z_sample_t}; use libc::c_void; /// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. /// @@ -77,3 +77,84 @@ impl From for z_owned_closure_sample_t { } } } + +/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: +/// +/// Members: +/// void *context: a pointer to an arbitrary state. +/// void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. +/// void *drop(void*): allows the callback's state to be freed. +/// +/// Closures are not guaranteed not to be called concurrently. +/// +/// It is guaranteed that: +/// +/// - `call` will never be called once `drop` has started. +/// - `drop` will only be called **once**, and **after every** `call` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +#[repr(C)] +pub struct z_owned_closure_owned_sample_t { + context: *mut c_void, + call: Option, + drop: Option, +} +impl z_owned_closure_owned_sample_t { + pub fn empty() -> Self { + z_owned_closure_owned_sample_t { + context: std::ptr::null_mut(), + call: None, + drop: None, + } + } +} +unsafe impl Send for z_owned_closure_owned_sample_t {} +unsafe impl Sync for z_owned_closure_owned_sample_t {} +impl Drop for z_owned_closure_owned_sample_t { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.context) + } + } +} +/// Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_null() -> z_owned_closure_owned_sample_t { + z_owned_closure_owned_sample_t::empty() +} +/// Calls the closure. Calling an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_call( + closure: &z_owned_closure_owned_sample_t, + sample: &mut z_owned_sample_t, +) { + match closure.call { + Some(call) => call(sample, closure.context), + None => log::error!("Attempted to call an uninitialized closure!"), + } +} +/// Drops the closure. Droping an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_drop(closure: &mut z_owned_closure_owned_sample_t) { + let mut empty_closure = z_owned_closure_owned_sample_t::empty(); + std::mem::swap(&mut empty_closure, closure); +} +impl From for z_owned_closure_owned_sample_t { + fn from(f: F) -> Self { + let this = Box::into_raw(Box::new(f)) as _; + extern "C" fn call( + sample: &mut z_owned_sample_t, + this: *mut c_void, + ) { + let this = unsafe { &*(this as *const F) }; + this(sample) + } + extern "C" fn drop(this: *mut c_void) { + std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) + } + z_owned_closure_owned_sample_t { + context: this, + call: Some(call::), + drop: Some(drop::), + } + } +}