Skip to content

Commit

Permalink
feat: use lru cache for queryable cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Dec 19, 2024
1 parent 4c480aa commit 97777a5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 78 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ typedef struct _z_session_t {
#if Z_FEATURE_QUERYABLE == 1
_z_session_queryable_rc_list_t *_local_queryable;
#if Z_FEATURE_RX_CACHE == 1
_z_queryable_cache_t _queryable_cache;
_z_queryable_lru_cache_t _queryable_cache;
#endif
#endif
#if Z_FEATURE_QUERY == 1
Expand Down
11 changes: 9 additions & 2 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <stdbool.h>
#include <zenoh-pico/session/session.h>

#include "zenoh-pico/collections/lru_cache.h"

// Forward declaration to avoid cyclical include
typedef struct _z_session_t _z_session_t;
typedef struct _z_session_rc_t _z_session_rc_t;
Expand All @@ -36,14 +38,19 @@ typedef struct {
_z_keyexpr_t ke_out;
_z_queryable_infos_svec_t infos;
size_t qle_nb;
} _z_queryable_cache_t;
} _z_queryable_cache_data_t;

int _z_queryable_cache_data_compare(const void *first, const void *second);

#if Z_FEATURE_QUERYABLE == 1
#define _Z_QUERYABLE_COMPLETE_DEFAULT false
#define _Z_QUERYABLE_DISTANCE_DEFAULT 0

#if Z_FEATURE_RX_CACHE == 1
_Z_LRU_CACHE_DEFINE(_z_queryable, _z_queryable_cache_data_t, _z_queryable_cache_data_compare)
#endif

/*------------------ Queryable ------------------*/
void _z_queryable_cache_clear(_z_queryable_cache_t *cache);
_z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
z_result_t _z_trigger_queryables(_z_session_rc_t *zn, _z_msg_query_t *query, _z_keyexpr_t *q_key, uint32_t qid);
Expand Down
17 changes: 12 additions & 5 deletions src/protocol/keyexpr.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@ _z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix) {

int _z_keyexpr_compare(_z_keyexpr_t *first, _z_keyexpr_t *second) {
if ((first->_id != 0) && (second->_id != 0)) {
if (first->_id == second->_id) {
return 0;
} else if (first->_id > second->_id) {
return 1;
if (_z_keyexpr_mapping_id(first) == _z_keyexpr_mapping_id(second)) {
if (first->_id == second->_id) {
return 0;
} else if (first->_id > second->_id) {
return 1;
}
return -1;
} else {
if (_z_keyexpr_mapping_id(first) > _z_keyexpr_mapping_id(second)) {
return 1;
}
return -1;
}
return -1;
}
if (_z_keyexpr_has_suffix(first) && _z_keyexpr_has_suffix(second)) {
return _z_string_compare(&first->_suffix, &second->_suffix);
Expand Down
158 changes: 90 additions & 68 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,56 +32,62 @@

#define _Z_QLEINFOS_VEC_SIZE 4 // Arbitrary initial size

#if Z_FEATURE_RX_CACHE == 1
static inline bool _z_queryable_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
_z_queryable_infos_svec_t *infos_val, size_t *qle_nb) {
if (!_z_keyexpr_equals(ke, &zn->_queryable_cache.ke_in)) {
return false;
}
*ke_val = _z_keyexpr_alias(&zn->_queryable_cache.ke_out);
*infos_val = _z_queryable_infos_svec_alias(&zn->_queryable_cache.infos);
*qle_nb = zn->_queryable_cache.qle_nb;
return true;
}

static inline void _z_queryable_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
_z_queryable_infos_svec_t *infos) {
// Clear previous data
_z_queryable_cache_clear(&zn->_queryable_cache);
// Register new info
zn->_queryable_cache.ke_in = _z_keyexpr_duplicate(ke_in);
zn->_queryable_cache.ke_out = _z_keyexpr_duplicate(ke_out);
zn->_queryable_cache.infos = _z_queryable_infos_svec_alias(infos);
zn->_queryable_cache.qle_nb = _z_queryable_infos_svec_len(infos);
}

void _z_queryable_cache_clear(_z_queryable_cache_t *cache) {
_z_queryable_infos_svec_clear(&cache->infos);
_z_keyexpr_clear(&cache->ke_in);
_z_keyexpr_clear(&cache->ke_out);
}

#else
static inline bool _z_queryable_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
_z_queryable_infos_svec_t *infos_val, size_t *sub_nb) {
_ZP_UNUSED(zn);
_ZP_UNUSED(ke);
_ZP_UNUSED(ke_val);
_ZP_UNUSED(infos_val);
_ZP_UNUSED(sub_nb);
return false;
static inline _z_queryable_cache_data_t _z_queryable_cache_data_null(void) {
_z_queryable_cache_data_t ret = {0};
return ret;
}

static inline void _z_queryable_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
_z_queryable_infos_svec_t *infos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(ke_in);
_ZP_UNUSED(ke_out);
_ZP_UNUSED(infos);
return;
#if Z_FEATURE_RX_CACHE == 1
int _z_queryable_cache_data_compare(const void *first, const void *second) {
_z_queryable_cache_data_t *first_data = (_z_queryable_cache_data_t *)first;
_z_queryable_cache_data_t *second_data = (_z_queryable_cache_data_t *)second;
return _z_keyexpr_compare(&first_data->ke_in, &second_data->ke_in);
}
#endif // Z_FEATURE_RX_CACHE == 1

// #if Z_FEATURE_RX_CACHE == 1
// static inline bool _z_queryable_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
// _z_queryable_infos_svec_t *infos_val, size_t *qle_nb) {
// if (!_z_keyexpr_equals(ke, &zn->_queryable_cache.ke_in)) {
// return false;
// }
// *ke_val = _z_keyexpr_alias(&zn->_queryable_cache.ke_out);
// *infos_val = _z_queryable_infos_svec_alias(&zn->_queryable_cache.infos);
// *qle_nb = zn->_queryable_cache.qle_nb;
// return true;
// }

// static inline void _z_queryable_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
// _z_queryable_infos_svec_t *infos) {
// // Clear previous data
// _z_queryable_cache_clear(&zn->_queryable_cache);
// // Register new info
// zn->_queryable_cache.ke_in = _z_keyexpr_duplicate(ke_in);
// zn->_queryable_cache.ke_out = _z_keyexpr_duplicate(ke_out);
// zn->_queryable_cache.infos = _z_queryable_infos_svec_alias(infos);
// zn->_queryable_cache.qle_nb = _z_queryable_infos_svec_len(infos);
// }
// #else
// static inline bool _z_queryable_get_from_cache(_z_session_t *zn, const _z_keyexpr_t *ke, _z_keyexpr_t *ke_val,
// _z_queryable_infos_svec_t *infos_val, size_t *sub_nb) {
// _ZP_UNUSED(zn);
// _ZP_UNUSED(ke);
// _ZP_UNUSED(ke_val);
// _ZP_UNUSED(infos_val);
// _ZP_UNUSED(sub_nb);
// return false;
// }

// static inline void _z_queryable_update_cache(_z_session_t *zn, const _z_keyexpr_t *ke_in, const _z_keyexpr_t *ke_out,
// _z_queryable_infos_svec_t *infos) {
// _ZP_UNUSED(zn);
// _ZP_UNUSED(ke_in);
// _ZP_UNUSED(ke_out);
// _ZP_UNUSED(infos);
// return;
// }
// #endif // Z_FEATURE_RX_CACHE == 1

bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two) {
return one->_id == two->_id;
}
Expand Down Expand Up @@ -174,28 +180,45 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se
return ret;
}

