Skip to content

Commit

Permalink
implement in-place teardown for DML operations
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 31, 2024
1 parent 402a4e2 commit 9f9788c
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/jogasaki/scheduler/dag_controller_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void dag_controller::impl::check_and_generate_internal_events(step const& s) {
case step_state_kind::deactivated:
if(all_steps_deactivated(*graph_)) {
graph_deactivated_ = true;
submit_teardown(*request_context_);
set_going_teardown_or_submit(*request_context_);
}
break;
}
Expand Down
19 changes: 18 additions & 1 deletion src/jogasaki/scheduler/flat_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <jogasaki/scheduler/statement_scheduler.h>
#include <jogasaki/scheduler/statement_scheduler_impl.h>
#include <jogasaki/scheduler/task_scheduler.h>
#include <jogasaki/scheduler/thread_local_info.h>
#include <jogasaki/utils/cancel_request.h>
#include <jogasaki/utils/hex.h>
#include <jogasaki/utils/latch.h>
Expand Down Expand Up @@ -77,6 +78,22 @@ void flat_task::dag_schedule() {
log_exit << *this;
}

bool set_going_teardown_or_submit(
request_context& req_context,
bool force,
bool try_on_suspended_worker
) {
if(global::config_pool()->inplace_teardown() && thread_local_info_.is_worker_thread()) {
if(ready_to_finish(*req_context.job(), true)) {
req_context.job()->going_teardown().store(true);
return true;
}
}
submit_teardown(req_context, force, try_on_suspended_worker);
return false;
}


bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task,
Expand Down Expand Up @@ -266,7 +283,7 @@ void flat_task::operator()(tateyama::task_scheduler::context& ctx) {
auto lk = (tctx && sticky_) ?
std::unique_lock{tctx->mutex()} :
std::unique_lock<transaction_context::mutex_type>{};
job_completes = execute(ctx);
job_completes = execute(ctx) || job()->going_teardown();
if (tctx && sticky_) {
tctx->decrement_worker_count();
}
Expand Down
33 changes: 24 additions & 9 deletions src/jogasaki/scheduler/flat_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,6 @@ struct statement_context {
error_info_stats_callback callback_{}; //NOLINT
};

void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false);

bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task = false,
bool force = false,
bool try_on_suspended_worker = false
);

/**
* @brief common task object
* @details The task object used commonly for the jogasaki::scheduler::task_scheduler.
Expand Down Expand Up @@ -344,6 +335,30 @@ void finish_job(request_context& req_context);
*/
void dag_schedule(request_context& req_context);

/**
* @brief submit teardown task
*/
void submit_teardown(request_context& req_context, bool force = false, bool try_on_suspended_worker = false);

/**
* @brief check if job is ready to finish, otherwise submit teardown task
*/
bool check_or_submit_teardown(
request_context& req_context,
bool calling_from_task = false,
bool force = false,
bool try_on_suspended_worker = false
);

/**
* @brief set going_teardown flag if the current thread is in scheduler worker, otherwise submit teardown task
*/
bool set_going_teardown_or_submit(
request_context& req_context,
bool force = false,
bool try_on_suspended_worker = false
);

void print_task_diagnostic(flat_task const& t, std::ostream& os);

}
4 changes: 4 additions & 0 deletions src/jogasaki/scheduler/job_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ std::atomic_bool &job_context::started() noexcept {
return started_;
}

std::atomic_bool& job_context::going_teardown() noexcept {
return going_teardown_;
}

void job_context::request(std::shared_ptr<request_detail> arg) noexcept {
if(arg) {
id_ = arg->id();
Expand Down
8 changes: 8 additions & 0 deletions src/jogasaki/scheduler/job_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ class cache_align job_context {
return hybrid_execution_mode_;
}

/**
* @brief accessor for the going_teardown flag used to mark whether the final flat_task is going to complete the job
* without submitting teardown task
* @return going_teardown flag
*/
[[nodiscard]] std::atomic_bool& going_teardown() noexcept;

/**
* @brief dump the text representation of the value to output stream
* @param out the target output stream
Expand All @@ -177,6 +184,7 @@ class cache_align job_context {
readiness_provider readiness_provider_{};
std::shared_ptr<request_detail> request_detail_{};
cache_align std::atomic<hybrid_execution_mode_kind> hybrid_execution_mode_{hybrid_execution_mode_kind::undefined};
cache_align std::atomic_bool going_teardown_{false};

static inline std::atomic_size_t id_src_{1UL << 32UL}; //NOLINT

Expand Down
11 changes: 7 additions & 4 deletions src/jogasaki/scheduler/stealing_task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,23 @@
#include <jogasaki/scheduler/flat_task.h>
#include <jogasaki/scheduler/job_context.h>
#include <jogasaki/scheduler/request_detail.h>
#include <jogasaki/scheduler/task_scheduler.h>
#include <jogasaki/scheduler/thread_info.h>
#include <jogasaki/scheduler/thread_local_info.h>
#include <jogasaki/scheduler/thread_params.h>
#include <jogasaki/transaction_context.h>
#include <jogasaki/utils/hex.h>
#include <jogasaki/utils/latch.h>

#include "task_scheduler.h"
#include "thread_params.h"

namespace jogasaki::scheduler {

using takatori::util::throw_exception;

stealing_task_scheduler::stealing_task_scheduler(thread_params params) :
scheduler_cfg_(create_scheduler_cfg(params)),
scheduler_(scheduler_cfg_)
scheduler_(scheduler_cfg_, [](std::size_t id) {
thread_local_info_ = thread_info{true, id};
})
{}

std::size_t determine_worker(transaction_context const& tx, std::size_t worker_count) {
Expand Down
49 changes: 49 additions & 0 deletions src/jogasaki/scheduler/thread_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstddef>

namespace jogasaki::scheduler {

class thread_info {
public:
static constexpr std::size_t undefined = static_cast<std::size_t>(-1);

thread_info() = default;

thread_info(
bool is_worker_thread,
std::size_t worker_index
) :
is_worker_thread_(is_worker_thread),
worker_index_(worker_index)
{}

[[nodiscard]] bool is_worker_thread() const noexcept {
return is_worker_thread_;
}

[[nodiscard]] std::size_t worker_index() const noexcept {
return worker_index_;
}

private:
bool is_worker_thread_{};
std::size_t worker_index_{undefined};
};

} // namespace
27 changes: 27 additions & 0 deletions src/jogasaki/scheduler/thread_local_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <jogasaki/scheduler/thread_info.h>

namespace jogasaki::scheduler {

/**
* @brief thread specific information to identify the worker thread
*/
inline thread_local thread_info thread_local_info_{}; //NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

} // namespace jogasaki::scheduler

0 comments on commit 9f9788c

Please sign in to comment.