Skip to content

Commit

Permalink
improve commit callback profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 10, 2023
1 parent 04f7058 commit 7b4dc71
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 66 deletions.
9 changes: 9 additions & 0 deletions include/jogasaki/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ class configuration {
return update_skips_deletion_;
}

void profile_commits(bool arg) noexcept {
profile_commits_ = arg;
}

[[nodiscard]] bool profile_commits() const noexcept {
return profile_commits_;
}
friend inline std::ostream& operator<<(std::ostream& out, configuration const& cfg) {
return out << std::boolalpha <<
"single_thread:" << cfg.single_thread() << " " <<
Expand Down Expand Up @@ -357,6 +364,7 @@ class configuration {
"worker_suspend_timeout:" << cfg.worker_suspend_timeout() << " " <<
"default_commit_response:" << cfg.default_commit_response() << " " <<
"update_skips_deletion:" << cfg.update_skips_deletion() << " " <<
"profile_commits:" << cfg.profile_commits() << " " <<
"";
}

Expand Down Expand Up @@ -391,6 +399,7 @@ class configuration {
std::size_t worker_suspend_timeout_ = 1000000;
commit_response_kind default_commit_response_{commit_response_kind::propagated};
bool update_skips_deletion_ = false;
bool profile_commits_ = false;
};

}
Expand Down
112 changes: 112 additions & 0 deletions src/jogasaki/api/impl/commit_stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "commit_stats.h"

#include <memory>
#include "../../third_party/nlohmann/json.hpp"

#include <jogasaki/logging_helper.h>
#include <jogasaki/transaction_context.h>

namespace jogasaki::api::impl {

void commit_stats::dump() {
if(! enabled_) return;
using json = nlohmann::json;
try {
json j{};
j["count_commits_requested"] = commits_requested_.load();
j["count_precommit_cb_invoked"] = precommit_cb_invoked_.load();
j["count_durable_cb_invoked"] = durable_cb_invoked_.load();
j["count_commit_job_completed"] = commit_job_completed_.load();

j["min_duration_ns_precommit"] = min_precommit_duration_ns_.load();
j["max_duration_ns_precommit"] = max_precommit_duration_ns_.load();
j["avg_duration_ns_precommit"] = precommit_cb_invoked_.load() ? (sum_precommit_duration_ns_.load() / precommit_cb_invoked_.load()) : 0;

Check warning on line 38 in src/jogasaki/api/impl/commit_stats.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

readability-implicit-bool-conversion

implicit conversion 'std::__atomic_base<unsigned long>::__int_type' (aka 'unsigned long') -> bool

j["min_duration_ns_durability"] = min_durability_duration_ns_.load();
j["max_duration_ns_durability"] = max_durability_duration_ns_.load();
j["avg_duration_ns_durability"] = durable_cb_invoked_.load() ? (sum_durability_duration_ns_.load() / durable_cb_invoked_.load()) : 0;

Check warning on line 42 in src/jogasaki/api/impl/commit_stats.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

readability-implicit-bool-conversion

implicit conversion 'std::__atomic_base<unsigned long>::__int_type' (aka 'unsigned long') -> bool

j["min_duration_ns_notification"] = min_notification_duration_ns_.load();
j["max_duration_ns_notification"] = max_notification_duration_ns_.load();
j["avg_duration_ns_notification"] = commit_job_completed_.load() ? (sum_notification_duration_ns_.load() / commit_job_completed_.load()) : 0;

Check warning on line 46 in src/jogasaki/api/impl/commit_stats.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

readability-implicit-bool-conversion

implicit conversion 'std::__atomic_base<unsigned long>::__int_type' (aka 'unsigned long') -> bool
LOG_LP(INFO) << "commit_profile " << j.dump();
} catch (json::exception const& e) {
LOG_LP(INFO) << "json exception " << e.what();
}
}

void commit_stats::add(commit_profile const& arg) {
if(! enabled_) return;
if(arg.commit_requested_ != time_point{}) {
++commits_requested_;
}
if(arg.precommit_cb_invoked_ != time_point{}) {
++precommit_cb_invoked_;
auto e = std::chrono::duration_cast<std::chrono::nanoseconds>(arg.precommit_cb_invoked_ - arg.commit_requested_).count();
sum_precommit_duration_ns_ += e;
update_min(min_precommit_duration_ns_, e);
update_max(max_precommit_duration_ns_, e);
}
if(arg.durability_cb_invoked_ != time_point{}) {
++durable_cb_invoked_;
auto e = std::chrono::duration_cast<std::chrono::nanoseconds>(arg.durability_cb_invoked_ - arg.precommit_cb_invoked_).count();
sum_durability_duration_ns_ += e;
update_min(min_durability_duration_ns_, e);
update_max(max_durability_duration_ns_, e);
}
if(arg.commit_job_completed_ != time_point{}) {
++commit_job_completed_;
std::size_t e{};
if(arg.durability_cb_invoked_ != time_point{}) {
e = std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.durability_cb_invoked_).count();
} else if(arg.precommit_cb_invoked_ != time_point{}) {
e = std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.precommit_cb_invoked_).count();
}
sum_notification_duration_ns_ += e;
update_min(min_notification_duration_ns_, e);
update_max(max_notification_duration_ns_, e);
}
}

