Skip to content

Commit

Permalink
[skip ci] Add sample channel (require z_owned_sample_t)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 12, 2024
1 parent 467b791 commit e93f4e0
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 32 deletions.
44 changes: 22 additions & 22 deletions examples/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,30 @@ int main(int argc, char **argv) {
}

printf("Pull functionality not implemented!\n");
// @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new
// printf("Declaring Subscriber on '%s'...\n", keyexpr);
// z_owned_sample_channel_t channel = z_sample_channel_ring_new(size);
// z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return -1;
// }
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_ring_channel_t channel = z_sample_channel_ring_new(size);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
return -1;
}

// printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
// z_owned_sample_t sample = z_sample_null();
// while (true) {
// for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
// z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
// printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
// sample.payload.start);
// z_drop(z_move(keystr));
// z_drop(z_move(sample));
// }
// printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval);
// z_sleep_ms(interval);
// }
printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
z_owned_sample_t sample = z_sample_null();
while (true) {
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
sample.payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
}
printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval);
z_sleep_ms(interval);
}

// z_undeclare_subscriber(z_move(sub));
z_undeclare_subscriber(z_move(sub));
z_drop(z_move(channel));

z_close(z_move(s));

Expand Down
33 changes: 26 additions & 7 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
z_owned_closure_hello_t * : z_closure_hello_drop, \
z_owned_closure_zid_t * : z_closure_zid_drop, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \
z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \
z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \
z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \
z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \
z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \
z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \
z_owned_bytes_map_t * : z_bytes_map_drop, \
zc_owned_payload_t * : zc_payload_drop, \
zc_owned_shmbuf_t * : zc_shmbuf_drop, \
Expand Down Expand Up @@ -67,10 +69,12 @@
z_owned_closure_hello_t * : z_closure_hello_null, \
z_owned_closure_zid_t * : z_closure_zid_null, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \
z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \
z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_null, \
z_owned_sample_ring_channel_t * : z_sample_ring_channel_null, \
z_owned_query_fifo_channel_t * : z_query_fifo_channel_null, \
z_owned_query_ring_channel_t * : z_query_ring_channel_null, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \
z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \
z_owned_bytes_map_t * : z_bytes_map_null, \
z_attachment_t * : z_attachment_null, \
zc_owned_payload_t * : zc_payload_null, \
Expand Down Expand Up @@ -173,7 +177,12 @@ template<> struct zenoh_drop_type<z_owned_closure_reply_t> { typedef void type;
template<> struct zenoh_drop_type<z_owned_closure_hello_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_closure_zid_t> { typedef void type; };
template<> struct zenoh_drop_type<zcu_owned_closure_matching_status_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_sample_fifo_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_sample_ring_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_query_fifo_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_query_ring_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_fifo_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_ring_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_bytes_map_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_liveliness_token_t> { typedef void type; };
template<> struct zenoh_drop_type<ze_owned_publication_cache_t> { typedef int8_t type; };
Expand All @@ -200,7 +209,12 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop
template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); }
template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); }
template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); }
template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); }
template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_sample_fifo_channel_drop(v); }
template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_sample_ring_channel_drop(v); }
template<> inline void z_drop(z_owned_query_fifo_channel_t* v) { z_query_fifo_channel_drop(v); }
template<> inline void z_drop(z_owned_query_ring_channel_t* v) { z_query_ring_channel_drop(v); }
template<> inline void z_drop(z_owned_reply_fifo_channel_t* v) { z_reply_fifo_channel_drop(v); }
template<> inline void z_drop(z_owned_reply_ring_channel_t* v) { z_reply_ring_channel_drop(v); }
template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); }
template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); }
template<> inline int8_t z_drop(ze_owned_publication_cache_t* v) { return ze_undeclare_publication_cache(v); }
Expand All @@ -227,7 +241,12 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); }
inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); }
inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); }
inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); }
inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); }
inline void z_null(z_owned_sample_fifo_channel_t& v) { v = z_sample_fifo_channel_null(); }
inline void z_null(z_owned_sample_ring_channel_t& v) { v = z_sample_ring_channel_null(); }
inline void z_null(z_owned_query_fifo_channel_t& v) { v = z_query_fifo_channel_null(); }
inline void z_null(z_owned_query_ring_channel_t& v) { v = z_query_ring_channel_null(); }
inline void z_null(z_owned_reply_fifo_channel_t& v) { v = z_reply_fifo_channel_null(); }
inline void z_null(z_owned_reply_ring_channel_t& v) { v = z_reply_ring_channel_null(); }
inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); }
inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); }
inline void z_null(ze_owned_publication_cache_t& v) { v = ze_publication_cache_null(); }
Expand Down
7 changes: 5 additions & 2 deletions src/closures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ mod reply_closure;
pub use zenohid_closure::*;
mod zenohid_closure;

