diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3566327b..c7032a88a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,10 @@ jobs: cmake .. -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=~/local cmake --build . --target install --config Release + - name: Install valgrind + uses: taiki-e/install-action@valgrind + if: matrix.os == 'ubuntu-latest' + - name: Run cmake tests with zenoh-c as dynamic library shell: bash run: | diff --git a/Cargo.lock b/Cargo.lock index 5b820a761..bdd61aca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3611,6 +3611,7 @@ dependencies = [ "unwrap-infallible", "zenoh", "zenoh-ext", + "zenoh-runtime", "zenoh-util", ] diff --git a/Cargo.toml b/Cargo.toml index e794d8262..367ff9ed2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ unwrap-infallible = "0.1.5" const_format = "0.2.32" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false, features = ["internal"] } zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" , optional = true } +zenoh-runtime = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" } zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" } flume = "*" diff --git a/Cargo.toml.in b/Cargo.toml.in index 42a2b0eb3..7f60c486b 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -54,6 +54,7 @@ unwrap-infallible = "0.1.5" const_format = "0.2.32" zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0", default-features = false, features = ["internal"] } zenoh-ext = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" , optional = true } +zenoh-runtime = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" } zenoh-util = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "dev/1.0.0" } flume = "*" diff --git a/docs/api.rst b/docs/api.rst index 9364c81a6..1f6a6a0b8 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -896,3 +896,11 @@ Functions .. doxygenfunction:: zc_init_logging .. doxygenfunction:: zc_init_logging_with_callback + + +Other +===== + +Functions +--------- +.. doxygenfunction:: zc_stop_z_runtime \ No newline at end of file diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 8f0ce542f..24bfc5817 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -4599,6 +4599,14 @@ ZENOHC_API void zc_shm_client_list_new(zc_owned_shm_client_list_t *this_); #if (defined(SHARED_MEMORY) && defined(UNSTABLE)) ZENOHC_API void zc_shm_client_list_null(zc_owned_shm_client_list_t *this_); #endif +/** + * Stops all Zenoh tasks and drops all related static variables. + * All Zenoh-related structures should be properly dropped/undeclared PRIOR to this call. + * None of Zenoh functionality can be used after this call. + * Useful to suppress memory leaks messages due to Zenoh static variables (since they are never destroyed due to Rust language design). + */ +ZENOHC_API +void zc_stop_z_runtime(void); /** * Constructs and declares a publication cache. * diff --git a/src/closures/query_channel.rs b/src/closures/query_channel.rs index f24f7033b..1b3c9cccb 100644 --- a/src/closures/query_channel.rs +++ b/src/closures/query_channel.rs @@ -61,7 +61,7 @@ extern "C" fn __z_handler_query_send(query: &z_loaned_query_t, context: *mut c_v extern "C" fn __z_handler_query_drop(context: *mut c_void) { unsafe { - let f = (context as *mut Arc).read(); + let f = Box::from_raw(context as *mut Arc); std::mem::drop(f); } } diff --git a/src/closures/response_channel.rs b/src/closures/response_channel.rs index f106a56a2..518d3eb7b 100644 --- a/src/closures/response_channel.rs +++ b/src/closures/response_channel.rs @@ -61,7 +61,7 @@ extern "C" fn __z_handler_reply_send(reply: &z_loaned_reply_t, context: *mut c_v extern "C" fn __z_handler_reply_drop(context: *mut c_void) { unsafe { - let f = (context as *mut Arc).read(); + let f = Box::from_raw(context as *mut Arc); std::mem::drop(f); } } diff --git a/src/closures/sample_channel.rs b/src/closures/sample_channel.rs index b74fc9b28..6f84f3b3d 100644 --- a/src/closures/sample_channel.rs +++ b/src/closures/sample_channel.rs @@ -63,7 +63,7 @@ extern "C" fn __z_handler_sample_send(sample: &z_loaned_sample_t, context: *mut extern "C" fn __z_handler_sample_drop(context: *mut c_void) { unsafe { - let f = (context as *mut Arc).read(); + let f = Box::from_raw(context as *mut Arc); std::mem::drop(f); } } diff --git a/src/lib.rs b/src/lib.rs index 2a7633e19..3ff20ae0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,3 +162,12 @@ impl CopyableToCArray for &str { self.as_bytes().copy_to_c_array(buf, len) } } + +/// Stops all Zenoh tasks and drops all related static variables. +/// All Zenoh-related structures should be properly dropped/undeclared PRIOR to this call. +/// None of Zenoh functionality can be used after this call. +/// Useful to suppress memory leaks messages due to Zenoh static variables (since they are never destroyed due to Rust language design). +#[no_mangle] +pub extern "C" fn zc_stop_z_runtime() { + let _z = zenoh_runtime::ZRuntimePoolGuard; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 69e7e1530..64f2bbc5c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -29,6 +29,8 @@ foreach(file ${files}) set(test_type "integration") elseif (${file} MATCHES "^.*z_build_.*$") set(test_type "build") + elseif (${file} MATCHES "^.*z_leak_.*$") + set(test_type "leak") else() message(FATAL_ERROR "Test file ${file} does not match any known type (z_api_ or z_int_ or z_build)") endif() @@ -39,6 +41,11 @@ foreach(file ${files}) target_link_libraries(${target} PRIVATE zenohc::lib Threads::Threads) copy_dlls(${target}) set_property(TARGET ${target} PROPERTY C_STANDARD 11) - add_test(NAME "${test_type}_${target}" COMMAND ${target}) + find_program(VALGRIND valgrind) + if (NOT(test_type STREQUAL leak)) + add_test(NAME "${test_type}_${target}" COMMAND ${target}) + elseif(VALGRIND) + add_test(NAME "${test_type}_${target}" COMMAND bash ${PROJECT_SOURCE_DIR}/tests/run_leak_check.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${target}) + endif() endforeach() diff --git a/tests/run_leak_check.sh b/tests/run_leak_check.sh new file mode 100755 index 000000000..7ce533bd8 --- /dev/null +++ b/tests/run_leak_check.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +function check_leaks { + echo "Checking $1 for memory leaks" + valgrind --leak-check=full --num-callers=50 --log-file="$1.leaks.log" $1 + num_leaks=$(grep 'ERROR SUMMARY: [0-9]+' -Eo "$1.leaks.log" | grep '[0-9]+' -Eo) + echo "Detected $num_leaks memory leaks" + if (( num_leaks == 0 )) + then + return 0 + else + cat $1.leaks.log + return -1 + fi +} + +check_leaks $1 diff --git a/tests/z_leak_pub_sub_test.c b/tests/z_leak_pub_sub_test.c new file mode 100644 index 000000000..231d8b39e --- /dev/null +++ b/tests/z_leak_pub_sub_test.c @@ -0,0 +1,98 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include + +#include "zenoh.h" + +#undef NDEBUG +#include + +const char *PUB_KEY_EXPR = "test/valgrind/data"; +const char *SUB_KEY_EXPR = "test/valgrind/**"; + +void data_handler(const z_loaned_sample_t *sample, void *context) { + (void)context; + z_view_string_t key_string; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string); + + z_owned_string_t payload_string; + z_bytes_deserialize_into_string(z_sample_payload(sample), &payload_string); + + printf(">> [Subscriber] Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string)), (int)z_string_len(z_loan(payload_string)), + z_string_data(z_loan(payload_string))); + z_drop(z_move(payload_string)); +} + +int main(int argc, char **argv) { + printf("Declaring Publisher on %s\n", PUB_KEY_EXPR); + + z_owned_keyexpr_t pub_keyexpr; + z_keyexpr_from_str(&pub_keyexpr, PUB_KEY_EXPR); + + z_owned_config_t pub_config; + z_config_default(&pub_config); + + z_owned_session_t pub_session; + z_open(&pub_session, z_move(pub_config)); + + z_owned_publisher_t publisher; + z_declare_publisher(&publisher, z_loan(pub_session), z_loan(pub_keyexpr), NULL); + + printf("Declaring Subscriber on %s\n", SUB_KEY_EXPR); + + z_view_keyexpr_t sub_keyexpr; + z_view_keyexpr_from_str(&sub_keyexpr, SUB_KEY_EXPR); + + z_owned_config_t sub_config; + z_config_default(&sub_config); + + z_owned_session_t sub_session; + z_open(&sub_session, z_move(sub_config)); + + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler, NULL, NULL); + + z_owned_subscriber_t subscriber; + z_declare_subscriber(&subscriber, z_loan(sub_session), z_loan(sub_keyexpr), z_move(callback), NULL); + + z_sleep_s(1); + + char buf[32] = {0}; + for (int i = 0; i < 5; ++i) { + sprintf(buf, "data [%4d]", i); + printf("Putting Data ('%s': '%s')...\n", PUB_KEY_EXPR, buf); + z_publisher_put_options_t options; + z_publisher_put_options_default(&options); + + z_owned_bytes_t payload; + z_bytes_serialize_from_str(&payload, buf); + + z_publisher_put(z_loan(publisher), z_move(payload), &options); + z_sleep_s(1); + } + + z_undeclare_publisher(z_move(publisher)); + z_undeclare_subscriber(z_move(subscriber)); + z_close(z_move(pub_session)); + z_close(z_move(sub_session)); + z_drop(z_move(pub_keyexpr)); + + zc_stop_z_runtime(); + + return 0; +} diff --git a/tests/z_leak_queryable_get_test.c b/tests/z_leak_queryable_get_test.c new file mode 100644 index 000000000..58f755950 --- /dev/null +++ b/tests/z_leak_queryable_get_test.c @@ -0,0 +1,131 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include + +#include "zenoh.h" + +#undef NDEBUG +#include + +const char *GET_KEY_EXPR = "test/valgrind/data"; +const char *QUERYABLE_KEY_EXPR = "test/valgrind/**"; + +void query_handler(const z_loaned_query_t *query, void *context) { + (void)context; + z_view_string_t key_string; + z_keyexpr_as_view_string(z_query_keyexpr(query), &key_string); + + z_view_string_t params; + z_query_parameters(query, ¶ms); + + const z_loaned_bytes_t *payload = z_query_payload(query); + + z_owned_string_t payload_string; + z_bytes_deserialize_into_string(payload, &payload_string); + + printf(">> [Queryable ] Received Query '%.*s' with value '%.*s'\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string)), (int)z_string_len(z_loan(payload_string)), + z_string_data(z_loan(payload_string))); + z_drop(z_move(payload_string)); + z_query_reply_options_t options; + z_query_reply_options_default(&options); + + z_owned_bytes_t reply_payload; + z_bytes_clone(&reply_payload, payload); + + z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options); +} + +int main(int argc, char **argv) { + printf("Declaring Queryable on %s\n", QUERYABLE_KEY_EXPR); + + z_owned_keyexpr_t queryable_keyexpr; + z_keyexpr_from_str(&queryable_keyexpr, QUERYABLE_KEY_EXPR); + + z_owned_config_t queryable_config; + z_config_default(&queryable_config); + + z_owned_session_t queryable_session; + z_open(&queryable_session, z_move(queryable_config)); + + z_owned_closure_query_t callback; + z_closure(&callback, query_handler, NULL, NULL); + z_owned_queryable_t queryable; + z_declare_queryable(&queryable, z_loan(queryable_session), z_loan(queryable_keyexpr), z_move(callback), NULL); + + z_view_keyexpr_t get_keyexpr; + z_view_keyexpr_from_str(&get_keyexpr, GET_KEY_EXPR); + + z_owned_config_t get_config; + z_config_default(&get_config); + + z_owned_session_t get_session; + z_open(&get_session, z_move(get_config)); + + z_sleep_s(1); + + size_t received_replies = 0; + char buf[32] = {0}; + for (int i = 0; i < 5; ++i) { + sprintf(buf, "data [%4d]", i); + printf("Get with Data ('%s': '%s')...\n", GET_KEY_EXPR, buf); + z_get_options_t options; + z_get_options_default(&options); + + z_owned_bytes_t payload; + z_bytes_serialize_from_str(&payload, buf); + + options.payload = z_move(payload); + + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t closure; + z_fifo_channel_reply_new(&closure, &handler, 16); + + z_get(z_loan(get_session), z_loan(get_keyexpr), "", z_move(closure), &options); + + z_owned_reply_t reply; + while (z_recv(z_loan(handler), &reply) == Z_OK) { + received_replies++; + const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply)); + assert(sample != NULL); + + z_view_string_t key_str; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str); + + z_owned_string_t reply_str; + z_bytes_deserialize_into_string(z_sample_payload(sample), &reply_str); + + printf(">> Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(key_str)), z_string_data(z_loan(key_str)), + (int)z_string_len(z_loan(reply_str)), z_string_data(z_loan(reply_str))); + z_drop(z_move(reply_str)); + z_drop(z_move(reply)); + } + + z_drop(z_move(handler)); + z_sleep_s(1); + } + assert(received_replies == 5); + + z_undeclare_queryable(z_move(queryable)); + z_close(z_move(get_session)); + z_close(z_move(queryable_session)); + z_drop(z_move(queryable_keyexpr)); + + zc_stop_z_runtime(); + + return 0; +}