void commit_stats::update_min(std::atomic_size_t& target, std::size_t new_v) {
auto cur = target.load();
do {
if(new_v >= cur) {
return;
}
} while(target.compare_exchange_strong(cur, new_v));
}

void commit_stats::update_max(std::atomic_size_t& target, std::size_t new_v) {
auto cur = target.load();
do {
if(new_v <= cur) {
return;
}
} while(target.compare_exchange_strong(cur, new_v));
}

void commit_stats::enabled(bool arg) noexcept {
enabled_ = arg;
}

bool commit_stats::enabled() const noexcept {
return enabled_;
}
}

68 changes: 22 additions & 46 deletions src/jogasaki/api/impl/commit_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
#pragma once

#include <memory>
#include "../../third_party/nlohmann/json.hpp"
#include <atomic>
#include <limits>

#include <jogasaki/logging_helper.h>
#include <jogasaki/transaction_context.h>
#include <jogasaki/commit_profile.h>

namespace jogasaki::api::impl {

/**
* @brief error info object
* @details this object represents the error information of the API request
* @brief commit profiling statistics
*/
class commit_stats {
public:
Expand Down Expand Up @@ -61,52 +60,29 @@ class commit_stats {
*/
~commit_stats() = default;

void add(commit_profile const& arg) {
if(arg.commit_requested_ != time_point{}) {
++commits_requested_;
}
if(arg.precommit_cb_invoked_ != time_point{}) {
++precommit_cb_invoked_;
precommit_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.precommit_cb_invoked_ - arg.commit_requested_).count();
}
if(arg.durability_cb_invoked_ != time_point{}) {
++durable_cb_invoked_;
durability_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.durability_cb_invoked_ - arg.precommit_cb_invoked_).count();
}
if(arg.commit_job_completed_ != time_point{}) {
++commit_job_completed_;
if(arg.durability_cb_invoked_ != time_point{}) {
notification_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.durability_cb_invoked_).count();
} else if(arg.precommit_cb_invoked_ != time_point{}) {
notification_duration_ns_ += std::chrono::duration_cast<std::chrono::nanoseconds>(arg.commit_job_completed_ - arg.precommit_cb_invoked_).count();
}
}
}

