Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source info support #422

Merged
merged 6 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,131 changes: 622 additions & 509 deletions Cargo.lock

Large diffs are not rendered by default.

504 changes: 243 additions & 261 deletions build-resources/opaque-types/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions build-resources/opaque-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ edition = "2021"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = ["shared-memory", "unstable"], default-features = false }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = ["unstable"] }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", features = ["shared-memory"] }
const_format = "0.2.32"
flume = "*"
17 changes: 13 additions & 4 deletions build-resources/opaque-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ use zenoh::handlers::DefaultHandler;
use zenoh::handlers::RingChannelHandler;
use zenoh::key_expr::KeyExpr;
use zenoh::liveliness::LivelinessToken;
use zenoh::publication::MatchingListener;
use zenoh::publication::Publisher;
use zenoh::publisher::MatchingListener;
use zenoh::publisher::Publisher;
use zenoh::query::Reply;
use zenoh::queryable::Query;
use zenoh::queryable::Queryable;
use zenoh::sample::Sample;
use zenoh::sample::SourceInfo;
use zenoh::scouting::Hello;
use zenoh::session::Session;
use zenoh::subscriber::Subscriber;
use zenoh::time::Timestamp;
use zenoh::value::Value;
use zenoh_protocol::core::EntityGlobalId;

