Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix reply functions to properly account for reply_err #531

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/session.h"

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error.
* _Z_REPLY_TAG_NONE: Tag identifying empty reply.
*/
typedef enum {
_Z_REPLY_TAG_DATA = 0,
_Z_REPLY_TAG_FINAL = 1,
_Z_REPLY_TAG_ERROR = 2,
_Z_REPLY_TAG_NONE = 3
} _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
Expand All @@ -32,9 +49,12 @@
*
*/
typedef struct _z_reply_data_t {
_z_value_t error;
_z_sample_t sample;
union {
_z_value_t error;
_z_sample_t sample;
} _result;
_z_id_t replier_id;
_z_reply_tag_t _tag;
} _z_reply_data_t;

void _z_reply_data_clear(_z_reply_data_t *rd);
Expand All @@ -43,17 +63,6 @@ int8_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src);
_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t)

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error
*/
typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR = 2 } _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
Expand All @@ -65,19 +74,16 @@ typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR
*/
typedef struct _z_reply_t {
_z_reply_data_t data;
_z_reply_tag_t _tag;
} _z_reply_t;

_z_reply_t _z_reply_move(_z_reply_t *src_reply);

_z_reply_t _z_reply_null(void);
_Bool _z_reply_check(const _z_reply_t *reply);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment);
_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding);

typedef struct _z_pending_reply_t {
Expand Down
16 changes: 12 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ _Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_config_t, config, _z_config_check, _z_c
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_string_t, string, _z_string_check, _z_string_null, _z_string_copy, _z_string_clear)

_Bool _z_value_check(const _z_value_t *value) {
return _z_string_check(&value->encoding.schema) && value->encoding.id == _Z_ENCODING_ID_DEFAULT;
return _z_encoding_check(&value->encoding) || _z_bytes_check(&value->payload);
}
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_value_t, reply_err, _z_value_check, _z_value_null, _z_value_copy, _z_value_clear)

Expand Down Expand Up @@ -1168,6 +1168,14 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
#endif

#if Z_FEATURE_QUERY == 1
_Bool _z_reply_check(const _z_reply_t *reply) {
if (reply->data._tag == _Z_REPLY_TAG_DATA) {
return _z_sample_check(&reply->data._result.sample);
} else if (reply->data._tag == _Z_REPLY_TAG_ERROR) {
return _z_value_check(&reply->data._result.error);
}
return false;
}
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_reply_t, reply, _z_reply_check, _z_reply_null, _z_reply_copy, _z_reply_clear)

void z_get_options_default(z_get_options_t *options) {
Expand Down Expand Up @@ -1223,11 +1231,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
return ret;
}

_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->_tag != _Z_REPLY_TAG_ERROR; }
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; }

const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }
const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data._result.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data.error; }
const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data._result.error; }

_Bool z_reply_replier_id(const z_loaned_reply_t *reply, z_id_t *out_id) {
if (_z_id_check(reply->data.replier_id)) {
Expand Down
63 changes: 32 additions & 31 deletions src/net/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
#include "zenoh-pico/utils/logging.h"

_z_reply_data_t _z_reply_data_null(void) {
return (_z_reply_data_t){
.replier_id = {.id = {0}},
.sample = _z_sample_null(),
};
return (_z_reply_data_t){.replier_id = {.id = {0}}, ._result.sample = _z_sample_null(), ._tag = _Z_REPLY_TAG_NONE};
}

_z_reply_t _z_reply_null(void) {
_z_reply_t r = {._tag = _Z_REPLY_TAG_DATA, .data = _z_reply_data_null()};
_z_reply_t r = {.data = _z_reply_data_null()};
return r;
}

_Bool _z_reply_check(const _z_reply_t *reply) { return _z_sample_check(&reply->data.sample); }

#if Z_FEATURE_QUERY == 1
void _z_reply_data_clear(_z_reply_data_t *reply_data) {
_z_sample_clear(&reply_data->sample);
if (reply_data->_tag == _Z_REPLY_TAG_DATA) {
_z_sample_clear(&reply_data->_result.sample);
} else if (reply_data->_tag == _Z_REPLY_TAG_ERROR) {
_z_value_clear(&reply_data->_result.error);
}
reply_data->_tag = _Z_REPLY_TAG_NONE;
reply_data->replier_id = _z_id_empty();
}

Expand All @@ -49,8 +49,13 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) {

int8_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src) {
*dst = _z_reply_data_null();
_Z_RETURN_IF_ERR(_z_sample_copy(&dst->sample, &src->sample));
if (src->_tag == _Z_REPLY_TAG_DATA) {
_Z_RETURN_IF_ERR(_z_sample_copy(&dst->_result.sample, &src->_result.sample));
} else if (src->_tag == _Z_REPLY_TAG_ERROR) {
_Z_RETURN_IF_ERR(_z_value_copy(&dst->_result.error, &src->_result.error));
}
dst->replier_id = src->replier_id;
dst->_tag = src->_tag;
return _Z_RES_OK;
}

Expand All @@ -76,7 +81,6 @@ void _z_reply_free(_z_reply_t **reply) {
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src) {
*dst = _z_reply_null();
_Z_RETURN_IF_ERR(_z_reply_data_copy(&dst->data, &src->data));
dst->_tag = src->_tag;
return _Z_RES_OK;
}

Expand All @@ -92,37 +96,34 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) {
_z_timestamp_clear(&pr->_tstamp);
}

