diff --git a/cpp/include/ucxx/notifier.h b/cpp/include/ucxx/notifier.h index b1a29a25..554271be 100644 --- a/cpp/include/ucxx/notifier.h +++ b/cpp/include/ucxx/notifier.h @@ -97,6 +97,13 @@ class Notifier { * it should stop and exit. */ virtual void stopRequestNotifierThread() = 0; + + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ + [[nodiscard]] virtual bool isRunning() const = 0; }; } // namespace ucxx diff --git a/cpp/include/ucxx/worker_progress_thread.h b/cpp/include/ucxx/worker_progress_thread.h index 70700cb0..19ce1012 100644 --- a/cpp/include/ucxx/worker_progress_thread.h +++ b/cpp/include/ucxx/worker_progress_thread.h @@ -162,6 +162,11 @@ class WorkerProgressThread { */ [[nodiscard]] std::thread::id getId() const; + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ [[nodiscard]] bool isRunning() const; void stop(); diff --git a/cpp/python/include/ucxx/python/notifier.h b/cpp/python/include/ucxx/python/notifier.h index 4f2d577c..2e72ebc8 100644 --- a/cpp/python/include/ucxx/python/notifier.h +++ b/cpp/python/include/ucxx/python/notifier.h @@ -139,6 +139,13 @@ class Notifier : public ::ucxx::Notifier { * that it should stop and exit. */ void stopRequestNotifierThread() override; + + /** + * @brief Returns whether the thread is running. + * + * @returns Whether the thread is running. + */ + [[nodiscard]] bool isRunning() const override; }; } // namespace python diff --git a/cpp/python/src/notifier.cpp b/cpp/python/src/notifier.cpp index 0fd2d200..16a97b8c 100644 --- a/cpp/python/src/notifier.cpp +++ b/cpp/python/src/notifier.cpp @@ -122,6 +122,12 @@ void Notifier::stopRequestNotifierThread() _notifierThreadConditionVariable.notify_all(); } +bool Notifier::isRunning() const +{ + return _notifierThreadFutureStatusReady || + _notifierThreadFutureStatusFinished == RequestNotifierThreadState::Running; +} + } // namespace python } // namespace ucxx diff --git a/cpp/src/worker.cpp b/cpp/src/worker.cpp index 614f5521..bc80d609 100644 --- a/cpp/src/worker.cpp +++ b/cpp/src/worker.cpp @@ -164,8 +164,18 @@ Worker::~Worker() _handle, canceled); - stopProgressThreadNoWarn(); - if (_notifier) _notifier->stopRequestNotifierThread(); + if (_progressThread.isRunning()) { + ucxx_warn( + "The progress thread should be explicitly stopped with `stopProgressThread()` to prevent " + "unintended effects, such as destructors being called from that thread."); + stopProgressThreadNoWarn(); + } + if (_notifier && _notifier->isRunning()) { + ucxx_warn( + "The notifier thread should be explicitly stopped with `stopNotifierThread()` to prevent " + "unintended effects, such as destructors being called from that thread."); + _notifier->stopRequestNotifierThread(); + } drainWorkerTagRecv(); diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index 75ca48ec..c5dbd85c 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -89,6 +89,13 @@ class RequestTest : public ::testing::TestWithParam< _ep = _worker->createEndpointFromWorkerAddress(_worker->getAddress()); } + void TearDown() + { + if (_progressMode == ProgressMode::ThreadPolling || + _progressMode == ProgressMode::ThreadBlocking) + _worker->stopProgressThread(); + } + void allocate(const size_t numBuffers = 1, const bool allocateRecvBuffer = true) { _numBuffers = numBuffers; diff --git a/cpp/tests/worker.cpp b/cpp/tests/worker.cpp index 96d662b8..77667045 100644 --- a/cpp/tests/worker.cpp +++ b/cpp/tests/worker.cpp @@ -65,6 +65,13 @@ class WorkerProgressTest : public WorkerTest, _progressWorker = getProgressFunction(_worker, _progressMode); } + + void TearDown() + { + if (_progressMode == ProgressMode::ThreadPolling || + _progressMode == ProgressMode::ThreadBlocking) + _worker->stopProgressThread(); + } }; TEST_F(WorkerTest, HandleIsValid) { ASSERT_TRUE(_worker->getHandle() != nullptr); }