void dump() {
using json = nlohmann::json;
try {
json j{};
j["commits_requested"] = commits_requested_.load();
j["precommit_cb_invoked"] = precommit_cb_invoked_.load();
j["durable_cb_invoked"] = durable_cb_invoked_.load();
j["commit_job_completed"] = commit_job_completed_.load();
j["precommit_duration_ns"] = precommit_cb_invoked_.load() ? (precommit_duration_ns_.load() / precommit_cb_invoked_.load()) : 0;
j["durability_duration_ns"] = durable_cb_invoked_.load() ? (durability_duration_ns_.load() / durable_cb_invoked_.load()) : 0;
j["notification_duration_ns"] = commit_job_completed_.load() ? (notification_duration_ns_.load() / commit_job_completed_.load()) : 0;
LOG_LP(INFO) << "commit_profile " << j.dump();
} catch (json::exception const& e) {
LOG_LP(INFO) << "json exception " << e.what();
}
}
void add(commit_profile const& arg);
void dump();
void enabled(bool arg) noexcept;
[[nodiscard]] bool enabled() const noexcept;

private:
bool enabled_{false};
std::atomic_size_t commits_requested_{};
std::atomic_size_t precommit_cb_invoked_{};
std::atomic_size_t durable_cb_invoked_{};
std::atomic_size_t commit_job_completed_{};
std::atomic_size_t precommit_duration_ns_{};
std::atomic_size_t durability_duration_ns_{};
std::atomic_size_t notification_duration_ns_{};
std::atomic_size_t sum_precommit_duration_ns_{};
std::atomic_size_t sum_durability_duration_ns_{};
std::atomic_size_t sum_notification_duration_ns_{};
std::atomic_size_t min_precommit_duration_ns_{std::numeric_limits<std::size_t>::max()};
std::atomic_size_t min_durability_duration_ns_{std::numeric_limits<std::size_t>::max()};
std::atomic_size_t min_notification_duration_ns_{std::numeric_limits<std::size_t>::max()};
std::atomic_size_t max_precommit_duration_ns_{};
std::atomic_size_t max_durability_duration_ns_{};
std::atomic_size_t max_notification_duration_ns_{};

void update_min(std::atomic_size_t& target, std::size_t new_v);
void update_max(std::atomic_size_t& target, std::size_t new_v);
};


Expand Down
6 changes: 5 additions & 1 deletion src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ void dump_public_configurations(configuration const& cfg) {
LOGCFG << "(worker_suspend_timeout) " << cfg.worker_suspend_timeout() << " : duration(us) between worker thread suspends and resumes";
LOGCFG << "(commit_response) " << cfg.default_commit_response() << " : commit notification timing default";
LOGCFG << "(dev_update_skips_deletion) " << cfg.update_skips_deletion() << " : whether update statement skips unnecessary deletion when possible";
LOGCFG << "(dev_profile_commits) " << cfg.profile_commits() << " : whether to profile commit/durability callbacks";
}

