From 6697d0ddaeb5f2ef0e898b7e7e22e8efdbc8f852 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 23 Oct 2024 15:34:51 +0200 Subject: [PATCH] Warn if there are unstopped threads (#304) One of the goals with UCXX is to, as much as possible, prevent the user from having to manually cleanup resources, which allows for a simpler programming model and prevents memory leaks. However, there are some resources that are notably difficult to ensure they _will_ be cleaned up appropriately, threads for example. With this PR the user will now be warned about running threads when a worker is being destroyed, but will nevertheless attempt to stop them. The expectation is that with this change the user can be told how to resolve such problems so that it can be done manually to guarantee there's no leakage of resources in the running thread. What currently happens is, when a certain codeblock causes a `ucxx::Worker` to go out-of-scope, if care is not taken to ensure, for example, the progress thread has already completed processing all pending tasks (e.g., `ucxx::Request`s), the surviving thread will end up destroying the `ucxx::Worker`, and subsequently itself, from the running progress thread which is an invalid pattern and will cause deadlocks. This is currently observed intermittently in some C++ tests, where `std::system_error` may be raised: ``` terminate called after throwing an instance of 'std::system_error' what(): Resource deadlock avoided timeout: the monitored command dumped core ``` The tests are also modified to apply this pattern of calling `ucxx::Worker::stopProgressThread()` at teardown, and thus prevent errors like above from occurring. It may still be possible to handle this issue more gracefully, but for the moment it's best to ensure the user takes care of it while a more resilient solution can be worked on. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/ucxx/pull/304 --- cpp/include/ucxx/notifier.h | 7 +++++++ cpp/include/ucxx/worker_progress_thread.h | 5 +++++ cpp/python/include/ucxx/python/notifier.h | 7 +++++++ cpp/python/src/notifier.cpp | 6 ++++++ cpp/src/worker.cpp | 14 ++++++++++++-- cpp/tests/request.cpp | 7 +++++++ cpp/tests/worker.cpp | 7 +++++++ 7 files changed, 51 insertions(+), 2 deletions(-) diff --git a/cpp/include/ucxx/notifier.h b/cpp/include/ucxx/notifier.h index b1a29a25..554271be 100644 --- a/cpp/include/ucxx/notifier.h +++ b/cpp/include/ucxx/notifier.h @@ -97,6 +97,13 @@ class Notifier { * it should stop and exit. */ virtual void stopRequestNotifierThread() = 0; + + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ + [[nodiscard]] virtual bool isRunning() const = 0; }; } // namespace ucxx diff --git a/cpp/include/ucxx/worker_progress_thread.h b/cpp/include/ucxx/worker_progress_thread.h index 70700cb0..19ce1012 100644 --- a/cpp/include/ucxx/worker_progress_thread.h +++ b/cpp/include/ucxx/worker_progress_thread.h @@ -162,6 +162,11 @@ class WorkerProgressThread { */ [[nodiscard]] std::thread::id getId() const; + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ [[nodiscard]] bool isRunning() const; void stop(); diff --git a/cpp/python/include/ucxx/python/notifier.h b/cpp/python/include/ucxx/python/notifier.h index 4f2d577c..2e72ebc8 100644 --- a/cpp/python/include/ucxx/python/notifier.h +++ b/cpp/python/include/ucxx/python/notifier.h @@ -139,6 +139,13 @@ class Notifier : public ::ucxx::Notifier { * that it should stop and exit. */ void stopRequestNotifierThread() override; + + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ + [[nodiscard]] bool isRunning() const override; }; } // namespace python diff --git a/cpp/python/src/notifier.cpp b/cpp/python/src/notifier.cpp index 0fd2d200..16a97b8c 100644 --- a/cpp/python/src/notifier.cpp +++ b/cpp/python/src/notifier.cpp @@ -122,6 +122,12 @@ void Notifier::stopRequestNotifierThread() _notifierThreadConditionVariable.notify_all(); } +bool Notifier::isRunning() const +{ + return _notifierThreadFutureStatusReady || + _notifierThreadFutureStatusFinished == RequestNotifierThreadState::Running; +} + } // namespace python } // namespace ucxx diff --git a/cpp/src/worker.cpp b/cpp/src/worker.cpp index 614f5521..bc80d609 100644 --- a/cpp/src/worker.cpp +++ b/cpp/src/worker.cpp @@ -164,8 +164,18 @@ Worker::~Worker() _handle, canceled); - stopProgressThreadNoWarn(); - if (_notifier) _notifier->stopRequestNotifierThread(); + if (_progressThread.isRunning()) { + ucxx_warn( + "The progress thread should be explicitly stopped with `stopProgressThread()` to prevent " + "unintended effects, such as destructors being called from that thread."); + stopProgressThreadNoWarn(); + } + if (_notifier && _notifier->isRunning()) { + ucxx_warn( + "The notifier thread should be explicitly stopped with `stopNotifierThread()` to prevent " + "unintended effects, such as destructors being called from that thread."); + _notifier->stopRequestNotifierThread(); + } drainWorkerTagRecv(); diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index 75ca48ec..c5dbd85c 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -89,6 +89,13 @@ class RequestTest : public ::testing::TestWithParam< _ep = _worker->createEndpointFromWorkerAddress(_worker->getAddress()); } + void TearDown() + { + if (_progressMode == ProgressMode::ThreadPolling || + _progressMode == ProgressMode::ThreadBlocking) + _worker->stopProgressThread(); + } + void allocate(const size_t numBuffers = 1, const bool allocateRecvBuffer = true) { _numBuffers = numBuffers; diff --git a/cpp/tests/worker.cpp b/cpp/tests/worker.cpp index 96d662b8..77667045 100644 --- a/cpp/tests/worker.cpp +++ b/cpp/tests/worker.cpp @@ -65,6 +65,13 @@ class WorkerProgressTest : public WorkerTest, _progressWorker = getProgressFunction(_worker, _progressMode); } + + void TearDown() + { + if (_progressMode == ProgressMode::ThreadPolling || + _progressMode == ProgressMode::ThreadBlocking) + _worker->stopProgressThread(); + } }; TEST_F(WorkerTest, HandleIsValid) { ASSERT_TRUE(_worker->getHandle() != nullptr); }