Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support histograms for command latency statistics #2721

Open
wants to merge 17 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ option(ENABLE_STATIC_LIBSTDCXX "link kvrocks with static library of libstd++ ins
option(ENABLE_LUAJIT "enable use of luaJIT instead of lua" ON)
option(ENABLE_OPENSSL "enable openssl to support tls connection" OFF)
option(ENABLE_IPO "enable interprocedural optimization" ON)
option(ENABLE_HISTOGRAMS "enable histograms to view the operation latencies" OFF)
rabunkosar-dd marked this conversation as resolved.
Show resolved Hide resolved
set(SYMBOLIZE_BACKEND "" CACHE STRING "symbolization backend library for cpptrace (libbacktrace, libdwarf, or empty)")
set(PORTABLE 0 CACHE STRING "build a portable binary (disable arch-specific optimizations)")
# TODO: set ENABLE_NEW_ENCODING to ON when we are ready
Expand Down Expand Up @@ -288,6 +289,11 @@ if(ENABLE_IPO)
endif()
endif()

if(ENABLE_HISTOGRAMS)
target_compile_definitions(kvrocks_objs PUBLIC ENABLE_HISTOGRAMS)
endif()


# kvrocks main target
add_executable(kvrocks src/cli/main.cc)
target_link_libraries(kvrocks PRIVATE kvrocks_objs ${EXTERNAL_LIBS})
Expand Down
12 changes: 12 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,18 @@ json-storage-format json
# Default: no
txn-context-enabled no

# Define the histogram bucket values.
#
# If enabled, those values will be used to store the command execution latency values
# in buckets defined below. The values should be integers and must be sorted.
# An implicit bucket (+Inf in prometheus jargon) will be added to track the highest values
# that are beyond the bucket limits.

# NOTE: This is an experimental feature. There might be some performance overhead when using this
# feature, please be aware.
# Default: 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000
# histogram-bucket-boundaries 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
27 changes: 26 additions & 1 deletion src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},

#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_,
"10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000")},
#endif
/* rocksdb options */
{"rocksdb.compression", false,
new EnumField<rocksdb::CompressionType>(&rocks_db.compression, compression_types,
Expand Down Expand Up @@ -754,6 +757,28 @@ void Config::initFieldCallback() {
{"tls-session-caching", set_tls_option},
{"tls-session-cache-size", set_tls_option},
{"tls-session-cache-timeout", set_tls_option},
#endif
#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> buckets = util::Split(v, ",");
histogram_bucket_boundaries.clear();
if (buckets.size() < 1) {
return {Status::NotOK, "Please provide at least 1 bucket value for histogram"};
}
std::transform(buckets.begin(), buckets.end(), std::back_inserter(histogram_bucket_boundaries), [](const std::string& val)
{
return std::stod(val);
});
if (histogram_bucket_boundaries.size() != buckets.size()) {
return {Status::NotOK, "All values for the bucket must be double or integer values"};
}

if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) {
return {Status::NotOK, "The values for the histogram must be sorted"};
}
return Status::OK();
}},
#endif
};
for (const auto &iter : callbacks) {
Expand Down
8 changes: 8 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

#ifdef ENABLE_HISTOGRAMS
std::vector<double> histogram_bucket_boundaries;
#endif

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down Expand Up @@ -260,6 +264,10 @@ struct Config {
std::string profiling_sample_commands_str_;
std::map<std::string, std::unique_ptr<ConfigField>> fields_;
std::vector<std::string> rename_command_;
#ifdef ENABLE_HISTOGRAMS
std::string histogram_bucket_boundaries_str_;
#endif


void initFieldValidator();
void initFieldCallback();
Expand Down
38 changes: 37 additions & 1 deletion src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
#include "worker.h"

Server::Server(engine::Storage *storage, Config *config)
: storage(storage),
:
#ifdef ENABLE_HISTOGRAMS
stats(config),
#endif
storage(storage),
indexer(storage),
index_mgr(&indexer, storage),
start_time_secs_(util::GetTimeStamp()),
Expand All @@ -63,6 +67,17 @@ Server::Server(engine::Storage *storage, Config *config)
for (const auto &iter : *commands) {
stats.commands_stats[iter.first].calls = 0;
stats.commands_stats[iter.first].latency = 0;

#ifdef ENABLE_HISTOGRAMS
//NB: Extra index for the last bucket (Inf)
for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) {
auto bucket_ptr = std::shared_ptr<std::atomic<uint64_t>>(new std::atomic<uint64_t>(0));

stats.commands_histogram[iter.first].buckets.push_back(bucket_ptr);
}
stats.commands_histogram[iter.first].calls = 0;
stats.commands_histogram[iter.first].sum = 0;
#endif
}

// init cursor_dict_
Expand Down Expand Up @@ -1165,6 +1180,27 @@ void Server::GetCommandsStatsInfo(std::string *info) {
<< ",usec_per_call=" << static_cast<float>(latency / calls) << "\r\n";
}

#ifdef ENABLE_HISTOGRAMS
for (const auto &cmd_hist : stats.commands_histogram) {
auto command_name = cmd_hist.first;
auto calls = stats.commands_histogram[command_name].calls.load();
if (calls == 0) continue;

auto sum = stats.commands_histogram[command_name].sum.load();
string_stream << "cmdstathist_" << command_name << ":";
for (std::size_t i{0}; i < stats.commands_histogram[command_name].buckets.size(); ++i) {
auto bucket_value = stats.commands_histogram[command_name].buckets[i]->load();
auto bucket_bound = std::numeric_limits<double>::infinity();
if (i < stats.bucket_boundaries.size()) {
bucket_bound = stats.bucket_boundaries[i];
}

string_stream << bucket_bound << "=" << bucket_value << ",";
}
string_stream << "sum=" << sum << ",count=" << calls << "\r\n";
}
#endif

*info = string_stream.str();
}

