diff --git a/examples/unix/c11/z_put.c b/examples/unix/c11/z_put.c index 089615618..294e03ef5 100644 --- a/examples/unix/c11/z_put.c +++ b/examples/unix/c11/z_put.c @@ -14,11 +14,17 @@ #include #include +#include #include #include #include #include +#include "zenoh-pico/api/macros.h" +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/protocol/core.h" + #if Z_FEATURE_PUBLICATION == 1 int main(int argc, char **argv) { const char *keyexpr = "demo/example/zenoh-pico-put"; @@ -89,16 +95,19 @@ int main(int argc, char **argv) { printf("Putting Data ('%s': '%s')...\n", keyexpr, value); z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + z_owned_bytes_map_t map = z_bytes_map_new(); + z_bytes_map_insert_by_alias(&map, _z_bytes_wrap((uint8_t *)"hi", 2), _z_bytes_wrap((uint8_t *)"there", 5)); + options.attachment = z_bytes_map_as_attachment(&map); if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options) < 0) { printf("Oh no! Put has failed...\n"); } + z_bytes_map_drop(&map); // z_undeclare_keyexpr(z_loan(s), z_move(ke)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); zp_stop_lease_task(z_loan(s)); - z_close(z_move(s)); return 0; } diff --git a/examples/unix/c11/z_sub.c b/examples/unix/c11/z_sub.c index af42950c3..8f263733a 100644 --- a/examples/unix/c11/z_sub.c +++ b/examples/unix/c11/z_sub.c @@ -14,17 +14,31 @@ #include #include +#include #include #include #include #include +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/protocol/core.h" + #if Z_FEATURE_SUBSCRIPTION == 1 +int8_t attachment_handler(z_bytes_t key, z_bytes_t value, void *ctx) { + (void)ctx; + printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)value.len, value.start); + return 0; +} + void data_handler(const z_sample_t *sample, void *ctx) { (void)(ctx); z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, sample->payload.start); + if (z_attachment_check(&sample->attachment)) { + printf("Attachement found\n"); + z_attachment_iterate(sample->attachment, attachment_handler, NULL); + } z_drop(z_move(keystr)); } diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 9b51f29e2..771312765 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -274,9 +274,11 @@ typedef struct { * * Members: * z_encoding_t encoding: The encoding of the payload. + * z_attachment_t attachment: an attachment to the response. */ typedef struct { z_encoding_t encoding; + z_attachment_t attachment; } z_query_reply_options_t; /** @@ -559,8 +561,8 @@ struct _z_bytes_pair_t { void _z_bytes_pair_clear(struct _z_bytes_pair_t *this); -_Z_ELEM_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t, _z_noop_size, _z_bytes_pair_clear, _z_noop_copy); -_Z_LIST_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t); +_Z_ELEM_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t, _z_noop_size, _z_bytes_pair_clear, _z_noop_copy) +_Z_LIST_DEFINE(_z_bytes_pair, struct _z_bytes_pair_t) /** * A map of maybe-owned vector of bytes to maybe-owned vector of bytes. diff --git a/include/zenoh-pico/collections/bytes.h b/include/zenoh-pico/collections/bytes.h index 0002bc257..b7dff1edc 100644 --- a/include/zenoh-pico/collections/bytes.h +++ b/include/zenoh-pico/collections/bytes.h @@ -18,6 +18,7 @@ #include #include #include +#include /*-------- Bytes --------*/ /** diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 3711d8ceb..0a253cc72 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -119,7 +119,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub); */ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len, const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, - z_priority_t priority); + z_priority_t priority, z_attachment_t attachment); #endif #if Z_FEATURE_SUBSCRIPTION == 1 @@ -228,7 +228,7 @@ int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _ */ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, const z_consolidation_mode_t consolidation, const _z_value_t value, _z_reply_handler_t callback, - void *arg_call, _z_drop_handler_t dropper, void *arg_drop); + void *arg_call, _z_drop_handler_t dropper, void *arg_drop, z_attachment_t attachment); #endif #endif /* ZENOH_PICO_PRIMITIVES_NETAPI_H */ diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 9ba46b566..25035372e 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -25,6 +25,7 @@ #include "zenoh-pico/collections/element.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/system/platform.h" #define _Z_OPTIONAL @@ -98,6 +99,7 @@ typedef struct z_attachment_vtable_t { */ z_attachment_iter_driver_t iteration_driver; } z_attachment_vtable_t; + /** * A v-table based map of byte slice to byte slice. * @@ -109,13 +111,25 @@ typedef struct z_attachment_t { z_attachment_iter_driver_t iteration_driver; } z_attachment_t; -inline z_attachment_t z_attachment_null() { return (z_attachment_t){.data = NULL, .iteration_driver = NULL}; } -inline _Bool z_attachment_check(const z_attachment_t *attachment) { return attachment->iteration_driver != NULL; } -inline int8_t z_attachment_iterate(z_attachment_t this, z_attachment_iter_body_t body, void *ctx) { - return this.iteration_driver(this.data, body, ctx); -} +z_attachment_t z_attachment_null(void); +_Bool z_attachment_check(const z_attachment_t *attachment); +int8_t z_attachment_iterate(z_attachment_t this, z_attachment_iter_body_t body, void *ctx); _z_bytes_t z_attachment_get(z_attachment_t this, _z_bytes_t key); +typedef struct { + union { + z_attachment_t decoded; + _z_bytes_t encoded; + } body; + _Bool is_encoded; +} _z_owned_encoded_attachment_t; +/** + * Estimate the length of an attachment once encoded. + */ +size_t _z_attachment_estimate_length(z_attachment_t att); +z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att); +void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att); + _z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp); _z_timestamp_t _z_timestamp_null(void); void _z_timestamp_clear(_z_timestamp_t *tstamp); diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index 703ed6172..819d9f293 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -67,6 +67,7 @@ typedef struct { _z_value_t _value; _z_source_info_t _ext_source_info; z_consolidation_mode_t _ext_consolidation; + _z_owned_encoded_attachment_t _ext_attachment; } _z_msg_reply_t; void _z_msg_reply_clear(_z_msg_reply_t *msg); #define _Z_FLAG_Z_R_T 0x20 @@ -149,6 +150,7 @@ typedef struct { _z_m_push_commons_t _commons; _z_bytes_t _payload; _z_encoding_t _encoding; + _z_owned_encoded_attachment_t _attachment; } _z_msg_put_t; void _z_msg_put_clear(_z_msg_put_t *); #define _Z_M_PUT_ID 0x01 @@ -172,11 +174,13 @@ typedef struct { _z_source_info_t _ext_info; _z_value_t _ext_value; z_consolidation_mode_t _ext_consolidation; + _z_owned_encoded_attachment_t _ext_attachment; } _z_msg_query_t; typedef struct { _Bool info; _Bool body; _Bool consolidation; + _Bool attachment; } _z_msg_query_reqexts_t; _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg); void _z_msg_query_clear(_z_msg_query_t *msg); diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 69226b6e0..14661adc0 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -230,7 +230,8 @@ _Z_VEC_DEFINE(_z_network_message, _z_network_message_t) void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping); _z_network_message_t _z_msg_make_pull(_z_keyexpr_t key, _z_zint_t pull_id); _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes_t) parameters, _z_zint_t qid, - z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value); + z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value, + z_attachment_t attachment); _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value); _z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key); _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid); diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index a05de96d9..e2b3400cb 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -25,7 +25,8 @@ _z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8 _z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp); + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + z_attachment_t att); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_sptr_t *sub); void _z_flush_subscriptions(_z_session_t *zn); diff --git a/src/api/api.c b/src/api/api.c index ffb73e271..9cec36f90 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -610,7 +610,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint opt.priority = options->priority; } ret = _z_write(zs._val, keyexpr, (const uint8_t *)payload, payload_len, opt.encoding, Z_SAMPLE_KIND_PUT, - opt.congestion_control, opt.priority); + opt.congestion_control, opt.priority, options->attachment); return ret; } @@ -624,7 +624,7 @@ int8_t z_delete(z_session_t zs, z_keyexpr_t keyexpr, const z_delete_options_t *o opt.priority = options->priority; } ret = _z_write(zs._val, keyexpr, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, opt.congestion_control, - opt.priority); + opt.priority, z_attachment_null()); return ret; } @@ -683,7 +683,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l } ret = _z_write(pub._val->_zn, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, - pub._val->_congestion_control, pub._val->_priority); + pub._val->_congestion_control, pub._val->_priority, options->attachment); return ret; } @@ -691,7 +691,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options) { (void)(options); return _z_write(pub._val->_zn, pub._val->_key, NULL, 0, z_encoding_default(), Z_SAMPLE_KIND_DELETE, - pub._val->_congestion_control, pub._val->_priority); + pub._val->_congestion_control, pub._val->_priority, z_attachment_null()); } z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) { @@ -709,7 +709,8 @@ OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _ z_get_options_t z_get_options_default(void) { return (z_get_options_t){.target = z_query_target_default(), .consolidation = z_query_consolidation_default(), - .value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()}}; + .value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()}, + .attachment = z_attachment_null()}; } typedef struct __z_reply_handler_wrapper_t { @@ -757,7 +758,7 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne } ret = _z_query(zs._val, keyexpr, parameters, opt.target, opt.consolidation.mode, opt.value, __z_reply_handler, - wrapped_ctx, callback->drop, ctx); + wrapped_ctx, callback->drop, ctx, opt.attachment); return ret; } @@ -1158,7 +1159,7 @@ void z_bytes_map_insert_by_alias(const z_owned_bytes_map_t *this_, z_bytes_t key memset(insert, 0, sizeof(struct _z_bytes_pair_t)); insert->key = _z_bytes_wrap(key.start, key.len); insert->value = _z_bytes_wrap(value.start, value.len); - _z_bytes_pair_list_push(this_->_inner, insert); + ((z_owned_bytes_map_t *)this_)->_inner = _z_bytes_pair_list_push(this_->_inner, insert); } } void z_bytes_map_insert_by_copy(const z_owned_bytes_map_t *this_, z_bytes_t key, z_bytes_t value) { @@ -1182,7 +1183,7 @@ void z_bytes_map_insert_by_copy(const z_owned_bytes_map_t *this_, z_bytes_t key, memset(insert, 0, sizeof(struct _z_bytes_pair_t)); _z_bytes_copy(&insert->key, &key); _z_bytes_copy(&insert->value, &value); - _z_bytes_pair_list_push(this_->_inner, insert); + ((z_owned_bytes_map_t *)this_)->_inner = _z_bytes_pair_list_push(this_->_inner, insert); } } int8_t z_bytes_map_iter(const z_owned_bytes_map_t *this_, z_attachment_iter_body_t body, void *ctx) { diff --git a/src/collections/bytes.c b/src/collections/bytes.c index d2a8939b0..40e67bd36 100644 --- a/src/collections/bytes.c +++ b/src/collections/bytes.c @@ -105,3 +105,6 @@ _z_bytes_t _z_bytes_steal(_z_bytes_t *b) { *b = _z_bytes_empty(); return ret; } +_Bool _z_bytes_eq(const _z_bytes_t *left, const _z_bytes_t *right) { + return left->len == right->len && memcmp(left->start, right->start, left->len) == 0; +} diff --git a/src/net/primitives.c b/src/net/primitives.c index 7f0123695..de6f20d41 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -128,7 +128,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) { /*------------------ Write ------------------*/ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len, const _z_encoding_t encoding, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, - z_priority_t priority) { + z_priority_t priority, z_attachment_t attachment) { int8_t ret = _Z_RES_OK; _z_network_message_t msg; switch (kind) { @@ -146,6 +146,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay ._commons = {._timestamp = _z_timestamp_null(), ._source_info = _z_source_info_null()}, ._payload = _z_bytes_wrap(payload, len), ._encoding = encoding, + ._attachment = {.is_encoded = false, .body.decoded = attachment}, }, }, }; @@ -383,7 +384,7 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu /*------------------ Query ------------------*/ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback, - void *arg_call, _z_drop_handler_t dropper, void *arg_drop) { + void *arg_call, _z_drop_handler_t dropper, void *arg_drop, z_attachment_t attachment) { int8_t ret = _Z_RES_OK; // Create the pending query object @@ -404,7 +405,8 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session if (ret == _Z_RES_OK) { _z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters)); - _z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, ¶ms, pq->_id, pq->_consolidation, &value); + _z_zenoh_message_t z_msg = + _z_msg_make_query(&keyexpr, ¶ms, pq->_id, pq->_consolidation, &value, attachment); if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { _z_unregister_pending_query(zn, pq); diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index ad4279fe8..166e44822 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -242,18 +242,32 @@ int8_t _z_source_info_encode_ext(_z_wbuf_t *wbf, const _z_source_info_t *info) { return ret; } +int8_t _z_attachment_encode_ext_kv(_z_bytes_t key, _z_bytes_t value, void *ctx) { + _z_wbuf_t *wbf = (_z_wbuf_t *)ctx; + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &key)); + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &value)); + return 0; +} +int8_t _z_attachment_encode_ext(_z_wbuf_t *wbf, z_attachment_t att) { + size_t len = _z_attachment_estimate_length(att); + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, len)); + _Z_RETURN_IF_ERR(z_attachment_iterate(att, _z_attachment_encode_ext_kv, wbf)); + return 0; +} + /*------------------ Push Body Field ------------------*/ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) { (void)(wbf); (void)(pshb); - int8_t ret = _Z_RES_OK; uint8_t header = pshb->_is_put ? _Z_MID_Z_PUT : _Z_MID_Z_DEL; _Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id) || pshb->_body._put._commons._source_info._source_sn != 0 || pshb->_body._put._commons._source_info._entity_id != 0; + z_attachment_t att = _z_encoded_as_attachment(&pshb->_body._put._attachment); + _Bool has_attachment = pshb->_is_put && z_attachment_check(&att); _Bool has_timestamp = _z_timestamp_check(&pshb->_body._put._commons._timestamp); _Bool has_encoding = false; - if (has_source_info) { + if (has_source_info || has_attachment) { header |= _Z_FLAG_Z_Z; } if (pshb->_is_put) { @@ -270,26 +284,31 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) { header |= _Z_FLAG_Z_D_T; } } - ret = _z_uint8_encode(wbf, header); - if ((ret == _Z_RES_OK) && has_timestamp) { - ret = _z_timestamp_encode(wbf, &pshb->_body._put._commons._timestamp); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + if (has_timestamp) { + _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &pshb->_body._put._commons._timestamp)); + } + + if (has_encoding) { + _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, pshb->_body._put._encoding.prefix)); + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &pshb->_body._put._encoding.suffix)); } - if ((ret == _Z_RES_OK) && has_encoding) { - ret = _z_encoding_prefix_encode(wbf, pshb->_body._put._encoding.prefix); - ret |= _z_bytes_encode(wbf, &pshb->_body._put._encoding.suffix); + if (has_source_info) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01 | (has_attachment ? _Z_FLAG_Z_Z : 0))); + _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &pshb->_body._put._commons._source_info)); } - if ((ret == _Z_RES_OK) && has_source_info) { - ret = _z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01); - ret |= _z_source_info_encode_ext(wbf, &pshb->_body._put._commons._source_info); + if (has_attachment) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x03)); + _Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att)); } - if ((ret == _Z_RES_OK) && pshb->_is_put) { - ret = _z_bytes_encode(wbf, &pshb->_body._put._payload); + if (pshb->_is_put) { + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &pshb->_body._put._payload)); } - return ret; + return 0; } int8_t _z_push_body_decode_extensions(_z_msg_ext_t *extension, void *ctx) { _z_push_body_t *pshb = (_z_push_body_t *)ctx; @@ -300,6 +319,13 @@ int8_t _z_push_body_decode_extensions(_z_msg_ext_t *extension, void *ctx) { ret = _z_source_info_decode(&pshb->_body._put._commons._source_info, &zbf); break; } + case _Z_MSG_EXT_ENC_ZBUF | 0x03: { + pshb->_body._put._attachment.is_encoded = true; + pshb->_body._put._attachment.body.encoded = extension->_body._zbuf._val._is_alloc + ? _z_bytes_steal(&extension->_body._zbuf._val) + : _z_bytes_duplicate(&extension->_body._zbuf._val); + break; + } default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { ret = _z_msg_ext_unknown_error(extension, 0x08); @@ -384,7 +410,7 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { header |= _Z_FLAG_Z_P; } _z_msg_query_reqexts_t required_exts = _z_msg_query_required_extensions(msg); - if (required_exts.body || required_exts.consolidation || required_exts.info) { + if (required_exts.body || required_exts.consolidation || required_exts.info || required_exts.attachment) { header |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); @@ -395,7 +421,7 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { if (required_exts.body) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x03; - if (required_exts.consolidation || required_exts.info) { + if (required_exts.consolidation || required_exts.info || required_exts.attachment) { extheader |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); @@ -408,7 +434,7 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { } if (required_exts.consolidation) { uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02; - if (required_exts.info) { + if (required_exts.info || required_exts.attachment) { extheader |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); @@ -416,9 +442,17 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { } if (required_exts.info) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; + if (required_exts.attachment) { + extheader |= _Z_FLAG_Z_Z; + } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &msg->_ext_info)); } + if (required_exts.attachment) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x05)); + z_attachment_t att = _z_encoded_as_attachment(&msg->_ext_attachment); + _Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att)); + } return ret; } @@ -444,6 +478,13 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) { _z_bytes_copy(&msg->_ext_value.payload, &bytes); break; } + case _Z_MSG_EXT_ENC_ZBUF | 0x05: { + msg->_ext_attachment.is_encoded = true; + msg->_ext_attachment.body.encoded = extension->_body._zbuf._val._is_alloc + ? _z_bytes_steal(&extension->_body._zbuf._val) + : _z_bytes_duplicate(&extension->_body._zbuf._val); + break; + } default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { ret = _z_msg_ext_unknown_error(extension, 0x09); @@ -479,9 +520,12 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { !_z_bytes_is_empty(&reply->_value.encoding.suffix)) { header |= _Z_FLAG_Z_R_E; } + z_attachment_t att = _z_encoded_as_attachment(&reply->_ext_attachment); + _Bool has_attachment = z_attachment_check(&att); _Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 || reply->_ext_source_info._entity_id != 0; - if (reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO || has_sourceinfo) { + _Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO; + if (has_consolidation_ext || has_sourceinfo || has_attachment) { header |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); @@ -495,19 +539,26 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, reply->_value.encoding.prefix)); _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix)); } - _Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO; if (has_sourceinfo) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; - if (has_consolidation_ext) { + if (has_consolidation_ext || has_attachment) { extheader |= _Z_MSG_EXT_FLAG_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &reply->_ext_source_info)); } if (has_consolidation_ext) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02)); + uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02; + if (has_attachment) { + extheader |= _Z_MSG_EXT_FLAG_Z; + } + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, reply->_ext_consolidation)); } + if (has_attachment) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x04)); + _Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att)); + } _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.payload)); return ret; } @@ -524,6 +575,13 @@ int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) { reply->_ext_consolidation = extension->_body._zint._val; break; } + case _Z_MSG_EXT_ENC_ZBUF | 0x04: { + reply->_ext_attachment.is_encoded = true; + reply->_ext_attachment.body.encoded = extension->_body._zbuf._val._is_alloc + ? _z_bytes_steal(&extension->_body._zbuf._val) + : _z_bytes_duplicate(&extension->_body._zbuf._val); + break; + } default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { ret = _z_msg_ext_unknown_error(extension, 0x0a); diff --git a/src/protocol/core.c b/src/protocol/core.c index b5d8405f8..ac4f47f8a 100644 --- a/src/protocol/core.c +++ b/src/protocol/core.c @@ -18,7 +18,11 @@ #include #include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/api/types.h" #include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/iobuf.h" +#include "zenoh-pico/utils/logging.h" uint8_t _z_id_len(_z_id_t id) { uint8_t len = 16; @@ -86,3 +90,46 @@ _z_bytes_t z_attachment_get(z_attachment_t this, _z_bytes_t key) { z_attachment_iterate(this, _z_attachment_get_seeker, (void *)&seeker); return seeker.value; } +int8_t _z_attachment_estimate_length_body(_z_bytes_t key, _z_bytes_t value, void *ctx) { + size_t *len = (size_t *)ctx; + *len += _z_zint_len(key.len) + key.len + _z_zint_len(value.len) + value.len; + return 0; +} +size_t _z_attachment_estimate_length(z_attachment_t att) { + size_t len = 0; + z_attachment_iterate(att, _z_attachment_estimate_length_body, &len); + return len; +} + +int8_t _z_encoded_attachment_iteration_driver(const void *this, z_attachment_iter_body_t body, void *ctx) { + _z_zbuf_t data = _z_zbytes_as_zbuf(*(_z_bytes_t *)this); + while (_z_zbuf_can_read(&data)) { + _z_bytes_t key = _z_bytes_empty(); + _z_bytes_t value = _z_bytes_empty(); + _z_bytes_decode(&key, &data); + _z_bytes_decode(&value, &data); + int8_t ret = body(key, value, ctx); + if (ret != 0) { + return ret; + } + } + return 0; +} + +z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att) { + if (att->is_encoded) { + return (z_attachment_t){.data = &att->body.encoded, .iteration_driver = _z_encoded_attachment_iteration_driver}; + } else { + return att->body.decoded; + } +} +void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att) { + if (att->is_encoded) { + _z_bytes_clear(&att->body.encoded); + } +} +inline _Bool z_attachment_check(const z_attachment_t *attachment) { return attachment->iteration_driver != NULL; } +inline int8_t z_attachment_iterate(z_attachment_t this, z_attachment_iter_body_t body, void *ctx) { + return this.iteration_driver(this.data, body, ctx); +} +z_attachment_t z_attachment_null(void) { return (z_attachment_t){.data = NULL, .iteration_driver = NULL}; } diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index e3865635f..10bd7b3ee 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -15,6 +15,7 @@ #include "zenoh-pico/protocol/definitions/message.h" #include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/protocol/core.h" void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); } @@ -25,11 +26,13 @@ void _z_msg_put_clear(_z_msg_put_t *msg) { } _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg) { + z_attachment_t att = _z_encoded_as_attachment(&msg->_ext_attachment); return (_z_msg_query_reqexts_t){ .body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 || !_z_bytes_is_empty(&msg->_ext_value.encoding.suffix), .info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0, - .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO}; + .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO, + .attachment = z_attachment_check(&att)}; } void _z_msg_query_clear(_z_msg_query_t *msg) { diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index 56855657c..f6a41e378 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -168,7 +168,8 @@ _z_network_message_t _z_msg_make_pull(_z_keyexpr_t key, _z_zint_t pull_id) { return ret; } _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes_t) parameters, _z_zint_t qid, - z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value) { + z_consolidation_mode_t consolidation, _Z_MOVE(_z_value_t) value, + z_attachment_t attachment) { return (_z_zenoh_message_t){ ._tag = _Z_N_REQUEST, ._body._request = @@ -176,13 +177,11 @@ _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes ._rid = qid, ._key = _z_keyexpr_steal(key), ._tag = _Z_REQUEST_QUERY, - ._body._query = - { - ._parameters = _z_bytes_steal(parameters), - ._ext_consolidation = consolidation, - ._ext_value = _z_value_steal(value), - ._ext_info = _z_source_info_null(), - }, + ._body._query = {._parameters = _z_bytes_steal(parameters), + ._ext_consolidation = consolidation, + ._ext_value = _z_value_steal(value), + ._ext_info = _z_source_info_null(), + ._ext_attachment = attachment}, ._ext_budget = 0, ._ext_qos = _Z_N_QOS_DEFAULT, ._ext_target = Z_QUERY_TARGET_BEST_MATCHING, diff --git a/src/session/push.c b/src/session/push.c index 0028d4a83..e32e210c3 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -18,6 +18,7 @@ #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/utils/logging.h" @@ -29,7 +30,8 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { _z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : z_encoding_default(); int kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; #if Z_FEATURE_SUBSCRIPTION == 1 - ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp); + z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment); + ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, att); #endif return ret; } \ No newline at end of file diff --git a/src/session/reply.c b/src/session/reply.c index 0ca562936..ae361a358 100644 --- a/src/session/reply.c +++ b/src/session/reply.c @@ -16,6 +16,7 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/config.h" +#include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/utils/logging.h" diff --git a/src/session/rx.c b/src/session/rx.c index 28d225827..257c75a87 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -100,8 +100,9 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_REQUEST_PUT: { _z_msg_put_t put = req._body._put; #if Z_FEATURE_SUBSCRIPTION == 1 + z_attachment_t att = _z_encoded_as_attachment(&put._attachment); ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp); + put._commons._timestamp, att); #endif if (ret == _Z_RES_OK) { _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); @@ -114,7 +115,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = req._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp); + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, z_attachment_null()); #endif if (ret == _Z_RES_OK) { _z_network_message_t ack = _z_n_msg_make_ack(req._rid, &req._key); @@ -148,16 +149,17 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_RESPONSE_BODY_PUT: { _z_msg_put_t put = response._body._put; + z_attachment_t att = _z_encoded_as_attachment(&put._attachment); #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp); + put._commons._timestamp, att); #endif } break; case _Z_RESPONSE_BODY_DEL: { _z_msg_del_t del = response._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp); + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, z_attachment_null()); #endif } break; } diff --git a/src/session/subscription.c b/src/session/subscription.c index fbb52e6b0..4af37a619 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -153,7 +153,8 @@ _z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_lo } int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp) { + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + z_attachment_t att) { int8_t ret = _Z_RES_OK; #if Z_FEATURE_MULTI_THREAD == 1 @@ -177,6 +178,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co s.encoding = encoding; s.kind = kind; s.timestamp = timestamp; + s.attachment = att; _z_subscription_sptr_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs\n", (uintmax_t)_z_subscription_sptr_list_len(xs)); while (xs != NULL) { diff --git a/zenohpico.pc b/zenohpico.pc index ec329193d..16c7f4e7c 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20231213dev +Version: 0.11.20240115dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico