Skip to content

Commit

Permalink
feat: set reliability as unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Sep 9, 2024
1 parent 7fe43d9 commit d3e5135
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
4 changes: 3 additions & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1610,15 +1610,17 @@ const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample);
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);

/**
* Gets the reliability a sample was received with.
* (unstable) Gets the reliability a sample was received with.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the reliability from.
*
* Return:
* The reliability wrapped as a :c:type:`z_reliability_t`.
*/
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample);
#endif

/**
* Got sample qos congestion control value.
Expand Down
9 changes: 9 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,16 @@ typedef struct {
* publisher.
* z_priority_t priority: The priority of messages issued by this publisher.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_publisher_options_t;

/**
Expand Down Expand Up @@ -283,6 +286,7 @@ typedef struct {
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_moved_bytes_t* attachment: An optional attachment to the publication.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
Expand All @@ -291,7 +295,9 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_moved_bytes_t *attachment;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_put_options_t;

/**
Expand All @@ -302,13 +308,16 @@ typedef struct {
* z_priority_t priority: The priority of this message when router.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_delete_options_t;

/**
Expand Down
68 changes: 43 additions & 25 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,9 @@ z_result_t z_id_to_string(z_id_t *id, z_owned_string_t *str) {

const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; }
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample) { return sample->reliability; }
#endif
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; }
const z_timestamp_t *z_sample_timestamp(const z_loaned_sample_t *sample) {
if (_z_timestamp_check(&sample->timestamp)) {
Expand Down Expand Up @@ -1047,15 +1049,19 @@ void z_put_options_default(z_put_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
options->attachment = NULL;
#if Z_FEATURE_UNSTABLE_API == 1
options->reliability = Z_RELIABILITY_DEFAULT;
#endif
}

void z_delete_options_default(z_delete_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->is_express = false;
options->timestamp = NULL;
options->priority = Z_PRIORITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
options->reliability = Z_RELIABILITY_DEFAULT;
#endif
}

int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_moved_bytes_t *payload,
Expand All @@ -1065,27 +1071,25 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
z_put_options_t opt;
z_put_options_default(&opt);
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.encoding = options->encoding;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
opt.reliability = options->reliability;
opt = *options;
}
z_reliability_t reliability = Z_RELIABILITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
reliability = opt.reliability;
#endif

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this),
opt.encoding == NULL ? NULL : &opt.encoding->_this._val, Z_SAMPLE_KIND_PUT, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this),
opt.reliability);
reliability);

// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this),
opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
_z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), opt.reliability);
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
// Clean-up
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1099,14 +1103,15 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
z_delete_options_t opt;
z_delete_options_default(&opt);
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.reliability = options->reliability;
opt = *options;
}
z_reliability_t reliability = Z_RELIABILITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
reliability = opt.reliability;
#endif

ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null(), opt.reliability);
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null(), reliability);

return ret;
}
Expand All @@ -1116,7 +1121,9 @@ void z_publisher_options_default(z_publisher_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
#if Z_FEATURE_UNSTABLE_API == 1
options->reliability = Z_RELIABILITY_DEFAULT;
#endif
}

int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
Expand All @@ -1141,10 +1148,14 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
if (options != NULL) {
opt = *options;
}
z_reliability_t reliability = Z_RELIABILITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
reliability = opt.reliability;
#endif

// Set publisher
_z_publisher_t int_pub =
_z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.congestion_control,
opt.priority, opt.is_express, opt.reliability);
_z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
opt.congestion_control, opt.priority, opt.is_express, reliability);
// Create write filter
int8_t res = _z_write_filter_create(&int_pub);
if (res != _Z_RES_OK) {
Expand Down Expand Up @@ -1174,10 +1185,13 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload
z_publisher_put_options_t opt;
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
opt = *options;
}
z_reliability_t reliability = Z_RELIABILITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
reliability = pub->reliability;
#endif

_z_encoding_t encoding;
if (opt.encoding == NULL) {
_Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &pub->_encoding));
Expand All @@ -1192,13 +1206,13 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *payload
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(&opt.attachment->_this), pub->reliability);
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), pub->reliability);
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
// Clean-up
_z_encoding_clear(&encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1211,14 +1225,18 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.timestamp = options->timestamp;
opt = *options;
}
z_reliability_t reliability = Z_RELIABILITY_DEFAULT;
#if Z_FEATURE_UNSTABLE_API == 1
reliability = pub->reliability;
#endif
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias_from_user_defined(pub->_key, true);

return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null(),
pub->reliability);
reliability);
}

const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
Expand Down

0 comments on commit d3e5135

Please sign in to comment.