Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: owned queries now supported, allowing responses out of callback #213

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ serde_yaml = "0.9.19"

[lib]
path="src/lib.rs"
name = "zenohc"
name = "zenohcd"
JEnoch marked this conversation as resolved.
Show resolved Hide resolved
crate-type = ["cdylib", "staticlib"]
doctest = false

Expand Down
39 changes: 39 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,19 @@ typedef struct z_put_options_t {
enum z_congestion_control_t congestion_control;
enum z_priority_t priority;
} z_put_options_t;
/**
* Owned variant of a Query received by a Queryable.
*
* You may construct it by `z_query_clone`-ing a loaned query.
* When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
* the query will receive its termination signal.
*
* Holding onto an `z_owned_query_t` for too long (10s by default) will trigger a timeout error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the way to change this 10s default timeout on Queryable side ?

* to be sent to the querier, and responding to the query will no longer work.
*/
typedef struct z_owned_query_t {
void *_0;
} z_owned_query_t;
/**
* Represents the set of options that can be applied to a query reply,
* sent via :c:func:`z_query_reply`.
Expand Down Expand Up @@ -1414,6 +1427,14 @@ int8_t z_put(struct z_session_t session,
* Constructs the default value for :c:type:`z_put_options_t`.
*/
ZENOHC_API struct z_put_options_t z_put_options_default(void);
/**
* Returns `false` if `this` is in a gravestone state, `true` otherwise.
JEnoch marked this conversation as resolved.
Show resolved Hide resolved
*
* This function may not be called with the null pointer, but can be called with the gravestone value.
*/
ZENOHC_API
bool z_query_check(const struct z_owned_query_t *this_);
ZENOHC_API struct z_owned_query_t z_query_clone(const struct z_query_t *query);
/**
* Automatic query consolidation strategy selection.
*
Expand Down Expand Up @@ -1441,10 +1462,28 @@ ZENOHC_API struct z_query_consolidation_t z_query_consolidation_monotonic(void);
* Disable consolidation.
*/
ZENOHC_API struct z_query_consolidation_t z_query_consolidation_none(void);
/**
* Destroys the query, setting `this` to its gravestone value to prevent double-frees.
*
* This function may not be called with the null pointer, but can be called with the gravestone value.
*/
ZENOHC_API
void z_query_drop(struct z_owned_query_t *this_);
/**
* Get a query's key by aliasing it.
*/
ZENOHC_API struct z_keyexpr_t z_query_keyexpr(const struct z_query_t *query);
/**
* Aliases the query.
*
* This function may not be called with the null pointer, but can be called with the gravestone value.
*/
ZENOHC_API
struct z_query_t z_query_loan(const struct z_owned_query_t *this_);
/**
* The gravestone value of `z_owned_query_t`.
*/
ZENOHC_API struct z_owned_query_t z_query_null(void);
/**
* Get a query's `value selector <https://github.com/eclipse-zenoh/roadmap/tree/main/rfcs/ALL/Selectors>`_ by aliasing it.
*/
Expand Down
5 changes: 4 additions & 1 deletion include/zenoh_concrete.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ typedef struct z_owned_session_t {
size_t _0;
} z_owned_session_t;
/**
* Structs received by a Queryable.
* Loaned variant of a Query received by a Queryable.
*
* Queries are atomically reference-counted, letting you extract them from the callback that handed them to you by cloning.
* `z_query_t`'s are valid as long as at least one corresponding `z_owned_query_t` exists, including the one owned by Zenoh until the callback returns.
*/
typedef struct z_query_t {
void *_0;
Expand Down
9 changes: 9 additions & 0 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
z_owned_encoding_t : z_encoding_loan, \
z_owned_hello_t : z_hello_loan, \
z_owned_str_t : z_str_loan, \
z_owned_query_t : z_query_loan, \
ze_owned_querying_subscriber_t : ze_querying_subscriber_loan \
)(&x)

