Skip to content

Commit

Permalink
fix reply functions to properly account for reply_err (#531)
Browse files Browse the repository at this point in the history
* fix reply functions to properly account for reply_err

* revert z_keyexpr_as_view_string_check

* revert z_keyexpr_as_view_string_check
  • Loading branch information
DenisBiryukov91 authored Jul 12, 2024
1 parent efbdd2d commit 72fdf90
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 57 deletions.
42 changes: 24 additions & 18 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/session.h"

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error.
* _Z_REPLY_TAG_NONE: Tag identifying empty reply.
*/
typedef enum {
_Z_REPLY_TAG_DATA = 0,
_Z_REPLY_TAG_FINAL = 1,
_Z_REPLY_TAG_ERROR = 2,
_Z_REPLY_TAG_NONE = 3
} _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
Expand All @@ -32,9 +49,12 @@
*
*/
typedef struct _z_reply_data_t {
_z_value_t error;
_z_sample_t sample;
union {
_z_value_t error;
_z_sample_t sample;
} _result;
_z_id_t replier_id;
_z_reply_tag_t _tag;
} _z_reply_data_t;

void _z_reply_data_clear(_z_reply_data_t *rd);
Expand All @@ -43,17 +63,6 @@ int8_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src);
_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t)

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error
*/
typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR = 2 } _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
Expand All @@ -65,19 +74,16 @@ typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR
*/
typedef struct _z_reply_t {
_z_reply_data_t data;
_z_reply_tag_t _tag;
} _z_reply_t;

_z_reply_t _z_reply_move(_z_reply_t *src_reply);

_z_reply_t _z_reply_null(void);
_Bool _z_reply_check(const _z_reply_t *reply);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment);
_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding);

typedef struct _z_pending_reply_t {
Expand Down
16 changes: 12 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ _Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_config_t, config, _z_config_check, _z_c
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_string_t, string, _z_string_check, _z_string_null, _z_string_copy, _z_string_clear)

_Bool _z_value_check(const _z_value_t *value) {
return _z_string_check(&value->encoding.schema) && value->encoding.id == _Z_ENCODING_ID_DEFAULT;
return _z_encoding_check(&value->encoding) || _z_bytes_check(&value->payload);
}
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_value_t, reply_err, _z_value_check, _z_value_null, _z_value_copy, _z_value_clear)

Expand Down Expand Up @@ -1168,6 +1168,14 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
#endif

#if Z_FEATURE_QUERY == 1
_Bool _z_reply_check(const _z_reply_t *reply) {
if (reply->data._tag == _Z_REPLY_TAG_DATA) {
return _z_sample_check(&reply->data._result.sample);
} else if (reply->data._tag == _Z_REPLY_TAG_ERROR) {
return _z_value_check(&reply->data._result.error);
}
return false;
}
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_reply_t, reply, _z_reply_check, _z_reply_null, _z_reply_copy, _z_reply_clear)

void z_get_options_default(z_get_options_t *options) {
Expand Down Expand Up @@ -1223,11 +1231,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
return ret;
}

_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->_tag != _Z_REPLY_TAG_ERROR; }
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; }

const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }
const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data._result.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data.error; }
const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data._result.error; }

