Skip to content

Commit

Permalink
feat: use lru cache for subscription cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Dec 19, 2024
1 parent 4a3da58 commit 4c480aa
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 81 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ typedef struct _z_session_t {
_z_subscription_rc_list_t *_subscriptions;
_z_subscription_rc_list_t *_liveliness_subscriptions;
#if Z_FEATURE_RX_CACHE == 1
_z_subscription_cache_t _subscription_cache;
_z_subscription_lru_cache_t _subscription_cache;
#endif
#endif

Expand Down
7 changes: 5 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H
#define INCLUDE_ZENOH_PICO_SESSION_SUBSCRIPTION_H

#include "zenoh-pico/collections/lru_cache.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/session.h"
Expand All @@ -40,7 +41,9 @@ typedef struct {
_z_keyexpr_t ke_out;
_z_subscription_infos_svec_t infos;
size_t sub_nb;
} _z_subscription_cache_t;
} _z_subscription_cache_data_t;

int _z_subscription_cache_data_compare(const void *first, const void *second);

/*------------------ Subscription ------------------*/
z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, _z_keyexpr_t *keyexpr, _z_bytes_t *payload,
Expand All @@ -59,7 +62,7 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke
#if Z_FEATURE_SUBSCRIPTION == 1

#if Z_FEATURE_RX_CACHE == 1
void _z_subscription_cache_clear(_z_subscription_cache_t *cache);
_Z_LRU_CACHE_DEFINE(_z_subscription, _z_subscription_cache_data_t, _z_subscription_cache_data_compare)
#endif

_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id);
Expand Down
1 change: 1 addition & 0 deletions src/collections/lru_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f comp
}

z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare) {
assert(cache->capacity > 0);
// Create node
_z_lru_cache_node_t *node = _z_lru_cache_node_create(value, value_size);
if (node == NULL) {
Expand Down
130 changes: 54 additions & 76 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,57 +33,19 @@

#define _Z_SUBINFOS_VEC_SIZE 4 // Arbitrary initial size

#if Z_FEATURE_RX_CACHE == 1
static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
_z_subscription_infos_svec_t *infos_val, size_t *sub_nb) {
if (!_z_keyexpr_equals(ke, &zn->_subscription_cache.ke_in)) {
return false;
}
*ke_val = _z_keyexpr_alias(&zn->_subscription_cache.ke_out);
*infos_val = _z_subscription_infos_svec_alias(&zn->_subscription_cache.infos);
*sub_nb = zn->_subscription_cache.sub_nb;
return true;
}

static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
_z_subscription_infos_svec_t *infos) {
// Clear previous data
_z_subscription_cache_clear(&zn->_subscription_cache);
// Register new info
zn->_subscription_cache.ke_in = _z_keyexpr_duplicate(ke_in);
zn->_subscription_cache.ke_out = _z_keyexpr_duplicate(ke_out);
zn->_subscription_cache.infos = _z_subscription_infos_svec_alias(infos);
zn->_subscription_cache.sub_nb = _z_subscription_infos_svec_len(infos);
}

void _z_subscription_cache_clear(_z_subscription_cache_t *cache) {
_z_subscription_infos_svec_clear(&cache->infos);
_z_keyexpr_clear(&cache->ke_in);
_z_keyexpr_clear(&cache->ke_out);
}

#else
static inline bool _z_subscription_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
_z_subscription_infos_svec_t *infos_val, size_t *sub_nb) {
_ZP_UNUSED(zn);
_ZP_UNUSED(ke);
_ZP_UNUSED(ke_val);
_ZP_UNUSED(infos_val);
_ZP_UNUSED(sub_nb);
return false;
static inline _z_subscription_cache_data_t _z_subscription_cache_data_null(void) {
_z_subscription_cache_data_t ret = {0};
return ret;
}

static inline void _z_subscription_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
_z_subscription_infos_svec_t *infos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(ke_in);
_ZP_UNUSED(ke_out);
_ZP_UNUSED(infos);
return;
#if Z_FEATURE_RX_CACHE == 1
int _z_subscription_cache_data_compare(const void *first, const void *second) {
_z_subscription_cache_data_t *first_data = (_z_subscription_cache_data_t *)first;
_z_subscription_cache_data_t *second_data = (_z_subscription_cache_data_t *)second;
return _z_keyexpr_compare(&first_data->ke_in, &second_data->ke_in);
}
#endif // Z_FEATURE_RX_CACHE == 1

// Subscription
bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this_) {
return this_->_id == other->_id;
}
Expand Down Expand Up @@ -219,28 +181,45 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke
Z_RELIABILITY_RELIABLE);
}

