From 338c47d94bc902726d51cd79c1b1c1bf978d4fd4 Mon Sep 17 00:00:00 2001 From: Luke Marshall <52978038+mathgeekcoder@users.noreply.github.com> Date: Thu, 12 Sep 2024 17:40:01 -0700 Subject: [PATCH] An attempt to fix potential deadlock issues in Windows. Removes synchronization with worker threads on shutdown. Also removes the "search" for the main executor in the worker threads. Instead we simply pass the main executor to the thread as a parameter. We also pass the underlying shared_ptr to avoid potential edge cases where reference count drops to zero before some threads initialize. I made the run_worker static to avoid any confusion about "this" vs "executor->ptr", and so it uses the shared_ptr to reference the shared memory. The last worker thread will delete the shared memory, via the shared_ptr reference count. --- src/parallel/HighsTaskExecutor.h | 39 +++++++++++++------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/parallel/HighsTaskExecutor.h b/src/parallel/HighsTaskExecutor.h index c0acab9e69..e8381329fb 100644 --- a/src/parallel/HighsTaskExecutor.h +++ b/src/parallel/HighsTaskExecutor.h @@ -85,31 +85,23 @@ class HighsTaskExecutor { return nullptr; } - void run_worker(int workerId) { - // spin until the global executor pointer is set up - ExecutorHandle* executor; - // Following yields warning C4706: assignment within conditional - // expression when building libhighs on Windows (/W4): - // - // while (!(executor = mainWorkerHandle.load(std::memory_order_acquire))) - // HighsSpinMutex::yieldProcessor(); - while (true) { - executor = mainWorkerHandle.load(std::memory_order_acquire); - if (executor != nullptr) break; - HighsSpinMutex::yieldProcessor(); - } + static void run_worker( + int workerId, + ExecutorHandle* executor, + highs::cache_aligned::shared_ptr ref) { + // now acquire a reference count of the global executor threadLocalExecutorHandle() = *executor; - HighsSplitDeque* localDeque = workerDeques[workerId].get(); + HighsSplitDeque* localDeque = ref->workerDeques[workerId].get(); threadLocalWorkerDeque() = localDeque; - HighsTask* currentTask = workerBunk->waitForNewTask(localDeque); + HighsTask* currentTask = ref->workerBunk->waitForNewTask(localDeque); while (currentTask != nullptr) { localDeque->runStolenTask(currentTask); - currentTask = random_steal_loop(localDeque); + currentTask = ref->random_steal_loop(localDeque); if (currentTask != nullptr) continue; - currentTask = workerBunk->waitForNewTask(localDeque); + currentTask = ref->workerBunk->waitForNewTask(localDeque); } } @@ -124,8 +116,12 @@ class HighsTaskExecutor { workerBunk, workerDeques.data(), i, numThreads); threadLocalWorkerDeque() = workerDeques[0].get(); - for (int i = 1; i < numThreads; ++i) - std::thread([&](int id) { run_worker(id); }, i).detach(); + } + + void init(ExecutorHandle* executor) { + for (int i = 1, numThreads = workerDeques.size(); i < numThreads; ++i) { + std::thread(&HighsTaskExecutor::run_worker, i, executor, executor->ptr).detach(); + } } static HighsSplitDeque* getThisWorkerDeque() { @@ -143,16 +139,13 @@ class HighsTaskExecutor { cache_aligned::make_shared(numThreads); executorHandle.ptr->mainWorkerHandle.store(&executorHandle, std::memory_order_release); + executorHandle.ptr->init(&executorHandle); } } static void shutdown(bool blocking = false) { auto& executorHandle = threadLocalExecutorHandle(); if (executorHandle.ptr) { - // first spin until every worker has acquired its executor reference - while (executorHandle.ptr.use_count() != - (long)executorHandle.ptr->workerDeques.size()) - HighsSpinMutex::yieldProcessor(); // set the active flag to false first with release ordering executorHandle.ptr->mainWorkerHandle.store(nullptr, std::memory_order_release);