diff --git a/src/jogasaki/scheduler/dag_controller_impl.cpp b/src/jogasaki/scheduler/dag_controller_impl.cpp index 4064cae6..01986036 100644 --- a/src/jogasaki/scheduler/dag_controller_impl.cpp +++ b/src/jogasaki/scheduler/dag_controller_impl.cpp @@ -299,7 +299,7 @@ void dag_controller::impl::check_and_generate_internal_events(step const& s) { case step_state_kind::deactivated: if(all_steps_deactivated(*graph_)) { graph_deactivated_ = true; - submit_teardown(*request_context_); + set_going_teardown_or_submit(*request_context_); } break; } diff --git a/src/jogasaki/scheduler/flat_task.cpp b/src/jogasaki/scheduler/flat_task.cpp index 60c9198d..cce053d3 100644 --- a/src/jogasaki/scheduler/flat_task.cpp +++ b/src/jogasaki/scheduler/flat_task.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -77,6 +78,22 @@ void flat_task::dag_schedule() { log_exit << *this; } +bool set_going_teardown_or_submit( + request_context& req_context, + bool force, + bool try_on_suspended_worker +) { + if(global::config_pool()->inplace_teardown() && thread_local_info_.is_worker_thread()) { + if(ready_to_finish(*req_context.job(), true)) { + req_context.job()->going_teardown().store(true); + return true; + } + } + submit_teardown(req_context, force, try_on_suspended_worker); + return false; +} + + bool check_or_submit_teardown( request_context& req_context, bool calling_from_task, @@ -266,7 +283,7 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) { auto lk = (tctx && sticky_) ? std::unique_lock{tctx->mutex()} : std::unique_lock{}; - job_completes = execute(ctx); + job_completes = execute(ctx) || job()->going_teardown(); if (tctx && sticky_) { tctx->decrement_worker_count(); } diff --git a/src/jogasaki/scheduler/flat_task.h b/src/jogasaki/scheduler/flat_task.h index 4fa11c6c..0db2dc32 100644 --- a/src/jogasaki/scheduler/flat_task.h +++ b/src/jogasaki/scheduler/flat_task.h @@ -133,15 +133,6 @@ struct statement_context { error_info_stats_callback callback_{}; //NOLINT }; -void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false); - -bool check_or_submit_teardown( - request_context& req_context, - bool calling_from_task = false, - bool force = false, - bool try_on_suspended_worker = false -); - /** * @brief common task object * @details The task object used commonly for the jogasaki::scheduler::task_scheduler. @@ -344,6 +335,30 @@ void finish_job(request_context& req_context); */ void dag_schedule(request_context& req_context); +/** + * @brief submit teardown task + */ +void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false); + +/** + * @brief check if job is ready to finish, otherwise submit teardown task + */ +bool check_or_submit_teardown( + request_context& req_context, + bool calling_from_task = false, + bool force = false, + bool try_on_suspended_worker = false +); + +/** + * @brief set going_teardown flag if the current thread is in scheduler worker, otherwise submit teardown task + */ +bool set_going_teardown_or_submit( + request_context& req_context, + bool force = false, + bool try_on_suspended_worker = false +); + void print_task_diagnostic(flat_task const& t, std::ostream& os); } diff --git a/src/jogasaki/scheduler/job_context.cpp b/src/jogasaki/scheduler/job_context.cpp index 9d83031e..9e168669 100644 --- a/src/jogasaki/scheduler/job_context.cpp +++ b/src/jogasaki/scheduler/job_context.cpp @@ -61,6 +61,10 @@ std::atomic_bool &job_context::started() noexcept { return started_; } +std::atomic_bool& job_context::going_teardown() noexcept { + return going_teardown_; +} + void job_context::request(std::shared_ptr arg) noexcept { if(arg) { id_ = arg->id(); diff --git a/src/jogasaki/scheduler/job_context.h b/src/jogasaki/scheduler/job_context.h index 1a114a3f..6a32ead5 100644 --- a/src/jogasaki/scheduler/job_context.h +++ b/src/jogasaki/scheduler/job_context.h @@ -157,6 +157,13 @@ class cache_align job_context { return hybrid_execution_mode_; } + /** + * @brief accessor for the going_teardown flag used to mark whether the final flat_task is going to complete the job + * without submitting teardown task + * @return going_teardown flag + */ + [[nodiscard]] std::atomic_bool& going_teardown() noexcept; + /** * @brief dump the text representation of the value to output stream * @param out the target output stream @@ -177,6 +184,7 @@ class cache_align job_context { readiness_provider readiness_provider_{}; std::shared_ptr request_detail_{}; cache_align std::atomic hybrid_execution_mode_{hybrid_execution_mode_kind::undefined}; + cache_align std::atomic_bool going_teardown_{false}; static inline std::atomic_size_t id_src_{1UL << 32UL}; //NOLINT diff --git a/src/jogasaki/scheduler/stealing_task_scheduler.cpp b/src/jogasaki/scheduler/stealing_task_scheduler.cpp index 2afff6c1..3105cfb5 100644 --- a/src/jogasaki/scheduler/stealing_task_scheduler.cpp +++ b/src/jogasaki/scheduler/stealing_task_scheduler.cpp @@ -34,20 +34,23 @@ #include #include #include +#include +#include +#include +#include #include #include #include -#include "task_scheduler.h" -#include "thread_params.h" - namespace jogasaki::scheduler { using takatori::util::throw_exception; stealing_task_scheduler::stealing_task_scheduler(thread_params params) : scheduler_cfg_(create_scheduler_cfg(params)), - scheduler_(scheduler_cfg_) + scheduler_(scheduler_cfg_, [](std::size_t id) { + thread_local_info_ = thread_info{true, id}; + }) {} std::size_t determine_worker(transaction_context const& tx, std::size_t worker_count) { diff --git a/src/jogasaki/scheduler/thread_info.h b/src/jogasaki/scheduler/thread_info.h new file mode 100644 index 00000000..5844164e --- /dev/null +++ b/src/jogasaki/scheduler/thread_info.h @@ -0,0 +1,49 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace jogasaki::scheduler { + +class thread_info { +public: + static constexpr std::size_t undefined = static_cast(-1); + + thread_info() = default; + + thread_info( + bool is_worker_thread, + std::size_t worker_index + ) : + is_worker_thread_(is_worker_thread), + worker_index_(worker_index) + {} + + [[nodiscard]] bool is_worker_thread() const noexcept { + return is_worker_thread_; + } + + [[nodiscard]] std::size_t worker_index() const noexcept { + return worker_index_; + } + +private: + bool is_worker_thread_{}; + std::size_t worker_index_{undefined}; +}; + +} // namespace diff --git a/src/jogasaki/scheduler/thread_local_info.h b/src/jogasaki/scheduler/thread_local_info.h new file mode 100644 index 00000000..5c8b21f8 --- /dev/null +++ b/src/jogasaki/scheduler/thread_local_info.h @@ -0,0 +1,27 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace jogasaki::scheduler { + +/** + * @brief thread specific information to identify the worker thread + */ +inline thread_local thread_info thread_local_info_{}; //NOLINT(cppcoreguidelines-avoid-non-const-global-variables) + +} // namespace jogasaki::scheduler