Skip to content

Commit

Permalink
Feature COMMANDLOG to record slow execution and large request/reply (v…
Browse files Browse the repository at this point in the history
…alkey-io#1294)

As discussed in PR valkey-io#336.

We have different types of resources like CPU, memory, network, etc. The
`slowlog` can only record commands eat lots of CPU during the processing
phase (doesn't include read/write network time), but can not record
commands eat too many memory and network. For example:

1. run "SET key value(10 megabytes)" command would not be recored in
slowlog, since when processing it the SET command only insert the
value's pointer into db dict. But that command eats huge memory in query
buffer and bandwidth from network. In this case, just 1000 tps can cause
10GB/s network flow.
2. run "GET key" command and the key's value length is 10 megabytes. The
get command can eat huge memory in output buffer and bandwidth to
network.

This PR introduces a new command `COMMANDLOG`, to log commands that
consume significant network bandwidth, including both input and output.
Users can retrieve the results using `COMMANDLOG get <count>
large-request` and `COMMANDLOG get <count> large-reply`, all subcommands
for `COMMANDLOG` are:

* `COMMANDLOG HELP`
* `COMMANDLOG GET <count> <slow|large-request|large-reply>`
* `COMMANDLOG LEN <slow|large-request|large-reply>`
* `COMMANDLOG RESET <slow|large-request|large-reply>`

And the slowlog is also incorporated into the commandlog.

For each of these three types, additional configs have been added for
control:

* `commandlog-request-larger-than` and
`commandlog-large-request-max-len` represent the threshold for large
requests(the unit is Bytes) and the maximum number of commands that can
be recorded.
* `commandlog-reply-larger-than` and `commandlog-large-reply-max-len`
represent the threshold for large replies(the unit is Bytes) and the
maximum number of commands that can be recorded.
* `commandlog-execution-slower-than` and
`commandlog-slow-execution-max-len` represent the threshold for slow
executions(the unit is microseconds) and the maximum number of commands
that can be recorded.
* Additionally, `slowlog-log-slower-than` and `slowlog-max-len` are now
set as aliases for these two new configs.

---------

Signed-off-by: zhaozhao.zz <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
Co-authored-by: Ping Xie <[email protected]>
  • Loading branch information
3 people authored and kronwerk committed Jan 27, 2025
1 parent 9df9d61 commit ca68e8a
Show file tree
Hide file tree
Showing 27 changed files with 1,138 additions and 292 deletions.
2 changes: 1 addition & 1 deletion cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/cluster_slot_stats.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/endianconv.c
${CMAKE_SOURCE_DIR}/src/slowlog.c
${CMAKE_SOURCE_DIR}/src/commandlog.c
${CMAKE_SOURCE_DIR}/src/eval.c
${CMAKE_SOURCE_DIR}/src/bio.c
${CMAKE_SOURCE_DIR}/src/rio.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
16 changes: 8 additions & 8 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/

#include "server.h"
#include "slowlog.h"
#include "commandlog.h"
#include "latency.h"
#include "monotonic.h"
#include "cluster_slot_stats.h"
Expand Down Expand Up @@ -117,15 +117,15 @@ void blockClient(client *c, int btype) {
* he will attempt to reprocess the command which will update the statistics.
* However in case the client was timed out or in case of module blocked client is being unblocked
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.
* This function will make updates to the commandstats, slot-stats, commandlog and monitors.
* The failed_or_rejected parameter is an indication that the blocked command was either failed internally or
* rejected/aborted externally. In case the command was rejected the value ERROR_COMMAND_REJECTED should be passed.
* In case the command failed internally, ERROR_COMMAND_FAILED should be passed.
* A value of zero indicate no error was reported after the command was unblocked */
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_or_rejected) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->duration += blocked_us + reply_us;
c->lastcmd->microseconds += c->duration;
clusterSlotStatsAddCpuDuration(c, c->duration);
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
Expand All @@ -139,9 +139,9 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int failed_
debugServerAssertWithInfo(c, NULL, 0);
}
if (server.latency_tracking_enabled)
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration * 1000);
/* Log the command into the Slow log if needed. */
slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), c->duration * 1000);
/* Log the command into the commandlog if needed. */
commandlogPushCurrentCommand(c, c->lastcmd);
c->duration = 0;
/* Log the reply duration event. */
latencyAddSampleIfNeeded("command-unblocking", reply_us / 1000);
Expand Down
265 changes: 265 additions & 0 deletions src/commandlog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/* Commandlog implements a system that is able to remember the latest N
* queries that took more than M microseconds to execute, or consumed
* too much network bandwidth and memory for input/output buffers.
*
* The execution time to reach to be logged in the slow log is set
* using the 'commandlog-execution-slower-than' config directive, that is also
* readable and writable using the CONFIG SET/GET command.
*
* Other configurations such as `commandlog-request-larger-than` and
* `commandlog-reply-larger-than` can be found with more detailed
* explanations in the config file.
*
* The command log is actually not "logged" in the server log file
* but is accessible thanks to the COMMANDLOG command.
*
* ----------------------------------------------------------------------------
*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "commandlog.h"
#include "script.h"

/* Create a new commandlog entry.
* Incrementing the ref count of all the objects retained is up to
* this function. */
static commandlogEntry *commandlogCreateEntry(client *c, robj **argv, int argc, long long value, int type) {
commandlogEntry *ce = zmalloc(sizeof(*ce));
int j, ceargc = argc;

if (ceargc > COMMANDLOG_ENTRY_MAX_ARGC) ceargc = COMMANDLOG_ENTRY_MAX_ARGC;
ce->argc = ceargc;
ce->argv = zmalloc(sizeof(robj *) * ceargc);
for (j = 0; j < ceargc; j++) {
/* Logging too many arguments is a useless memory waste, so we stop
* at COMMANDLOG_ENTRY_MAX_ARGC, but use the last argument to specify
* how many remaining arguments there were in the original command. */
if (ceargc != argc && j == ceargc - 1) {
ce->argv[j] =
createObject(OBJ_STRING, sdscatprintf(sdsempty(), "... (%d more arguments)", argc - ceargc + 1));
} else {
/* Trim too long strings as well... */
if (argv[j]->type == OBJ_STRING && sdsEncodedObject(argv[j]) &&
sdslen(argv[j]->ptr) > COMMANDLOG_ENTRY_MAX_STRING) {
sds s = sdsnewlen(argv[j]->ptr, COMMANDLOG_ENTRY_MAX_STRING);

s = sdscatprintf(s, "... (%lu more bytes)",
(unsigned long)sdslen(argv[j]->ptr) - COMMANDLOG_ENTRY_MAX_STRING);
ce->argv[j] = createObject(OBJ_STRING, s);
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
ce->argv[j] = argv[j];
} else {
/* Here we need to duplicate the string objects composing the
* argument vector of the command, because those may otherwise
* end shared with string objects stored into keys. Having
* shared objects between any part of the server, and the data
* structure holding the data, is a problem: FLUSHALL ASYNC
* may release the shared string object and create a race. */
ce->argv[j] = dupStringObject(argv[j]);
}
}
}
ce->time = time(NULL);
ce->value = value;
ce->id = server.commandlog[type].entry_id++;
ce->peerid = sdsnew(getClientPeerId(c));
ce->cname = c->name ? sdsnew(c->name->ptr) : sdsempty();
return ce;
}

