diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index e30f37767..eb0ac4fc9 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -220,10 +220,12 @@ typedef struct { * z_congestion_control_t congestion_control: The congestion control to apply when routing messages from this * publisher. * z_priority_t priority: The priority of messages issued by this publisher. + * _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwith. */ typedef struct { z_congestion_control_t congestion_control; z_priority_t priority; + _Bool is_express; } z_publisher_options_t; /** diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 38da173f5..138725f6a 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -88,7 +88,7 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid); * The created :c:type:`_z_publisher_t` (in null state if the declaration failed).. */ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, - z_congestion_control_t congestion_control, z_priority_t priority); + z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express); /** * Undeclare a :c:type:`_z_publisher_t`. diff --git a/include/zenoh-pico/net/publish.h b/include/zenoh-pico/net/publish.h index bf97f41a1..e2ac097b5 100644 --- a/include/zenoh-pico/net/publish.h +++ b/include/zenoh-pico/net/publish.h @@ -28,6 +28,7 @@ typedef struct _z_publisher_t { _z_session_rc_t _zn; z_congestion_control_t _congestion_control; z_priority_t _priority; + _Bool _is_express; #if Z_FEATURE_INTEREST == 1 _z_write_filter_t _filter; #endif diff --git a/src/api/api.c b/src/api/api.c index 7f8ca30a9..c8f027079 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -971,6 +971,7 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, void z_publisher_options_default(z_publisher_options_t *options) { options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT; options->priority = Z_PRIORITY_DEFAULT; + options->is_express = false; } int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, @@ -995,9 +996,10 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z if (options != NULL) { opt.congestion_control = options->congestion_control; opt.priority = options->priority; + opt.is_express = options->is_express; } // Set publisher - _z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.congestion_control, opt.priority); + _z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.congestion_control, opt.priority, opt.is_express); // Create write filter int8_t res = _z_write_filter_create(&int_pub); if (res != _Z_RES_OK) { @@ -1035,6 +1037,8 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload opt.is_express = options->is_express; opt.timestamp = options->timestamp; opt.attachment = options->attachment; + } else { + opt.is_express = pub->_is_express; } // Check if write filter is active before writing if (!_z_write_filter_active(pub)) { @@ -1060,6 +1064,8 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del if (options != NULL) { opt.is_express = options->is_express; opt.timestamp = options->timestamp; + } else { + opt.is_express = pub->_is_express; } return _z_write(&pub->_zn.in->val, pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE, pub->_congestion_control, pub->_priority, opt.is_express, opt.timestamp, _z_bytes_null()); diff --git a/src/net/primitives.c b/src/net/primitives.c index 570866e75..a6b41c676 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -103,7 +103,8 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { #if Z_FEATURE_PUBLICATION == 1 /*------------------ Publisher Declaration ------------------*/ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, - z_congestion_control_t congestion_control, z_priority_t priority) { + z_congestion_control_t congestion_control, z_priority_t priority, + _Bool is_express) { // Allocate publisher _z_publisher_t ret; // Fill publisher @@ -111,6 +112,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye ret._id = _z_get_entity_id(&zn->in->val); ret._congestion_control = congestion_control; ret._priority = priority; + ret._is_express = is_express; ret._zn = _z_session_rc_clone(zn); return ret; }