Skip to content

Commit

Permalink
feat: query is not a rc and store a session rc
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Nov 10, 2024
1 parent 10a7c2a commit b59d97d
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 54 deletions.
7 changes: 3 additions & 4 deletions examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ void reply_handler(z_loaned_reply_t *reply, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_sample_attachment(sample);
if (attachment == NULL) {
return;
}
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) != 0) {
return;
}
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
7 changes: 3 additions & 4 deletions examples/unix/c11/z_queryable_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ void query_handler(z_loaned_query_t *query, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_query_attachment(query);
if (attachment != NULL) {
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) == 0) {
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ _Z_OWNED_TYPE_VALUE(_z_queryable_t, queryable)
/**
* Represents a Zenoh Query entity, received by Zenoh Queryable entities.
*/
_Z_OWNED_TYPE_RC(_z_query_rc_t, query)
_Z_OWNED_TYPE_VALUE(_z_query_t, query)

/**
* Represents the encoding of a payload, in a MIME-like format.
Expand Down
10 changes: 6 additions & 4 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,22 @@ typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_weak_t _zn; // Can't be an rc because of cross referencing
_z_session_rc_t _zn;
_z_bytes_t _attachment;
_z_string_t _parameters;
bool _anyke;
} _z_query_t;

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_query_t _z_query_null(void) { return (_z_query_t){0}; }
static inline bool _z_query_check(const _z_query_t *query) {
return _z_keyexpr_check(&query->_key) || _z_value_check(&query->_value) || _z_bytes_check(&query->_attachment) ||
_z_string_check(&query->_parameters);
}
void _z_query_clear(_z_query_t *q);
z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src);
void _z_query_free(_z_query_t **query);

_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
* Return type when declaring a queryable.
*/
Expand All @@ -59,7 +61,7 @@ static inline _z_query_t _z_query_alias(_z_value_t *value, _z_keyexpr_t *key, co
bool anyke) {
return (_z_query_t){
._request_id = request_id,
._zn = _z_session_rc_clone_as_weak(zn),
._zn = *zn,
._parameters = _z_string_alias_slice(parameters),
._anyke = anyke,
._key = *key,
Expand Down
6 changes: 4 additions & 2 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static inline _z_reply_t _z_reply_null(void) { return (_z_reply_t){0}; }
static inline _z_reply_t _z_reply_alias(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t *attachment) {
return (_z_reply_t){
_z_reply_t r = {
.data.replier_id = id,
.data._tag = _Z_REPLY_TAG_DATA,
.data._result.sample.keyexpr = *keyexpr,
Expand All @@ -93,13 +93,15 @@ static inline _z_reply_t _z_reply_alias(_z_keyexpr_t *keyexpr, _z_id_t id, const
.data._result.sample.attachment = *attachment,
.data._result.sample.encoding = *encoding,
};
return r;
}
static inline _z_reply_t _z_reply_err_alias(const _z_bytes_t *payload, _z_encoding_t *encoding) {
return (_z_reply_t){
_z_reply_t r = {
.data._tag = _Z_REPLY_TAG_ERROR,
.data._result.error.payload = *payload,
.data._result.error.encoding = *encoding,
};
return r;
}
_z_reply_t _z_reply_move(_z_reply_t *src_reply);
void _z_reply_clear(_z_reply_t *src);
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ typedef struct {

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_value_t _z_value_null(void) { return (_z_value_t){0}; }
static inline bool _z_value_check(const _z_value_t *value) {
return _z_bytes_check(&value->payload) || _z_encoding_check(&value->encoding);
}
_z_value_t _z_value_steal(_z_value_t *value);
z_result_t _z_value_copy(_z_value_t *dst, const _z_value_t *src);
void _z_value_move(_z_value_t *dst, _z_value_t *src);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ typedef struct {
} _z_publication_t;

// Forward type declaration to avoid cyclical include
typedef struct _z_query_rc_t _z_query_rc_t;
typedef struct _z_query_t _z_query_t;

/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_query_handler_t)(_z_query_rc_t *query, void *arg);
typedef void (*_z_query_handler_t)(_z_query_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand Down
40 changes: 13 additions & 27 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,17 +417,16 @@ z_query_consolidation_t z_query_consolidation_none(void) {
z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); }

void z_query_parameters(const z_loaned_query_t *query, z_view_string_t *parameters) {
parameters->_val = _z_string_alias(_Z_RC_IN_VAL(query)->_parameters);
parameters->_val = _z_string_alias(query->_parameters);
}

const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_attachment; }
const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->_attachment; }

const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_key; }
const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->_key; }

const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &_Z_RC_IN_VAL(query)->_value.payload; }
const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) {
return &_Z_RC_IN_VAL(query)->_value.encoding;
}
const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->_value.payload; }