static z_result_t _z_subscription_get_infos(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_keyexpr_t *keyexpr,
_z_keyexpr_t *key, _z_subscription_infos_svec_t *subs, size_t *sub_nb) {
static z_result_t _z_subscription_get_infos(_z_session_t *zn, _z_subscriber_kind_t kind,
_z_subscription_cache_data_t *infos) {
// Check cache
if (!_z_subscription_get_from_cache(zn, keyexpr, key, subs, sub_nb)) {
_Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr->_id, (int)_z_string_len(&keyexpr->_suffix),
_z_string_data(&keyexpr->_suffix), _z_keyexpr_mapping_id(keyexpr));
_z_subscription_cache_data_t *cache_entry = NULL;
#if Z_FEATURE_RX_CACHE == 1
cache_entry = _z_subscription_lru_cache_get(&zn->_subscription_cache, infos);
#endif
// Note cache entry
if (cache_entry != NULL) {
infos->ke_out = _z_keyexpr_alias(&cache_entry->ke_out);
infos->infos = _z_subscription_infos_svec_alias(&cache_entry->infos);
infos->sub_nb = cache_entry->sub_nb;
} else { // Construct data and add to cache
_Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", infos->ke_in._id, (int)_z_string_len(&infos->ke_in._suffix),
_z_string_data(&infos->ke_in._suffix), _z_keyexpr_mapping_id(&infos->ke_in));
_z_session_mutex_lock(zn);
*key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, true);
infos->ke_out = __unsafe_z_get_expanded_key_from_key(zn, &infos->ke_in, true);

if (!_z_keyexpr_has_suffix(key)) {
if (!_z_keyexpr_has_suffix(&infos->ke_out)) {
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Get subscription list
z_result_t ret = __unsafe_z_get_subscriptions_by_key(zn, kind, key, subs);
z_result_t ret = __unsafe_z_get_subscriptions_by_key(zn, kind, &infos->ke_out, &infos->infos);
_z_session_mutex_unlock(zn);
if (ret != _Z_RES_OK) {
return ret;
}
*sub_nb = _z_subscription_infos_svec_len(subs);
// Update cache
_z_subscription_update_cache(zn, keyexpr, key, subs);
infos->sub_nb = _z_subscription_infos_svec_len(&infos->infos);
#if Z_FEATURE_RX_CACHE == 1
// Update cache, takes ownership of the data
_z_subscription_cache_data_t cache_storage = {
.infos = _z_subscription_infos_svec_alias(&infos->infos),
.ke_in = _z_keyexpr_duplicate(&infos->ke_in),
.ke_out = _z_keyexpr_duplicate(&infos->ke_out),
.sub_nb = infos->sub_nb,
};
return _z_subscription_lru_cache_insert(&zn->_subscription_cache, &cache_storage);
#endif
}
return _Z_RES_OK;
}
Expand All @@ -250,35 +229,34 @@ static z_result_t _z_trigger_subscriptions_inner(_z_session_t *zn, _z_subscriber
_z_encoding_t *encoding, const _z_zint_t sample_kind,
const _z_timestamp_t *timestamp, const _z_n_qos_t qos,
_z_bytes_t *attachment, z_reliability_t reliability) {
_z_keyexpr_t key;
_z_subscription_infos_svec_t subs;
size_t sub_nb;
// Retrieve sub infos
_Z_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, keyexpr, &key, &subs, &sub_nb));
_z_subscription_cache_data_t sub_infos = _z_subscription_cache_data_null();
sub_infos.ke_in = _z_keyexpr_alias(keyexpr);
_Z_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, &sub_infos));
// Check if there are subs
_Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_nb, key._id, (int)_z_string_len(&key._suffix),
_z_string_data(&key._suffix));
if (sub_nb == 0) {
_z_keyexpr_clear(&key);
_z_subscription_infos_svec_release(&subs);
_Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_infos.sub_nb, sub_infos.ke_out._id,
(int)_z_string_len(&sub_infos.ke_out._suffix), _z_string_data(&sub_infos.ke_out._suffix));
if (sub_infos.sub_nb == 0) {
_z_keyexpr_clear(&sub_infos.ke_out);
#if Z_FEATURE_RX_CACHE == 0
_z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache
#endif
return _Z_RES_OK;
}
// Create sample
_z_sample_t sample = _z_sample_alias(&key, payload, timestamp, encoding, sample_kind, qos, attachment, reliability);
_z_sample_t sample =
_z_sample_alias(&sub_infos.ke_out, payload, timestamp, encoding, sample_kind, qos, attachment, reliability);
// Parse subscription infos svec
for (size_t i = 0; i < sub_nb; i++) {
_z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&subs, i);
for (size_t i = 0; i < sub_infos.sub_nb; i++) {
_z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&sub_infos.infos, i);
sub_info->callback(&sample, sub_info->arg);
}
// Clean up
_z_keyexpr_clear(&key);
if (sub_kind == _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER) {
_z_subscription_infos_svec_release(&subs);
} else {
#if Z_FEATURE_RX_CACHE != 1
_z_subscription_infos_svec_release(&subs); // Otherwise it's released with cache
_z_keyexpr_clear(&sub_infos.ke_out);
#if Z_FEATURE_RX_CACHE == 0
_z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache
#endif
}

return _Z_RES_OK;
}

Expand Down
4 changes: 2 additions & 2 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) {
zn->_subscriptions = NULL;
zn->_liveliness_subscriptions = NULL;
#if Z_FEATURE_RX_CACHE == 1
memset(&zn->_subscription_cache, 0, sizeof(zn->_subscription_cache));
zn->_subscription_cache = _z_subscription_lru_cache_init(Z_RX_CACHE_SIZE);
#endif
#endif
#if Z_FEATURE_QUERYABLE == 1
Expand Down Expand Up @@ -126,7 +126,7 @@ void _z_session_clear(_z_session_t *zn) {
#if Z_FEATURE_SUBSCRIPTION == 1
_z_flush_subscriptions(zn);
#if Z_FEATURE_RX_CACHE == 1
_z_subscription_cache_clear(&zn->_subscription_cache);
_z_subscription_lru_cache_delete(&zn->_subscription_cache);
#endif
#endif
#if Z_FEATURE_QUERYABLE == 1
Expand Down

0 comments on commit 4c480aa

Please sign in to comment.