Expand Down
28 changes: 28 additions & 0 deletions src/stats/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@
#include "fmt/format.h"
#include "time_util.h"


#ifdef ENABLE_HISTOGRAMS
Stats::Stats(Config *config)
: config_(config) {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
im.last_sample_time_ms = 0;
im.last_sample_count = 0;
im.idx = 0;
for (uint64_t &sample : im.samples) {
sample = 0;
}
inst_metrics.push_back(im);
}
bucket_boundaries = config_->histogram_bucket_boundaries;
}
#else
Stats::Stats() {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
Expand All @@ -38,6 +55,7 @@ Stats::Stats() {
inst_metrics.push_back(im);
}
}
#endif

#if defined(__APPLE__)
#include <mach/mach_init.h>
Expand Down Expand Up @@ -86,10 +104,20 @@ int64_t Stats::GetMemoryRSS() {
void Stats::IncrCalls(const std::string &command_name) {
total_calls.fetch_add(1, std::memory_order_relaxed);
commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed);
#ifdef ENABLE_HISTOGRAMS
commands_histogram[command_name].calls.fetch_add(1, std::memory_order_relaxed);
#endif
}

void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
commands_stats[command_name].latency.fetch_add(latency, std::memory_order_relaxed);
#ifdef ENABLE_HISTOGRAMS
commands_histogram[command_name].sum.fetch_add(latency, std::memory_order_relaxed);

const auto bucket_index = static_cast<std::size_t>(std::distance(
bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(), bucket_boundaries.end(), latency)));
commands_histogram[command_name].buckets[bucket_index]->fetch_add(1, std::memory_order_relaxed);
#endif
}

void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
Expand Down
27 changes: 27 additions & 0 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include <shared_mutex>
#include <string>
#include <vector>
#ifdef ENABLE_HISTOGRAMS
#include <algorithm>
#include "config/config.h"
rabunkosar-dd marked this conversation as resolved.
Show resolved Hide resolved
#endif

enum StatsMetricFlags {
STATS_METRIC_COMMAND = 0, // Number of commands executed
Expand All @@ -43,6 +47,15 @@ enum StatsMetricFlags {

constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric

#ifdef ENABLE_HISTOGRAMS
// Experimental part to support histograms for cmd statistics
struct CommandHistogram {
std::vector<std::shared_ptr<std::atomic<uint64_t>>> buckets;
std::atomic<uint64_t> calls;
std::atomic<uint64_t> sum;
};
#endif

struct CommandStat {
std::atomic<uint64_t> calls;
std::atomic<uint64_t> latency;
Expand All @@ -57,6 +70,14 @@ struct InstMetric {

class Stats {
public:
#ifdef ENABLE_HISTOGRAMS
using BucketBoundaries = std::vector<double>;
BucketBoundaries bucket_boundaries;
std::map<std::string, CommandHistogram> commands_histogram;

Config *config_ = nullptr;
#endif

std::atomic<uint64_t> total_calls = {0};
std::atomic<uint64_t> in_bytes = {0};
std::atomic<uint64_t> out_bytes = {0};
Expand All @@ -69,7 +90,13 @@ class Stats {
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;


#ifdef ENABLE_HISTOGRAMS
explicit Stats(Config *config);
#else
Stats();
#endif

void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
Expand Down
4 changes: 4 additions & 0 deletions tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ TEST(Config, GetAndSet) {
{"rocksdb.rate_limiter_auto_tuned", "yes"},
{"rocksdb.compression_level", "32767"},
{"rocksdb.wal_compression", "no"},
#ifdef ENABLE_HISTOGRAMS
{"histogram-bucket-boundaries", "10,100,1000,10000"},
#endif

};
for (const auto &iter : immutable_cases) {
s = config.Set(nullptr, iter.first, iter.second);
Expand Down
23 changes: 22 additions & 1 deletion tests/gocase/unit/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -33,7 +34,10 @@ import (
)

func TestInfo(t *testing.T) {
srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
srv0 := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"histogram-bucket-boundaries": "10,20,30,50",
})
defer func() { srv0.Close() }()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
Expand Down Expand Up @@ -102,6 +106,23 @@ func TestInfo(t *testing.T) {
t.Run("get cluster information by INFO - cluster enabled", func(t *testing.T) {
require.Equal(t, "1", util.FindInfoEntry(rdb0, "cluster_enabled", "cluster"))
})

t.Run("get command latencies via histogram INFO - histogram-bucket-boundaries", func(t *testing.T) {
output := util.FindInfoEntry(rdb0, "cmdstathist", "cmdstathist_info")
if len(output) == 0 {
t.SkipNow()
}

splitValues := strings.FieldsFunc(output, func(r rune) bool {
return r == '=' || r == ','
})

// expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=..
require.GreaterOrEqual(t, len(splitValues), 15)
require.Contains(t, splitValues, "sum")
require.Contains(t, splitValues, "count")
require.Contains(t, splitValues, "info")
})
}

func TestKeyspaceHitMiss(t *testing.T) {
Expand Down
Loading