const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->_value.encoding; }

void z_closure_sample_call(const z_loaned_closure_sample_t *closure, z_loaned_sample_t *sample) {
if (closure->call != NULL) {
Expand Down Expand Up @@ -470,9 +469,6 @@ _Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_config_t, config, _z_config_check, _z_config_nu

_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_string_t, string, _z_string_check, _z_string_null, _z_string_copy, _z_string_clear)

bool _z_value_check(const _z_value_t *value) {
return _z_encoding_check(&value->encoding) || _z_bytes_check(&value->payload);
}
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_value_t, reply_err, _z_value_check, _z_value_null, _z_value_copy, _z_value_clear)

_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_keyexpr_t, keyexpr, _z_keyexpr_check, _z_keyexpr_null, _z_keyexpr_copy,
Expand Down Expand Up @@ -1121,7 +1117,7 @@ bool z_reply_replier_id(const z_loaned_reply_t *reply, z_id_t *out_id) {
#endif

#if Z_FEATURE_QUERYABLE == 1
_Z_OWNED_FUNCTIONS_RC_IMPL(query)
_Z_OWNED_FUNCTIONS_VALUE_IMPL(_z_query_t, query, _z_query_check, _z_query_null, _z_query_copy, _z_query_clear)

void _z_queryable_drop(_z_queryable_t *queryable) {
_z_undeclare_queryable(queryable);
Expand Down Expand Up @@ -1191,9 +1187,7 @@ void z_query_reply_options_default(z_query_reply_options_t *options) {

z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload,
const z_query_reply_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
if (_Z_RC_IS_NULL(&query->_zn)) {
return _Z_ERR_SESSION_CLOSED;
}
// Set options
Expand All @@ -1208,12 +1202,11 @@ z_result_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this),
.encoding = _z_encoding_from_owned(&opts.encoding->_this)};

z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
z_bytes_drop(payload);
// Clean-up
_z_session_rc_drop(&sess_rc);
z_encoding_drop(opts.encoding);
z_bytes_drop(opts.attachment);
return ret;
Expand All @@ -1229,9 +1222,7 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options) {

z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
if (_Z_RC_IS_NULL(&query->_zn)) {
return _Z_ERR_SESSION_CLOSED;
}
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
Expand All @@ -1244,11 +1235,10 @@ z_result_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyex

_z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()};

z_result_t ret = _z_send_reply(_Z_RC_IN_VAL(query), &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
z_result_t ret = _z_send_reply(query, &query->_zn, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE,
opts.congestion_control, opts.priority, opts.is_express, opts.timestamp,
_z_bytes_from_owned_bytes(&opts.attachment->_this));
// Clean-up
_z_session_rc_drop(&sess_rc);
z_bytes_drop(opts.attachment);
return ret;
}
Expand All @@ -1257,9 +1247,7 @@ void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { o

z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *payload,
const z_query_reply_err_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&_Z_RC_IN_VAL(query)->_zn);
if (_Z_RC_IS_NULL(&sess_rc)) {
if (_Z_RC_IS_NULL(&query->_zn)) {
return _Z_ERR_SESSION_CLOSED;
}
z_query_reply_err_options_t opts;
Expand All @@ -1271,9 +1259,7 @@ z_result_t z_query_reply_err(const z_loaned_query_t *query, z_moved_bytes_t *pay
// Set value
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&payload->_this),
.encoding = _z_encoding_from_owned(&opts.encoding->_this)};

