From 896b7dd8b6e2a12e0c71b70046f128139ba0ca4c Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 10 Jan 2024 14:32:34 +0100 Subject: [PATCH 1/3] feat: trigger local subscriptions when performing a pub/put --- include/zenoh-pico/session/subscription.h | 2 ++ src/api/api.c | 7 +++++++ src/net/primitives.c | 1 - src/session/subscription.c | 8 ++++++++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index a05de96d9..ac28f4c41 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -24,6 +24,8 @@ _z_subscription_sptr_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8 const _z_keyexpr_t *keyexpr); _z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, + _z_zint_t payload_len); int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp); void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_sptr_t *sub); diff --git a/src/api/api.c b/src/api/api.c index 9455a6a2e..27f6498a2 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -31,6 +31,7 @@ #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/subscription.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/system/platform.h" #include "zenoh-pico/transport/multicast.h" @@ -612,6 +613,9 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint ret = _z_write(zs._val, keyexpr, (const uint8_t *)payload, payload_len, opt.encoding, Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority); + // Trigger local subscriptions + _z_trigger_local_subscriptions(zs._val, keyexpr, payload, payload_len); + return ret; } @@ -685,6 +689,9 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l ret = _z_write(pub._val->_zn, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, pub._val->_congestion_control, pub._val->_priority); + // Trigger local subscriptions + _z_trigger_local_subscriptions(pub._val->_zn, pub._val->_key, payload, len); + return ret; } diff --git a/src/net/primitives.c b/src/net/primitives.c index 1ad25d86a..a964bcf12 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -203,7 +203,6 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _ &keyexpr, s._id, sub_info.reliability == Z_RELIABILITY_RELIABLE, sub_info.mode == Z_SUBMODE_PULL); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - // ret = _Z_ERR_TRANSPORT_TX_FAILED; _z_unregister_subscription(zn, _Z_RESOURCE_IS_LOCAL, sp_s); _z_subscriber_free(&ret); } diff --git a/src/session/subscription.c b/src/session/subscription.c index 487d927aa..cc9c19a10 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -152,6 +152,14 @@ _z_subscription_sptr_t *_z_register_subscription(_z_session_t *zn, uint8_t is_lo return ret; } +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, + _z_zint_t payload_len) { + _z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)}; + int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT, + _z_timestamp_null()); + (void)ret; +} + int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp) { int8_t ret = _Z_RES_OK; From f365b8c05289c7f2e5b0fdca12a39c533d2864aa Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 10 Jan 2024 14:44:34 +0100 Subject: [PATCH 2/3] feat: add dummy function when feature not present --- src/session/subscription.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/session/subscription.c b/src/session/subscription.c index cc9c19a10..40c07d42b 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -235,4 +235,14 @@ void _z_flush_subscriptions(_z_session_t *zn) { _z_mutex_unlock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 } -#endif \ No newline at end of file +#else // Z_FEATURE_SUBSCRIPTION == 0 + +void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, + _z_zint_t payload_len) { + _ZP_UNUSED(zn); + _ZP_UNUSED(keyexpr); + _ZP_UNUSED(payload); + _ZP_UNUSED(payload_len); +} + +#endif // Z_FEATURE_SUBSCRIPTION == 1 \ No newline at end of file From b4e2a2ea91242d7d74cac5dc1ec52e16eae58e65 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 10 Jan 2024 15:01:43 +0100 Subject: [PATCH 3/3] fix: missing end of file new line --- src/session/subscription.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/session/subscription.c b/src/session/subscription.c index 40c07d42b..ceaefd7c6 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -245,4 +245,4 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr _ZP_UNUSED(payload_len); } -#endif // Z_FEATURE_SUBSCRIPTION == 1 \ No newline at end of file +#endif // Z_FEATURE_SUBSCRIPTION == 1