Skip to content

Commit

Permalink
Merge pull request #431 from ZettaScaleLabs/query_reply_options_update
Browse files Browse the repository at this point in the history
Add missing properties to z_query_reply_options_t
  • Loading branch information
milyin authored Jun 7, 2024
2 parents 6c1df6c + f6aa07d commit 51affda
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 8 deletions.
16 changes: 16 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,22 @@ typedef struct z_query_reply_options_t {
* The encoding of the reply payload.
*/
struct z_owned_encoding_t *encoding;
/**
* The congestion control to apply when routing the reply.
*/
enum z_congestion_control_t congestion_control;
/**
* The priority of the reply.
*/
enum z_priority_t priority;
/**
* If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
*/
bool is_express;
/**
* The timestamp of the reply.
*/
struct z_timestamp_t *timestamp;
/**
* The source info for the reply.
*/
Expand Down
34 changes: 30 additions & 4 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ use crate::transmute::{
TransmuteUninitPtr,
};
use crate::{
errors, z_closure_query_call, z_closure_query_loan, z_loaned_bytes_t, z_loaned_keyexpr_t,
z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t, z_owned_closure_query_t,
z_owned_encoding_t, z_owned_source_info_t, z_view_string_from_substring, z_view_string_t,
errors, z_closure_query_call, z_closure_query_loan, z_congestion_control_t, z_loaned_bytes_t,
z_loaned_keyexpr_t, z_loaned_session_t, z_loaned_value_t, z_owned_bytes_t,
z_owned_closure_query_t, z_owned_encoding_t, z_owned_source_info_t, z_priority_t,
z_timestamp_t, z_view_string_from_substring, z_view_string_t,
};
use std::mem::MaybeUninit;
use std::ptr::null_mut;
use zenoh::core::Wait;
use zenoh::encoding::Encoding;
use zenoh::prelude::SessionDeclarations;
use zenoh::publisher::CongestionControl;
use zenoh::publisher::Priority;
use zenoh::queryable::{Query, Queryable};
use zenoh::sample::{SampleBuilderTrait, ValueBuilderTrait};
use zenoh::sample::{
QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
};

pub use crate::opaque_types::z_owned_queryable_t;
decl_transmute_owned!(Option<Queryable<'static, ()>>, z_owned_queryable_t);
Expand Down Expand Up @@ -110,6 +115,14 @@ pub extern "C" fn z_queryable_options_default(this: &mut z_queryable_options_t)
pub struct z_query_reply_options_t {
/// The encoding of the reply payload.
pub encoding: *mut z_owned_encoding_t,
/// The congestion control to apply when routing the reply.
pub congestion_control: z_congestion_control_t,
/// The priority of the reply.
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
pub is_express: bool,
/// The timestamp of the reply.
pub timestamp: *mut z_timestamp_t,
/// The source info for the reply.
pub source_info: *mut z_owned_source_info_t,
/// The attachment to this reply.
Expand All @@ -122,6 +135,10 @@ pub struct z_query_reply_options_t {
pub extern "C" fn z_query_reply_options_default(this: &mut z_query_reply_options_t) {
*this = z_query_reply_options_t {
encoding: null_mut(),
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
is_express: false,
timestamp: null_mut(),
source_info: null_mut(),
attachment: null_mut(),
};
Expand Down Expand Up @@ -256,6 +273,15 @@ pub extern "C" fn z_query_reply(
let attachment = attachment.transmute_mut().extract();
reply = reply.attachment(attachment);
}
if !options.timestamp.is_null() {
let timestamp = *unsafe { options.timestamp.as_mut() }
.unwrap()
.transmute_ref();
reply = reply.timestamp(Some(timestamp));
}
reply = reply.priority(options.priority.into());
reply = reply.congestion_control(options.congestion_control.into());
reply = reply.express(options.is_express);
}

if let Err(e) = reply.wait() {
Expand Down
68 changes: 64 additions & 4 deletions tests/z_int_queryable_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,40 @@ const char *const keyexpr = "test/key";
const char *const values[] = {"test_value_1", "test_value_2", "test_value_3"};
const size_t values_count = sizeof(values) / sizeof(values[0]);

const uint32_t TEST_EID = 42;
const uint64_t TEST_SN = 24;
const uint64_t TEST_TS = 401706000;
const uint8_t TEST_ID = 123;

void query_handler(const z_loaned_query_t *query, void *context) {
static int value_num = 0;

z_view_string_t params;
z_query_parameters(query, &params);
const z_loaned_value_t* payload_value = z_query_value(query);
const z_loaned_value_t *payload_value = z_query_value(query);

z_query_reply_options_t options;
z_query_reply_options_default(&options);


z_id_t self_id;
self_id.id[0] = TEST_ID;

z_entity_global_id_t entity_global_id;
z_entity_global_id_new(&entity_global_id, &self_id, TEST_EID);
z_owned_source_info_t source_info;
z_source_info_new(&source_info, &entity_global_id, TEST_SN);

z_timestamp_t ts;
z_timestamp_new(&ts, &self_id, TEST_TS + value_num);

options.source_info = &source_info;
options.timestamp = &ts;

z_owned_bytes_t payload;
z_bytes_encode_from_string(&payload, values[value_num]);

z_view_keyexpr_t reply_ke;
z_view_keyexpr_from_string(&reply_ke, (const char*)context);
z_view_keyexpr_from_string(&reply_ke, (const char *)context);
z_query_reply(query, z_loan(reply_ke), z_move(payload), &options);

if (++value_num == values_count) {
Expand Down Expand Up @@ -100,7 +119,7 @@ int run_get() {
for (z_recv(z_loan(handler), &reply); z_check(reply); z_recv(z_loan(handler), &reply)) {
assert(z_reply_is_ok(z_loan(reply)));

const z_loaned_sample_t* sample = z_reply_ok(z_loan(reply));
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));
z_owned_string_t payload_string;
z_bytes_decode_into_string(z_sample_payload(sample), &payload_string);
if (strncmp(values[val_num], z_string_data(z_loan(payload_string)), z_string_len(z_loan(payload_string)))) {
Expand All @@ -109,6 +128,47 @@ int run_get() {
exit(-1);
}

const z_loaned_source_info_t *source_info = z_sample_source_info(sample);
if (source_info == NULL) {
perror("Unexpected null source_info");
exit(-1);
}
const uint64_t sn = z_source_info_sn(source_info);
if (sn != TEST_SN) {
perror("Unexpected sn value");
exit(-1);
}
const z_entity_global_id_t id = z_source_info_id(source_info);
uint32_t eid = z_entity_global_id_eid(&id);
if (eid != TEST_EID) {
perror("Unexpected eid value");
exit(-1);
}

const z_timestamp_t *ts = z_sample_timestamp(sample);
if (ts == NULL) {
perror("Unexpected null timestamp");
exit(-1);
}
const uint64_t time = z_timestamp_npt64_time(ts);
if (time != TEST_TS + val_num) {
perror("Unexpected timestamp value");
exit(-1);
}

z_id_t ts_id = z_timestamp_id(ts);
z_id_t gloabl_id = z_entity_global_id_zid(&id);

if (memcmp(ts_id.id, gloabl_id.id, sizeof(ts_id.id)) != 0) {
perror("Timestamp id and global id differ");
exit(-1);
}

if (ts_id.id[0] != TEST_ID) {
perror("Unexpected id value");
exit(-1);
}

z_drop(z_move(payload_string));
z_drop(z_move(reply));
}
Expand Down

0 comments on commit 51affda

Please sign in to comment.