diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index a72beac3..8411d738 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -1205,7 +1205,7 @@ scheduler::job_context::job_id_type database::do_create_transaction_async( if(*canceled) { set_cancel_status(*rctx); } - scheduler::submit_teardown(*rctx, false, true); + scheduler::submit_teardown(*rctx, true); }, } ); diff --git a/src/jogasaki/durability_common.cpp b/src/jogasaki/durability_common.cpp index 74b3f4cd..bae746f1 100644 --- a/src/jogasaki/durability_common.cpp +++ b/src/jogasaki/durability_common.cpp @@ -60,7 +60,7 @@ void submit_commit_response( } else { rctx->commit_ctx()->on_response()(kind); } - scheduler::submit_teardown(*rctx, false, teardown_try_on_suspended_worker); + scheduler::submit_teardown(*rctx, teardown_try_on_suspended_worker); return model::task_result::complete; }, false) ); diff --git a/src/jogasaki/scheduler/flat_task.cpp b/src/jogasaki/scheduler/flat_task.cpp index 54e3fc7f..92578f9c 100644 --- a/src/jogasaki/scheduler/flat_task.cpp +++ b/src/jogasaki/scheduler/flat_task.cpp @@ -80,34 +80,33 @@ void flat_task::dag_schedule() { bool set_going_teardown_or_submit( request_context& req_context, - bool force, bool try_on_suspended_worker ) { // note that this function can be called multiple times // once going_teardown is set to true, it should never go back to false. - + if(! global::config_pool()->inplace_teardown() || ! thread_local_info_.is_worker_thread()) { + submit_teardown(req_context, try_on_suspended_worker); + return false; + } auto& job = *req_context.job(); - if(job.completing().load()) { - // possibly programming error - VLOG_LP(log_trace_fine) << "warning - " << job - << " set_going_teardown_or_submit() for job with completing=true"; + auto completing = job.completing().load(); + if(completing) { + // teardown task is scheduled or going_teardown is set return false; - }; - if(global::config_pool()->inplace_teardown() && thread_local_info_.is_worker_thread()) { - if(ready_to_finish(*req_context.job(), true)) { - req_context.job()->going_teardown().store(true); + } + if(ready_to_finish(job, true)) { + if (job.completing().compare_exchange_strong(completing, true)) { + job.going_teardown().store(true); return true; } } - submit_teardown(req_context, force, try_on_suspended_worker); + submit_teardown(req_context, try_on_suspended_worker); return false; } - bool check_or_submit_teardown( request_context& req_context, bool calling_from_task, - bool force, bool try_on_suspended_worker ) { if(global::config_pool()->inplace_teardown()) { @@ -115,21 +114,16 @@ bool check_or_submit_teardown( return true; } } - submit_teardown(req_context, force, try_on_suspended_worker); + submit_teardown(req_context, try_on_suspended_worker); return false; } -void submit_teardown(request_context& req_context, bool force, bool try_on_suspended_worker) { +void submit_teardown(request_context& req_context, bool try_on_suspended_worker) { // make sure teardown task is submitted only once auto& ts = *req_context.scheduler(); auto& job = *req_context.job(); - if(job.going_teardown()) { - // possibly programming error - VLOG_LP(log_trace_fine) << "warning - " << job << " submit_teardown() for job with going_teardown=true"; - return; - } auto completing = job.completing().load(); - if (force || (!completing && job.completing().compare_exchange_strong(completing, true))) { + if (! completing && job.completing().compare_exchange_strong(completing, true)) { ts.schedule_task( flat_task{task_enum_tag, std::addressof(req_context)}, schedule_option{ @@ -322,7 +316,7 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) { // Submitting teardown should be done at the end of the task since otherwise new teardown finish fast // and start destroying job context, which can be touched by this task. - submit_teardown(*req_context_, true); + resubmit(*req_context_); return; } finish_job(*req_context_); diff --git a/src/jogasaki/scheduler/flat_task.h b/src/jogasaki/scheduler/flat_task.h index 0db2dc32..578e2dfb 100644 --- a/src/jogasaki/scheduler/flat_task.h +++ b/src/jogasaki/scheduler/flat_task.h @@ -319,6 +319,8 @@ class cache_align flat_task { /** * @brief function to check job is ready to finish + * @param job the job context to check + * @param calling_from_task whether the function is called from task (i.e. one of worker threads on task scheduler) * @return true if there is no other tasks for the job and completion is ready * @return false otherwise */ @@ -326,7 +328,8 @@ bool ready_to_finish(job_context& job, bool calling_from_task); /** * @brief finish the job - * @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready to finish + * @details this function doesn't check any condition for teardown, so use only when you are sure the job is ready + * to finish (e.g. by checking with `ready_to_finish()`) */ void finish_job(request_context& req_context); @@ -337,28 +340,37 @@ void dag_schedule(request_context& req_context); /** * @brief submit teardown task + * @details check job_context::completing() flag. If the flag is not set, set it and submit teardown task. + * Otherwise do nothing. + * @param req_context the request context where the task belongs + * @param try_on_suspended_worker whether to try to submit the task on suspended worker */ -void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false); +void submit_teardown(request_context& req_context, bool try_on_suspended_worker = false); /** * @brief check if job is ready to finish, otherwise submit teardown task + * @details if the job is ready to finish, return true. Otherwise submit teardown task and return false. + * @param req_context the request context where the task belongs + * @param calling_from_task whether the function is called from task (i.e. one of worker threads on task scheduler) + * @param try_on_suspended_worker whether to try to submit the task on suspended worker */ bool check_or_submit_teardown( request_context& req_context, bool calling_from_task = false, - bool force = false, bool try_on_suspended_worker = false ); /** * @brief set going_teardown flag if the current thread is in scheduler worker, otherwise submit teardown task + * @details by setting going_teardown flag, the task run by current thread completes the job at the end of the task + * @param req_context the request context where the task belongs + * @param try_on_suspended_worker whether to try to submit the task on suspended worker */ bool set_going_teardown_or_submit( request_context& req_context, - bool force = false, bool try_on_suspended_worker = false ); void print_task_diagnostic(flat_task const& t, std::ostream& os); -} +} // namespace jogasaki::scheduler