From 7f3926c6691101bc806a76f92e1f1d7b9f3bf196 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 20 Dec 2024 11:25:29 +0100 Subject: [PATCH] feat: add rx cache invalidation --- include/zenoh-pico/collections/lru_cache.h | 2 ++ include/zenoh-pico/session/queryable.h | 1 + include/zenoh-pico/session/subscription.h | 1 + src/collections/lru_cache.c | 18 +++++++++++++++ src/net/primitives.c | 19 ++++++++++++--- src/session/queryable.c | 8 +++++++ src/session/subscription.c | 8 +++++++ tests/z_lru_cache_test.c | 27 +++++++++++++++++++--- 8 files changed, 78 insertions(+), 6 deletions(-) diff --git a/include/zenoh-pico/collections/lru_cache.h b/include/zenoh-pico/collections/lru_cache.h index 1bed89b27..9fb783b8e 100644 --- a/include/zenoh-pico/collections/lru_cache.h +++ b/include/zenoh-pico/collections/lru_cache.h @@ -45,6 +45,7 @@ typedef struct _z_lru_cache_t { _z_lru_cache_t _z_lru_cache_init(size_t capacity); void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f compare); z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare); +void _z_lru_cache_clear(_z_lru_cache_t *cache); void _z_lru_cache_delete(_z_lru_cache_t *cache); #define _Z_LRU_CACHE_DEFINE(name, type, compare_f) \ @@ -56,6 +57,7 @@ void _z_lru_cache_delete(_z_lru_cache_t *cache); static inline z_result_t name##_lru_cache_insert(name##_lru_cache_t *cache, type *val) { \ return _z_lru_cache_insert(cache, (void *)val, sizeof(type), compare_f); \ } \ + static inline void name##_lru_cache_clear(name##_lru_cache_t *cache) { _z_lru_cache_clear(cache); } \ static inline void name##_lru_cache_delete(name##_lru_cache_t *cache) { _z_lru_cache_delete(cache); } #ifdef __cplusplus diff --git a/include/zenoh-pico/session/queryable.h b/include/zenoh-pico/session/queryable.h index c92527859..019ce6e1e 100644 --- a/include/zenoh-pico/session/queryable.h +++ b/include/zenoh-pico/session/queryable.h @@ -40,6 +40,7 @@ typedef struct { size_t qle_nb; } _z_queryable_cache_data_t; +void _z_queryable_cache_invalidate(_z_session_t *zn); int _z_queryable_cache_data_compare(const void *first, const void *second); #if Z_FEATURE_QUERYABLE == 1 diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index a78d23fd8..07c037992 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -43,6 +43,7 @@ typedef struct { size_t sub_nb; } _z_subscription_cache_data_t; +void _z_subscription_cache_invalidate(_z_session_t *zn); int _z_subscription_cache_data_compare(const void *first, const void *second); /*------------------ Subscription ------------------*/ diff --git a/src/collections/lru_cache.c b/src/collections/lru_cache.c index f83b079d6..c306c3d6d 100644 --- a/src/collections/lru_cache.c +++ b/src/collections/lru_cache.c @@ -223,6 +223,24 @@ z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_ return _Z_RES_OK; } +void _z_lru_cache_clear(_z_lru_cache_t *cache) { + // Reset slist + if (cache->slist != NULL) { + memset(cache->slist, 0, cache->capacity * sizeof(void *)); + } + // Remove nodes + _z_lru_cache_node_data_t *node = cache->head; + while (node != NULL) { + _z_lru_cache_node_t *tmp = node; + _z_lru_cache_node_data_t *node_data = _z_lru_cache_node_data(node); + node = node_data->next; + z_free(tmp); + } + cache->len = 0; + cache->head = NULL; + cache->tail = NULL; +} + void _z_lru_cache_delete(_z_lru_cache_t *cache) { _z_lru_cache_node_data_t *node = cache->head; z_free(cache->slist); diff --git a/src/net/primitives.c b/src/net/primitives.c index ee372a7ac..2014d10f9 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -70,13 +70,15 @@ uint16_t _z_declare_resource(_z_session_t *zn, const _z_keyexpr_t *keyexpr) { _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { ret = id; + // Invalidate cache + _z_subscription_cache_invalidate(zn); + _z_queryable_cache_invalidate(zn); } else { _z_unregister_resource(zn, id, _Z_KEYEXPR_MAPPING_LOCAL); } _z_n_msg_clear(&n_msg); } } - return ret; } @@ -89,8 +91,11 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) { _z_declaration_t declaration = _z_make_undecl_keyexpr(rid); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { - _z_unregister_resource(zn, rid, - _Z_KEYEXPR_MAPPING_LOCAL); // Only if message is send, local resource is removed + // Remove local resource + _z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL); + // Invalidate cache + _z_subscription_cache_invalidate(zn); + _z_queryable_cache_invalidate(zn); } else { ret = _Z_ERR_TRANSPORT_TX_FAILED; } @@ -244,6 +249,8 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke // Fill subscriber ret._entity_id = s._id; ret._zn = _z_session_rc_clone_as_weak(zn); + // Invalidate cache + _z_subscription_cache_invalidate(_Z_RC_IN_VAL(zn)); return ret; } @@ -273,6 +280,8 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { // Only if message is successfully send, local subscription state can be removed _z_undeclare_resource(_Z_RC_IN_VAL(&sub->_zn), _Z_RC_IN_VAL(s)->_key_id); _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, s); + // Invalidate cache + _z_subscription_cache_invalidate(_Z_RC_IN_VAL(&sub->_zn)); return _Z_RES_OK; } #endif @@ -308,6 +317,8 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye // Fill queryable ret._entity_id = q._id; ret._zn = _z_session_rc_clone_as_weak(zn); + // Invalidate cache + _z_queryable_cache_invalidate(_Z_RC_IN_VAL(zn)); return ret; } @@ -335,6 +346,8 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) { _z_n_msg_clear(&n_msg); // Only if message is successfully send, local queryable state can be removed _z_unregister_session_queryable(_Z_RC_IN_VAL(&qle->_zn), q); + // Invalidate cache + _z_queryable_cache_invalidate(_Z_RC_IN_VAL(&qle->_zn)); return _Z_RES_OK; } diff --git a/src/session/queryable.c b/src/session/queryable.c index a4c7106a0..641afba42 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -37,6 +37,14 @@ static inline _z_queryable_cache_data_t _z_queryable_cache_data_null(void) { return ret; } +void _z_queryable_cache_invalidate(_z_session_t *zn) { +#if Z_FEATURE_RX_CACHE == 1 + _z_queryable_lru_cache_clear(&zn->_queryable_cache); +#else + _ZP_UNUSED(zn); +#endif +} + #if Z_FEATURE_RX_CACHE == 1 int _z_queryable_cache_data_compare(const void *first, const void *second) { _z_queryable_cache_data_t *first_data = (_z_queryable_cache_data_t *)first; diff --git a/src/session/subscription.c b/src/session/subscription.c index d4c7ffe40..6c007c148 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -38,6 +38,14 @@ static inline _z_subscription_cache_data_t _z_subscription_cache_data_null(void) return ret; } +void _z_subscription_cache_invalidate(_z_session_t *zn) { +#if Z_FEATURE_RX_CACHE == 1 + _z_subscription_lru_cache_clear(&zn->_subscription_cache); +#else + _ZP_UNUSED(zn); +#endif +} + #if Z_FEATURE_RX_CACHE == 1 int _z_subscription_cache_data_compare(const void *first, const void *second) { _z_subscription_cache_data_t *first_data = (_z_subscription_cache_data_t *)first; diff --git a/tests/z_lru_cache_test.c b/tests/z_lru_cache_test.c index daa9a5345..329ec185a 100644 --- a/tests/z_lru_cache_test.c +++ b/tests/z_lru_cache_test.c @@ -49,17 +49,17 @@ void test_lru_init(void) { assert(dcache.len == 0); assert(dcache.head == NULL); assert(dcache.tail == NULL); -#if Z_FEATURE_CACHE_TREE == 1 - assert(dcache.root == NULL); -#endif + assert(dcache.slist == NULL); } void test_lru_cache_insert(void) { _dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY); _dummy_t v0 = {0}; + assert(dcache.slist == NULL); assert(_dummy_lru_cache_get(&dcache, &v0) == NULL); assert(_dummy_lru_cache_insert(&dcache, &v0) == 0); + assert(dcache.slist != NULL); _dummy_t *res = _dummy_lru_cache_get(&dcache, &v0); assert(res != NULL); assert(res->foo == v0.foo); @@ -77,6 +77,26 @@ void test_lru_cache_insert(void) { _dummy_lru_cache_delete(&dcache); } +void test_lru_cache_clear(void) { + _dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY); + + _dummy_t data[CACHE_CAPACITY] = {0}; + for (size_t i = 0; i < CACHE_CAPACITY; i++) { + data[i].foo = (int)i; + assert(_dummy_lru_cache_insert(&dcache, &data[i]) == 0); + } + _dummy_lru_cache_clear(&dcache); + assert(dcache.capacity == CACHE_CAPACITY); + assert(dcache.len == 0); + assert(dcache.slist != NULL); + assert(dcache.head == NULL); + assert(dcache.tail == NULL); + for (size_t i = 0; i < CACHE_CAPACITY; i++) { + assert(_dummy_lru_cache_get(&dcache, &data[i]) == NULL); + } + _dummy_lru_cache_delete(&dcache); +} + void test_lru_cache_deletion(void) { _dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY); @@ -221,6 +241,7 @@ void test_search_benchmark(void) { int main(void) { test_lru_init(); test_lru_cache_insert(); + test_lru_cache_clear(); test_lru_cache_deletion(); test_lru_cache_update(); test_lru_cache_random_val();