_Bool z_reply_replier_id(const z_loaned_reply_t *reply, z_id_t *out_id) {
if (_z_id_check(reply->data.replier_id)) {
Expand Down
63 changes: 32 additions & 31 deletions src/net/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
#include "zenoh-pico/utils/logging.h"

_z_reply_data_t _z_reply_data_null(void) {
return (_z_reply_data_t){
.replier_id = {.id = {0}},
.sample = _z_sample_null(),
};
return (_z_reply_data_t){.replier_id = {.id = {0}}, ._result.sample = _z_sample_null(), ._tag = _Z_REPLY_TAG_NONE};
}

_z_reply_t _z_reply_null(void) {
_z_reply_t r = {._tag = _Z_REPLY_TAG_DATA, .data = _z_reply_data_null()};
_z_reply_t r = {.data = _z_reply_data_null()};
return r;
}

_Bool _z_reply_check(const _z_reply_t *reply) { return _z_sample_check(&reply->data.sample); }

#if Z_FEATURE_QUERY == 1
void _z_reply_data_clear(_z_reply_data_t *reply_data) {
_z_sample_clear(&reply_data->sample);
if (reply_data->_tag == _Z_REPLY_TAG_DATA) {
_z_sample_clear(&reply_data->_result.sample);
} else if (reply_data->_tag == _Z_REPLY_TAG_ERROR) {
_z_value_clear(&reply_data->_result.error);
}
reply_data->_tag = _Z_REPLY_TAG_NONE;
reply_data->replier_id = _z_id_empty();
}

Expand All @@ -49,8 +49,13 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) {

int8_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src) {
*dst = _z_reply_data_null();
_Z_RETURN_IF_ERR(_z_sample_copy(&dst->sample, &src->sample));
if (src->_tag == _Z_REPLY_TAG_DATA) {
_Z_RETURN_IF_ERR(_z_sample_copy(&dst->_result.sample, &src->_result.sample));
} else if (src->_tag == _Z_REPLY_TAG_ERROR) {
_Z_RETURN_IF_ERR(_z_value_copy(&dst->_result.error, &src->_result.error));
}
dst->replier_id = src->replier_id;
dst->_tag = src->_tag;
return _Z_RES_OK;
}

Expand All @@ -76,7 +81,6 @@ void _z_reply_free(_z_reply_t **reply) {
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src) {
*dst = _z_reply_null();
_Z_RETURN_IF_ERR(_z_reply_data_copy(&dst->data, &src->data));
dst->_tag = src->_tag;
return _Z_RES_OK;
}

Expand All @@ -92,37 +96,34 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) {
_z_timestamp_clear(&pr->_tstamp);
}

_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_z_reply_t reply = _z_reply_null();
reply._tag = tag;
if (tag == _Z_REPLY_TAG_DATA) {
reply.data.replier_id = id;
// Create reply sample
reply.data.sample.keyexpr = _z_keyexpr_steal(&keyexpr);
reply.data.sample.kind = kind;
reply.data.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data.sample.payload, &payload);
_z_bytes_copy(&reply.data.sample.attachment, &attachment);
_z_encoding_move(&reply.data.sample.encoding, encoding);
}
reply.data._tag = _Z_REPLY_TAG_DATA;
reply.data.replier_id = id;

// Create reply sample
reply.data._result.sample.keyexpr = _z_keyexpr_steal(&keyexpr);
reply.data._result.sample.kind = kind;
reply.data._result.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data._result.sample.payload, &payload);
_z_bytes_copy(&reply.data._result.sample.attachment, &attachment);
_z_encoding_move(&reply.data._result.sample.encoding, encoding);

return reply;
}

_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding) {
_z_reply_t reply = _z_reply_null();
reply._tag = _Z_REPLY_TAG_ERROR;
_z_bytes_copy(&reply.data.error.payload, &payload);
_z_encoding_move(&reply.data.error.encoding, encoding);
reply.data._tag = _Z_REPLY_TAG_ERROR;
_z_bytes_copy(&reply.data._result.error.payload, &payload);
_z_encoding_move(&reply.data._result.error.encoding, encoding);
return reply;
}
#else
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(tag);
_ZP_UNUSED(id);
_ZP_UNUSED(payload);
_ZP_UNUSED(timestamp);
Expand Down
10 changes: 6 additions & 4 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
}

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, _Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload,
&msg->_commons._timestamp, &msg->_encoding, kind, msg->_attachment);
_z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, msg->_payload, &msg->_commons._timestamp,
&msg->_encoding, kind, msg->_attachment);

_Bool drop = false;
// Verify if this is a newer reply, free the old one in case it is
Expand All @@ -125,7 +125,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
pen_rep = _z_pending_reply_list_head(pen_rps);

// Check if this is the same resource key
if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) {
if (_z_str_eq(pen_rep->_reply.data._result.sample.keyexpr._suffix,
reply.data._result.sample.keyexpr._suffix) == true) {
if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) {
drop = true;
} else {
Expand All @@ -146,7 +147,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
_z_reply_t partial_reply;
(void)memset(&partial_reply, 0,
sizeof(_z_reply_t)); // Avoid warnings on uninitialized values on the reply
partial_reply.data.sample.keyexpr = _z_keyexpr_duplicate(reply.data.sample.keyexpr);
partial_reply.data._tag = _Z_REPLY_TAG_DATA;
partial_reply.data._result.sample.keyexpr = _z_keyexpr_duplicate(reply.data._result.sample.keyexpr);
pen_rep->_reply = partial_reply;
} else {
pen_rep->_reply = reply; // Store the whole reply in the latest mode
Expand Down

0 comments on commit 72fdf90

Please sign in to comment.