pub use reply_channel::*;
mod reply_channel;
pub use sample_channel::*;
mod sample_channel;

pub use query_channel::*;
mod query_channel;

pub use reply_channel::*;
mod reply_channel;

pub use hello_closure::*;
mod hello_closure;

Expand Down
132 changes: 132 additions & 0 deletions src/closures/sample_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::{
z_closure_owned_sample_drop, z_closure_sample_drop, z_owned_closure_owned_sample_t,
z_owned_closure_sample_t, z_owned_sample_t, z_sample_clone, z_sample_t,
};

#[repr(C)]
pub struct z_owned_sample_fifo_channel_t {
pub send: z_owned_closure_sample_t,
pub recv: z_owned_closure_owned_sample_t,
pub try_recv: z_owned_closure_owned_sample_t,
}
#[no_mangle]
pub extern "C" fn z_sample_fifo_channel_drop(channel: &mut z_owned_sample_fifo_channel_t) {
z_closure_sample_drop(&mut channel.send);
z_closure_owned_sample_drop(&mut channel.recv);
z_closure_owned_sample_drop(&mut channel.try_recv);
}
/// Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type
#[no_mangle]
pub extern "C" fn z_sample_fifo_channel_null() -> z_owned_sample_fifo_channel_t {
z_owned_sample_fifo_channel_t {
send: z_owned_closure_sample_t::empty(),
recv: z_owned_closure_owned_sample_t::empty(),
try_recv: z_owned_closure_owned_sample_t::empty(),
}
}

