Skip to content

Commit

Permalink
Warn if there are unstopped threads (#304)
Browse files Browse the repository at this point in the history
One of the goals with UCXX is to, as much as possible, prevent the user from having to manually cleanup resources, which allows for a simpler programming model and prevents memory leaks. However, there are some resources that are notably difficult to ensure they _will_ be cleaned up appropriately, threads for example.

With this PR the user will now be warned about running threads when a worker is being destroyed, but will nevertheless attempt to stop them. The expectation is that with this change the user can be told how to resolve such problems so that it can be done manually to guarantee there's no leakage of resources in the running thread. What currently happens is, when a certain codeblock causes a `ucxx::Worker` to go out-of-scope, if care is not taken to ensure, for example, the progress thread has already completed processing all pending tasks (e.g., `ucxx::Request`s), the surviving thread will end up destroying the `ucxx::Worker`, and subsequently itself, from the running progress thread which is an invalid pattern and will cause deadlocks. This is currently observed intermittently in some C++ tests, where `std::system_error` may be raised:

```
terminate called after throwing an instance of 'std::system_error'
  what():  Resource deadlock avoided
timeout: the monitored command dumped core
```

The tests are also modified to apply this pattern of calling `ucxx::Worker::stopProgressThread()` at teardown, and thus prevent errors like above from occurring.

It may still be possible to handle this issue more gracefully, but for the moment it's best to ensure the user takes care of it while a more resilient solution can be worked on.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #304
  • Loading branch information
pentschev authored Oct 23, 2024
1 parent 90c05dc commit 6697d0d
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 2 deletions.
7 changes: 7 additions & 0 deletions cpp/include/ucxx/notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ class Notifier {
* it should stop and exit.
*/
virtual void stopRequestNotifierThread() = 0;

/**
* @brief Returns whether the thread is running.
*
* @returns Whether the thread is running.
*/
[[nodiscard]] virtual bool isRunning() const = 0;
};

} // namespace ucxx
5 changes: 5 additions & 0 deletions cpp/include/ucxx/worker_progress_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ class WorkerProgressThread {
*/
[[nodiscard]] std::thread::id getId() const;

/**
* @brief Returns whether the thread is running.
*
* @returns Whether the thread is running.
*/
[[nodiscard]] bool isRunning() const;

void stop();
Expand Down
7 changes: 7 additions & 0 deletions cpp/python/include/ucxx/python/notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ class Notifier : public ::ucxx::Notifier {
* that it should stop and exit.
*/
void stopRequestNotifierThread() override;

/**
* @brief Returns whether the thread is running.
*
* @returns Whether the thread is running.
*/
[[nodiscard]] bool isRunning() const override;
};

} // namespace python
Expand Down
6 changes: 6 additions & 0 deletions cpp/python/src/notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ void Notifier::stopRequestNotifierThread()
_notifierThreadConditionVariable.notify_all();
}

bool Notifier::isRunning() const
{
return _notifierThreadFutureStatusReady ||
_notifierThreadFutureStatusFinished == RequestNotifierThreadState::Running;
}

} // namespace python

} // namespace ucxx
14 changes: 12 additions & 2 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,18 @@ Worker::~Worker()
_handle,
canceled);

stopProgressThreadNoWarn();
if (_notifier) _notifier->stopRequestNotifierThread();
if (_progressThread.isRunning()) {
ucxx_warn(
"The progress thread should be explicitly stopped with `stopProgressThread()` to prevent "
"unintended effects, such as destructors being called from that thread.");
stopProgressThreadNoWarn();
}
if (_notifier && _notifier->isRunning()) {
ucxx_warn(
"The notifier thread should be explicitly stopped with `stopNotifierThread()` to prevent "
"unintended effects, such as destructors being called from that thread.");
_notifier->stopRequestNotifierThread();
}

drainWorkerTagRecv();

Expand Down
7 changes: 7 additions & 0 deletions cpp/tests/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class RequestTest : public ::testing::TestWithParam<
_ep = _worker->createEndpointFromWorkerAddress(_worker->getAddress());
}

void TearDown()
{
if (_progressMode == ProgressMode::ThreadPolling ||
_progressMode == ProgressMode::ThreadBlocking)
_worker->stopProgressThread();
}

void allocate(const size_t numBuffers = 1, const bool allocateRecvBuffer = true)
{
_numBuffers = numBuffers;
Expand Down
7 changes: 7 additions & 0 deletions cpp/tests/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class WorkerProgressTest : public WorkerTest,

_progressWorker = getProgressFunction(_worker, _progressMode);
}

void TearDown()
{
if (_progressMode == ProgressMode::ThreadPolling ||
_progressMode == ProgressMode::ThreadBlocking)
_worker->stopProgressThread();
}
};

TEST_F(WorkerTest, HandleIsValid) { ASSERT_TRUE(_worker->getHandle() != nullptr); }
Expand Down

0 comments on commit 6697d0d

Please sign in to comment.