Skip to content

Commit

Permalink
Move encoding value on receive side. (#490)
Browse files Browse the repository at this point in the history
* feat: change functions args to allow encoding move

* feat: add z_value_move

* feat: add string check in encoding_clear

* feat: switch to encoding/value move

* feat: pass pointer to encoding message field

* fix: revert encoding_clear changes
  • Loading branch information
jean-roland authored Jul 3, 2024
1 parent d0272ac commit 65f3953
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 55 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);

typedef struct _z_pending_reply_t {
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int8_t _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ typedef struct {
_z_value_t _z_value_null(void);
_z_value_t _z_value_steal(_z_value_t *value);
int8_t _z_value_copy(_z_value_t *dst, const _z_value_t *src);
void _z_value_move(_z_value_t *dst, _z_value_t *src);
void _z_value_clear(_z_value_t *src);
void _z_value_free(_z_value_t **hello);

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t

int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
const _z_msg_put_t *msg);
_z_msg_put_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
const _z_bytes_t attachment);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
Expand Down
4 changes: 2 additions & 2 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void _z_query_free(_z_query_t **query) {
}

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment) {
_z_query_t q = _z_query_null();
q._request_id = request_id;
Expand All @@ -71,7 +71,7 @@ _z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_
q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
q._key = _z_keyexpr_steal(key);
_z_bytes_copy(&q.attachment, &attachment);
_z_value_copy(&q._value, value); // FIXME: Move encoding, Issue #482
_z_value_move(&q._value, value);
return q;
}

Expand Down
6 changes: 3 additions & 3 deletions src/net/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) {
}

_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_z_reply_t reply = _z_reply_null();
reply._tag = tag;
Expand All @@ -105,13 +105,13 @@ _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id,
reply.data.sample.timestamp = _z_timestamp_duplicate(timestamp);
_z_bytes_copy(&reply.data.sample.payload, &payload);
_z_bytes_copy(&reply.data.sample.attachment, &attachment);
_z_encoding_copy(&reply.data.sample.encoding, &encoding); // FIXME: Move encoding, Issue #482
_z_encoding_move(&reply.data.sample.encoding, encoding);
}
return reply;
}
#else
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment) {
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(tag);
Expand Down
6 changes: 3 additions & 3 deletions src/net/sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) {

#if Z_FEATURE_SUBSCRIPTION == 1
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_z_sample_t s = _z_sample_null();
s.keyexpr = _z_keyexpr_steal(key);
Expand All @@ -92,12 +92,12 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const
s.qos = qos;
_z_bytes_copy(&s.payload, &payload);
_z_bytes_copy(&s.attachment, &attachment);
_z_encoding_copy(&s.encoding, &encoding); // FIXME: Move encoding, Issue #482
_z_encoding_move(&s.encoding, encoding);
return s;
}
#else
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment) {
_ZP_UNUSED(key);
_ZP_UNUSED(payload);
Expand Down
5 changes: 5 additions & 0 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@ int8_t _z_hello_copy(_z_hello_t *dst, const _z_hello_t *src) {
_z_hello_t _z_hello_null(void) {
return (_z_hello_t){.zid = _z_id_empty(), .version = 0, .whatami = 0x0, .locators = _z_string_svec_make(0)};
}

void _z_value_move(_z_value_t *dst, _z_value_t *src) {
_z_encoding_move(&dst->encoding, &src->encoding);
_z_bytes_move(&dst->payload, &src->payload);
}
2 changes: 1 addition & 1 deletion src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body);
void _z_msg_put_clear(_z_msg_put_t *msg) {
_z_bytes_drop(&msg->_payload);
_z_bytes_drop(&msg->_attachment);
_z_encoding_clear(&msg->_encoding); // FIXME: Remove when possible, Issue #482
_z_encoding_clear(&msg->_encoding);
_z_timestamp_clear(&msg->_commons._timestamp);
}

Expand Down
15 changes: 11 additions & 4 deletions src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {

// TODO check body to know where to dispatch
#if Z_FEATURE_SUBSCRIPTION == 1
_z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_null();
_z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : _z_encoding_null();

size_t kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos,
push->_body._body._put._attachment);
if (push->_body._is_put) {
ret =
_z_trigger_subscriptions(zn, push->_key, push->_body._body._put._payload, &push->_body._body._put._encoding,
kind, push->_timestamp, push->_qos, push->_body._body._put._attachment);
} else {
_z_encoding_t encoding = _z_encoding_null();
_z_bytes_t payload = _z_bytes_null();
ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, push->_timestamp, push->_qos,
push->_body._body._put._attachment);
}
#else
_ZP_UNUSED(zn);
_ZP_UNUSED(push);
Expand Down
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
}

int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr,
const _z_msg_put_t *msg) {
_z_msg_put_t *msg) {
int8_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
Expand All @@ -113,7 +113,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons

// Build the reply
_z_reply_t reply = _z_reply_create(expanded_ke, Z_REPLY_TAG_DATA, zn->_local_zid, msg->_payload,
&msg->_commons._timestamp, msg->_encoding, Z_SAMPLE_KIND_PUT, msg->_attachment);
&msg->_commons._timestamp, &msg->_encoding, Z_SAMPLE_KIND_PUT, msg->_attachment);