z_result_t ret = _z_send_reply_err(_Z_RC_IN_VAL(query), &sess_rc, value);
_z_session_rc_drop(&sess_rc);
z_result_t ret = _z_send_reply_err(query, &query->_zn, value);
z_bytes_drop(payload);
// Clean-up
z_encoding_drop(opts.encoding);
Expand Down
9 changes: 3 additions & 6 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ void _z_query_clear_inner(_z_query_t *q) {
_z_value_clear(&q->_value);
_z_bytes_drop(&q->_attachment);
_z_string_clear(&q->_parameters);
_z_session_weak_drop(&q->_zn);
_z_session_rc_drop(&q->_zn);
}

void _z_query_clear(_z_query_t *q) {
// Try to upgrade session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&q->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
if (!_Z_RC_IS_NULL(&q->_zn)) {
// Send REPLY_FINAL message
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id);
if (_z_send_n_msg(_Z_RC_IN_VAL(&q->_zn), &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) !=
_Z_RES_OK) {
_Z_ERROR("Query send REPLY_FINAL transport failure !");
}
_z_msg_clear(&z_msg);
_z_session_rc_drop(&sess_rc);
}
// Clean up memory
_z_query_clear_inner(q);
Expand All @@ -48,7 +45,7 @@ z_result_t _z_query_copy(_z_query_t *dst, const _z_query_t *src) {
_Z_CLEAN_RETURN_IF_ERR(_z_value_copy(&dst->_value, &src->_value), _z_query_clear_inner(dst));
_Z_CLEAN_RETURN_IF_ERR(_z_bytes_copy(&dst->_attachment, &src->_attachment), _z_query_clear_inner(dst));
_Z_CLEAN_RETURN_IF_ERR(_z_string_copy(&dst->_parameters, &src->_parameters), _z_query_clear_inner(dst));
_z_session_weak_copy(&dst->_zn, &src->_zn);
_z_session_rc_copy(&dst->_zn, &src->_zn);
if (_Z_RC_IS_NULL(&dst->_zn)) {
_z_query_clear_inner(dst);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
Expand Down
6 changes: 2 additions & 4 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,15 @@ static z_result_t _z_trigger_queryables_inner(_z_session_rc_t *zsrc, _z_msg_quer
}
}
// Build the z_query
_z_query_t q =
_z_query_t query =
_z_query_alias(&msgq->_ext_value, &key, &msgq->_parameters, zsrc, qid, &msgq->_ext_attachment, anyke);
_z_query_rc_t query = _z_query_rc_new_from_val(&q);
// Parse session_queryable svec
for (size_t i = 0; i < qle_nb; i++) {
_z_queryable_infos_t *qle_info = _z_queryable_infos_svec_get(&qles, i);
qle_info->callback(&query, qle_info->arg);
}
// Clean up
_z_keyexpr_clear(&key);
_z_query_rc_drop(&query);
#if Z_FEATURE_RX_CACHE != 1
_z_queryable_infos_svec_release(&qles); // Otherwise it's released with cache
#endif
Expand All @@ -246,7 +244,7 @@ z_result_t _z_trigger_queryables(_z_session_rc_t *zsrc, _z_msg_query_t *msgq, _z
// Clean up
_z_keyexpr_clear(q_key);
_z_encoding_clear(&msgq->_ext_value.encoding);
_z_bytes_aliased_drop(&msgq->_ext_value.payload);
_z_bytes_drop(&msgq->_ext_value.payload);
_z_bytes_drop(&msgq->_ext_attachment);
_z_slice_clear(&msgq->_parameters);
return ret;
Expand Down

0 comments on commit b59d97d

Please sign in to comment.