static z_result_t _z_session_queryable_get_infos(_z_session_t *zn, const _z_keyexpr_t *keyexpr, _z_keyexpr_t *key,
_z_queryable_infos_svec_t *qles, size_t *qle_nb) {
static z_result_t _z_session_queryable_get_infos(_z_session_t *zn, _z_queryable_cache_data_t *infos) {
_z_queryable_cache_data_t *cache_entry = NULL;
#if Z_FEATURE_RX_CACHE == 1
cache_entry = _z_queryable_lru_cache_get(&zn->_queryable_cache, infos);
#endif
// Check cache
if (!_z_queryable_get_from_cache(zn, keyexpr, key, qles, qle_nb)) {
_Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr->_id, (int)_z_string_len(&keyexpr->_suffix),
_z_string_data(&keyexpr->_suffix), _z_keyexpr_mapping_id(keyexpr));
if (cache_entry != NULL) {
// Note cache entry
infos->ke_out = _z_keyexpr_alias(&cache_entry->ke_out);
infos->infos = _z_queryable_infos_svec_alias(&cache_entry->infos);
infos->qle_nb = cache_entry->qle_nb;
} else {
// Build queryable data
_Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", infos->ke_in._id, (int)_z_string_len(&infos->ke_in._suffix),
_z_string_data(&infos->ke_in._suffix), _z_keyexpr_mapping_id(&infos->ke_in));
_z_session_mutex_lock(zn);
*key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr, true);
infos->ke_out = __unsafe_z_get_expanded_key_from_key(zn, &infos->ke_in, true);

if (!_z_keyexpr_has_suffix(key)) {
if (!_z_keyexpr_has_suffix(&infos->ke_out)) {
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Get queryable list
z_result_t ret = __unsafe_z_get_session_queryable_by_key(zn, key, qles);
z_result_t ret = __unsafe_z_get_session_queryable_by_key(zn, &infos->ke_out, &infos->infos);
_z_session_mutex_unlock(zn);
if (ret != _Z_RES_OK) {
return ret;
}
*qle_nb = _z_queryable_infos_svec_len(qles);
infos->qle_nb = _z_queryable_infos_svec_len(&infos->infos);
#if Z_FEATURE_RX_CACHE == 1
// Update cache
_z_queryable_update_cache(zn, keyexpr, key, qles);
_z_queryable_cache_data_t cache_storage = {
.infos = _z_queryable_infos_svec_alias(&infos->infos),
.ke_in = _z_keyexpr_duplicate(&infos->ke_in),
.ke_out = _z_keyexpr_duplicate(&infos->ke_out),
.qle_nb = infos->qle_nb,
};
_z_queryable_lru_cache_insert(&zn->_queryable_cache, &cache_storage);
#endif
}
return _Z_RES_OK;
}
Expand All @@ -212,16 +235,15 @@ static inline void _z_queryable_query_steal_data(_z_query_t *query, _z_session_r
static z_result_t _z_trigger_queryables_inner(_z_session_rc_t *zsrc, _z_msg_query_t *msgq, _z_keyexpr_t *q_key,
uint32_t qid) {
_z_session_t *zn = _Z_RC_IN_VAL(zsrc);
_z_keyexpr_t key;
_z_queryable_infos_svec_t qles;
size_t qle_nb;
_z_queryable_cache_data_t qle_infos = _z_queryable_cache_data_null();
qle_infos.ke_in = _z_keyexpr_alias(q_key);
// Retrieve sub infos
_Z_RETURN_IF_ERR(_z_session_queryable_get_infos(zn, q_key, &key, &qles, &qle_nb));
_Z_RETURN_IF_ERR(_z_session_queryable_get_infos(zn, &qle_infos));
// Check if there are queryables
_Z_DEBUG("Triggering %ju queryables for key %d - %.*s", (uintmax_t)qle_nb, key._id,
(int)_z_string_len(&key._suffix), _z_string_data(&key._suffix));
if (qle_nb == 0) {
_z_keyexpr_clear(&key);
_Z_DEBUG("Triggering %ju queryables for key %d - %.*s", (uintmax_t)qle_infos.qle_nb, qle_infos.ke_out._id,
(int)_z_string_len(&qle_infos.ke_out._suffix), _z_string_data(&qle_infos.ke_out._suffix));
if (qle_infos.qle_nb == 0) {
_z_keyexpr_clear(&qle_infos.ke_out);
return _Z_RES_OK;
}
// Check anyke
Expand All @@ -241,17 +263,17 @@ static z_result_t _z_trigger_queryables_inner(_z_session_rc_t *zsrc, _z_msg_quer
if (_Z_RC_IS_NULL(&query)) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
_z_queryable_query_steal_data(q, zsrc, msgq, &key, qid, anyke);
_z_queryable_query_steal_data(q, zsrc, msgq, &qle_infos.ke_out, qid, anyke);
// Parse session_queryable svec
for (size_t i = 0; i < qle_nb; i++) {
_z_queryable_infos_t *qle_info = _z_queryable_infos_svec_get(&qles, i);
for (size_t i = 0; i < qle_infos.qle_nb; i++) {
_z_queryable_infos_t *qle_info = _z_queryable_infos_svec_get(&qle_infos.infos, i);
qle_info->callback(&query, qle_info->arg);
}
_z_query_rc_drop(&query);
// Clean up
_z_keyexpr_clear(&key);
_z_keyexpr_clear(&qle_infos.ke_out);
#if Z_FEATURE_RX_CACHE != 1
_z_queryable_infos_svec_release(&qles); // Otherwise it's released with cache
_z_queryable_infos_svec_release(&qle_infos.infos); // Otherwise it's released with cache
#endif
return _Z_RES_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) {
#if Z_FEATURE_QUERYABLE == 1
zn->_local_queryable = NULL;
#if Z_FEATURE_RX_CACHE == 1
memset(&zn->_queryable_cache, 0, sizeof(zn->_queryable_cache));
zn->_queryable_cache = _z_queryable_lru_cache_init(Z_RX_CACHE_SIZE);
#endif
#endif
#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -132,7 +132,7 @@ void _z_session_clear(_z_session_t *zn) {
#if Z_FEATURE_QUERYABLE == 1
_z_flush_session_queryable(zn);
#if Z_FEATURE_RX_CACHE == 1
_z_queryable_cache_clear(&zn->_queryable_cache);
_z_queryable_lru_cache_delete(&zn->_queryable_cache);
#endif
#endif
#if Z_FEATURE_QUERY == 1
Expand Down

0 comments on commit 97777a5

Please sign in to comment.