/* Free a command log entry. The argument is void so that the prototype of this
* function matches the one of the 'free' method of adlist.c.
*
* This function will take care to release all the retained object. */
static void commandlogFreeEntry(void *ceptr) {
commandlogEntry *ce = ceptr;
int j;

for (j = 0; j < ce->argc; j++) decrRefCount(ce->argv[j]);
zfree(ce->argv);
sdsfree(ce->peerid);
sdsfree(ce->cname);
zfree(ce);
}

/* Initialize the command log. This function should be called a single time
* at server startup. */
void commandlogInit(void) {
for (int i = 0; i < COMMANDLOG_TYPE_NUM; i++) {
server.commandlog[i].entries = listCreate();
server.commandlog[i].entry_id = 0;
listSetFreeMethod(server.commandlog[i].entries, commandlogFreeEntry);
}
}

/* Push a new entry into the command log.
* This function will make sure to trim the command log accordingly to the
* configured max length. */
static void commandlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long value, int type) {
if (server.commandlog[type].threshold < 0 || server.commandlog[type].max_len == 0) return; /* The corresponding commandlog disabled */
if (value >= server.commandlog[type].threshold)
listAddNodeHead(server.commandlog[type].entries, commandlogCreateEntry(c, argv, argc, value, type));

/* Remove old entries if needed. */
while (listLength(server.commandlog[type].entries) > server.commandlog[type].max_len) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Remove all the entries from the current command log of the specified type. */
static void commandlogReset(int type) {
while (listLength(server.commandlog[type].entries) > 0) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Reply command logs to client. */
static void commandlogGetReply(client *c, int type, long count) {
listIter li;
listNode *ln;
commandlogEntry *ce;

if (count > (long)listLength(server.commandlog[type].entries)) {
count = listLength(server.commandlog[type].entries);
}
addReplyArrayLen(c, count);
listRewind(server.commandlog[type].entries, &li);
while (count--) {
int j;

ln = listNext(&li);
ce = ln->value;
addReplyArrayLen(c, 6);
addReplyLongLong(c, ce->id);
addReplyLongLong(c, ce->time);
addReplyLongLong(c, ce->value);
addReplyArrayLen(c, ce->argc);
for (j = 0; j < ce->argc; j++) addReplyBulk(c, ce->argv[j]);
addReplyBulkCBuffer(c, ce->peerid, sdslen(ce->peerid));
addReplyBulkCBuffer(c, ce->cname, sdslen(ce->cname));
}
}

/* Log the last command a client executed into the commandlog. */
void commandlogPushCurrentCommand(client *c, struct serverCommand *cmd) {
/* Some commands may contain sensitive data that should not be available in the commandlog.
*/
if (cmd->flags & CMD_SKIP_COMMANDLOG) return;

/* If command argument vector was rewritten, use the original
* arguments. */
robj **argv = c->original_argv ? c->original_argv : c->argv;
int argc = c->original_argv ? c->original_argc : c->argc;

/* If a script is currently running, the client passed in is a
* fake client. Or the client passed in is the original client
* if this is a EVAL or alike, doesn't matter. In this case,
* use the original client to get the client information. */
c = scriptIsRunning() ? scriptGetCaller() : c;

commandlogPushEntryIfNeeded(c, argv, argc, c->duration, COMMANDLOG_TYPE_SLOW);
commandlogPushEntryIfNeeded(c, argv, argc, c->net_input_bytes_curr_cmd, COMMANDLOG_TYPE_LARGE_REQUEST);
commandlogPushEntryIfNeeded(c, argv, argc, c->net_output_bytes_curr_cmd, COMMANDLOG_TYPE_LARGE_REPLY);
}

/* The SLOWLOG command. Implements all the subcommands needed to handle the
* slow log. */
void slowlogCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET [<count>]",
" Return top <count> entries from the slowlog (default: 10, -1 mean all).",
" Entries are made of:",
" id, timestamp, time in microseconds, arguments array, client IP and port,",
" client name",
"LEN",
" Return the length of the slowlog.",
"RESET",
" Reset the slowlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "reset")) {
commandlogReset(COMMANDLOG_TYPE_SLOW);
addReply(c, shared.ok);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "len")) {
addReplyLongLong(c, listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries));
} else if ((c->argc == 2 || c->argc == 3) && !strcasecmp(c->argv[1]->ptr, "get")) {
long count = 10;

if (c->argc == 3) {
/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all slow logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries);
}
}

