Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add z_query_reply_del #515

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

const char *kind_to_str(z_sample_kind_t kind);

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
Expand All @@ -38,7 +40,8 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
z_owned_string_t replystr;
z_bytes_deserialize_into_string(z_sample_payload(sample), &replystr);

printf(">> Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(replystr)));
printf(">> Received %s ('%s': '%s')\n", kind_to_str(z_sample_kind(sample)), z_string_data(z_loan(keystr)),
z_string_data(z_loan(replystr)));
z_drop(z_move(keystr));
z_drop(z_move(replystr));
} else {
Expand Down Expand Up @@ -143,6 +146,17 @@ int main(int argc, char **argv) {
z_close(z_move(s));
return 0;
}

const char *kind_to_str(z_sample_kind_t kind) {
switch (kind) {
case Z_SAMPLE_KIND_PUT:
return "PUT";
case Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}
#else
int main(void) {
printf(
Expand Down
18 changes: 16 additions & 2 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";
static int msg_nb = 0;
static z_sample_kind_t reply_kind = Z_SAMPLE_KIND_PUT;

void query_handler(const z_loaned_query_t *query, void *ctx) {
(void)(ctx);
Expand All @@ -43,7 +44,17 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), NULL);
switch (reply_kind) {
case Z_SAMPLE_KIND_PUT:
z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), NULL);
break;
case Z_SAMPLE_KIND_DELETE:
z_query_reply_del(query, z_query_keyexpr(query), NULL);
break;
default:
printf("Unknown reply kind\n");
break;
}
z_drop(z_move(keystr));
msg_nb++;
}
Expand All @@ -55,7 +66,7 @@ int main(int argc, char **argv) {
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:")) != -1) {
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:d")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -75,6 +86,9 @@ int main(int argc, char **argv) {
case 'n':
n = atoi(optarg);
break;
case 'd':
reply_kind = Z_SAMPLE_KIND_DELETE;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n') {
Expand Down
34 changes: 31 additions & 3 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1639,14 +1639,40 @@ void z_query_reply_options_default(z_query_reply_options_t *options);
* query: Pointer to a :c:type:`z_loaned_query_t` to reply.
* keyexpr: Pointer to a :c:type:`z_loaned_keyexpr_t` to bind the reply with.
* payload: Pointer to the reply data.
* payload_len: The length of the payload.
* options: Pointer to a :c:type:`z_query_reply_options_t` to configure the reply.
*
* Return:
* ``0`` if reply operation successful, ``negative value`` otherwise.
*/
int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload,
const z_query_reply_options_t *options);

/**
* Builds a :c:type:`z_query_reply_del_options_t` with default values.
*
* Parameters:
* options: Pointer to an uninitialized :c:type:`z_query_reply_del_options_t`.
*/
void z_query_reply_del_options_default(z_query_reply_del_options_t *options);

/**
* Sends a reply delete to a query.
*
* This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the
* :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback
* returns.
*
* Parameters:
* query: Pointer to a :c:type:`z_loaned_query_t` to reply.
* keyexpr: Pointer to a :c:type:`z_loaned_keyexpr_t` to bind the reply with.
* options: Pointer to a :c:type:`z_query_reply_del_options_t` to configure the reply.
*
* Return:
* ``0`` if reply operation successful, ``negative value`` otherwise.
*/
int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options);
#endif

