Skip to content

Commit

Permalink
[#3142] fixed deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Razvan Becheriu committed Nov 20, 2023
1 parent bbf71a9 commit 1587fad
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
6 changes: 3 additions & 3 deletions src/lib/util/tests/thread_pool_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ TEST_F(ThreadPoolTest, wait) {
checkState(thread_pool, 0, 0);

items_count = 16;
thread_count = 16;
thread_count = 256;
// prepare setup
reset(thread_count);

Expand All @@ -546,8 +546,8 @@ TEST_F(ThreadPoolTest, wait) {

// calling stop should clear all threads and should keep queued items
EXPECT_NO_THROW(thread_pool.stop());
// the thread count should be 0
ASSERT_EQ(thread_pool.size(), 0);
checkState(thread_pool, 0, 0);

// wait for all items to be processed
ASSERT_TRUE(thread_pool.wait(1));
checkState(thread_pool, 0, 0);
Expand Down
29 changes: 24 additions & 5 deletions src/lib/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ struct ThreadPool {
/// Creates the thread pool queue in 'disabled' state
ThreadPoolQueue()
: enabled_(false), paused_(false), max_queue_size_(0), working_(0),
stat10(0.), stat100(0.), stat1000(0.) {
unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
}

/// @brief Destructor
Expand All @@ -289,6 +289,20 @@ struct ThreadPool {
clear();
}

/// @brief register thread so that it can be taken into account
void registerThread() {
std::lock_guard<std::mutex> lock(mutex_);
++working_;
--unavailable_;
}

/// @brief unregister thread so that it can be ignored
void unregisterThread() {
std::lock_guard<std::mutex> lock(mutex_);
--working_;
++unavailable_;
}

/// @brief set maximum number of work items in the queue
///
/// @return the maximum size (0 means unlimited)
Expand Down Expand Up @@ -377,7 +391,7 @@ struct ThreadPool {
std::unique_lock<std::mutex> lock(mutex_);
--working_;
// Signal thread waiting for threads to pause.
if (working_ == 0 && paused_) {
if (paused_ && working_ == 0 && unavailable_ == 0) {
wait_threads_cv_.notify_all();
}
// Signal thread waiting for tasks to finish.
Expand All @@ -386,10 +400,10 @@ struct ThreadPool {
}
// Wait for push or disable functions.
cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
++working_;
if (!enabled_) {
return (Item());
}
++working_;
size_t length = queue_.size();
stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
Expand Down Expand Up @@ -444,7 +458,7 @@ struct ThreadPool {
paused_ = true;
if (wait) {
// Wait for working threads to finish.
wait_threads_cv_.wait(lock, [&]() {return (working_ == 0);});
wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);});
}
}

Expand Down Expand Up @@ -493,7 +507,7 @@ struct ThreadPool {
void enable(uint32_t thread_count) {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = true;
working_ = thread_count;
unavailable_ = thread_count;
}

/// @brief disable the queue
Expand Down Expand Up @@ -562,6 +576,9 @@ struct ThreadPool {
/// @brief number of threads currently doing work
uint32_t working_;

/// @brief number of threads not running
uint32_t unavailable_;

/// @brief queue length statistic for 10 packets
double stat10;

Expand All @@ -574,6 +591,7 @@ struct ThreadPool {

/// @brief run function of each thread
void run() {
queue_.registerThread();
for (bool work = true; work; work = queue_.enabled()) {
WorkItemPtr item = queue_.pop();
if (item) {
Expand All @@ -584,6 +602,7 @@ struct ThreadPool {
}
}
}
queue_.unregisterThread();
}

/// @brief list of worker threads
Expand Down

0 comments on commit 1587fad

Please sign in to comment.