// Verify if this is a newer reply, free the old one in case it is
if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) ||
Expand Down
2 changes: 1 addition & 1 deletion src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se
return ret;
}

int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid,
int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid,
const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;

Expand Down
62 changes: 31 additions & 31 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
switch (msg->_tag) {
case _Z_N_DECLARE: {
_Z_DEBUG("Handling _Z_N_DECLARE");
_z_n_msg_declare_t decl = msg->_body._declare;
switch (decl._decl._tag) {
_z_n_msg_declare_t *decl = &msg->_body._declare;
switch (decl->_decl._tag) {
case _Z_DECL_KEXPR: {
if (_z_register_resource(zn, decl._decl._body._decl_kexpr._keyexpr,
decl._decl._body._decl_kexpr._id, local_peer_id) == 0) {
if (_z_register_resource(zn, decl->_decl._body._decl_kexpr._keyexpr,
decl->_decl._body._decl_kexpr._id, local_peer_id) == 0) {
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}
} break;
case _Z_UNDECL_KEXPR: {
_z_unregister_resource(zn, decl._decl._body._undecl_kexpr._id, local_peer_id);
_z_unregister_resource(zn, decl->_decl._body._undecl_kexpr._id, local_peer_id);
} break;
case _Z_DECL_SUBSCRIBER: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_DECL_QUERYABLE: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_declares(zn, &decl->_decl);
} break;
case _Z_UNDECL_SUBSCRIBER: {
_z_interest_process_undeclares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_UNDECL_QUERYABLE: {
_z_interest_process_undeclares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl->_decl);
} break;
case _Z_DECL_TOKEN: {
// TODO: add support or explicitly discard
Expand All @@ -72,10 +72,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_DECL_FINAL: {
// Check that interest id is valid
if (!decl.has_interest_id) {
if (!decl->has_interest_id) {
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_z_interest_process_declare_final(zn, decl._interest_id);
_z_interest_process_declare_final(zn, decl->_interest_id);
} break;
}
} break;
Expand All @@ -86,56 +86,56 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
} break;
case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST");
_z_n_msg_request_t req = msg->_body._request;
switch (req._tag) {
_z_n_msg_request_t *req = &msg->_body._request;
switch (req->_tag) {
case _Z_REQUEST_QUERY: {
#if Z_FEATURE_QUERYABLE == 1
_z_msg_query_t *query = &req._body._query;
ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid,
req._body._query._ext_attachment);
_z_msg_query_t *query = &req->_body._query;
ret = _z_trigger_queryables(zn, query, req->_key, (uint32_t)req->_rid,
req->_body._query._ext_attachment);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported");
#endif
} break;
case _Z_REQUEST_PUT: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_put_t put = req._body._put;
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp, req._ext_qos, put._attachment);
_z_msg_put_t put = req->_body._put;
ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp, req->_ext_qos, put._attachment);
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req._rid);
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
case _Z_REQUEST_DEL: {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_msg_del_t del = req._body._del;
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_null(), _z_encoding_null(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos,
_z_bytes_null());
_z_msg_del_t del = req->_body._del;
_z_encoding_t encoding = _z_encoding_null();
ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE,
del._commons._timestamp, req->_ext_qos, _z_bytes_null());
#endif
if (ret == _Z_RES_OK) {
_z_network_message_t final = _z_n_msg_make_response_final(req._rid);
_z_network_message_t final = _z_n_msg_make_response_final(req->_rid);
ret |= _z_send_n_msg(zn, &final, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
}
} break;
}
} break;
case _Z_N_RESPONSE: {
_Z_DEBUG("Handling _Z_N_RESPONSE");
_z_n_msg_response_t response = msg->_body._response;
switch (response._tag) {
_z_n_msg_response_t *response = &msg->_body._response;
switch (response->_tag) {
case _Z_RESPONSE_BODY_REPLY: {
_z_msg_reply_t *reply = &response._body._reply;
ret = _z_trigger_reply_partial(zn, response._request_id, response._key, reply);
_z_msg_reply_t *reply = &response->_body._reply;
ret = _z_trigger_reply_partial(zn, response->_request_id, response->_key, reply);
} break;
case _Z_RESPONSE_BODY_ERR: {
// @TODO: expose zenoh errors to the user
_z_msg_err_t error = response._body._err;
_z_msg_err_t error = response->_body._err;
_z_slice_t payload = _z_bytes_try_get_contiguous(&error._payload);
_ZP_UNUSED(payload); // Unused when logs are deactivated
_Z_ERROR("Received Err for query %zu: message=%.*s", response._request_id, (int)payload.len,
_Z_ERROR("Received Err for query %zu: message=%.*s", response->_request_id, (int)payload.len,
payload.start);
} break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
_z_encoding_t encoding = _z_encoding_null();
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, _z_timestamp_null(), qos,
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, _z_timestamp_null(), qos,
attachment);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;

Expand Down

0 comments on commit 65f3953

Please sign in to comment.