/**
Expand Down Expand Up @@ -1751,7 +1777,8 @@ void zp_task_read_options_default(zp_task_read_options_t *options);
/**
* Starts a task to read from the network and process the received messages.
*
* Note that the task can be implemented in form of thread, process, etc. and its implementation is platform-dependent.
* Note that the task can be implemented in form of thread, process, etc. and its implementation is
* platform-dependent.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` to start the task from.
Expand Down Expand Up @@ -1788,7 +1815,8 @@ void zp_task_lease_options_default(zp_task_lease_options_t *options);
*
* This task will send ``KeepAlive`` messages when needed and will close the session when the lease is expired.
* When operating over a multicast transport, it also periodically sends the ``Join`` messages.
* Note that the task can be implemented in form of thread, process, etc. and its implementation is platform-dependent.
* Note that the task can be implemented in form of thread, process, etc. and its implementation is
* platform-dependent.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` to start the task from.
Expand Down
18 changes: 18 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,24 @@ typedef struct {
z_owned_bytes_t *attachment;
} z_query_reply_options_t;

/**
* Represents the configuration used to configure a query reply delete sent via :c:func:`z_query_reply_del.
*
* Members:
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when routed.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith.
* z_owned_bytes_t *attachment: An optional attachment to the response.
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
z_timestamp_t *timestamp;
_Bool is_express;
z_owned_bytes_t *attachment;
} z_query_reply_del_options_t;

/**
* Represents the configuration used to configure a put operation sent via via :c:func:`z_put`.
*
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct {

typedef struct {
_z_m_push_commons_t _commons;
_z_bytes_t _attachment;
} _z_msg_del_t;
static inline void _z_msg_del_clear(_z_msg_del_t *del) { (void)del; }
#define _Z_M_DEL_ID 0x02
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t

int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
_z_msg_put_t *msg);
_z_msg_put_t *msg, z_sample_kind_t kind);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
30 changes: 29 additions & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1205,8 +1205,8 @@ int8_t z_undeclare_queryable(z_owned_queryable_t *queryable) {

void z_query_reply_options_default(z_query_reply_options_t *options) {
options->encoding = NULL;
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->encoding = NULL;
options->timestamp = NULL;
options->is_express = false;
options->attachment = NULL;
Expand Down Expand Up @@ -1236,6 +1236,34 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke
z_bytes_drop(opts.attachment);
return ret;
}

void z_query_reply_del_options_default(z_query_reply_del_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->timestamp = NULL;
options->is_express = false;
options->attachment = NULL;
}

int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options) {
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
z_query_reply_del_options_t opts;
if (options == NULL) {
z_query_reply_del_options_default(&opts);
} else {
opts = *options;
}

_z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()};

int8_t ret =
_z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control,
opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment));

z_bytes_drop(opts.attachment);
return ret;
}
#endif

int8_t z_keyexpr_from_str(z_owned_keyexpr_t *key, const char *name) {
Expand Down
82 changes: 62 additions & 20 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
},
};
break;
default:
return _Z_ERR_GENERIC;
}

if (_z_send_n_msg(zn, &msg, Z_RELIABILITY_RELIABLE, cong_ctrl) != _Z_RES_OK) {
Expand Down Expand Up @@ -331,28 +333,68 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val
_z_keyexpr_t ke = _z_keyexpr_alias(keyexpr);
_z_zenoh_message_t z_msg;
switch (kind) {
default:
return _Z_ERR_GENERIC;
break;
case Z_SAMPLE_KIND_PUT:
z_msg._tag = _Z_N_RESPONSE;
z_msg._body._response._request_id = query->_request_id;
z_msg._body._response._key = ke;
z_msg._body._response._ext_responder._zid = zid;
z_msg._body._response._ext_responder._eid = 0;
z_msg._body._response._ext_qos =
_z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority);
z_msg._body._response._ext_timestamp = _z_timestamp_null();
z_msg._body._response._tag = _Z_RESPONSE_BODY_REPLY;
z_msg._body._response._body._reply._consolidation = Z_CONSOLIDATION_MODE_DEFAULT;
z_msg._body._response._body._reply._body._is_put = true;
z_msg._body._response._body._reply._body._body._put._payload = payload.payload;
z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding;
z_msg._body._response._body._reply._body._body._put._commons._timestamp =
(timestamp != NULL) ? *timestamp : _z_timestamp_null();
z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null();
z_msg._body._response._body._reply._body._body._put._attachment = att;
z_msg = (_z_zenoh_message_t){
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._key = ke,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_REPLY,
._body._reply =
{
._consolidation = Z_CONSOLIDATION_MODE_DEFAULT,
._body._is_put = true,
._body._body._put =
{
._payload = payload.payload,
._encoding = payload.encoding,
._commons =
{
._timestamp =
(timestamp != NULL) ? *timestamp : _z_timestamp_null(),
._source_info = _z_source_info_null(),
},
._attachment = att,
},
},
},
};
break;
case Z_SAMPLE_KIND_DELETE:
z_msg = (_z_zenoh_message_t){
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._key = ke,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _z_n_qos_make(is_express, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_REPLY,
._body._reply =
{
._consolidation = Z_CONSOLIDATION_MODE_DEFAULT,
._body._is_put = false,
._body._body._del =
{
._commons =
{
._timestamp =
(timestamp != NULL) ? *timestamp : _z_timestamp_null(),
._source_info = _z_source_info_null(),
},
._attachment = att,
},
},
},
};
break;
default:
return _Z_ERR_GENERIC;
}
if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
}

int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr,
_z_msg_put_t *msg) {
_z_msg_put_t *msg, z_sample_kind_t kind) {
int8_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
Expand All @@ -113,7 +113,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload,
&msg->_commons._timestamp, &msg->_encoding, Z_SAMPLE_KIND_PUT, msg->_attachment);
&msg->_commons._timestamp, &msg->_encoding, kind, msg->_attachment);

// Verify if this is a newer reply, free the old one in case it is
if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) ||
Expand Down
7 changes: 2 additions & 5 deletions src/session/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key
// TODO check id to know where to dispatch

#if Z_FEATURE_QUERY == 1
if (reply->_body._is_put) {
ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put);
} else {
ret = _Z_ERR_GENERIC;
}
ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put,
(reply->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE));
#else
_ZP_UNUSED(zn);
_ZP_UNUSED(id);
Expand Down
2 changes: 1 addition & 1 deletion src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = req->_body._del;
_z_encoding_t encoding = _z_encoding_null();
ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE,
del._commons._timestamp, req->_ext_qos, _z_bytes_null());
del._commons._timestamp, req->_ext_qos, del._attachment);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
Expand Down
2 changes: 1 addition & 1 deletion tests/modularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def query_and_queryable(args):
if args.queryable == 1:
z_query_expected_output = """Opening session...
Sending Query 'demo/example/**'...
>> Received ('demo/example/**': 'Queryable from Pico!')
>> Received PUT ('demo/example/**': 'Queryable from Pico!')
>> Received query final notification"""
else:
z_query_expected_output = """Opening session...
Expand Down
Loading