From 4c480aa33ec85fc9f784128216b07acbaf904ef9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 19 Dec 2024 11:57:06 +0100 Subject: [PATCH] feat: use lru cache for subscription cache --- include/zenoh-pico/net/session.h | 2 +- include/zenoh-pico/session/subscription.h | 7 +- src/collections/lru_cache.c | 1 + src/session/subscription.c | 130 +++++++++------------- src/session/utils.c | 4 +- 5 files changed, 63 insertions(+), 81 deletions(-) diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 5ee1556dc..5b0edb48e 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -60,7 +60,7 @@ typedef struct _z_session_t { _z_subscription_rc_list_t *_subscriptions; _z_subscription_rc_list_t *_liveliness_subscriptions; #if Z_FEATURE_RX_CACHE == 1 - _z_subscription_cache_t _subscription_cache; + _z_subscription_lru_cache_t _subscription_cache; #endif #endif diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index b70871b30..a78d23fd8 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -15,6 +15,7 @@ #ifndef INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H #define INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H +#include "zenoh-pico/collections/lru_cache.h" #include "zenoh-pico/net/encoding.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/session.h" @@ -40,7 +41,9 @@ typedef struct { _z_keyexpr_t ke_out; _z_subscription_infos_svec_t infos; size_t sub_nb; -} _z_subscription_cache_t; +} _z_subscription_cache_data_t; + +int _z_subscription_cache_data_compare(const void *first, const void *second); /*------------------ Subscription ------------------*/ z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, _z_keyexpr_t *keyexpr, _z_bytes_t *payload, @@ -59,7 +62,7 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke #if Z_FEATURE_SUBSCRIPTION == 1 #if Z_FEATURE_RX_CACHE == 1 -void _z_subscription_cache_clear(_z_subscription_cache_t *cache); +_Z_LRU_CACHE_DEFINE(_z_subscription, _z_subscription_cache_data_t, _z_subscription_cache_data_compare) #endif _z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id); diff --git a/src/collections/lru_cache.c b/src/collections/lru_cache.c index 0bbcbf381..81505755e 100644 --- a/src/collections/lru_cache.c +++ b/src/collections/lru_cache.c @@ -297,6 +297,7 @@ void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f comp } z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare) { + assert(cache->capacity > 0); // Create node _z_lru_cache_node_t *node = _z_lru_cache_node_create(value, value_size); if (node == NULL) { diff --git a/src/session/subscription.c b/src/session/subscription.c index 8ec1027aa..d4c7ffe40 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -33,57 +33,19 @@ #define _Z_SUBINFOS_VEC_SIZE 4 // Arbitrary initial size -#if Z_FEATURE_RX_CACHE == 1 -static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val, - _z_subscription_infos_svec_t *infos_val, size_t *sub_nb) { - if (!_z_keyexpr_equals(ke, &zn->_subscription_cache.ke_in)) { - return false; - } - *ke_val = _z_keyexpr_alias(&zn->_subscription_cache.ke_out); - *infos_val = _z_subscription_infos_svec_alias(&zn->_subscription_cache.infos); - *sub_nb = zn->_subscription_cache.sub_nb; - return true; -} - -static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out, - _z_subscription_infos_svec_t *infos) { - // Clear previous data - _z_subscription_cache_clear(&zn->_subscription_cache); - // Register new info - zn->_subscription_cache.ke_in = _z_keyexpr_duplicate(ke_in); - zn->_subscription_cache.ke_out = _z_keyexpr_duplicate(ke_out); - zn->_subscription_cache.infos = _z_subscription_infos_svec_alias(infos); - zn->_subscription_cache.sub_nb = _z_subscription_infos_svec_len(infos); -} - -void _z_subscription_cache_clear(_z_subscription_cache_t *cache) { - _z_subscription_infos_svec_clear(&cache->infos); - _z_keyexpr_clear(&cache->ke_in); - _z_keyexpr_clear(&cache->ke_out); -} - -#else -static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val, - _z_subscription_infos_svec_t *infos_val, size_t *sub_nb) { - _ZP_UNUSED(zn); - _ZP_UNUSED(ke); - _ZP_UNUSED(ke_val); - _ZP_UNUSED(infos_val); - _ZP_UNUSED(sub_nb); - return false; +static inline _z_subscription_cache_data_t _z_subscription_cache_data_null(void) { + _z_subscription_cache_data_t ret = {0}; + return ret; } -static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out, - _z_subscription_infos_svec_t *infos) { - _ZP_UNUSED(zn); - _ZP_UNUSED(ke_in); - _ZP_UNUSED(ke_out); - _ZP_UNUSED(infos); - return; +#if Z_FEATURE_RX_CACHE == 1 +int _z_subscription_cache_data_compare(const void *first, const void *second) { + _z_subscription_cache_data_t *first_data = (_z_subscription_cache_data_t *)first; + _z_subscription_cache_data_t *second_data = (_z_subscription_cache_data_t *)second; + return _z_keyexpr_compare(&first_data->ke_in, &second_data->ke_in); } #endif // Z_FEATURE_RX_CACHE == 1 -// Subscription bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this_) { return this_->_id == other->_id; } @@ -219,28 +181,45 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke Z_RELIABILITY_RELIABLE); } -static z_result_t _z_subscription_get_infos(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_keyexpr_t *keyexpr, - _z_keyexpr_t *key, _z_subscription_infos_svec_t *subs, size_t *sub_nb) { +static z_result_t _z_subscription_get_infos(_z_session_t *zn, _z_subscriber_kind_t kind, + _z_subscription_cache_data_t *infos) { // Check cache - if (!_z_subscription_get_from_cache(zn, keyexpr, key, subs, sub_nb)) { - _Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr->_id, (int)_z_string_len(&keyexpr->_suffix), - _z_string_data(&keyexpr->_suffix), _z_keyexpr_mapping_id(keyexpr)); + _z_subscription_cache_data_t *cache_entry = NULL; +#if Z_FEATURE_RX_CACHE == 1 + cache_entry = _z_subscription_lru_cache_get(&zn->_subscription_cache, infos); +#endif + // Note cache entry + if (cache_entry != NULL) { + infos->ke_out = _z_keyexpr_alias(&cache_entry->ke_out); + infos->infos = _z_subscription_infos_svec_alias(&cache_entry->infos); + infos->sub_nb = cache_entry->sub_nb; + } else { // Construct data and add to cache + _Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", infos->ke_in._id, (int)_z_string_len(&infos->ke_in._suffix), + _z_string_data(&infos->ke_in._suffix), _z_keyexpr_mapping_id(&infos->ke_in)); _z_session_mutex_lock(zn); - *key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, true); + infos->ke_out = __unsafe_z_get_expanded_key_from_key(zn, &infos->ke_in, true); - if (!_z_keyexpr_has_suffix(key)) { + if (!_z_keyexpr_has_suffix(&infos->ke_out)) { _z_session_mutex_unlock(zn); return _Z_ERR_KEYEXPR_UNKNOWN; } // Get subscription list - z_result_t ret = __unsafe_z_get_subscriptions_by_key(zn, kind, key, subs); + z_result_t ret = __unsafe_z_get_subscriptions_by_key(zn, kind, &infos->ke_out, &infos->infos); _z_session_mutex_unlock(zn); if (ret != _Z_RES_OK) { return ret; } - *sub_nb = _z_subscription_infos_svec_len(subs); - // Update cache - _z_subscription_update_cache(zn, keyexpr, key, subs); + infos->sub_nb = _z_subscription_infos_svec_len(&infos->infos); +#if Z_FEATURE_RX_CACHE == 1 + // Update cache, takes ownership of the data + _z_subscription_cache_data_t cache_storage = { + .infos = _z_subscription_infos_svec_alias(&infos->infos), + .ke_in = _z_keyexpr_duplicate(&infos->ke_in), + .ke_out = _z_keyexpr_duplicate(&infos->ke_out), + .sub_nb = infos->sub_nb, + }; + return _z_subscription_lru_cache_insert(&zn->_subscription_cache, &cache_storage); +#endif } return _Z_RES_OK; } @@ -250,35 +229,34 @@ static z_result_t _z_trigger_subscriptions_inner(_z_session_t *zn, _z_subscriber _z_encoding_t *encoding, const _z_zint_t sample_kind, const _z_timestamp_t *timestamp, const _z_n_qos_t qos, _z_bytes_t *attachment, z_reliability_t reliability) { - _z_keyexpr_t key; - _z_subscription_infos_svec_t subs; - size_t sub_nb; // Retrieve sub infos - _Z_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, keyexpr, &key, &subs, &sub_nb)); + _z_subscription_cache_data_t sub_infos = _z_subscription_cache_data_null(); + sub_infos.ke_in = _z_keyexpr_alias(keyexpr); + _Z_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, &sub_infos)); // Check if there are subs - _Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_nb, key._id, (int)_z_string_len(&key._suffix), - _z_string_data(&key._suffix)); - if (sub_nb == 0) { - _z_keyexpr_clear(&key); - _z_subscription_infos_svec_release(&subs); + _Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_infos.sub_nb, sub_infos.ke_out._id, + (int)_z_string_len(&sub_infos.ke_out._suffix), _z_string_data(&sub_infos.ke_out._suffix)); + if (sub_infos.sub_nb == 0) { + _z_keyexpr_clear(&sub_infos.ke_out); +#if Z_FEATURE_RX_CACHE == 0 + _z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache +#endif return _Z_RES_OK; } // Create sample - _z_sample_t sample = _z_sample_alias(&key, payload, timestamp, encoding, sample_kind, qos, attachment, reliability); + _z_sample_t sample = + _z_sample_alias(&sub_infos.ke_out, payload, timestamp, encoding, sample_kind, qos, attachment, reliability); // Parse subscription infos svec - for (size_t i = 0; i < sub_nb; i++) { - _z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&subs, i); + for (size_t i = 0; i < sub_infos.sub_nb; i++) { + _z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&sub_infos.infos, i); sub_info->callback(&sample, sub_info->arg); } // Clean up - _z_keyexpr_clear(&key); - if (sub_kind == _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER) { - _z_subscription_infos_svec_release(&subs); - } else { -#if Z_FEATURE_RX_CACHE != 1 - _z_subscription_infos_svec_release(&subs); // Otherwise it's released with cache + _z_keyexpr_clear(&sub_infos.ke_out); +#if Z_FEATURE_RX_CACHE == 0 + _z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache #endif - } + return _Z_RES_OK; } diff --git a/src/session/utils.c b/src/session/utils.c index 89d92cdd4..c5d1df408 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -63,7 +63,7 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { zn->_subscriptions = NULL; zn->_liveliness_subscriptions = NULL; #if Z_FEATURE_RX_CACHE == 1 - memset(&zn->_subscription_cache, 0, sizeof(zn->_subscription_cache)); + zn->_subscription_cache = _z_subscription_lru_cache_init(Z_RX_CACHE_SIZE); #endif #endif #if Z_FEATURE_QUERYABLE == 1 @@ -126,7 +126,7 @@ void _z_session_clear(_z_session_t *zn) { #if Z_FEATURE_SUBSCRIPTION == 1 _z_flush_subscriptions(zn); #if Z_FEATURE_RX_CACHE == 1 - _z_subscription_cache_clear(&zn->_subscription_cache); + _z_subscription_lru_cache_delete(&zn->_subscription_cache); #endif #endif #if Z_FEATURE_QUERYABLE == 1