Skip to content

Commit

Permalink
feat: add rx cache invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Dec 20, 2024
1 parent 679e386 commit 8ea1e65
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 6 deletions.
2 changes: 2 additions & 0 deletions include/zenoh-pico/collections/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typedef struct _z_lru_cache_t {
_z_lru_cache_t _z_lru_cache_init(size_t capacity);
void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f compare);
z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare);
void _z_lru_cache_clear(_z_lru_cache_t *cache);
void _z_lru_cache_delete(_z_lru_cache_t *cache);

#define _Z_LRU_CACHE_DEFINE(name, type, compare_f) \
Expand All @@ -56,6 +57,7 @@ void _z_lru_cache_delete(_z_lru_cache_t *cache);
static inline z_result_t name##_lru_cache_insert(name##_lru_cache_t *cache, type *val) { \
return _z_lru_cache_insert(cache, (void *)val, sizeof(type), compare_f); \
} \
static inline void name##_lru_cache_clear(name##_lru_cache_t *cache) { _z_lru_cache_clear(cache); } \
static inline void name##_lru_cache_delete(name##_lru_cache_t *cache) { _z_lru_cache_delete(cache); }

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ typedef struct {
size_t qle_nb;
} _z_queryable_cache_data_t;

void _z_queryable_cache_invalidate(_z_session_t *zn);
int _z_queryable_cache_data_compare(const void *first, const void *second);

#if Z_FEATURE_QUERYABLE == 1
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct {
size_t sub_nb;
} _z_subscription_cache_data_t;

void _z_subscription_cache_invalidate(_z_session_t *zn);
int _z_subscription_cache_data_compare(const void *first, const void *second);

/*------------------ Subscription ------------------*/
Expand Down
18 changes: 18 additions & 0 deletions src/collections/lru_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,24 @@ z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_
return _Z_RES_OK;
}

void _z_lru_cache_clear(_z_lru_cache_t *cache) {
// Reset slist
if (cache->slist != NULL) {
memset(cache->slist, 0, cache->capacity * sizeof(void *));
}
// Remove nodes
_z_lru_cache_node_data_t *node = cache->head;
while (node != NULL) {
_z_lru_cache_node_t *tmp = node;
_z_lru_cache_node_data_t *node_data = _z_lru_cache_node_data(node);
node = node_data->next;
z_free(tmp);
}
cache->len = 0;
cache->head = NULL;
cache->tail = NULL;
}

void _z_lru_cache_delete(_z_lru_cache_t *cache) {
_z_lru_cache_node_data_t *node = cache->head;
z_free(cache->slist);
Expand Down
19 changes: 16 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ uint16_t _z_declare_resource(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_decalre(zn, &n_msg) == _Z_RES_OK) {
ret = id;
// Invalidate cache
_z_subscription_cache_invalidate(zn);
_z_queryable_cache_invalidate(zn);
} else {
_z_unregister_resource(zn, id, _Z_KEYEXPR_MAPPING_LOCAL);
}
_z_n_msg_clear(&n_msg);
}
}

return ret;
}

Expand All @@ -118,8 +120,11 @@ z_result_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) {
_z_declaration_t declaration = _z_make_undecl_keyexpr(rid);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
if (_z_send_undecalre(zn, &n_msg) == _Z_RES_OK) {
_z_unregister_resource(zn, rid,
_Z_KEYEXPR_MAPPING_LOCAL); // Only if message is send, local resource is removed
// Remove local resource
_z_unregister_resource(zn, rid, _Z_KEYEXPR_MAPPING_LOCAL);
// Invalidate cache
_z_subscription_cache_invalidate(zn);
_z_queryable_cache_invalidate(zn);
} else {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand Down Expand Up @@ -273,6 +278,8 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke
// Fill subscriber
ret._entity_id = s._id;
ret._zn = _z_session_rc_clone_as_weak(zn);
// Invalidate cache
_z_subscription_cache_invalidate(_Z_RC_IN_VAL(zn));
return ret;
}

Expand Down Expand Up @@ -301,6 +308,8 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
// Only if message is successfully send, local subscription state can be removed
_z_undeclare_resource(_Z_RC_IN_VAL(&sub->_zn), _Z_RC_IN_VAL(s)->_key_id);
_z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, s);
// Invalidate cache
_z_subscription_cache_invalidate(_Z_RC_IN_VAL(&sub->_zn));
return _Z_RES_OK;
}
#endif
Expand Down Expand Up @@ -336,6 +345,8 @@ _z_queryable_t _z_declare_queryable(const _z_session_rc_t *zn, _z_keyexpr_t keye
// Fill queryable
ret._entity_id = q._id;
ret._zn = _z_session_rc_clone_as_weak(zn);
// Invalidate cache
_z_queryable_cache_invalidate(_Z_RC_IN_VAL(zn));
return ret;
}

