Skip to content

Commit

Permalink
Fix memory leaks (#873)
Browse files Browse the repository at this point in the history
* feat: clean t message only when needed

* feat: svec_alias can take ownership

* feat: add clear data in lru cache

* fix: remove duplicate clear

* fix: reundant code in lru cache

* fix: add svec_transfer function
  • Loading branch information
jean-roland authored Jan 30, 2025
1 parent f1b548d commit bbb343c
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 38 deletions.
13 changes: 9 additions & 4 deletions include/zenoh-pico/collections/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/utils/result.h"

#ifdef __cplusplus
Expand Down Expand Up @@ -45,8 +46,8 @@ typedef struct _z_lru_cache_t {
_z_lru_cache_t _z_lru_cache_init(size_t capacity);
void *_z_lru_cache_get(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f compare);
z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_size, _z_lru_val_cmp_f compare);
void _z_lru_cache_clear(_z_lru_cache_t *cache);
void _z_lru_cache_delete(_z_lru_cache_t *cache);
void _z_lru_cache_clear(_z_lru_cache_t *cache, z_element_clear_f clear);
void _z_lru_cache_delete(_z_lru_cache_t *cache, z_element_clear_f clear);

#define _Z_LRU_CACHE_DEFINE(name, type, compare_f) \
typedef _z_lru_cache_t name##_lru_cache_t; \
Expand All @@ -57,8 +58,12 @@ void _z_lru_cache_delete(_z_lru_cache_t *cache);
static inline z_result_t name##_lru_cache_insert(name##_lru_cache_t *cache, type *val) { \
return _z_lru_cache_insert(cache, (void *)val, sizeof(type), compare_f); \
} \
static inline void name##_lru_cache_clear(name##_lru_cache_t *cache) { _z_lru_cache_clear(cache); } \
static inline void name##_lru_cache_delete(name##_lru_cache_t *cache) { _z_lru_cache_delete(cache); }
static inline void name##_lru_cache_clear(name##_lru_cache_t *cache) { \
_z_lru_cache_clear(cache, name##_elem_clear); \
} \
static inline void name##_lru_cache_delete(name##_lru_cache_t *cache) { \
_z_lru_cache_delete(cache, name##_elem_clear); \
}

#ifdef __cplusplus
}
Expand Down
11 changes: 8 additions & 3 deletions include/zenoh-pico/collections/vec.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ typedef struct {
} _z_svec_t;