#[macro_export]
macro_rules! get_opaque_type_data {
Expand Down Expand Up @@ -243,7 +245,6 @@ get_opaque_type_data!(Option<Hello>, z_owned_hello_t);
/// A loaned hello message.
get_opaque_type_data!(Hello, z_loaned_hello_t);


/// An owned Zenoh fifo sample handler.
get_opaque_type_data!(Option<flume::Receiver<Sample>>, z_owned_fifo_handler_sample_t);
/// An loaned Zenoh fifo sample handler.
Expand Down Expand Up @@ -272,4 +273,12 @@ get_opaque_type_data!(flume::Receiver<Reply>, z_loaned_fifo_handler_reply_t);
/// An owned Zenoh ring reply handler.
get_opaque_type_data!(Option<RingChannelHandler<Reply>>, z_owned_ring_handler_reply_t);
/// An loaned Zenoh ring reply handler.
get_opaque_type_data!(RingChannelHandler<Reply>, z_loaned_ring_handler_reply_t);
get_opaque_type_data!(RingChannelHandler<Reply>, z_loaned_ring_handler_reply_t);

/// An owned Zenoh-allocated source info`.
get_opaque_type_data!(Option<SourceInfo>, z_owned_source_info_t);
/// A loaned source info.
get_opaque_type_data!(SourceInfo, z_loaned_source_info_t);

/// An entity gloabal id.
get_opaque_type_data!(EntityGlobalId, z_entity_global_id_t);
69 changes: 69 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,24 @@ typedef struct z_delete_options_t {
*/
bool is_express;
} z_delete_options_t;
/**
* An entity gloabal id.
*/
typedef struct ALIGN(4) z_entity_global_id_t {
uint8_t _0[20];
} z_entity_global_id_t;
/**
* The replies consolidation strategy to apply on replies to a `z_get()`.
*/
typedef struct z_query_consolidation_t {
enum z_consolidation_mode_t mode;
} z_query_consolidation_t;
/**
* An owned Zenoh-allocated source info`.
*/
typedef struct ALIGN(8) z_owned_source_info_t {
uint8_t _0[40];
} z_owned_source_info_t;
/**
* Options passed to the `z_get()` function.
*/
Expand All @@ -413,6 +425,10 @@ typedef struct z_get_options_t {
* An optional encoding of the query payload and or attachment.
*/
struct z_owned_encoding_t *encoding;
/**
* The source info for the query.
*/
struct z_owned_source_info_t *source_info;
/**
* An optional attachment to attach to the query.
*/
Expand All @@ -437,6 +453,10 @@ typedef struct z_publisher_put_options_t {
* The encoding of the data to publish.
*/
struct z_owned_encoding_t *encoding;
/**
* The source info for the publication.
*/
struct z_owned_source_info_t *source_info;
/**
* The attachment to attach to the publication.
*/
Expand All @@ -462,6 +482,10 @@ typedef struct z_put_options_t {
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The source info for the message.
*/
struct z_owned_source_info_t *source_info;
/**
* The attachment to this message.
*/
Expand All @@ -476,6 +500,10 @@ typedef struct z_query_reply_options_t {
* The encoding of the reply payload.
*/
struct z_owned_encoding_t *encoding;
/**
* The source info for the reply.
*/
struct z_owned_source_info_t *source_info;
/**
* The attachment to this reply.
*/
Expand All @@ -491,6 +519,12 @@ typedef struct z_query_reply_err_options_t {
*/
struct z_owned_encoding_t *encoding;
} z_query_reply_err_options_t;
/**
* A loaned source info.
*/
typedef struct ALIGN(8) z_loaned_source_info_t {
uint8_t _0[40];
} z_loaned_source_info_t;
/**
* Options to pass to `z_scout()`.
*/
Expand Down Expand Up @@ -1316,6 +1350,21 @@ ZENOHC_API void z_encoding_null(struct z_owned_encoding_t *this_);
ZENOHC_API
void z_encoding_to_string(const struct z_loaned_encoding_t *this_,
struct z_owned_string_t *out_str);
/**
* Returns the entity id of the entity global id.
*/
ZENOHC_API uint32_t z_entity_global_id_eid(const struct z_entity_global_id_t *this_);
/**
* Create entity global id
*/
ZENOHC_API
z_error_t z_entity_global_id_new(struct z_entity_global_id_t *this_,
const struct z_id_t *zid,
uint32_t eid);
/**
* Returns the zenoh id of entity global id.
*/
ZENOHC_API struct z_id_t z_entity_global_id_zid(const struct z_entity_global_id_t *this_);
/**
* Constructs send and recieve ends of the fifo channel
*/
Expand Down Expand Up @@ -2161,6 +2210,11 @@ ZENOHC_API const struct z_loaned_bytes_t *z_sample_payload(const struct z_loaned
* Returns sample qos priority value.
*/
ZENOHC_API enum z_priority_t z_sample_priority(const struct z_loaned_sample_t *this_);
/**
* Returns the sample source_info.
*/
ZENOHC_API
const struct z_loaned_source_info_t *z_sample_source_info(const struct z_loaned_sample_t *this_);
/**
* Returns the sample timestamp.
*
Expand Down Expand Up @@ -2336,6 +2390,21 @@ ZENOHC_API void z_slice_null(struct z_owned_slice_t *this_);
* @return -1 if `start == NULL` and `len > 0` (creating an empty slice), 0 otherwise.
*/
ZENOHC_API z_error_t z_slice_wrap(struct z_owned_slice_t *this_, const uint8_t *start, size_t len);
/**
* Returns the source_id of the source info.
*/
ZENOHC_API struct z_entity_global_id_t z_source_info_id(const struct z_loaned_source_info_t *this_);
/**
* Create source info
*/
ZENOHC_API
z_error_t z_source_info_new(struct z_owned_source_info_t *this_,
const struct z_entity_global_id_t *source_id,
uint64_t source_sn);
/**
* Returns the source_sn of the source info.
*/
ZENOHC_API uint64_t z_source_info_sn(const struct z_loaned_source_info_t *this_);
/**
* @return ``true`` if the string array is valid, ``false`` if it is in a gravestone state.
*/
Expand Down
73 changes: 73 additions & 0 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ use zenoh::query::ReplyKeyExpr;
use zenoh::sample::Locality;
use zenoh::sample::Sample;
use zenoh::sample::SampleKind;
use zenoh::sample::SourceInfo;
use zenoh::time::Timestamp;
use zenoh::value::Value;
use zenoh_protocol::core::EntityGlobalId;
use zenoh_protocol::zenoh::Consolidation;

/// A zenoh unsigned integer
Expand Down Expand Up @@ -140,6 +142,12 @@ pub extern "C" fn z_sample_attachment(this: &z_loaned_sample_t) -> *const z_loan
}
}

/// Returns the sample source_info.
#[no_mangle]
pub extern "C" fn z_sample_source_info(this: &z_loaned_sample_t) -> &z_loaned_source_info_t {
this.transmute_ref().source_info().transmute_handle()
}

pub use crate::opaque_types::z_owned_sample_t;
decl_transmute_owned!(Option<Sample>, z_owned_sample_t);

Expand Down Expand Up @@ -571,3 +579,68 @@ impl From<z_congestion_control_t> for CongestionControl {
}
}
}

use crate::z_entity_global_id_t;
decl_transmute_copy!(EntityGlobalId, z_entity_global_id_t);

/// Create entity global id
#[no_mangle]
pub extern "C" fn z_entity_global_id_new(
this: &mut z_entity_global_id_t,
zid: &z_id_t,
eid: u32,
) -> errors::z_error_t {
let entity_global_id = EntityGlobalId {
zid: zid.transmute_copy(),
eid,
};
*this = entity_global_id.transmute_copy();
errors::Z_OK
}

/// Returns the zenoh id of entity global id.
#[no_mangle]
pub extern "C" fn z_entity_global_id_zid(this: &z_entity_global_id_t) -> z_id_t {
this.transmute_ref().zid.transmute_copy()
}
/// Returns the entity id of the entity global id.
#[no_mangle]
pub extern "C" fn z_entity_global_id_eid(this: &z_entity_global_id_t) -> u32 {
this.transmute_ref().eid
}
pub use crate::opaque_types::z_loaned_source_info_t;
decl_transmute_handle!(SourceInfo, z_loaned_source_info_t);
pub use crate::opaque_types::z_owned_source_info_t;
decl_transmute_owned!(SourceInfo, z_owned_source_info_t);

validate_equivalence!(z_owned_source_info_t, z_loaned_source_info_t);

/// Create source info
#[no_mangle]
pub extern "C" fn z_source_info_new(
this: *mut MaybeUninit<z_owned_source_info_t>,
source_id: &z_entity_global_id_t,
source_sn: u64,
) -> errors::z_error_t {
let this = this.transmute_uninit_ptr();
let source_info = SourceInfo {
source_id: Some(source_id.transmute_copy()),
source_sn: Some(source_sn),
};
Inplace::init(this, source_info);
errors::Z_OK
}

/// Returns the source_id of the source info.
#[no_mangle]
pub extern "C" fn z_source_info_id(this: &z_loaned_source_info_t) -> z_entity_global_id_t {
match this.transmute_ref().source_id {
Some(source_id) => source_id.transmute_copy(),
None => EntityGlobalId::default().transmute_copy(),
}
}
/// Returns the source_sn of the source info.
#[no_mangle]
pub extern "C" fn z_source_info_sn(this: &z_loaned_source_info_t) -> u64 {
this.transmute_ref().source_sn.unwrap_or(0)
}
11 changes: 11 additions & 0 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::z_loaned_sample_t;
use crate::z_loaned_value_t;
use crate::z_owned_bytes_t;
use crate::z_owned_encoding_t;
use crate::z_owned_source_info_t;
use crate::z_query_target_t;
use crate::{
z_closure_reply_call, z_loaned_keyexpr_t, z_loaned_session_t, z_owned_closure_reply_t,
Expand Down Expand Up @@ -103,6 +104,8 @@ pub struct z_get_options_t {
pub payload: *mut z_owned_bytes_t,
/// An optional encoding of the query payload and or attachment.
pub encoding: *mut z_owned_encoding_t,
/// The source info for the query.
pub source_info: *mut z_owned_source_info_t,
/// An optional attachment to attach to the query.
pub attachment: *mut z_owned_bytes_t,
/// The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration.
Expand All @@ -118,6 +121,7 @@ pub extern "C" fn z_get_options_default(this: &mut z_get_options_t) {
timeout_ms: 0,
payload: null_mut(),
encoding: null_mut(),
source_info: null_mut(),
attachment: null_mut(),
};
}
Expand Down Expand Up @@ -168,6 +172,13 @@ pub unsafe extern "C" fn z_get(
.extract();
get = get.encoding(encoding);
}
if !options.source_info.is_null() {
let source_info = unsafe { options.source_info.as_mut() }
.unwrap()
.transmute_mut()
.extract();
get = get.source_info(source_info);
}
if !options.attachment.is_null() {
let attachment = unsafe { options.attachment.as_mut() }
.unwrap()
Expand Down
11 changes: 11 additions & 0 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::transmute::TransmuteIntoHandle;
use crate::transmute::TransmuteRef;
use crate::transmute::TransmuteUninitPtr;
use crate::z_owned_encoding_t;
use crate::z_owned_source_info_t;
use crate::zcu_closure_matching_status_call;
use crate::zcu_closure_matching_status_loan;
use crate::zcu_owned_closure_matching_status_t;
Expand Down Expand Up @@ -136,6 +137,8 @@ pub extern "C" fn z_publisher_loan(this: &z_owned_publisher_t) -> &z_loaned_publ
pub struct z_publisher_put_options_t {
/// The encoding of the data to publish.
pub encoding: *mut z_owned_encoding_t,
/// The source info for the publication.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to attach to the publication.
pub attachment: *mut z_owned_bytes_t,
}
Expand All @@ -146,6 +149,7 @@ pub struct z_publisher_put_options_t {
pub extern "C" fn z_publisher_put_options_default(this: &mut z_publisher_put_options_t) {
*this = z_publisher_put_options_t {
encoding: ptr::null_mut(),
source_info: ptr::null_mut(),
attachment: ptr::null_mut(),
}
}
Expand Down Expand Up @@ -186,6 +190,13 @@ pub unsafe extern "C" fn z_publisher_put(
.extract();
put = put.encoding(encoding);
};
if !options.source_info.is_null() {
let source_info = unsafe { options.source_info.as_mut() }
.unwrap()
.transmute_mut()
.extract();
put = put.source_info(source_info);
}
if !options.attachment.is_null() {
let attachment = unsafe { options.attachment.as_mut() }
.unwrap()
Expand Down
10 changes: 10 additions & 0 deletions src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct z_put_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The source info for the message.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to this message.
pub attachment: *mut z_owned_bytes_t,
}
Expand All @@ -53,6 +55,7 @@ pub extern "C" fn z_put_options_default(this: &mut z_put_options_t) {
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
source_info: null_mut(),
attachment: null_mut(),
};
}
Expand Down Expand Up @@ -92,6 +95,13 @@ pub extern "C" fn z_put(
.extract();
put = put.encoding(encoding);
};
if !options.source_info.is_null() {
let source_info = unsafe { options.source_info.as_mut() }
.unwrap()
.transmute_mut()
.extract();
put = put.source_info(source_info);
};
if !options.attachment.is_null() {
let attachment = unsafe { options.attachment.as_mut() }
.unwrap()
Expand Down
Loading
Loading