_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_z_reply_t reply = _z_reply_null();
reply._tag = tag;
if (tag == _Z_REPLY_TAG_DATA) {
reply.data.replier_id = id;
// Create reply sample
reply.data.sample.keyexpr = _z_keyexpr_steal(&keyexpr);
reply.data.sample.kind = kind;
reply.data.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data.sample.payload, &payload);
_z_bytes_copy(&reply.data.sample.attachment, &attachment);
_z_encoding_move(&reply.data.sample.encoding, encoding);
}
reply.data._tag = _Z_REPLY_TAG_DATA;
reply.data.replier_id = id;

// Create reply sample
reply.data._result.sample.keyexpr = _z_keyexpr_steal(&keyexpr);
reply.data._result.sample.kind = kind;
reply.data._result.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data._result.sample.payload, &payload);
_z_bytes_copy(&reply.data._result.sample.attachment, &attachment);
_z_encoding_move(&reply.data._result.sample.encoding, encoding);

return reply;
}

_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding) {
_z_reply_t reply = _z_reply_null();
reply._tag = _Z_REPLY_TAG_ERROR;
_z_bytes_copy(&reply.data.error.payload, &payload);
_z_encoding_move(&reply.data.error.encoding, encoding);
reply.data._tag = _Z_REPLY_TAG_ERROR;
_z_bytes_copy(&reply.data._result.error.payload, &payload);
_z_encoding_move(&reply.data._result.error.encoding, encoding);
return reply;
}
#else
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) {
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(tag);
_ZP_UNUSED(id);
_ZP_UNUSED(payload);
_ZP_UNUSED(timestamp);
Expand Down
10 changes: 6 additions & 4 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
}

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, _Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload,
&msg->_commons._timestamp, &msg->_encoding, kind, msg->_attachment);
_z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, msg->_payload, &msg->_commons._timestamp,
&msg->_encoding, kind, msg->_attachment);

_Bool drop = false;
// Verify if this is a newer reply, free the old one in case it is
Expand All @@ -125,7 +125,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
pen_rep = _z_pending_reply_list_head(pen_rps);

// Check if this is the same resource key
if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) {
if (_z_str_eq(pen_rep->_reply.data._result.sample.keyexpr._suffix,
reply.data._result.sample.keyexpr._suffix) == true) {
if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) {
drop = true;
} else {
Expand All @@ -146,7 +147,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
_z_reply_t partial_reply;
(void)memset(&partial_reply, 0,
sizeof(_z_reply_t)); // Avoid warnings on uninitialized values on the reply
partial_reply.data.sample.keyexpr = _z_keyexpr_duplicate(reply.data.sample.keyexpr);
partial_reply.data._tag = _Z_REPLY_TAG_DATA;
partial_reply.data._result.sample.keyexpr = _z_keyexpr_duplicate(reply.data._result.sample.keyexpr);
pen_rep->_reply = partial_reply;
} else {
pen_rep->_reply = reply; // Store the whole reply in the latest mode
Expand Down
Loading