static inline _z_svec_t _z_svec_null(void) { return (_z_svec_t){0}; }
static inline _z_svec_t _z_svec_alias(const _z_svec_t *src) {
static inline _z_svec_t _z_svec_alias(const _z_svec_t *src, bool ownership) {
_z_svec_t ret;
ret._capacity = src->_capacity;
ret._len = src->_len;
ret._val = src->_val;
ret._aliased = true;
ret._aliased = !ownership;
return ret;
}
static inline _z_svec_t _z_svec_alias_element(void *element) {
Expand Down Expand Up @@ -151,7 +151,12 @@ void _z_svec_release(_z_svec_t *v);
static inline z_result_t name##_svec_copy(name##_svec_t *dst, const name##_svec_t *src, bool use_elem_f) { \
return _z_svec_copy(dst, src, name##_elem_copy, sizeof(type), use_elem_f); \
} \
static inline name##_svec_t name##_svec_alias(const name##_svec_t *v) { return _z_svec_alias(v); } \
static inline name##_svec_t name##_svec_alias(const name##_svec_t *v) { return _z_svec_alias(v, false); } \
static inline name##_svec_t name##_svec_transfer(name##_svec_t *v) { \
name##_svec_t ret = _z_svec_alias(v, true); \
v->_aliased = true; \
return ret; \
} \
static inline name##_svec_t name##_svec_alias_element(type *e) { return _z_svec_alias_element((void *)e); } \
static inline void name##_svec_move(name##_svec_t *dst, name##_svec_t *src) { _z_svec_move(dst, src); } \
static inline void name##_svec_reset(name##_svec_t *v) { _z_svec_reset(v, name##_elem_clear, sizeof(type)); } \
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ typedef struct {

void _z_queryable_cache_invalidate(_z_session_t *zn);
int _z_queryable_cache_data_compare(const void *first, const void *second);
void _z_queryable_cache_data_clear(_z_queryable_cache_data_t *val);

#if Z_FEATURE_QUERYABLE == 1
#define _Z_QUERYABLE_COMPLETE_DEFAULT false
#define _Z_QUERYABLE_DISTANCE_DEFAULT 0

#if Z_FEATURE_RX_CACHE == 1
_Z_ELEM_DEFINE(_z_queryable, _z_queryable_cache_data_t, _z_noop_size, _z_queryable_cache_data_clear, _z_noop_copy,
_z_noop_move)
_Z_LRU_CACHE_DEFINE(_z_queryable, _z_queryable_cache_data_t, _z_queryable_cache_data_compare)
#endif

Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ typedef struct {

void _z_subscription_cache_invalidate(_z_session_t *zn);
int _z_subscription_cache_data_compare(const void *first, const void *second);
void _z_subscription_cache_data_clear(_z_subscription_cache_data_t *val);

/*------------------ Subscription ------------------*/
z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, _z_keyexpr_t *keyexpr, _z_bytes_t *payload,
Expand All @@ -63,6 +64,8 @@ z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, _z_ke
#if Z_FEATURE_SUBSCRIPTION == 1

#if Z_FEATURE_RX_CACHE == 1
_Z_ELEM_DEFINE(_z_subscription, _z_subscription_cache_data_t, _z_noop_size, _z_subscription_cache_data_clear,
_z_noop_copy, _z_noop_move)
_Z_LRU_CACHE_DEFINE(_z_subscription, _z_subscription_cache_data_t, _z_subscription_cache_data_compare)
#endif

Expand Down
37 changes: 19 additions & 18 deletions src/collections/lru_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ static void _z_lru_cache_update_list(_z_lru_cache_t *cache, _z_lru_cache_node_t
_z_lru_cache_insert_list_node(cache, node);
}

static void _z_lru_cache_clear_list(_z_lru_cache_t *cache, z_element_clear_f clear) {
_z_lru_cache_node_data_t *node = cache->head;
while (node != NULL) {
_z_lru_cache_node_t *tmp = node;
_z_lru_cache_node_data_t *node_data = _z_lru_cache_node_data(node);
void *node_value = _z_lru_cache_node_value(node);
node = node_data->next;
clear(node_value);
z_free(tmp);
}
}

// Sorted list function
static _z_lru_cache_node_t *_z_lru_cache_search_slist(_z_lru_cache_t *cache, void *value, _z_lru_val_cmp_f compare,
size_t *idx) {
Expand Down Expand Up @@ -223,32 +235,21 @@ z_result_t _z_lru_cache_insert(_z_lru_cache_t *cache, void *value, size_t value_
return _Z_RES_OK;
}

void _z_lru_cache_clear(_z_lru_cache_t *cache) {
void _z_lru_cache_clear(_z_lru_cache_t *cache, z_element_clear_f clear) {
// Reset slist
if (cache->slist != NULL) {
memset(cache->slist, 0, cache->capacity * sizeof(void *));
}
// Remove nodes
_z_lru_cache_node_data_t *node = cache->head;
while (node != NULL) {
_z_lru_cache_node_t *tmp = node;
_z_lru_cache_node_data_t *node_data = _z_lru_cache_node_data(node);
node = node_data->next;
z_free(tmp);
}
// Clear list
_z_lru_cache_clear_list(cache, clear);
// Reset cacge
cache->len = 0;
cache->head = NULL;
cache->tail = NULL;
}

void _z_lru_cache_delete(_z_lru_cache_t *cache) {
_z_lru_cache_node_data_t *node = cache->head;
void _z_lru_cache_delete(_z_lru_cache_t *cache, z_element_clear_f clear) {
_z_lru_cache_clear(cache, clear);
z_free(cache->slist);
// Parse list
while (node != NULL) {
_z_lru_cache_node_t *tmp = node;
_z_lru_cache_node_data_t *node_data = _z_lru_cache_node_data(node);
node = node_data->next;
z_free(tmp);
}
cache->slist = NULL;
}
2 changes: 2 additions & 0 deletions src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg) { (void)(msg); }
void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg) {
if (!msg->_messages._aliased) {
_z_network_message_svec_clear(&msg->_messages);
} else {
_z_network_message_svec_reset(&msg->_messages);
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ int _z_queryable_cache_data_compare(const void *first, const void *second) {
_z_queryable_cache_data_t *second_data = (_z_queryable_cache_data_t *)second;
return _z_keyexpr_compare(&first_data->ke_in, &second_data->ke_in);
}

void _z_queryable_cache_data_clear(_z_queryable_cache_data_t *val) {
_z_queryable_infos_svec_clear(&val->infos);
_z_keyexpr_clear(&val->ke_in);
_z_keyexpr_clear(&val->ke_out);
}
#endif // Z_FEATURE_RX_CACHE == 1

bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two) {
Expand Down Expand Up @@ -177,7 +183,7 @@ static z_result_t _z_session_queryable_get_infos(_z_session_t *zn, _z_queryable_
#if Z_FEATURE_RX_CACHE == 1
// Update cache
_z_queryable_cache_data_t cache_storage = {
.infos = _z_queryable_infos_svec_alias(&infos->infos),
.infos = _z_queryable_infos_svec_transfer(&infos->infos),
.ke_in = _z_keyexpr_duplicate(&infos->ke_in),
.ke_out = _z_keyexpr_duplicate(&infos->ke_out),
.qle_nb = infos->qle_nb,
Expand Down
15 changes: 13 additions & 2 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *
_z_interest_process_declare_final(zn, decl->_interest_id);
} break;
}
_z_n_msg_declare_clear(&msg->_body._declare);
} break;

case _Z_N_PUSH: {
_Z_DEBUG("Handling _Z_N_PUSH");
_z_n_msg_push_t *push = &msg->_body._push;
ret = _z_trigger_push(zn, push, msg->_reliability);
} break;

case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST");
_z_n_msg_request_t *req = &msg->_body._request;
Expand Down Expand Up @@ -132,6 +135,7 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *
} break;
}
} break;

case _Z_N_RESPONSE: {
_Z_DEBUG("Handling _Z_N_RESPONSE");
_z_n_msg_response_t *response = &msg->_body._response;
Expand All @@ -146,9 +150,11 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *
} break;
}
} break;

