Skip to content

Commit

Permalink
add code to profile commit callback
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 10, 2023
1 parent 7bcb507 commit 04f7058
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 0 deletions.
114 changes: 114 additions & 0 deletions src/jogasaki/api/impl/commit_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include "../../third_party/nlohmann/json.hpp"

#include <jogasaki/logging_helper.h>
#include <jogasaki/transaction_context.h>

namespace jogasaki::api::impl {

/**
* @brief error info object
* @details this object represents the error information of the API request
*/
class commit_stats {
public:
using time_point = commit_profile::time_point;

/**
* @brief construct empty object
*/
commit_stats() = default;

/**
* @brief copy construct
*/
commit_stats(commit_stats const&) = delete;

/**
* @brief move construct
*/
commit_stats(commit_stats &&) = delete;

/**
* @brief copy assign
*/
commit_stats& operator=(commit_stats const&) = delete;

/**
* @brief move assign
*/
commit_stats& operator=(commit_stats &&) = delete;

/**
* @brief destruct record
*/
~commit_stats() = default;

void add(commit_profile const& arg) {
if(arg.commit_requested_ != time_point{}) {
++commits_requested_;
}
if(arg.precommit_cb_invoked_ != time_point{}) {
++precommit_cb_invoked_;
precommit_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.precommit_cb_invoked_ - arg.commit_requested_).count();
}
if(arg.durability_cb_invoked_ != time_point{}) {
++durable_cb_invoked_;
durability_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.durability_cb_invoked_ - arg.precommit_cb_invoked_).count();
}
if(arg.commit_job_completed_ != time_point{}) {
++commit_job_completed_;
if(arg.durability_cb_invoked_ != time_point{}) {
notification_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.durability_cb_invoked_).count();
} else if(arg.precommit_cb_invoked_ != time_point{}) {
notification_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.precommit_cb_invoked_).count();
}
}
}

void dump() {
using json = nlohmann::json;
try {
json j{};
j["commits_requested"] = commits_requested_.load();
j["precommit_cb_invoked"] = precommit_cb_invoked_.load();
j["durable_cb_invoked"] = durable_cb_invoked_.load();
j["commit_job_completed"] = commit_job_completed_.load();
j["precommit_duration_ns"] = precommit_cb_invoked_.load() ? (precommit_duration_ns_.load() / precommit_cb_invoked_.load()) : 0;
j["durability_duration_ns"] = durable_cb_invoked_.load() ? (durability_duration_ns_.load() / durable_cb_invoked_.load()) : 0;
j["notification_duration_ns"] = commit_job_completed_.load() ? (notification_duration_ns_.load() / commit_job_completed_.load()) : 0;
LOG_LP(INFO) << "commit_profile " << j.dump();
} catch (json::exception const& e) {
LOG_LP(INFO) << "json exception " << e.what();
}
}
private:
std::atomic_size_t commits_requested_{};
std::atomic_size_t precommit_cb_invoked_{};
std::atomic_size_t durable_cb_invoked_{};
std::atomic_size_t commit_job_completed_{};
std::atomic_size_t precommit_duration_ns_{};
std::atomic_size_t durability_duration_ns_{};
std::atomic_size_t notification_duration_ns_{};
};


}

3 changes: 3 additions & 0 deletions src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ status database::stop() {
kvs_db_ = nullptr;
}
transactions_.clear();

commit_stats_->dump();
return status::ok;
}

Expand Down Expand Up @@ -602,6 +604,7 @@ status database::destroy_transaction(
) {
decltype(transactions_)::accessor acc{};
if (transactions_.find(acc, handle)) {
commit_stats_->add(*acc->second->profile());
transactions_.erase(acc);
} else {
VLOG_LP(log_warning) << "invalid handle";
Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/api/impl/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <jogasaki/proto/metadata/storage.pb.h>

#include <tateyama/status.h>
#include "commit_stats.h"

namespace jogasaki::scheduler {
class task_scheduler;
Expand Down Expand Up @@ -288,6 +289,8 @@ class database : public api::database {
std::shared_ptr<durability_manager> durability_manager_{std::make_shared<durability_manager>()};
std::atomic_bool stop_requested_{false};
utils::use_counter requests_inprocess_{};
std::shared_ptr<commit_stats> commit_stats_{std::make_shared<commit_stats>()};

[[nodiscard]] status prepare_common(
std::string_view sql,
std::shared_ptr<yugawara::variable::configurable_provider> provider,
Expand Down
1 change: 1 addition & 0 deletions src/jogasaki/durability_callback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void durability_callback::operator()(durability_callback::marker_type marker) {
[marker](element_reference_type e){
VLOG(log_trace) << "/:jogasaki:durability_callback:operator() "
<< "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker();
e->transaction()->profile()->durability_cb_invoked_ = commit_profile::clock::now();
scheduler::submit_teardown(*e, false, true);
})) {
scheduler::submit_teardown(*request_ctx);
Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,12 +568,14 @@ scheduler::job_context::job_id_type commit_async(
<< txid
<< " job_id:"
<< utils::hex(jobid);
rctx->transaction()->profile()->commit_requested_ = commit_profile::clock::now();
[[maybe_unused]] auto b = rctx->transaction()->commit(
[jobid, rctx, txid, &database, option](
::sharksfin::StatusCode st,
::sharksfin::ErrorCode ec,
::sharksfin::durability_marker_type marker
){
rctx->transaction()->profile()->precommit_cb_invoked_ = commit_profile::clock::now();
process_commit_callback(st, ec, marker, jobid, rctx, txid, database, option);
});
return model::task_result::complete;
Expand All @@ -587,6 +589,7 @@ scheduler::job_context::job_id_type commit_async(
<< txid
<< " status:"
<< (rctx->status_code() == status::ok ? "committed" : "aborted");
rctx->transaction()->profile()->commit_job_completed_ = commit_profile::clock::now();
on_completion(rctx->status_code(), rctx->error_info());
});
std::weak_ptr wrctx{rctx};
Expand Down
14 changes: 14 additions & 0 deletions src/jogasaki/transaction_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@

namespace jogasaki {

struct commit_profile {
using clock = std::chrono::steady_clock;
using time_point = std::chrono::time_point<clock, std::chrono::nanoseconds>;

time_point commit_requested_{};
time_point precommit_cb_invoked_{};
time_point durability_cb_invoked_{};
time_point commit_job_completed_{};
};

namespace details {

inline std::uint32_t upper(std::uint64_t arg) {
Expand Down Expand Up @@ -251,6 +261,9 @@ class transaction_context {
*/
void durability_marker(std::optional<durability_marker_type> arg) noexcept;

std::shared_ptr<commit_profile> const& profile() const noexcept {
return profile_;
}
private:
std::shared_ptr<kvs::transaction> transaction_{};
std::size_t id_{};
Expand All @@ -259,6 +272,7 @@ class transaction_context {
std::shared_ptr<error::error_info> error_info_{};
commit_response_kind commit_response_{commit_response_kind::undefined};
std::optional<durability_marker_type> durability_marker_{};
std::shared_ptr<commit_profile> profile_{std::make_shared<commit_profile>()};

static inline std::atomic_size_t id_source_{}; //NOLINT
};
Expand Down

0 comments on commit 04f7058

Please sign in to comment.