status database::start() {
Expand Down Expand Up @@ -164,6 +165,7 @@ status database::start() {
task_scheduler_->start();
}

commit_stats_->enabled(cfg_->profile_commits());
kvs_db_->register_durability_callback(durability_callback{*this});

return status::ok;
Expand Down Expand Up @@ -604,7 +606,9 @@ status database::destroy_transaction(
) {
decltype(transactions_)::accessor acc{};
if (transactions_.find(acc, handle)) {
commit_stats_->add(*acc->second->profile());
if(cfg_->profile_commits()) {
commit_stats_->add(*acc->second->profile());
}
transactions_.erase(acc);
} else {
VLOG_LP(log_warning) << "invalid handle";
Expand Down
3 changes: 3 additions & 0 deletions src/jogasaki/api/resource/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ std::shared_ptr<jogasaki::configuration> convert_config_internal(tateyama::api::
if (auto v = jogasaki_config->get<bool>("dev_update_skips_deletion")) {
ret->update_skips_deletion(v.value());
}
if (auto v = jogasaki_config->get<bool>("dev_profile_commits")) {
ret->profile_commits(v.value());
}
return ret;
}

Expand Down
65 changes: 65 additions & 0 deletions src/jogasaki/commit_profile.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2018-2023 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <atomic>
#include <memory>

#include <jogasaki/commit_response.h>
#include <jogasaki/error/error_info.h>
#include <jogasaki/kvs/database.h>

namespace jogasaki {

struct commit_profile {
using clock = std::chrono::steady_clock;
using time_point = std::chrono::time_point<clock, std::chrono::nanoseconds>;

void enabled(bool arg) noexcept {
enabled_ = arg;
}

[[nodiscard]] bool enabled() const noexcept {
return enabled_;
}

void set_commit_requested() noexcept {
if(! enabled_) return;
commit_requested_ = clock::now();
}

void set_precommit_cb_invoked() noexcept {
if(! enabled_) return;
precommit_cb_invoked_ = clock::now();
}
void set_durability_cb_invoked(time_point arg) noexcept {
if(! enabled_) return;
durability_cb_invoked_ = arg;
}
void set_commit_job_completed() noexcept {
if(! enabled_) return;
commit_job_completed_ = clock::now();
}

bool enabled_{};
time_point commit_requested_{};
time_point precommit_cb_invoked_{};
time_point durability_cb_invoked_{};
time_point commit_job_completed_{};
};

}

11 changes: 8 additions & 3 deletions src/jogasaki/durability_callback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ void durability_callback::operator()(durability_callback::marker_type marker) {
request_ctx->job()->callback([request_ctx](){
(void) request_ctx;
});

commit_profile::time_point durability_callback_invoked{};
if(db_->config()->profile_commits()) {
durability_callback_invoked = commit_profile::clock::now();
}
scheduler_->schedule_task(
scheduler::create_custom_task(
request_ctx.get(),
[mgr=manager_, marker, request_ctx=request_ctx.get()](){ // capture request_ctx pointer to avoid cyclic dependency
[mgr=manager_, marker, request_ctx=request_ctx.get(), durability_callback_invoked](){ // capture request_ctx pointer to avoid cyclic dependency
if(mgr->update_current_marker(
marker,
[marker](element_reference_type e){
[marker, durability_callback_invoked](element_reference_type e){
VLOG(log_trace) << "/:jogasaki:durability_callback:operator() "
<< "--- current:" << marker << " txid:" << e->transaction()->transaction_id() << " marker:" << *e->transaction()->durability_marker();
e->transaction()->profile()->durability_cb_invoked_ = commit_profile::clock::now();
e->transaction()->profile()->set_durability_cb_invoked(durability_callback_invoked);
scheduler::submit_teardown(*e, false, true);
})) {
scheduler::submit_teardown(*request_ctx);
Expand Down
7 changes: 4 additions & 3 deletions src/jogasaki/executor/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,14 @@ scheduler::job_context::job_id_type commit_async(
<< txid
<< " job_id:"
<< utils::hex(jobid);
rctx->transaction()->profile()->commit_requested_ = commit_profile::clock::now();
rctx->transaction()->profile()->set_commit_requested();
[[maybe_unused]] auto b = rctx->transaction()->commit(
[jobid, rctx, txid, &database, option](
::sharksfin::StatusCode st,
::sharksfin::ErrorCode ec,
::sharksfin::durability_marker_type marker
){
rctx->transaction()->profile()->precommit_cb_invoked_ = commit_profile::clock::now();
rctx->transaction()->profile()->set_precommit_cb_invoked();
process_commit_callback(st, ec, marker, jobid, rctx, txid, database, option);
});
return model::task_result::complete;
Expand All @@ -589,7 +589,7 @@ scheduler::job_context::job_id_type commit_async(
<< txid
<< " status:"
<< (rctx->status_code() == status::ok ? "committed" : "aborted");
rctx->transaction()->profile()->commit_job_completed_ = commit_profile::clock::now();
rctx->transaction()->profile()->set_commit_job_completed();
on_completion(rctx->status_code(), rctx->error_info());
});
std::weak_ptr wrctx{rctx};
Expand Down Expand Up @@ -618,6 +618,7 @@ status create_transaction(
return res;
}
out = std::move(ret);
out->profile()->enabled(db.config()->profile_commits());
return status::ok;
}

Expand Down
Loading

0 comments on commit 7b4dc71

Please sign in to comment.