Expand All @@ -362,6 +373,8 @@ z_result_t _z_undeclare_queryable(_z_queryable_t *qle) {
_z_n_msg_clear(&n_msg);
// Only if message is successfully send, local queryable state can be removed
_z_unregister_session_queryable(_Z_RC_IN_VAL(&qle->_zn), q);
// Invalidate cache
_z_queryable_cache_invalidate(_Z_RC_IN_VAL(&qle->_zn));
return _Z_RES_OK;
}

Expand Down
8 changes: 8 additions & 0 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ static inline _z_queryable_cache_data_t _z_queryable_cache_data_null(void) {
return ret;
}

void _z_queryable_cache_invalidate(_z_session_t *zn) {
#if Z_FEATURE_RX_CACHE == 1
_z_queryable_lru_cache_clear(&zn->_queryable_cache);
#else
_ZP_UNUSED(zn);
#endif
}

#if Z_FEATURE_RX_CACHE == 1
int _z_queryable_cache_data_compare(const void *first, const void *second) {
_z_queryable_cache_data_t *first_data = (_z_queryable_cache_data_t *)first;
Expand Down
8 changes: 8 additions & 0 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ static inline _z_subscription_cache_data_t _z_subscription_cache_data_null(void)
return ret;
}

void _z_subscription_cache_invalidate(_z_session_t *zn) {
#if Z_FEATURE_RX_CACHE == 1
_z_subscription_lru_cache_clear(&zn->_subscription_cache);
#else
_ZP_UNUSED(zn);
#endif
}

#if Z_FEATURE_RX_CACHE == 1
int _z_subscription_cache_data_compare(const void *first, const void *second) {
_z_subscription_cache_data_t *first_data = (_z_subscription_cache_data_t *)first;
Expand Down
27 changes: 24 additions & 3 deletions tests/z_lru_cache_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ void test_lru_init(void) {
assert(dcache.len == 0);
assert(dcache.head == NULL);
assert(dcache.tail == NULL);
#if Z_FEATURE_CACHE_TREE == 1
assert(dcache.root == NULL);
#endif
assert(dcache.slist == NULL);
}

void test_lru_cache_insert(void) {
_dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY);

_dummy_t v0 = {0};
assert(dcache.slist == NULL);
assert(_dummy_lru_cache_get(&dcache, &v0) == NULL);
assert(_dummy_lru_cache_insert(&dcache, &v0) == 0);
assert(dcache.slist != NULL);
_dummy_t *res = _dummy_lru_cache_get(&dcache, &v0);
assert(res != NULL);
assert(res->foo == v0.foo);
Expand All @@ -77,6 +77,26 @@ void test_lru_cache_insert(void) {
_dummy_lru_cache_delete(&dcache);
}

void test_lru_cache_clear(void) {
_dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY);

_dummy_t data[CACHE_CAPACITY] = {0};
for (size_t i = 0; i < CACHE_CAPACITY; i++) {
data[i].foo = (int)i;
assert(_dummy_lru_cache_insert(&dcache, &data[i]) == 0);
}
_dummy_lru_cache_clear(&dcache);
assert(dcache.capacity == CACHE_CAPACITY);
assert(dcache.len == 0);
assert(dcache.slist != NULL);
assert(dcache.head == NULL);
assert(dcache.tail == NULL);
for (size_t i = 0; i < CACHE_CAPACITY; i++) {
assert(_dummy_lru_cache_get(&dcache, &data[i]) == NULL);
}
_dummy_lru_cache_delete(&dcache);
}

void test_lru_cache_deletion(void) {
_dummy_lru_cache_t dcache = _dummy_lru_cache_init(CACHE_CAPACITY);

Expand Down Expand Up @@ -221,6 +241,7 @@ void test_search_benchmark(void) {
int main(void) {
test_lru_init();
test_lru_cache_insert();
test_lru_cache_clear();
test_lru_cache_deletion();
test_lru_cache_update();
test_lru_cache_random_val();
Expand Down

0 comments on commit 8ea1e65

Please sign in to comment.