Skip to content

Commit

Permalink
Rework reply fifo channel
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 11, 2024
1 parent 817ca5f commit 871b5e8
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 308 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int main(int argc, char **argv) {
}

printf("Sending Query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = z_bytes_from_str(value);
Expand All @@ -76,4 +76,4 @@ int main(int argc, char **argv) {
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
2 changes: 1 addition & 1 deletion examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int main(int argc, char **argv) {
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
Expand Down
29 changes: 15 additions & 14 deletions examples/z_non_blocking_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,27 @@ int main(int argc, char **argv) {
printf("Sending Query '%s'...\n", expr);
z_get_options_t opts = z_get_options_default();
opts.target = Z_QUERY_TARGET_ALL;
z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get(z_loan(s), keyexpr, "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply);
call_success = z_call(channel.recv, &reply)) {
if (!call_success) {
continue;
}
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
while (true) {
for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}
}
printf(">> Nothing to get... sleep for 5 s\n");
z_sleep_s(5);
}

z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
88 changes: 22 additions & 66 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,31 +778,14 @@ typedef struct z_query_reply_options_t {
struct z_encoding_t encoding;
struct z_attachment_t attachment;
} z_query_reply_options_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
* - `this` is a pointer to an arbitrary state.
* - `call` is the typical callback function. `this` will be passed as its last argument.
* - `drop` allows the callback's state to be freed.
*
* Closures are not guaranteed not to be called concurrently.
*
* We guarantee that:
* - `call` will never be called once `drop` has started.
* - `drop` will only be called ONCE, and AFTER EVERY `call` has ended.
* - The two previous guarantees imply that `call` and `drop` are never called concurrently.
*/
typedef struct z_owned_reply_channel_closure_t {
void *context;
bool (*call)(struct z_owned_reply_t*, void*);
void (*drop)(void*);
} z_owned_reply_channel_closure_t;
/**
* A pair of closures, the `send` one accepting
*/
typedef struct z_owned_reply_channel_t {
typedef struct z_owned_reply_fifo_channel_t {
struct z_owned_closure_reply_t send;
struct z_owned_reply_channel_closure_t recv;
} z_owned_reply_channel_t;
struct z_owned_closure_reply_t recv;
struct z_owned_closure_reply_t try_recv;
} z_owned_reply_fifo_channel_t;
typedef struct z_owned_scouting_config_t {
struct z_owned_config_t _config;
unsigned long zc_timeout_ms;
Expand Down Expand Up @@ -1913,25 +1896,6 @@ ZENOHC_API uint16_t z_random_u16(void);
ZENOHC_API uint32_t z_random_u32(void);
ZENOHC_API uint64_t z_random_u64(void);
ZENOHC_API uint8_t z_random_u8(void);
/**
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
bool z_reply_channel_closure_call(const struct z_owned_reply_channel_closure_t *closure,
struct z_owned_reply_t *sample);
/**
* Drops the closure. Droping an uninitialized closure is a no-op.
*/
ZENOHC_API void z_reply_channel_closure_drop(struct z_owned_reply_channel_closure_t *closure);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type
*/
ZENOHC_API struct z_owned_reply_channel_closure_t z_reply_channel_closure_null(void);
ZENOHC_API void z_reply_channel_drop(struct z_owned_reply_channel_t *channel);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type
*/
ZENOHC_API struct z_owned_reply_channel_t z_reply_channel_null(void);
/**
* Returns ``true`` if `reply_data` is valid.
*/
Expand All @@ -1947,6 +1911,24 @@ ZENOHC_API void z_reply_drop(struct z_owned_reply_t *reply_data);
*/
ZENOHC_API
struct z_value_t z_reply_err(const struct z_owned_reply_t *reply);
ZENOHC_API void z_reply_fifo_channel_drop(struct z_owned_reply_fifo_channel_t *channel);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_new(size_t bound);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type
*/
ZENOHC_API struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_null(void);
/**
* Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample.
*
Expand Down Expand Up @@ -2363,32 +2345,6 @@ struct z_owned_query_channel_t zc_query_fifo_new(size_t bound);
*/
ZENOHC_API
struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_fifo_new(size_t bound);
/**
* Creates a new non-blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_non_blocking_fifo_new(size_t bound);
/**
* Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies).
*/
Expand Down
12 changes: 2 additions & 10 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
z_owned_closure_hello_t * : z_closure_hello_drop, \
z_owned_closure_zid_t * : z_closure_zid_drop, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \
z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \
z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \
z_owned_reply_channel_t * : z_reply_channel_drop, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \
z_owned_query_channel_t * : z_query_channel_drop, \
z_owned_bytes_map_t * : z_bytes_map_drop, \
zc_owned_payload_t * : zc_payload_drop, \
Expand Down Expand Up @@ -67,8 +66,7 @@
z_owned_closure_hello_t * : z_closure_hello_null, \
z_owned_closure_zid_t * : z_closure_zid_null, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \
z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \
z_owned_reply_channel_t * : z_reply_channel_null, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \
z_owned_bytes_map_t * : z_bytes_map_null, \
z_attachment_t * : z_attachment_null, \
zc_owned_payload_t * : zc_payload_null, \
Expand Down Expand Up @@ -111,7 +109,6 @@
z_owned_closure_hello_t : z_closure_hello_call, \
z_owned_closure_zid_t : z_closure_zid_call, \
zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \
z_owned_reply_channel_closure_t : z_reply_channel_closure_call, \
z_owned_query_channel_closure_t : z_query_channel_closure_call \
) (&x, __VA_ARGS__)
// clang-format on
Expand Down Expand Up @@ -173,7 +170,6 @@ template<> struct zenoh_drop_type<z_owned_closure_reply_t> { typedef void type;
template<> struct zenoh_drop_type<z_owned_closure_hello_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_closure_zid_t> { typedef void type; };
template<> struct zenoh_drop_type<zcu_owned_closure_matching_status_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_channel_closure_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_bytes_map_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_liveliness_token_t> { typedef void type; };
Expand Down Expand Up @@ -201,7 +197,6 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop
template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); }
template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); }
template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); }
template<> inline void z_drop(z_owned_reply_channel_closure_t* v) { z_reply_channel_closure_drop(v); }
template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); }
template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); }
template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); }
Expand Down Expand Up @@ -229,7 +224,6 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); }
inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); }
inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); }
inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); }
inline void z_null(z_owned_reply_channel_closure_t& v) { v = z_reply_channel_closure_null(); }
inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); }
inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); }
inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); }
Expand Down Expand Up @@ -271,8 +265,6 @@ inline void z_call(const struct z_owned_closure_zid_t &closure, const struct z_i
{ z_closure_zid_call(&closure, zid); }
inline void z_call(const struct zcu_owned_closure_matching_status_t &closure, const struct zcu_matching_status_t *matching_status)
{ zcu_closure_matching_status_call(&closure, matching_status); }
inline bool z_call(const struct z_owned_reply_channel_closure_t &closure, struct z_owned_reply_t *sample)
{ return z_reply_channel_closure_call(&closure, sample); }
// clang-format on

#define _z_closure_overloader(callback, droper, ctx, ...) \
Expand Down
4 changes: 2 additions & 2 deletions src/closures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ mod reply_closure;
pub use zenohid_closure::*;
mod zenohid_closure;

pub use response_channel::*;
mod response_channel;
pub use reply_channel::*;
mod reply_channel;

pub use query_channel::*;
mod query_channel;
Expand Down
64 changes: 64 additions & 0 deletions src/closures/reply_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t};

/// A pair of closures, the `send` one accepting
#[repr(C)]
pub struct z_owned_reply_fifo_channel_t {
pub send: z_owned_closure_reply_t,
pub recv: z_owned_closure_reply_t,
pub try_recv: z_owned_closure_reply_t,
}
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_drop(channel: &mut z_owned_reply_fifo_channel_t) {
z_closure_reply_drop(&mut channel.send);
z_closure_reply_drop(&mut channel.recv);
z_closure_reply_drop(&mut channel.try_recv);
}
/// Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t {
z_owned_reply_fifo_channel_t {
send: z_owned_closure_reply_t::empty(),
recv: z_owned_closure_reply_t::empty(),
try_recv: z_owned_closure_reply_t::empty(),
}
}

/// Creates a new blocking fifo channel, returned as a pair of closures.
///
/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
///
/// The `send` end should be passed as callback to a `z_get` call.
///
/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
/// which it will then return; or until the `send` closure is dropped and all replies have been consumed,
/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t {
let (tx, rx) = if bound == 0 {
crossbeam_channel::unbounded()
} else {
crossbeam_channel::bounded(bound)
};
let rx_clone = rx.clone();
z_owned_reply_fifo_channel_t {
send: From::from(move |reply: &mut z_owned_reply_t| {
if let Some(reply) = reply.take() {
if let Err(e) = tx.send(reply) {
log::error!("Attempted to push onto a closed reply_fifo: {}", e)
}
}
}),
recv: From::from(move |receptacle: &mut z_owned_reply_t| {
*receptacle = match rx.recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
try_recv: From::from(move |receptacle: &mut z_owned_reply_t| {
*receptacle = match rx_clone.try_recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
}
}
Loading

0 comments on commit 871b5e8

Please sign in to comment.