Skip to content

Commit

Permalink
add support for qos settings in sample
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Feb 15, 2024
1 parent 9bb6f8c commit 84b11ad
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 17 deletions.
10 changes: 10 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,16 @@ typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
} zp_send_join_options_t;

/**
* QoS settings of zenoh message.
*
* Members:
* _z_priority_t priority: Priority of the message.
* _z_congestion_control_t congestion_control: Congestion control of the message.
* bool express: If true, the message is not batched during transmission, in order to reduce latency
*/
typedef _z_qos_t z_qos_t;

/**
* Represents a data sample.
*
Expand Down
15 changes: 15 additions & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ _z_keyexpr_t _z_rname(const char *rname);
*/
_z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix);

/**
* QoS settings of zenoh message.
*
* Members:
* _z_priority_t priority: Priority of the message.
* _z_congestion_control_t congestion_control: Congestion control of the message.
* bool express: If true, the message is not batched during transmission, in order to reduce latency
*/
typedef struct {
z_priority_t priority;
z_congestion_control_t congestion_control;
bool express;
} _z_qos_t;

/**
* A zenoh-net data sample.
*
Expand All @@ -227,6 +241,7 @@ typedef struct {
_z_timestamp_t timestamp;
_z_encoding_t encoding;
z_sample_kind_t kind;
_z_qos_t qos;
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ typedef struct {

#define _z_n_qos_make(express, nodrop, priority) \
(_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) }
inline _z_qos_t _z_n_qos_unmake(_z_n_qos_t n_qos) {
return (_z_qos_t){
.priority = n_qos._val & 0b111u,
.congestion_control = (n_qos._val & 0b1000u) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP,
.express = n_qos._val & 0b10000u};
}
_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos);
#define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5)

// RESPONSE FINAL message flags:
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
#if Z_FEATURE_ATTACHMENT == 1
,
,
opt.attachment
#endif
);
Expand Down Expand Up @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
Expand Down
7 changes: 7 additions & 0 deletions src/protocol/definitions/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,10 @@ void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) {
break;
}
}
_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos) {
_z_qos_t qos = _z_n_qos_unmake(n_qos);
if (qos.priority == Z_PRIORITY_REAL_TIME) {
qos.priority = Z_PRIORITY_DEFAULT;
}
return qos;
}
2 changes: 1 addition & 1 deletion src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand Down
8 changes: 4 additions & 4 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = req._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down Expand Up @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = response._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down
13 changes: 8 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,26 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null()
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
,
,
att
#endif
);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down Expand Up @@ -200,6 +201,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
s.encoding = encoding;
s.kind = kind;
s.timestamp = timestamp;
s.qos = _z_n_qos_unmake(qos);
#if Z_FEATURE_ATTACHMENT == 1
s.attachment = att;
#endif
Expand Down Expand Up @@ -256,11 +258,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len) {
_z_zint_t payload_len, _z_n_qos_t qos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(payload_len);
_ZP_UNUSED(qos);
}

#endif // Z_FEATURE_SUBSCRIPTION == 1
2 changes: 1 addition & 1 deletion tests/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output):
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
Expand Down
3 changes: 2 additions & 1 deletion tests/z_test_fragment_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ void data_handler(const z_sample_t *sample, void *ctx) {
break;
}
}
printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid);
printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr),
(int)sample->payload.len, is_valid, sample->qos.priority, sample->qos.congestion_control);
z_drop(z_move(keystr));
}

Expand Down
2 changes: 2 additions & 0 deletions tests/z_test_fragment_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int main(int argc, char **argv) {
// Put data
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
options.priority = Z_PRIORITY_DATA_HIGH;
options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
for (int i = 0; i < 5; i++) {
printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size);
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {
Expand Down

0 comments on commit 84b11ad

Please sign in to comment.