Skip to content

Commit

Permalink
Merge commit '85f6c655564fb73e1151987430cb95318f4eca26'
Browse files Browse the repository at this point in the history
Conflicts:
	Cargo.toml.in
  • Loading branch information
yellowhatter committed May 27, 2024
2 parents a513342 + 85f6c65 commit 92ac140
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::z_query_target_t;
use crate::{
z_closure_reply_call, z_loaned_keyexpr_t, z_loaned_session_t, z_owned_closure_reply_t,
};
use zenoh::prelude::SyncResolve;
use::zenoh::core::Wait;

pub use crate::opaque_types::z_owned_reply_t;
decl_transmute_owned!(Option<Reply>, z_owned_reply_t);
Expand Down Expand Up @@ -177,7 +177,7 @@ pub unsafe extern "C" fn z_get(
match get
.callback(move |response|
z_closure_reply_call(z_closure_reply_loan(&closure), response.transmute_handle()))
.res_sync()
.wait()
{
Ok(()) => errors::Z_OK,
Err(e) => {
Expand Down
10 changes: 5 additions & 5 deletions src/info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::transmute::{TransmuteCopy, TransmuteFromHandle};
//
// Copyright (c) 2017, 2022 ZettaScale Technology.
//
Expand All @@ -12,11 +11,12 @@ use crate::transmute::{TransmuteCopy, TransmuteFromHandle};
// Contributors:
// ZettaScale Zenoh team, <[email protected]>
//
use crate::transmute::{TransmuteCopy, TransmuteFromHandle};
use crate::{errors, z_closure_zid_call, z_closure_zid_loan, z_loaned_session_t, z_owned_closure_zid_t};
use std::mem::MaybeUninit;
use zenoh::config::ZenohId;
use zenoh::prelude::sync::SyncResolve;
use zenoh::session::SessionDeclarations;
use zenoh::core::Wait;

pub use crate::opaque_types::z_id_t;
decl_transmute_copy!(ZenohId, z_id_t);
Expand All @@ -36,7 +36,7 @@ impl From<[u8; 16]> for z_id_t {
#[no_mangle]
pub unsafe extern "C" fn z_info_zid(session: &z_loaned_session_t) -> z_id_t {
let session = session.transmute_ref();
session.info().zid().res_sync().transmute_copy()
session.info().zid().wait().transmute_copy()
}

/// Fetches the Zenoh IDs of all connected peers.
Expand All @@ -54,7 +54,7 @@ pub unsafe extern "C" fn z_info_peers_zid(
let mut closure = z_owned_closure_zid_t::empty();
std::mem::swap(&mut closure, callback);
let session = session.transmute_ref();
for id in session.info().peers_zid().res_sync() {
for id in session.info().peers_zid().wait() {
z_closure_zid_call(z_closure_zid_loan(&closure), &id.transmute_copy());
}
errors::Z_OK
Expand All @@ -75,7 +75,7 @@ pub unsafe extern "C" fn z_info_routers_zid(
let mut closure = z_owned_closure_zid_t::empty();
std::mem::swap(&mut closure, callback);
let session = session.transmute_ref();
for id in session.info().routers_zid().res_sync() {
for id in session.info().routers_zid().wait() {
z_closure_zid_call(z_closure_zid_loan(&closure), &id.transmute_copy());
}
errors::Z_OK
Expand Down
7 changes: 3 additions & 4 deletions src/keyexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// Contributors:
// ZettaScale Zenoh team, <[email protected]>
//

use std::mem::MaybeUninit;

use crate::errors;
Expand All @@ -28,11 +27,11 @@ use crate::z_view_str_from_substring;
use crate::z_view_str_t;
use libc::c_char;
use std::error::Error;
use zenoh::core::SyncResolve;
use zenoh::key_expr::SetIntersectionLevel;
use zenoh::prelude::keyexpr;
use zenoh::prelude::KeyExpr;
use zenoh_protocol::core::key_expr::canon::Canonizable;
use zenoh::core::Wait;

pub use crate::opaque_types::z_owned_keyexpr_t;
pub use crate::opaque_types::z_view_keyexpr_t;
Expand Down Expand Up @@ -447,7 +446,7 @@ pub extern "C" fn z_declare_keyexpr(
let this = this.transmute_uninit_ptr();
let key_expr = key_expr.transmute_ref();
let session = session.transmute_ref();
match session.declare_keyexpr(key_expr).res_sync() {
match session.declare_keyexpr(key_expr).wait() {
Ok(id) => {
Inplace::init(this, Some(id.into_owned()));
errors::Z_OK
Expand All @@ -473,7 +472,7 @@ pub extern "C" fn z_undeclare_keyexpr(
return errors::Z_EINVAL;
};
let session = session.transmute_ref();
match session.undeclare(kexpr).res() {
match session.undeclare(kexpr).wait() {
Ok(()) => errors::Z_OK,
Err(e) => {
log::debug!("{}", e);
Expand Down
10 changes: 5 additions & 5 deletions src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//

use std::mem::MaybeUninit;
use zenoh::prelude::SyncResolve;
use zenoh::{
liveliness::{Liveliness, LivelinessToken},
prelude::SessionDeclarations,
Expand All @@ -27,6 +26,7 @@ use crate::{
z_closure_reply_call, z_closure_sample_call, z_loaned_keyexpr_t, z_loaned_session_t,
z_owned_closure_reply_t, z_owned_closure_sample_t, z_owned_subscriber_t,
};
use zenoh::core::Wait;

use crate::opaque_types::zc_loaned_liveliness_token_t;
use crate::opaque_types::zc_owned_liveliness_token_t;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub extern "C" fn zc_liveliness_declare_token(
let this = this.transmute_uninit_ptr();
let session = session.transmute_ref();
let key_expr = key_expr.transmute_ref();
match session.liveliness().declare_token(key_expr).res() {
match session.liveliness().declare_token(key_expr).wait() {
Ok(token) => {
Inplace::init(this, Some(token));
errors::Z_OK
Expand All @@ -109,7 +109,7 @@ pub extern "C" fn zc_liveliness_undeclare_token(
) -> errors::z_error_t {
let this = this.transmute_mut();
if let Some(token) = this.extract().take() {
if let Err(e) = token.undeclare().res() {
if let Err(e) = token.undeclare().wait() {
log::error!("Failed to undeclare token: {e}");
return errors::Z_EGENERIC;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ pub extern "C" fn zc_liveliness_declare_subscriber(
let sample = sample.transmute_handle();
z_closure_sample_call(z_closure_sample_loan(&callback), sample)
})
.res()
.wait()
{
Ok(subscriber) => {
Inplace::init(this, Some(subscriber));
Expand Down Expand Up @@ -208,7 +208,7 @@ pub extern "C" fn zc_liveliness_get(
if let Some(options) = options {
builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64));
}
match builder.res() {
match builder.wait() {
Ok(()) => errors::Z_OK,
Err(e) => {
log::error!("Failed to subscribe to liveliness: {e}");
Expand Down
6 changes: 3 additions & 3 deletions src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

use std::mem::MaybeUninit;
use std::ptr::null;
use zenoh::prelude::SyncResolve;

use zenoh_ext::SessionExt;

use crate::transmute::{Inplace, TransmuteFromHandle, TransmuteRef, TransmuteUninitPtr};
use crate::{errors, z_loaned_keyexpr_t, z_loaned_session_t, zcu_locality_default, zcu_locality_t};
use zenoh::core::Wait;

/// Options passed to the `ze_declare_publication_cache()` function.
#[repr(C)]
Expand Down Expand Up @@ -93,7 +93,7 @@ pub extern "C" fn ze_declare_publication_cache(
p = p.queryable_prefix(queryable_prefix.clone());
}
}
match p.res_sync() {
match p.wait() {
Ok(publication_cache) => {
Inplace::init(this, Some(publication_cache));
errors::Z_OK
Expand Down Expand Up @@ -129,7 +129,7 @@ pub extern "C" fn ze_undeclare_publication_cache(
this: &mut ze_owned_publication_cache_t,
) -> errors::z_error_t {
if let Some(p) = this.transmute_mut().extract().take() {
if let Err(e) = p.close().res_sync() {
if let Err(e) = p.close().wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down
13 changes: 6 additions & 7 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use zenoh::sample::QoSBuilderTrait;
use zenoh::sample::SampleBuilderTrait;
use zenoh::sample::ValueBuilderTrait;
use zenoh::{prelude::Priority, publication::MatchingListener, publication::Publisher};

use zenoh::prelude::SyncResolve;
use zenoh::core::Wait;

use crate::{
z_congestion_control_t, z_loaned_keyexpr_t, z_loaned_session_t, z_owned_bytes_t, z_priority_t,
Expand Down Expand Up @@ -96,7 +95,7 @@ pub extern "C" fn z_declare_publisher(
.priority(options.priority.into())
.express(options.is_express);
}
match p.res_sync() {
match p.wait() {
Err(e) => {
log::error!("{}", e);
Inplace::empty(this);
Expand Down Expand Up @@ -190,7 +189,7 @@ pub unsafe extern "C" fn z_publisher_put(
}
}

if let Err(e) = put.res_sync() {
if let Err(e) = put.wait() {
log::error!("{}", e);
errors::Z_EGENERIC
} else {
Expand Down Expand Up @@ -221,7 +220,7 @@ pub extern "C" fn z_publisher_delete(
_options: Option<&z_publisher_delete_options_t>,
) -> errors::z_error_t {
let publisher = publisher.transmute_ref();
if let Err(e) = publisher.delete().res_sync() {
if let Err(e) = publisher.delete().wait() {
log::error!("{}", e);
errors::Z_EGENERIC
} else {
Expand Down Expand Up @@ -277,7 +276,7 @@ pub extern "C" fn zcu_publisher_matching_listener_callback(
};
zcu_closure_matching_status_call(zcu_closure_matching_status_loan(&closure), &status);
})
.res();
.wait();
match listener {
Ok(_) => {
Inplace::empty(this);
Expand All @@ -297,7 +296,7 @@ pub extern "C" fn zcu_publisher_matching_listener_callback(
#[allow(clippy::missing_safety_doc)]
pub extern "C" fn z_undeclare_publisher(this: &mut z_owned_publisher_t) -> errors::z_error_t {
if let Some(p) = this.transmute_mut().extract().take() {
if let Err(e) = p.undeclare().res_sync() {
if let Err(e) = p.undeclare().wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down
7 changes: 4 additions & 3 deletions src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use crate::transmute::TransmuteFromHandle;
use crate::transmute::TransmuteRef;
use crate::z_loaned_session_t;
use crate::z_owned_bytes_t;
use zenoh::prelude::{sync::SyncResolve, Priority};
use zenoh::prelude::Priority;
use zenoh::publication::CongestionControl;
use zenoh::sample::QoSBuilderTrait;
use zenoh::sample::SampleBuilderTrait;
use zenoh::sample::ValueBuilderTrait;
use zenoh::core::Wait;

/// Options passed to the `z_put()` function.
#[repr(C)]
Expand Down Expand Up @@ -97,7 +98,7 @@ pub extern "C" fn z_put(
put = put.express(options.is_express);
}

if let Err(e) = put.res_sync() {
if let Err(e) = put.wait() {
log::error!("{}", e);
errors::Z_EGENERIC
} else {
Expand Down Expand Up @@ -152,7 +153,7 @@ pub extern "C" fn z_delete(
.express(options.is_express);
}

match del.res_sync() {
match del.wait() {
Err(e) => {
log::error!("{}", e);
errors::Z_EGENERIC
Expand Down
10 changes: 5 additions & 5 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::mem::MaybeUninit;
use std::ptr::null_mut;
use zenoh::encoding::Encoding;
use zenoh::prelude::SessionDeclarations;
use zenoh::prelude::SyncResolve;
use zenoh::prelude::{Query, Queryable};
use zenoh::sample::{SampleBuilderTrait, ValueBuilderTrait};
use zenoh::value::Value;
use zenoh::core::Wait;

pub use crate::opaque_types::z_owned_queryable_t;
decl_transmute_owned!(Option<Queryable<'static, ()>>, z_owned_queryable_t);
Expand Down Expand Up @@ -170,7 +170,7 @@ pub extern "C" fn z_declare_queryable(
}
let queryable = builder
.callback(move |query| z_closure_query_call(z_closure_query_loan(&closure), query.transmute_handle()))
.res_sync();
.wait();
match queryable {
Ok(q) => {
Inplace::init(this, Some(q));
Expand All @@ -190,7 +190,7 @@ pub extern "C" fn z_declare_queryable(
#[no_mangle]
pub extern "C" fn z_undeclare_queryable(this: &mut z_owned_queryable_t) -> errors::z_error_t {
if let Some(qable) = this.transmute_mut().extract().take() {
if let Err(e) = qable.undeclare().res_sync() {
if let Err(e) = qable.undeclare().wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down Expand Up @@ -255,7 +255,7 @@ pub unsafe extern "C" fn z_query_reply(
}
}

if let Err(e) = reply.res_sync() {
if let Err(e) = reply.wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down Expand Up @@ -299,7 +299,7 @@ pub unsafe extern "C" fn z_query_reply_err(
);
let reply = query.reply_err(value);

if let Err(e) = reply.res_sync() {
if let Err(e) = reply.wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down
12 changes: 6 additions & 6 deletions src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ use crate::{
z_query_consolidation_t, z_query_target_default, z_query_target_t, zcu_locality_default,
zcu_locality_t, zcu_reply_keyexpr_default, zcu_reply_keyexpr_t,
};
use zenoh::prelude::sync::SyncResolve;
use zenoh::prelude::SessionDeclarations;
use zenoh::session::Session;
use zenoh::subscriber::Reliability;
use zenoh_ext::*;
use zenoh::core::Wait;

use crate::opaque_types::ze_loaned_querying_subscriber_t;
use crate::opaque_types::ze_owned_querying_subscriber_t;
Expand Down Expand Up @@ -145,7 +145,7 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber(
let sample = sample.transmute_handle();
z_closure_sample_call(z_closure_sample_loan(&closure), sample);
});
match sub.res() {
match sub.wait() {
Ok(sub) => {
Inplace::init(this, Some((sub, session)));
errors::Z_OK
Expand Down Expand Up @@ -182,11 +182,11 @@ pub unsafe extern "C" fn ze_querying_subscriber_get(
.consolidation(options.consolidation)
.timeout(std::time::Duration::from_millis(options.timeout_ms))
.callback(cb)
.res_sync(),
None => session.get(selector).callback(cb).res_sync(),
.wait(),
None => session.get(selector).callback(cb).wait(),
}
})
.res()
.wait()
{
log::debug!("{}", e);
return errors::Z_EGENERIC;
Expand All @@ -202,7 +202,7 @@ pub extern "C" fn ze_undeclare_querying_subscriber(
this: &mut ze_owned_querying_subscriber_t,
) -> errors::z_error_t {
if let Some(s) = this.transmute_mut().extract().take() {
if let Err(e) = s.0.close().res_sync() {
if let Err(e) = s.0.close().wait() {
log::error!("{}", e);
return errors::Z_EGENERIC;
}
Expand Down
2 changes: 0 additions & 2 deletions src/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use libc::c_ulong;
use std::mem::MaybeUninit;
use zenoh::scouting::Hello;
use zenoh_protocol::core::{whatami::WhatAmIMatcher, WhatAmI};
use zenoh_util::core::AsyncResolve;

pub use crate::opaque_types::z_loaned_hello_t;
pub use crate::opaque_types::z_owned_hello_t;
Expand Down Expand Up @@ -167,7 +166,6 @@ pub extern "C" fn z_scout(
task::block_on(async move {
let scout = zenoh::scout(what, config)
.callback(move |h| z_closure_hello_call(z_closure_hello_loan(&closure), h.transmute_handle()))
.res_async()
.await
.unwrap();
async_std::task::sleep(std::time::Duration::from_millis(timeout)).await;
Expand Down
Loading

0 comments on commit 92ac140

Please sign in to comment.