diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 3cefa9b1bc383..73a94619808e3 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -72,7 +72,8 @@ struct QueuedTask { } // namespace struct SerialExecutor::State { - std::deque task_queue; + std::priority_queue task_queue; + uint64_t spawned_tasks_count = 0; std::mutex mutex; std::condition_variable wait_for_tasks; std::thread::id current_thread; @@ -170,8 +171,9 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, "Attempt to schedule a task on a serial executor that has already finished or " "been abandoned"); } - state->task_queue.push_back( - Task{std::move(task), std::move(stop_token), std::move(stop_callback)}); + state->task_queue.push(QueuedTask{std::move(task), std::move(stop_token), + std::move(stop_callback), hints.priority, + state_->spawned_tasks_count++}); } state->wait_for_tasks.notify_one(); return Status::OK(); @@ -262,8 +264,8 @@ void SerialExecutor::RunLoop() { // because sometimes we will pause even with work leftover when processing // an async generator while (!state_->paused && !state_->task_queue.empty()) { - Task task = std::move(state_->task_queue.front()); - state_->task_queue.pop_front(); + Task task = std::move(const_cast(state_->task_queue.top().task)); + state_->task_queue.pop(); lk.unlock(); if (!task.stop_token.IsStopRequested()) { std::move(task.callable)();