commandlogGetReply(c, COMMANDLOG_TYPE_SLOW, count);
} else {
addReplySubcommandSyntaxError(c);
}
}

static int commandlogGetTypeOrReply(client *c, robj *o) {
if (!strcasecmp(o->ptr, "slow")) return COMMANDLOG_TYPE_SLOW;
if (!strcasecmp(o->ptr, "large-request")) return COMMANDLOG_TYPE_LARGE_REQUEST;
if (!strcasecmp(o->ptr, "large-reply")) return COMMANDLOG_TYPE_LARGE_REPLY;
addReplyError(c, "type should be one of the following: slow, large-request, large-reply");
return -1;
}

/* The COMMANDLOG command. Implements all the subcommands needed to handle the
* command log. */
void commandlogCommand(client *c) {
int type;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET <count> <type>",
" Return top <count> entries of the specified <type> from the commandlog (-1 mean all).",
" Entries are made of:",
" id, timestamp,",
" time in microseconds for type of slow,",
" or size in bytes for type of large-request,",
" or size in bytes for type of large-reply",
" arguments array, client IP and port,",
" client name",
"LEN <type>",
" Return the length of the specified type of commandlog.",
"RESET <type>",
" Reset the specified type of commandlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "reset")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
commandlogReset(type);
addReply(c, shared.ok);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "len")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
addReplyLongLong(c, listLength(server.commandlog[type].entries));
} else if (c->argc == 4 && !strcasecmp(c->argv[1]->ptr, "get")) {
long count;

/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if ((type = commandlogGetTypeOrReply(c, c->argv[3])) == -1) return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all command logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[type].entries);
}

commandlogGetReply(c, type, count);
} else {
addReplySubcommandSyntaxError(c);
}
}
29 changes: 14 additions & 15 deletions src/slowlog.h → src/commandlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,26 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __SLOWLOG_H__
#define __SLOWLOG_H__
#ifndef __COMMANDLOG_H__
#define __COMMANDLOG_H__

#include "server.h"

#define SLOWLOG_ENTRY_MAX_ARGC 32
#define SLOWLOG_ENTRY_MAX_STRING 128
#define COMMANDLOG_ENTRY_MAX_ARGC 32
#define COMMANDLOG_ENTRY_MAX_STRING 128

/* This structure defines an entry inside the slow log list */
typedef struct slowlogEntry {
/* This structure defines an entry inside the command log list */
typedef struct commandlogEntry {
robj **argv;
int argc;
long long id; /* Unique entry identifier. */
long long duration; /* Time spent by the query, in microseconds. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} slowlogEntry;
long long id; /* Unique entry identifier. */
long long value; /* The meaning is determined by the type of command log. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} commandlogEntry;

/* Exported API */
void slowlogInit(void);
void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration);
void commandlogInit(void);

#endif /* __SLOWLOG_H__ */
#endif /* __COMMANDLOG_H__ */
Loading

0 comments on commit ca68e8a

Please sign in to comment.