Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp to the put/delete options #507

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,14 @@ typedef struct {
* z_owned_encoding_t *encoding: The encoding of the payload.
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when routed.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_put_options_t;

Expand All @@ -271,10 +273,12 @@ typedef struct {
* Members:
* z_congestion_control_t congestion_control: The congestion control to apply when routing this message.
* z_priority_t priority: The priority of this message when router.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
z_timestamp_t *timestamp;
} z_delete_options_t;

/**
Expand All @@ -283,19 +287,24 @@ typedef struct {
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_publisher_put_options_t;

/**
* Represents the configuration used to configure a delete operation by a previously declared publisher,
* sent via :c:func:`z_publisher_delete`.
*
* Members:
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
z_timestamp_t *timestamp;
} z_publisher_delete_options_t;

/**
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* kind: The kind of the value.
* cong_ctrl: The congestion control of this write. Possible values defined
* in :c:type:`_z_congestion_control_t`.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_bytes_t attachment);
const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand Down
27 changes: 19 additions & 8 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,13 @@ void z_put_options_default(z_put_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->encoding = NULL;
options->timestamp = NULL;
options->attachment = NULL;
}

void z_delete_options_default(z_delete_options_t *options) {
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->timestamp = NULL;
options->priority = Z_PRIORITY_DEFAULT;
}

Expand All @@ -909,12 +911,13 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
opt.congestion_control = options->congestion_control;
opt.encoding = options->encoding;
opt.priority = options->priority;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}

ret =
_z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload), _z_encoding_from_owned(opt.encoding),
Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority, _z_bytes_from_owned_bytes(opt.attachment));
ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority,
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));

// Trigger local subscriptions
_z_trigger_local_subscriptions(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_from_owned_bytes(payload),
Expand All @@ -935,9 +938,10 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.timestamp = options->timestamp;
}
ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
opt.congestion_control, opt.priority, _z_bytes_null());
opt.congestion_control, opt.priority, opt.timestamp, _z_bytes_null());

return ret;
}
Expand Down Expand Up @@ -988,9 +992,10 @@ int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { return _z_undeclare_and
void z_publisher_put_options_default(z_publisher_put_options_t *options) {
options->encoding = NULL;
options->attachment = NULL;
options->timestamp = NULL;
}

void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->__dummy = 0; }
void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->timestamp = NULL; }

int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload,
const z_publisher_put_options_t *options) {
Expand All @@ -1000,14 +1005,15 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}
// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, _z_bytes_from_owned_bytes(opt.attachment));
pub->_priority, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub->_zn.in->val, pub->_key, _z_bytes_from_owned_bytes(payload), _Z_N_QOS_DEFAULT,
Expand All @@ -1020,9 +1026,14 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
}

int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_delete_options_t *options) {
(void)(options);
// Build options
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.timestamp = options->timestamp;
}
return _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, _z_bytes_null());
pub->_congestion_control, pub->_priority, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
Expand Down
8 changes: 5 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
const _z_bytes_t attachment) {
const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;
_z_network_message_t msg;
switch (kind) {
Expand All @@ -144,7 +144,8 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._body._is_put = true,
._body._body._put =
{
._commons = {._timestamp = _z_timestamp_null(), ._source_info = _z_source_info_null()},
._commons = {._timestamp = ((timestamp != NULL) ? *timestamp : _z_timestamp_null()),
._source_info = _z_source_info_null()},
._payload = payload,
._encoding = encoding,
._attachment = attachment,
Expand All @@ -161,7 +162,8 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._qos = _z_n_qos_make(0, cong_ctrl == Z_CONGESTION_CONTROL_BLOCK, priority),
._timestamp = _z_timestamp_null(),
._body._is_put = false,
._body._body._del = {._commons = {._timestamp = _z_timestamp_null(),
._body._body._del = {._commons = {._timestamp =
((timestamp != NULL) ? *timestamp : _z_timestamp_null()),
._source_info = _z_source_info_null()}},
},
};
Expand Down
Loading