/// Creates a new blocking fifo channel, returned as a pair of closures.
///
/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
///
/// The `send` end should be passed as callback to a `z_get` call.
///
/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available,
/// which it will then return; or until the `send` closure is dropped and all replies have been consumed,
/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls.
#[no_mangle]
pub extern "C" fn z_sample_fifo_channel_new(bound: usize) -> z_owned_sample_fifo_channel_t {
// TODO(sashacmc): switch to handlers::FifoChannel
let (tx, rx) = if bound == 0 {
crossbeam_channel::unbounded()
} else {
crossbeam_channel::bounded(bound)
};
let rx_clone = rx.clone();
z_owned_sample_fifo_channel_t {
send: From::from(move |sample: &z_sample_t| {
let mut osample = z_sample_clone(Some(sample));
if let Some(osample) = osample.take() {
if let Err(e) = tx.send(osample) {
log::error!("Attempted to push onto a closed sample_ring: {}", e)
}
}
}),
recv: From::from(move |receptacle: &mut z_owned_sample_t| {
*receptacle = match rx.recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
try_recv: From::from(move |receptacle: &mut z_owned_sample_t| {
*receptacle = match rx_clone.try_recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
}
}

#[repr(C)]
pub struct z_owned_sample_ring_channel_t {
pub send: z_owned_closure_sample_t,
pub recv: z_owned_closure_owned_sample_t,
pub try_recv: z_owned_closure_owned_sample_t,
}
#[no_mangle]
pub extern "C" fn z_sample_ring_channel_drop(channel: &mut z_owned_sample_ring_channel_t) {
z_closure_sample_drop(&mut channel.send);
z_closure_owned_sample_drop(&mut channel.recv);
z_closure_owned_sample_drop(&mut channel.try_recv);
}
/// Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type
#[no_mangle]
pub extern "C" fn z_sample_ring_channel_null() -> z_owned_sample_ring_channel_t {
z_owned_sample_ring_channel_t {
send: z_owned_closure_sample_t::empty(),
recv: z_owned_closure_owned_sample_t::empty(),
try_recv: z_owned_closure_owned_sample_t::empty(),
}
}

/// Creates a new blocking ring channel, returned as a pair of closures.
///
/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
///
/// The `send` end should be passed as callback to a `z_get` call.
///
/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available,
/// which it will then return; or until the `send` closure is dropped and all replies have been consumed,
/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls.
#[no_mangle]
pub extern "C" fn z_sample_ring_channel_new(bound: usize) -> z_owned_sample_ring_channel_t {
// TODO(sashacmc): switch to handlers::RingChannel
let (tx, rx) = if bound == 0 {
crossbeam_channel::unbounded()
} else {
crossbeam_channel::bounded(bound)
};
let rx_clone = rx.clone();
z_owned_sample_ring_channel_t {
send: From::from(move |sample: &z_sample_t| {
let mut osample = z_sample_clone(Some(sample));
if let Some(osample) = osample.take() {
if let Err(e) = tx.send(osample) {
log::error!("Attempted to push onto a closed sample_ring: {}", e)
}
}
}),
recv: From::from(move |receptacle: &mut z_owned_sample_t| {
*receptacle = match rx.recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
try_recv: From::from(move |receptacle: &mut z_owned_sample_t| {
*receptacle = match rx_clone.try_recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
}
}
83 changes: 82 additions & 1 deletion src/closures/sample_closure.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::z_sample_t;
use crate::{z_owned_sample_t, z_sample_t};
use libc::c_void;
/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks.
///
Expand Down Expand Up @@ -77,3 +77,84 @@ impl<F: Fn(&z_sample_t)> From<F> for z_owned_closure_sample_t {
}
}
}

/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
///
/// Members:
/// void *context: a pointer to an arbitrary state.
/// void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument.
/// void *drop(void*): allows the callback's state to be freed.
///
/// Closures are not guaranteed not to be called concurrently.
///
/// It is guaranteed that:
///
/// - `call` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `call` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
#[repr(C)]
pub struct z_owned_closure_owned_sample_t {
context: *mut c_void,
call: Option<extern "C" fn(&mut z_owned_sample_t, context: *mut c_void)>,
drop: Option<extern "C" fn(*mut c_void)>,
}
impl z_owned_closure_owned_sample_t {
pub fn empty() -> Self {
z_owned_closure_owned_sample_t {
context: std::ptr::null_mut(),
call: None,
drop: None,
}
}
}
unsafe impl Send for z_owned_closure_owned_sample_t {}
unsafe impl Sync for z_owned_closure_owned_sample_t {}
impl Drop for z_owned_closure_owned_sample_t {
fn drop(&mut self) {
if let Some(drop) = self.drop {
drop(self.context)
}
}
}
/// Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type
#[no_mangle]
pub extern "C" fn z_closure_owned_sample_null() -> z_owned_closure_owned_sample_t {
z_owned_closure_owned_sample_t::empty()
}
/// Calls the closure. Calling an uninitialized closure is a no-op.
#[no_mangle]
pub extern "C" fn z_closure_owned_sample_call(
closure: &z_owned_closure_owned_sample_t,
sample: &mut z_owned_sample_t,
) {
match closure.call {
Some(call) => call(sample, closure.context),
None => log::error!("Attempted to call an uninitialized closure!"),
}
}
/// Drops the closure. Droping an uninitialized closure is a no-op.
#[no_mangle]
pub extern "C" fn z_closure_owned_sample_drop(closure: &mut z_owned_closure_owned_sample_t) {
let mut empty_closure = z_owned_closure_owned_sample_t::empty();
std::mem::swap(&mut empty_closure, closure);
}
impl<F: Fn(&mut z_owned_sample_t)> From<F> for z_owned_closure_owned_sample_t {
fn from(f: F) -> Self {
let this = Box::into_raw(Box::new(f)) as _;
extern "C" fn call<F: Fn(&mut z_owned_sample_t)>(
sample: &mut z_owned_sample_t,
this: *mut c_void,
) {
let this = unsafe { &*(this as *const F) };
this(sample)
}
extern "C" fn drop<F>(this: *mut c_void) {
std::mem::drop(unsafe { Box::from_raw(this as *mut F) })
}
z_owned_closure_owned_sample_t {
context: this,
call: Some(call::<F>),
drop: Some(drop::<F>),
}
}
}

0 comments on commit e93f4e0

Please sign in to comment.