Expand All @@ -29,6 +30,7 @@
z_owned_reply_t * : z_reply_drop, \
z_owned_hello_t * : z_hello_drop, \
z_owned_str_t * : z_str_drop, \
z_owned_query_t * : z_query_drop, \
z_owned_closure_sample_t * : z_closure_sample_drop, \
z_owned_closure_query_t * : z_closure_query_drop, \
z_owned_closure_reply_t * : z_closure_reply_drop, \
Expand Down Expand Up @@ -57,6 +59,7 @@
z_owned_reply_t * : z_reply_null, \
z_owned_hello_t * : z_hello_null, \
z_owned_str_t * : z_str_null, \
z_owned_query_t * : z_query_null, \
z_owned_closure_sample_t * : z_closure_sample_null, \
z_owned_closure_query_t * : z_closure_query_null, \
z_owned_closure_reply_t * : z_closure_reply_null, \
Expand Down Expand Up @@ -85,6 +88,7 @@
z_owned_encoding_t : z_encoding_check, \
z_owned_reply_t : z_reply_check, \
z_owned_hello_t : z_hello_check, \
z_owned_query_t : z_query_check, \
z_owned_str_t : z_str_check, \
zc_owned_payload_t : zc_payload_check, \
zc_owned_shmbuf_t : zc_shmbuf_check, \
Expand Down Expand Up @@ -134,6 +138,7 @@ template<> inline z_subscriber_t z_loan(const z_owned_subscriber_t& x) { return
template<> inline z_pull_subscriber_t z_loan(const z_owned_pull_subscriber_t& x) { return z_pull_subscriber_loan(&x); }
template<> inline z_encoding_t z_loan(const z_owned_encoding_t& x) { return z_encoding_loan(&x); }
template<> inline z_hello_t z_loan(const z_owned_hello_t& x) { return z_hello_loan(&x); }
template<> inline z_query_t z_loan(const z_owned_query_t& x) { return z_query_loan(&x); }
template<> inline const char* z_loan(const z_owned_str_t& x) { return z_str_loan(&x); }
template<> inline ze_querying_subscriber_t z_loan(const ze_owned_querying_subscriber_t& x) { return ze_querying_subscriber_loan(&x); }

Expand All @@ -151,6 +156,7 @@ template<> struct zenoh_drop_type<z_owned_queryable_t> { typedef int8_t type; };
template<> struct zenoh_drop_type<z_owned_encoding_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_hello_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_query_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_str_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_payload_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_shmbuf_t> { typedef void type; };
Expand All @@ -177,6 +183,7 @@ template<> inline int8_t z_drop(z_owned_queryable_t* v) { return z_undeclare_que
template<> inline void z_drop(z_owned_encoding_t* v) { z_encoding_drop(v); }
template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); }
template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); }
template<> inline void z_drop(z_owned_query_t* v) { z_query_drop(v); }
template<> inline void z_drop(z_owned_str_t* v) { z_str_drop(v); }
template<> inline void z_drop(zc_owned_payload_t* v) { zc_payload_drop(v); }
template<> inline void z_drop(zc_owned_shmbuf_t* v) { zc_shmbuf_drop(v); }
Expand All @@ -203,6 +210,7 @@ inline void z_null(z_owned_queryable_t& v) { v = z_queryable_null(); }
inline void z_null(z_owned_encoding_t& v) { v = z_encoding_null(); }
inline void z_null(z_owned_reply_t& v) { v = z_reply_null(); }
inline void z_null(z_owned_hello_t& v) { v = z_hello_null(); }
inline void z_null(z_owned_query_t& v) { v = z_query_null(); }
inline void z_null(z_owned_str_t& v) { v = z_str_null(); }
inline void z_null(zc_owned_payload_t& v) { v = zc_payload_null(); }
inline void z_null(zc_owned_shmbuf_t& v) { v = zc_shmbuf_null(); }
Expand Down Expand Up @@ -234,6 +242,7 @@ inline bool z_check(const z_owned_queryable_t& v) { return z_queryable_check(&v)
inline bool z_check(const z_owned_encoding_t& v) { return z_encoding_check(&v); }
inline bool z_check(const z_owned_reply_t& v) { return z_reply_check(&v); }
inline bool z_check(const z_owned_hello_t& v) { return z_hello_check(&v); }
inline bool z_check(const z_owned_query_t& v) { return z_query_check(&v); }
inline bool z_check(const z_owned_str_t& v) { return z_str_check(&v); }
inline bool z_check(const zc_owned_liveliness_token_t& v) { return zc_liveliness_token_check(&v); }
inline bool z_check(const ze_owned_publication_cache_t& v) { return ze_publication_cache_check(&v); }
Expand Down
76 changes: 72 additions & 4 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
LOG_INVALID_SESSION,
};
use libc::c_void;
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use zenoh::prelude::SessionDeclarations;
use zenoh::{
prelude::{Sample, SplitBuffer},
Expand Down Expand Up @@ -76,16 +76,74 @@ pub extern "C" fn z_queryable_null() -> z_owned_queryable_t {
z_owned_queryable_t::null()
}

/// Structs received by a Queryable.
/// Loaned variant of a Query received by a Queryable.
///
/// Queries are atomically reference-counted, letting you extract them from the callback that handed them to you by cloning.
/// `z_query_t`'s are valid as long as at least one corresponding `z_owned_query_t` exists, including the one owned by Zenoh until the callback returns.
#[allow(non_camel_case_types)]
#[repr(C)]
pub struct z_query_t(*mut c_void);
/// Owned variant of a Query received by a Queryable.
///
/// You may construct it by `z_query_clone`-ing a loaned query.
/// When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
/// the query will receive its termination signal.
///
/// Holding onto an `z_owned_query_t` for too long (10s by default) will trigger a timeout error
/// to be sent to the querier, and responding to the query will no longer work.
#[allow(non_camel_case_types)]
#[repr(C)]
pub struct z_owned_query_t(*mut c_void);
impl Deref for z_query_t {
type Target = Query;
type Target = Option<Query>;
fn deref(&self) -> &Self::Target {
unsafe { &*(self.0 as *const _) }
}
}
impl Deref for z_owned_query_t {
type Target = Option<Query>;
fn deref(&self) -> &Self::Target {
unsafe { &*(self.0 as *const _) }
}
}
impl DerefMut for z_owned_query_t {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *(self.0 as *mut _) }
}
}
/// The gravestone value of `z_owned_query_t`.
#[no_mangle]
pub extern "C" fn z_query_null() -> z_owned_query_t {
unsafe { core::mem::transmute(None::<Query>) }
}
/// Returns `false` if `this` is in a gravestone state, `true` otherwise.
///
/// This function may not be called with the null pointer, but can be called with the gravestone value.
#[no_mangle]
pub extern "C" fn z_query_check(this: &z_owned_query_t) -> bool {
this.is_some()
}
/// Aliases the query.
///
/// This function may not be called with the null pointer, but can be called with the gravestone value.
#[no_mangle]
pub extern "C" fn z_query_loan(this: &z_owned_query_t) -> z_query_t {
unsafe { core::mem::transmute_copy(this) }
}
/// Destroys the query, setting `this` to its gravestone value to prevent double-frees.
///
/// This function may not be called with the null pointer, but can be called with the gravestone value.
#[no_mangle]
pub extern "C" fn z_query_drop(this: &mut z_owned_query_t) {
let _: Option<Query> = this.take();
}
#[no_mangle]
pub extern "C" fn z_query_clone(query: Option<&z_query_t>) -> z_owned_query_t {
match query {
Some(query) => unsafe { core::mem::transmute(query.as_ref().map(|q| q.clone())) },
None => z_query_null(),
}
}

