From 04f70588f7857600968ecab88a657c8a3db9eec3 Mon Sep 17 00:00:00 2001 From: Ryoji Kurosawa Date: Wed, 11 Oct 2023 01:22:13 +0900 Subject: [PATCH] add code to profile commit callback --- src/jogasaki/api/impl/commit_stats.h | 114 +++++++++++++++++++++++++++ src/jogasaki/api/impl/database.cpp | 3 + src/jogasaki/api/impl/database.h | 3 + src/jogasaki/durability_callback.cpp | 1 + src/jogasaki/executor/executor.cpp | 3 + src/jogasaki/transaction_context.h | 14 ++++ 6 files changed, 138 insertions(+) create mode 100644 src/jogasaki/api/impl/commit_stats.h diff --git a/src/jogasaki/api/impl/commit_stats.h b/src/jogasaki/api/impl/commit_stats.h new file mode 100644 index 000000000..c59cba505 --- /dev/null +++ b/src/jogasaki/api/impl/commit_stats.h @@ -0,0 +1,114 @@ +/* + * Copyright 2018-2023 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "../../third_party/nlohmann/json.hpp" + +#include +#include + +namespace jogasaki::api::impl { + +/** + * @brief error info object + * @details this object represents the error information of the API request + */ +class commit_stats { +public: + using time_point = commit_profile::time_point; + + /** + * @brief construct empty object + */ + commit_stats() = default; + + /** + * @brief copy construct + */ + commit_stats(commit_stats const&) = delete; + + /** + * @brief move construct + */ + commit_stats(commit_stats &&) = delete; + + /** + * @brief copy assign + */ + commit_stats& operator=(commit_stats const&) = delete; + + /** + * @brief move assign + */ + commit_stats& operator=(commit_stats &&) = delete; + + /** + * @brief destruct record + */ + ~commit_stats() = default; + + void add(commit_profile const& arg) { + if(arg.commit_requested_ != time_point{}) { + ++commits_requested_; + } + if(arg.precommit_cb_invoked_ != time_point{}) { + ++precommit_cb_invoked_; + precommit_duration_ns_ += std::chrono::duration_cast(arg.precommit_cb_invoked_ - arg.commit_requested_).count(); + } + if(arg.durability_cb_invoked_ != time_point{}) { + ++durable_cb_invoked_; + durability_duration_ns_ += std::chrono::duration_cast(arg.durability_cb_invoked_ - arg.precommit_cb_invoked_).count(); + } + if(arg.commit_job_completed_ != time_point{}) { + ++commit_job_completed_; + if(arg.durability_cb_invoked_ != time_point{}) { + notification_duration_ns_ += std::chrono::duration_cast(arg.commit_job_completed_ - arg.durability_cb_invoked_).count(); + } else if(arg.precommit_cb_invoked_ != time_point{}) { + notification_duration_ns_ += std::chrono::duration_cast(arg.commit_job_completed_ - arg.precommit_cb_invoked_).count(); + } + } + } + + void dump() { + using json = nlohmann::json; + try { + json j{}; + j["commits_requested"] = commits_requested_.load(); + j["precommit_cb_invoked"] = precommit_cb_invoked_.load(); + j["durable_cb_invoked"] = durable_cb_invoked_.load(); + j["commit_job_completed"] = commit_job_completed_.load(); + j["precommit_duration_ns"] = precommit_cb_invoked_.load() ? (precommit_duration_ns_.load() / precommit_cb_invoked_.load()) : 0; + j["durability_duration_ns"] = durable_cb_invoked_.load() ? (durability_duration_ns_.load() / durable_cb_invoked_.load()) : 0; + j["notification_duration_ns"] = commit_job_completed_.load() ? (notification_duration_ns_.load() / commit_job_completed_.load()) : 0; + LOG_LP(INFO) << "commit_profile " << j.dump(); + } catch (json::exception const& e) { + LOG_LP(INFO) << "json exception " << e.what(); + } + } +private: + std::atomic_size_t commits_requested_{}; + std::atomic_size_t precommit_cb_invoked_{}; + std::atomic_size_t durable_cb_invoked_{}; + std::atomic_size_t commit_job_completed_{}; + std::atomic_size_t precommit_duration_ns_{}; + std::atomic_size_t durability_duration_ns_{}; + std::atomic_size_t notification_duration_ns_{}; +}; + + +} + diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index 5f2051724..2e1f6c13e 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -195,6 +195,8 @@ status database::stop() { kvs_db_ = nullptr; } transactions_.clear(); + + commit_stats_->dump(); return status::ok; } @@ -602,6 +604,7 @@ status database::destroy_transaction( ) { decltype(transactions_)::accessor acc{}; if (transactions_.find(acc, handle)) { + commit_stats_->add(*acc->second->profile()); transactions_.erase(acc); } else { VLOG_LP(log_warning) << "invalid handle"; diff --git a/src/jogasaki/api/impl/database.h b/src/jogasaki/api/impl/database.h index 520d863fe..888ea2500 100644 --- a/src/jogasaki/api/impl/database.h +++ b/src/jogasaki/api/impl/database.h @@ -44,6 +44,7 @@ #include #include +#include "commit_stats.h" namespace jogasaki::scheduler { class task_scheduler; @@ -288,6 +289,8 @@ class database : public api::database { std::shared_ptr durability_manager_{std::make_shared()}; std::atomic_bool stop_requested_{false}; utils::use_counter requests_inprocess_{}; + std::shared_ptr commit_stats_{std::make_shared()}; + [[nodiscard]] status prepare_common( std::string_view sql, std::shared_ptr provider, diff --git a/src/jogasaki/durability_callback.cpp b/src/jogasaki/durability_callback.cpp index 2099dc6cc..e2bfb1295 100644 --- a/src/jogasaki/durability_callback.cpp +++ b/src/jogasaki/durability_callback.cpp @@ -45,6 +45,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) { [marker](element_reference_type e){ VLOG(log_trace) << "/:jogasaki:durability_callback:operator() " << "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker(); + e->transaction()->profile()->durability_cb_invoked_ = commit_profile::clock::now(); scheduler::submit_teardown(*e, false, true); })) { scheduler::submit_teardown(*request_ctx); diff --git a/src/jogasaki/executor/executor.cpp b/src/jogasaki/executor/executor.cpp index ccff29d99..080d4a60e 100644 --- a/src/jogasaki/executor/executor.cpp +++ b/src/jogasaki/executor/executor.cpp @@ -568,12 +568,14 @@ scheduler::job_context::job_id_type commit_async( << txid << " job_id:" << utils::hex(jobid); + rctx->transaction()->profile()->commit_requested_ = commit_profile::clock::now(); [[maybe_unused]] auto b = rctx->transaction()->commit( [jobid, rctx, txid, &database, option]( ::sharksfin::StatusCode st, ::sharksfin::ErrorCode ec, ::sharksfin::durability_marker_type marker ){ + rctx->transaction()->profile()->precommit_cb_invoked_ = commit_profile::clock::now(); process_commit_callback(st, ec, marker, jobid, rctx, txid, database, option); }); return model::task_result::complete; @@ -587,6 +589,7 @@ scheduler::job_context::job_id_type commit_async( << txid << " status:" << (rctx->status_code() == status::ok ? "committed" : "aborted"); + rctx->transaction()->profile()->commit_job_completed_ = commit_profile::clock::now(); on_completion(rctx->status_code(), rctx->error_info()); }); std::weak_ptr wrctx{rctx}; diff --git a/src/jogasaki/transaction_context.h b/src/jogasaki/transaction_context.h index e940da3ed..e1da2effe 100644 --- a/src/jogasaki/transaction_context.h +++ b/src/jogasaki/transaction_context.h @@ -24,6 +24,16 @@ namespace jogasaki { +struct commit_profile { + using clock = std::chrono::steady_clock; + using time_point = std::chrono::time_point; + + time_point commit_requested_{}; + time_point precommit_cb_invoked_{}; + time_point durability_cb_invoked_{}; + time_point commit_job_completed_{}; +}; + namespace details { inline std::uint32_t upper(std::uint64_t arg) { @@ -251,6 +261,9 @@ class transaction_context { */ void durability_marker(std::optional arg) noexcept; + std::shared_ptr const& profile() const noexcept { + return profile_; + } private: std::shared_ptr transaction_{}; std::size_t id_{}; @@ -259,6 +272,7 @@ class transaction_context { std::shared_ptr error_info_{}; commit_response_kind commit_response_{commit_response_kind::undefined}; std::optional durability_marker_{}; + std::shared_ptr profile_{std::make_shared()}; static inline std::atomic_size_t id_source_{}; //NOLINT };