Skip to content

Commit

Permalink
add valgrind memory leaks check (#584)
Browse files Browse the repository at this point in the history
* added pub sub test for memory leaks

* added valgrind test

* format

* ci fix

* ci fix

* add queryable-get memory leaks test

* do not copy memory leaks script

* fixed memory leak in handlers

* docs update

* format

* format
  • Loading branch information
DenisBiryukov91 authored Aug 9, 2024
1 parent 320faf6 commit 5a82ecf
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ jobs:
cmake .. -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=~/local
cmake --build . --target install --config Release
- name: Install valgrind
uses: taiki-e/install-action@valgrind
if: matrix.os == 'ubuntu-latest'

- name: Run cmake tests with zenoh-c as dynamic library
shell: bash
run: |
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ unwrap-infallible = "0.1.5"
const_format = "0.2.32"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false, features = ["internal"] }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" , optional = true }
zenoh-runtime = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
flume = "*"

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ unwrap-infallible = "0.1.5"
const_format = "0.2.32"
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false, features = ["internal"] }
zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" , optional = true }
zenoh-runtime = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" }
flume = "*"

Expand Down
8 changes: 8 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,11 @@ Functions

.. doxygenfunction:: zc_init_logging
.. doxygenfunction:: zc_init_logging_with_callback


Other
=====