/// Options passed to the :c:func:`z_declare_queryable` function.
///
Expand Down Expand Up @@ -210,6 +268,10 @@ pub unsafe extern "C" fn z_query_reply(
len: usize,
options: Option<&z_query_reply_options_t>,
) -> i8 {
let Some(query) = query.as_ref() else {
log::error!("Called `z_query_reply` with invalidated `query`");
return i8::MIN;
};
if let Some(key) = &*key {
let mut s = Sample::new(
key.clone().into_owned(),
Expand All @@ -232,13 +294,19 @@ pub unsafe extern "C" fn z_query_reply(
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
pub extern "C" fn z_query_keyexpr(query: &z_query_t) -> z_keyexpr_t {
let Some(query) = query.as_ref() else {
return z_keyexpr_t::null();
};
query.key_expr().borrowing_clone().into()
}

/// Get a query's `value selector <https://github.com/eclipse-zenoh/roadmap/tree/main/rfcs/ALL/Selectors>`_ by aliasing it.
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
pub extern "C" fn z_query_parameters(query: &z_query_t) -> z_bytes_t {
let Some(query) = query.as_ref() else {
return z_bytes_t::empty();
};
let complement = query.parameters();
z_bytes_t {
start: complement.as_ptr(),
Expand All @@ -252,7 +320,7 @@ pub extern "C" fn z_query_parameters(query: &z_query_t) -> z_bytes_t {
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
pub unsafe extern "C" fn z_query_value(query: &z_query_t) -> z_value_t {
match query.value() {
match query.as_ref().and_then(|q| q.value()) {
Some(value) => {
#[allow(mutable_transmutes)]
if let std::borrow::Cow::Owned(payload) = value.payload.contiguous() {
Expand Down
Loading