Skip to content

Commit

Permalink
style: check_or_submit_teardown to check completing flag for future use
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Nov 7, 2024
1 parent f31955f commit 08e0eab
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
17 changes: 10 additions & 7 deletions src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,29 @@ void flat_task::dag_schedule() {
log_exit << *this;
}

bool set_going_teardown_or_submit(
void set_going_teardown_or_submit(
request_context& req_context,
bool try_on_suspended_worker
) {
// note that this function can be called multiple times
// once going_teardown is set to true, it should never go back to false.
if(! global::config_pool()->inplace_teardown() || ! thread_local_info_.is_worker_thread()) {
submit_teardown(req_context, try_on_suspended_worker);
return false;
return;
}
auto& job = *req_context.job();
auto completing = job.completing().load();
if(completing) {
// teardown task is scheduled or going_teardown is set
return false;
return;
}
if(ready_to_finish(job, true)) {
if (job.completing().compare_exchange_strong(completing, true)) {
job.going_teardown().store(true);
return true;
return;
}
}
submit_teardown(req_context, try_on_suspended_worker);
return false;
}

bool check_or_submit_teardown(
Expand All @@ -110,8 +109,12 @@ bool check_or_submit_teardown(
bool try_on_suspended_worker
) {
if(global::config_pool()->inplace_teardown()) {
if(ready_to_finish(*req_context.job(), calling_from_task)) {
return true;
auto& job = *req_context.job();
if(ready_to_finish(job, calling_from_task)) {
auto completing = job.completing().load();
if (! completing && job.completing().compare_exchange_strong(completing, true)) {
return true;
}
}
}
submit_teardown(req_context, try_on_suspended_worker);
Expand Down
15 changes: 10 additions & 5 deletions src/jogasaki/scheduler/flat_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ void dag_schedule(request_context& req_context);
void submit_teardown(request_context& req_context, bool try_on_suspended_worker = false);

/**
* @brief check if job is ready to finish, otherwise submit teardown task
* @details if the job is ready to finish, return true. Otherwise submit teardown task and return false.
* @brief check if job is ready to finish, or submit teardown task
* @details if the job is ready to finish, return true. Otherwise, submit teardown task and return false.
* In both cases, check job_context::completing() flag and if it is already true, do nothing to prevent finishing
* job twice.
* @param req_context the request context where the task belongs
* @param calling_from_task whether the function is called from task (i.e. one of worker threads on task scheduler)
* @param try_on_suspended_worker whether to try to submit the task on suspended worker
Expand All @@ -361,12 +363,15 @@ bool check_or_submit_teardown(
);

/**
* @brief set going_teardown flag if the current thread is in scheduler worker, otherwise submit teardown task
* @details by setting going_teardown flag, the task run by current thread completes the job at the end of the task
* @brief set going_teardown flag or submit teardown task
* @details set going_teardown flag if the current thread is in scheduler worker and job is ready to finish,
* otherwise submit teardown task. In both cases, check job_context::completing() flag and if it is already true,
* do nothing to prevent finishing job twice.
* By setting going_teardown flag, the task run by current thread completes the job at the end of the task.
* @param req_context the request context where the task belongs
* @param try_on_suspended_worker whether to try to submit the task on suspended worker
*/
bool set_going_teardown_or_submit(
void set_going_teardown_or_submit(
request_context& req_context,
bool try_on_suspended_worker = false
);
Expand Down

0 comments on commit 08e0eab

Please sign in to comment.