case _Z_N_RESPONSE_FINAL: {
_Z_DEBUG("Handling _Z_N_RESPONSE_FINAL");
ret = _z_trigger_reply_final(zn, &msg->_body._response_final);
_z_n_msg_response_final_clear(&msg->_body._response_final);
} break;

case _Z_N_INTEREST: {
Expand All @@ -162,8 +168,13 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *
} else {
_z_interest_process_interest_final(zn, interest->_interest._id);
}
}
_z_n_msg_interest_clear(&msg->_body._interest);
} break;

default:
_Z_ERROR("Unknown network message ID");
_z_n_msg_clear(msg);
break;
}
_z_msg_clear(msg);
return ret;
}
8 changes: 7 additions & 1 deletion src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ int _z_subscription_cache_data_compare(const void *first, const void *second) {
_z_subscription_cache_data_t *second_data = (_z_subscription_cache_data_t *)second;
return _z_keyexpr_compare(&first_data->ke_in, &second_data->ke_in);
}

void _z_subscription_cache_data_clear(_z_subscription_cache_data_t *val) {
_z_subscription_infos_svec_clear(&val->infos);
_z_keyexpr_clear(&val->ke_in);
_z_keyexpr_clear(&val->ke_out);
}
#endif // Z_FEATURE_RX_CACHE == 1

bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this_) {
Expand Down Expand Up @@ -221,7 +227,7 @@ static z_result_t _z_subscription_get_infos(_z_session_t *zn, _z_subscriber_kind
#if Z_FEATURE_RX_CACHE == 1
// Update cache, takes ownership of the data
_z_subscription_cache_data_t cache_storage = {
.infos = _z_subscription_infos_svec_alias(&infos->infos),
.infos = _z_subscription_infos_svec_transfer(&infos->infos),
.ke_in = _z_keyexpr_duplicate(&infos->ke_in),
.ke_out = _z_keyexpr_duplicate(&infos->ke_out),
.sub_nb = infos->sub_nb,
Expand Down
4 changes: 1 addition & 3 deletions src/transport/multicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ void *_zp_multicast_read_task(void *ztm_arg) {
if (ret == _Z_RES_OK) {
ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr);

if (ret == _Z_RES_OK) {
_z_t_msg_clear(&t_msg);
} else {
if (ret != _Z_RES_OK) {
_Z_ERROR("Dropping message due to processing error: %d", ret);
continue;
}
Expand Down
Loading

0 comments on commit bbb343c

Please sign in to comment.