Skip to content

Commit

Permalink
Implement queuing for SerialExecutor too
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Dec 12, 2024
1 parent e3caae7 commit 7573d11
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ struct QueuedTask {
} // namespace

struct SerialExecutor::State {
std::deque<Task> task_queue;
std::priority_queue<QueuedTask> task_queue;
uint64_t spawned_tasks_count = 0;
std::mutex mutex;
std::condition_variable wait_for_tasks;
std::thread::id current_thread;
Expand Down Expand Up @@ -170,8 +171,9 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
"Attempt to schedule a task on a serial executor that has already finished or "
"been abandoned");
}
state->task_queue.push_back(
Task{std::move(task), std::move(stop_token), std::move(stop_callback)});
state->task_queue.push(QueuedTask{std::move(task), std::move(stop_token),
std::move(stop_callback), hints.priority,
state_->spawned_tasks_count++});
}
state->wait_for_tasks.notify_one();
return Status::OK();
Expand Down Expand Up @@ -262,8 +264,8 @@ void SerialExecutor::RunLoop() {
// because sometimes we will pause even with work leftover when processing
// an async generator
while (!state_->paused && !state_->task_queue.empty()) {
Task task = std::move(state_->task_queue.front());
state_->task_queue.pop_front();
Task task = std::move(const_cast<Task&>(state_->task_queue.top().task));
state_->task_queue.pop();
lk.unlock();
if (!task.stop_token.IsStopRequested()) {
std::move(task.callable)();
Expand Down

0 comments on commit 7573d11

Please sign in to comment.