From 84b11ad27019e6c503c936327f70906ae37aea1e Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Thu, 15 Feb 2024 17:52:32 +0100 Subject: [PATCH] add support for qos settings in sample --- include/zenoh-pico/api/types.h | 10 ++++++++++ include/zenoh-pico/protocol/core.h | 15 +++++++++++++++ include/zenoh-pico/protocol/definitions/network.h | 7 +++++++ include/zenoh-pico/session/subscription.h | 5 +++-- src/api/api.c | 7 ++++--- src/protocol/definitions/network.c | 7 +++++++ src/session/push.c | 2 +- src/session/rx.c | 8 ++++---- src/session/subscription.c | 13 ++++++++----- tests/fragment.py | 2 +- tests/z_test_fragment_rx.c | 3 ++- tests/z_test_fragment_tx.c | 2 ++ 12 files changed, 64 insertions(+), 17 deletions(-) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index d7bb29451..81874c5e4 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -413,6 +413,16 @@ typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior } zp_send_join_options_t; +/** + * QoS settings of zenoh message. + * + * Members: + * _z_priority_t priority: Priority of the message. + * _z_congestion_control_t congestion_control: Congestion control of the message. + * bool express: If true, the message is not batched during transmission, in order to reduce latency + */ +typedef _z_qos_t z_qos_t; + /** * Represents a data sample. * diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 2eaa3e495..8307654b6 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -211,6 +211,20 @@ _z_keyexpr_t _z_rname(const char *rname); */ _z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix); +/** + * QoS settings of zenoh message. + * + * Members: + * _z_priority_t priority: Priority of the message. + * _z_congestion_control_t congestion_control: Congestion control of the message. + * bool express: If true, the message is not batched during transmission, in order to reduce latency + */ +typedef struct { + z_priority_t priority; + z_congestion_control_t congestion_control; + bool express; +} _z_qos_t; + /** * A zenoh-net data sample. * @@ -227,6 +241,7 @@ typedef struct { _z_timestamp_t timestamp; _z_encoding_t encoding; z_sample_kind_t kind; + _z_qos_t qos; #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t attachment; #endif diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 5c09247d3..0ff6f7fb9 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -64,6 +64,13 @@ typedef struct { #define _z_n_qos_make(express, nodrop, priority) \ (_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) } +inline _z_qos_t _z_n_qos_unmake(_z_n_qos_t n_qos) { + return (_z_qos_t){ + .priority = n_qos._val & 0b111u, + .congestion_control = (n_qos._val & 0b1000u) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP, + .express = n_qos._val & 0b10000u}; +} +_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos); #define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5) // RESPONSE FINAL message flags: diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index f13ffe05f..77fb05af7 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len + _z_zint_t payload_len, _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att #endif ); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + const _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att diff --git a/src/api/api.c b/src/api/api.c index 4bdcc3421..73350bd23 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint ); // Trigger local subscriptions - _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len + _z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len, + _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority) #if Z_FEATURE_ATTACHMENT == 1 - , + , opt.attachment #endif ); @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l ); // Trigger local subscriptions - _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len + _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT #if Z_FEATURE_ATTACHMENT == 1 , opt.attachment diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index af5192b14..f80b3fb37 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -276,3 +276,10 @@ void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) { break; } } +_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos) { + _z_qos_t qos = _z_n_qos_unmake(n_qos); + if (qos.priority == Z_PRIORITY_REAL_TIME) { + qos.priority = Z_PRIORITY_DEFAULT; + } + return qos; +} diff --git a/src/session/push.c b/src/session/push.c index 55187bae7..0fa85cb4c 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { #if Z_FEATURE_ATTACHMENT == 1 z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment); #endif - ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp + ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos #if Z_FEATURE_ATTACHMENT == 1 , att diff --git a/src/session/rx.c b/src/session/rx.c index 9b27a9fbd..5ba2bc6ed 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint z_attachment_t att = _z_encoded_as_attachment(&put._attachment); #endif ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp + put._commons._timestamp, req._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , att @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = req._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint z_attachment_t att = _z_encoded_as_attachment(&put._attachment); #endif ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT, - put._commons._timestamp + put._commons._timestamp, response._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , att @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_msg_del_t del = response._body._del; #if Z_FEATURE_SUBSCRIPTION == 1 ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(), - Z_SAMPLE_KIND_DELETE, del._commons._timestamp + Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_null() diff --git a/src/session/subscription.c b/src/session/subscription.c index 0cd1050d9..103296562 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -153,7 +153,7 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca } void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len + _z_zint_t payload_len, _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att @@ -161,9 +161,9 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr ) { _z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)}; int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT, - _z_timestamp_null() + _z_timestamp_null(), qos #if Z_FEATURE_ATTACHMENT == 1 - , + , att #endif ); @@ -171,7 +171,8 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr } int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp + const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp, + const _z_n_qos_t qos #if Z_FEATURE_ATTACHMENT == 1 , z_attachment_t att @@ -200,6 +201,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co s.encoding = encoding; s.kind = kind; s.timestamp = timestamp; + s.qos = _z_n_qos_unmake(qos); #if Z_FEATURE_ATTACHMENT == 1 s.attachment = att; #endif @@ -256,11 +258,12 @@ void _z_flush_subscriptions(_z_session_t *zn) { #else // Z_FEATURE_SUBSCRIPTION == 0 void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, - _z_zint_t payload_len) { + _z_zint_t payload_len, _z_n_qos_t qos) { _ZP_UNUSED(zn); _ZP_UNUSED(keyexpr); _ZP_UNUSED(payload); _ZP_UNUSED(payload_len); + _ZP_UNUSED(qos); } #endif // Z_FEATURE_SUBSCRIPTION == 1 diff --git a/tests/fragment.py b/tests/fragment.py index d29295f0f..7b2a6f8d5 100644 --- a/tests/fragment.py +++ b/tests/fragment.py @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output): # Expected rx output & status z_rx_expected_status = 0 z_rx_expected_output = ( - "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1") + "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}") # Check the exit status of tx if tx_status == z_tx_expected_status: diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c index 133497694..4b884edc7 100644 --- a/tests/z_test_fragment_rx.c +++ b/tests/z_test_fragment_rx.c @@ -29,7 +29,8 @@ void data_handler(const z_sample_t *sample, void *ctx) { break; } } - printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid); + printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr), + (int)sample->payload.len, is_valid, sample->qos.priority, sample->qos.congestion_control); z_drop(z_move(keystr)); } diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index b002a34d8..70b24968e 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -60,6 +60,8 @@ int main(int argc, char **argv) { // Put data z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + options.priority = Z_PRIORITY_DATA_HIGH; + options.congestion_control = Z_CONGESTION_CONTROL_BLOCK; for (int i = 0; i < 5; i++) { printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {