Skip to content

Commit

Permalink
remove force arg. from submit_teardown()
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Nov 1, 2024
1 parent 604c7b7 commit 81908ff
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ scheduler::job_context::job_id_type database::do_create_transaction_async(
if(*canceled) {
set_cancel_status(*rctx);
}
scheduler::submit_teardown(*rctx, false, true);
scheduler::submit_teardown(*rctx, true);
},
}
);
Expand Down
2 changes: 1 addition & 1 deletion src/jogasaki/durability_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void submit_commit_response(
} else {
rctx->commit_ctx()->on_response()(kind);
}
scheduler::submit_teardown(*rctx, false, teardown_try_on_suspended_worker);
scheduler::submit_teardown(*rctx, teardown_try_on_suspended_worker);
return model::task_result::complete;
}, false)
);
Expand Down
38 changes: 16 additions & 22 deletions src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,56 +80,50 @@ void flat_task::dag_schedule() {

bool set_going_teardown_or_submit(
request_context& req_context,
bool force,
bool try_on_suspended_worker
) {
// note that this function can be called multiple times
// once going_teardown is set to true, it should never go back to false.

if(! global::config_pool()->inplace_teardown() || ! thread_local_info_.is_worker_thread()) {
submit_teardown(req_context, try_on_suspended_worker);
return false;
}
auto& job = *req_context.job();
if(job.completing().load()) {
// possibly programming error
VLOG_LP(log_trace_fine) << "warning - " << job
<< " set_going_teardown_or_submit() for job with completing=true";
auto completing = job.completing().load();
if(completing) {
// teardown task is scheduled or going_teardown is set
return false;
};
if(global::config_pool()->inplace_teardown() && thread_local_info_.is_worker_thread()) {
if(ready_to_finish(*req_context.job(), true)) {
req_context.job()->going_teardown().store(true);
}
if(ready_to_finish(job, true)) {
if (job.completing().compare_exchange_strong(completing, true)) {
job.going_teardown().store(true);
return true;
}
}
submit_teardown(req_context, force, try_on_suspended_worker);
submit_teardown(req_context, try_on_suspended_worker);
return false;
}


bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task,
bool force,
bool try_on_suspended_worker
) {
if(global::config_pool()->inplace_teardown()) {
if(ready_to_finish(*req_context.job(), calling_from_task)) {
return true;
}
}
submit_teardown(req_context, force, try_on_suspended_worker);
submit_teardown(req_context, try_on_suspended_worker);
return false;
}

void submit_teardown(request_context& req_context, bool force, bool try_on_suspended_worker) {
void submit_teardown(request_context& req_context, bool try_on_suspended_worker) {
// make sure teardown task is submitted only once
auto& ts = *req_context.scheduler();
auto& job = *req_context.job();
if(job.going_teardown()) {
// possibly programming error
VLOG_LP(log_trace_fine) << "warning - " << job << " submit_teardown() for job with going_teardown=true";
return;
}
auto completing = job.completing().load();
if (force || (!completing && job.completing().compare_exchange_strong(completing, true))) {
if (! completing && job.completing().compare_exchange_strong(completing, true)) {
ts.schedule_task(
flat_task{task_enum_tag<flat_task_kind::teardown>, std::addressof(req_context)},
schedule_option{
Expand Down Expand Up @@ -322,7 +316,7 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) {

// Submitting teardown should be done at the end of the task since otherwise new teardown finish fast
// and start destroying job context, which can be touched by this task.
submit_teardown(*req_context_, true);
resubmit(*req_context_);
return;
}
finish_job(*req_context_);
Expand Down
22 changes: 17 additions & 5 deletions src/jogasaki/scheduler/flat_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,17 @@ class cache_align flat_task {

/**
* @brief function to check job is ready to finish
* @param job the job context to check
* @param calling_from_task whether the function is called from task (i.e. one of worker threads on task scheduler)
* @return true if there is no other tasks for the job and completion is ready
* @return false otherwise
*/
bool ready_to_finish(job_context& job, bool calling_from_task);

/**
* @brief finish the job
* @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready to finish
* @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready
* to finish (e.g. by checking with `ready_to_finish()`)
*/
void finish_job(request_context& req_context);

Expand All @@ -337,28 +340,37 @@ void dag_schedule(request_context& req_context);

/**
* @brief submit teardown task
* @details check job_context::completing() flag. If the flag is not set, set it and submit teardown task.
* Otherwise do nothing.
* @param req_context the request context where the task belongs
* @param try_on_suspended_worker whether to try to submit the task on suspended worker
*/
void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false);
void submit_teardown(request_context& req_context, bool try_on_suspended_worker = false);

/**
* @brief check if job is ready to finish, otherwise submit teardown task
* @details if the job is ready to finish, return true. Otherwise submit teardown task and return false.
* @param req_context the request context where the task belongs
* @param calling_from_task whether the function is called from task (i.e. one of worker threads on task scheduler)
* @param try_on_suspended_worker whether to try to submit the task on suspended worker
*/
bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task = false,
bool force = false,
bool try_on_suspended_worker = false
);

/**
* @brief set going_teardown flag if the current thread is in scheduler worker, otherwise submit teardown task
* @details by setting going_teardown flag, the task run by current thread completes the job at the end of the task
* @param req_context the request context where the task belongs
* @param try_on_suspended_worker whether to try to submit the task on suspended worker
*/
bool set_going_teardown_or_submit(
request_context& req_context,
bool force = false,
bool try_on_suspended_worker = false
);

void print_task_diagnostic(flat_task const& t, std::ostream& os);

}
} // namespace jogasaki::scheduler

0 comments on commit 81908ff

Please sign in to comment.