Functions
---------
.. doxygenfunction:: zc_stop_z_runtime
8 changes: 8 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -4599,6 +4599,14 @@ ZENOHC_API void zc_shm_client_list_new(zc_owned_shm_client_list_t *this_);
#if (defined(SHARED_MEMORY) && defined(UNSTABLE))
ZENOHC_API void zc_shm_client_list_null(zc_owned_shm_client_list_t *this_);
#endif
/**
* Stops all Zenoh tasks and drops all related static variables.
* All Zenoh-related structures should be properly dropped/undeclared PRIOR to this call.
* None of Zenoh functionality can be used after this call.
* Useful to suppress memory leaks messages due to Zenoh static variables (since they are never destroyed due to Rust language design).
*/
ZENOHC_API
void zc_stop_z_runtime(void);
/**
* Constructs and declares a publication cache.
*
Expand Down
2 changes: 1 addition & 1 deletion src/closures/query_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ extern "C" fn __z_handler_query_send(query: &z_loaned_query_t, context: *mut c_v

extern "C" fn __z_handler_query_drop(context: *mut c_void) {
unsafe {
let f = (context as *mut Arc<dyn Fn(Query) + Send + Sync>).read();
let f = Box::from_raw(context as *mut Arc<dyn Fn(Query) + Send + Sync>);
std::mem::drop(f);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/closures/response_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ extern "C" fn __z_handler_reply_send(reply: &z_loaned_reply_t, context: *mut c_v

extern "C" fn __z_handler_reply_drop(context: *mut c_void) {
unsafe {
let f = (context as *mut Arc<dyn Fn(Reply) + Send + Sync>).read();
let f = Box::from_raw(context as *mut Arc<dyn Fn(Reply) + Send + Sync>);
std::mem::drop(f);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/closures/sample_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extern "C" fn __z_handler_sample_send(sample: &z_loaned_sample_t, context: *mut

extern "C" fn __z_handler_sample_drop(context: *mut c_void) {
unsafe {
let f = (context as *mut Arc<dyn Fn(Sample) + Send + Sync>).read();
let f = Box::from_raw(context as *mut Arc<dyn Fn(Sample) + Send + Sync>);
std::mem::drop(f);
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,12 @@ impl CopyableToCArray for &str {
self.as_bytes().copy_to_c_array(buf, len)
}
}

/// Stops all Zenoh tasks and drops all related static variables.
/// All Zenoh-related structures should be properly dropped/undeclared PRIOR to this call.
/// None of Zenoh functionality can be used after this call.
/// Useful to suppress memory leaks messages due to Zenoh static variables (since they are never destroyed due to Rust language design).
#[no_mangle]
pub extern "C" fn zc_stop_z_runtime() {
let _z = zenoh_runtime::ZRuntimePoolGuard;
}
9 changes: 8 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ foreach(file ${files})
set(test_type "integration")
elseif (${file} MATCHES "^.*z_build_.*$")
set(test_type "build")
elseif (${file} MATCHES "^.*z_leak_.*$")
set(test_type "leak")
else()
message(FATAL_ERROR "Test file ${file} does not match any known type (z_api_ or z_int_ or z_build)")
endif()
Expand All @@ -39,6 +41,11 @@ foreach(file ${files})
target_link_libraries(${target} PRIVATE zenohc::lib Threads::Threads)
copy_dlls(${target})
set_property(TARGET ${target} PROPERTY C_STANDARD 11)
add_test(NAME "${test_type}_${target}" COMMAND ${target})
find_program(VALGRIND valgrind)
if (NOT(test_type STREQUAL leak))
add_test(NAME "${test_type}_${target}" COMMAND ${target})
elseif(VALGRIND)
add_test(NAME "${test_type}_${target}" COMMAND bash ${PROJECT_SOURCE_DIR}/tests/run_leak_check.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${target})
endif()
endforeach()

19 changes: 19 additions & 0 deletions tests/run_leak_check.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

function check_leaks {
echo "Checking $1 for memory leaks"
valgrind --leak-check=full --num-callers=50 --log-file="$1.leaks.log" $1
num_leaks=$(grep 'ERROR SUMMARY: [0-9]+' -Eo "$1.leaks.log" | grep '[0-9]+' -Eo)
echo "Detected $num_leaks memory leaks"
if (( num_leaks == 0 ))
then
return 0
else
cat $1.leaks.log
return -1
fi
}

check_leaks $1
98 changes: 98 additions & 0 deletions tests/z_leak_pub_sub_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>

#include "zenoh.h"

#undef NDEBUG
#include <assert.h>

const char *PUB_KEY_EXPR = "test/valgrind/data";
const char *SUB_KEY_EXPR = "test/valgrind/**";

void data_handler(const z_loaned_sample_t *sample, void *context) {
(void)context;
z_view_string_t key_string;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string);

z_owned_string_t payload_string;
z_bytes_deserialize_into_string(z_sample_payload(sample), &payload_string);

printf(">> [Subscriber] Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(key_string)),
z_string_data(z_loan(key_string)), (int)z_string_len(z_loan(payload_string)),
z_string_data(z_loan(payload_string)));
z_drop(z_move(payload_string));
}

int main(int argc, char **argv) {
printf("Declaring Publisher on %s\n", PUB_KEY_EXPR);

z_owned_keyexpr_t pub_keyexpr;
z_keyexpr_from_str(&pub_keyexpr, PUB_KEY_EXPR);

z_owned_config_t pub_config;
z_config_default(&pub_config);

z_owned_session_t pub_session;
z_open(&pub_session, z_move(pub_config));

z_owned_publisher_t publisher;
z_declare_publisher(&publisher, z_loan(pub_session), z_loan(pub_keyexpr), NULL);

printf("Declaring Subscriber on %s\n", SUB_KEY_EXPR);

z_view_keyexpr_t sub_keyexpr;
z_view_keyexpr_from_str(&sub_keyexpr, SUB_KEY_EXPR);

z_owned_config_t sub_config;
z_config_default(&sub_config);

z_owned_session_t sub_session;
z_open(&sub_session, z_move(sub_config));

z_owned_closure_sample_t callback;
z_closure(&callback, data_handler, NULL, NULL);

z_owned_subscriber_t subscriber;
z_declare_subscriber(&subscriber, z_loan(sub_session), z_loan(sub_keyexpr), z_move(callback), NULL);

z_sleep_s(1);

char buf[32] = {0};
for (int i = 0; i < 5; ++i) {
sprintf(buf, "data [%4d]", i);
printf("Putting Data ('%s': '%s')...\n", PUB_KEY_EXPR, buf);
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);

z_owned_bytes_t payload;
z_bytes_serialize_from_str(&payload, buf);

z_publisher_put(z_loan(publisher), z_move(payload), &options);
z_sleep_s(1);
}

z_undeclare_publisher(z_move(publisher));
z_undeclare_subscriber(z_move(subscriber));
z_close(z_move(pub_session));
z_close(z_move(sub_session));
z_drop(z_move(pub_keyexpr));

zc_stop_z_runtime();

return 0;
}
131 changes: 131 additions & 0 deletions tests/z_leak_queryable_get_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>

#include "zenoh.h"

#undef NDEBUG
#include <assert.h>

const char *GET_KEY_EXPR = "test/valgrind/data";
const char *QUERYABLE_KEY_EXPR = "test/valgrind/**";

void query_handler(const z_loaned_query_t *query, void *context) {
(void)context;
z_view_string_t key_string;
z_keyexpr_as_view_string(z_query_keyexpr(query), &key_string);

z_view_string_t params;
z_query_parameters(query, &params);

const z_loaned_bytes_t *payload = z_query_payload(query);

z_owned_string_t payload_string;
z_bytes_deserialize_into_string(payload, &payload_string);

printf(">> [Queryable ] Received Query '%.*s' with value '%.*s'\n", (int)z_string_len(z_loan(key_string)),
z_string_data(z_loan(key_string)), (int)z_string_len(z_loan(payload_string)),
z_string_data(z_loan(payload_string)));
z_drop(z_move(payload_string));
z_query_reply_options_t options;
z_query_reply_options_default(&options);

z_owned_bytes_t reply_payload;
z_bytes_clone(&reply_payload, payload);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options);
}

int main(int argc, char **argv) {
printf("Declaring Queryable on %s\n", QUERYABLE_KEY_EXPR);

z_owned_keyexpr_t queryable_keyexpr;
z_keyexpr_from_str(&queryable_keyexpr, QUERYABLE_KEY_EXPR);

z_owned_config_t queryable_config;
z_config_default(&queryable_config);

z_owned_session_t queryable_session;
z_open(&queryable_session, z_move(queryable_config));

z_owned_closure_query_t callback;
z_closure(&callback, query_handler, NULL, NULL);
z_owned_queryable_t queryable;
z_declare_queryable(&queryable, z_loan(queryable_session), z_loan(queryable_keyexpr), z_move(callback), NULL);

z_view_keyexpr_t get_keyexpr;
z_view_keyexpr_from_str(&get_keyexpr, GET_KEY_EXPR);

z_owned_config_t get_config;
z_config_default(&get_config);

z_owned_session_t get_session;
z_open(&get_session, z_move(get_config));

z_sleep_s(1);

size_t received_replies = 0;
char buf[32] = {0};
for (int i = 0; i < 5; ++i) {
sprintf(buf, "data [%4d]", i);
printf("Get with Data ('%s': '%s')...\n", GET_KEY_EXPR, buf);
z_get_options_t options;
z_get_options_default(&options);

z_owned_bytes_t payload;
z_bytes_serialize_from_str(&payload, buf);

options.payload = z_move(payload);

z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, 16);

z_get(z_loan(get_session), z_loan(get_keyexpr), "", z_move(closure), &options);

z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
received_replies++;
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));
assert(sample != NULL);

z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);

z_owned_string_t reply_str;
z_bytes_deserialize_into_string(z_sample_payload(sample), &reply_str);

printf(">> Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(key_str)), z_string_data(z_loan(key_str)),
(int)z_string_len(z_loan(reply_str)), z_string_data(z_loan(reply_str)));
z_drop(z_move(reply_str));
z_drop(z_move(reply));
}

z_drop(z_move(handler));
z_sleep_s(1);
}
assert(received_replies == 5);

z_undeclare_queryable(z_move(queryable));
z_close(z_move(get_session));
z_close(z_move(queryable_session));
z_drop(z_move(queryable_keyexpr));

zc_stop_z_runtime();

return 0;
}

0 comments on commit 5a82ecf

Please sign in to comment.