Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature COMMANDLOG to record slow execution and large request/reply #1294

Open
wants to merge 13 commits into
base: unstable
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/cluster_slot_stats.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/endianconv.c
${CMAKE_SOURCE_DIR}/src/slowlog.c
${CMAKE_SOURCE_DIR}/src/commandlog.c
${CMAKE_SOURCE_DIR}/src/eval.c
${CMAKE_SOURCE_DIR}/src/bio.c
${CMAKE_SOURCE_DIR}/src/rio.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
14 changes: 7 additions & 7 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/

#include "server.h"
#include "slowlog.h"
#include "commandlog.h"
#include "latency.h"
#include "monotonic.h"
#include "cluster_slot_stats.h"
Expand Down Expand Up @@ -107,17 +107,17 @@ void blockClient(client *c, int btype) {
* the command will not be reprocessed and we need to make stats update.
* This function will make updates to the commandstats, slot-stats, slowlog and monitors.*/
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors) {
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->duration += blocked_us + reply_us;
c->lastcmd->microseconds += c->duration;
clusterSlotStatsAddCpuDuration(c, c->duration);
c->lastcmd->calls++;
c->commands_processed++;
server.stat_numcommands++;
if (had_errors) c->lastcmd->failed_calls++;
if (server.latency_tracking_enabled)
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration * 1000);
/* Log the command into the Slow log if needed. */
slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), c->duration * 1000);
/* Log the command into the commandlog if needed. */
commandlogPushCurrentCommand(c, c->lastcmd);
c->duration = 0;
/* Log the reply duration event. */
latencyAddSampleIfNeeded("command-unblocking", reply_us / 1000);
Expand Down
265 changes: 265 additions & 0 deletions src/commandlog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
/* Commandlog implements a system that is able to remember the latest N
* queries that took more than M microseconds to execute, or consumed
* too much network bandwidth and memory for input/output buffers.
*
* The execution time to reach to be logged in the slow log is set
* using the 'slowlog-log-slower-than' config directive, that is also
* readable and writable using the CONFIG SET/GET command.
*
* Other configurations such as `large-request-larger-than` and
* `large-reply-larger-than` can be found with more detailed
* explanations in the config file.
*
* The command log is actually not "logged" in the server log file
* but is accessible thanks to the COMMANDLOG command.
*
* ----------------------------------------------------------------------------
*
* Copyright (c) 2009-2012, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "commandlog.h"

/* Create a new commandlog entry.
* Incrementing the ref count of all the objects retained is up to
* this function. */
commandlogEntry *commandlogCreateEntry(client *c, robj **argv, int argc, long long value, int type) {
commandlogEntry *ce = zmalloc(sizeof(*ce));
int j, slargc = argc;

if (slargc > COMMANDLOG_ENTRY_MAX_ARGC) slargc = COMMANDLOG_ENTRY_MAX_ARGC;
ce->argc = slargc;
ce->argv = zmalloc(sizeof(robj *) * slargc);
for (j = 0; j < slargc; j++) {
/* Logging too many arguments is a useless memory waste, so we stop
* at COMMANDLOG_ENTRY_MAX_ARGC, but use the last argument to specify
* how many remaining arguments there were in the original command. */
if (slargc != argc && j == slargc - 1) {
ce->argv[j] =
createObject(OBJ_STRING, sdscatprintf(sdsempty(), "... (%d more arguments)", argc - slargc + 1));
} else {
/* Trim too long strings as well... */
if (argv[j]->type == OBJ_STRING && sdsEncodedObject(argv[j]) &&
sdslen(argv[j]->ptr) > COMMANDLOG_ENTRY_MAX_STRING) {
sds s = sdsnewlen(argv[j]->ptr, COMMANDLOG_ENTRY_MAX_STRING);

s = sdscatprintf(s, "... (%lu more bytes)",
(unsigned long)sdslen(argv[j]->ptr) - COMMANDLOG_ENTRY_MAX_STRING);
ce->argv[j] = createObject(OBJ_STRING, s);
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
ce->argv[j] = argv[j];
} else {
/* Here we need to duplicate the string objects composing the
* argument vector of the command, because those may otherwise
* end shared with string objects stored into keys. Having
* shared objects between any part of the server, and the data
* structure holding the data, is a problem: FLUSHALL ASYNC
* may release the shared string object and create a race. */
ce->argv[j] = dupStringObject(argv[j]);
}
}
}
ce->time = time(NULL);
ce->value = value;
ce->id = server.commandlog[type].entry_id++;
ce->peerid = sdsnew(getClientPeerId(c));
ce->cname = c->name ? sdsnew(c->name->ptr) : sdsempty();
return ce;
}

/* Free a command log entry. The argument is void so that the prototype of this
* function matches the one of the 'free' method of adlist.c.
*
* This function will take care to release all the retained object. */
void commandlogFreeEntry(void *ceptr) {
commandlogEntry *ce = ceptr;
int j;

for (j = 0; j < ce->argc; j++) decrRefCount(ce->argv[j]);
zfree(ce->argv);
sdsfree(ce->peerid);
sdsfree(ce->cname);
zfree(ce);
}

/* Initialize the command log. This function should be called a single time
* at server startup. */
void commandlogInit(void) {
for (int i = 0; i < COMMANDLOG_TYPE_MAX; i++) {
server.commandlog[i].entries = listCreate();
server.commandlog[i].entry_id = 0;
listSetFreeMethod(server.commandlog[i].entries, commandlogFreeEntry);
}
}

/* Push a new entry into the command log.
* This function will make sure to trim the command log accordingly to the
* configured max length. */
void commandlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long value, int type) {
if (server.commandlog[type].threshold < 0 || server.commandlog[type].max_len == 0) return; /* The corresponding commandlog disabled */
if (value >= server.commandlog[type].threshold)
listAddNodeHead(server.commandlog[type].entries, commandlogCreateEntry(c, argv, argc, value, type));

/* Remove old entries if needed. */
while (listLength(server.commandlog[type].entries) > server.commandlog[type].max_len) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Remove all the entries from the current command log of the specified type. */
void commandlogReset(int type) {
while (listLength(server.commandlog[type].entries) > 0) listDelNode(server.commandlog[type].entries, listLast(server.commandlog[type].entries));
}

/* Reply command logs to client. */
void commandlogGetReply(client *c, int type, long count) {
listIter li;
listNode *ln;
commandlogEntry *ce;

if (count > (long)listLength(server.commandlog[type].entries)) {
count = listLength(server.commandlog[type].entries);
}
addReplyArrayLen(c, count);
listRewind(server.commandlog[type].entries, &li);
while (count--) {
int j;

ln = listNext(&li);
ce = ln->value;
addReplyArrayLen(c, 6);
addReplyLongLong(c, ce->id);
addReplyLongLong(c, ce->time);
addReplyLongLong(c, ce->value);
addReplyArrayLen(c, ce->argc);
for (j = 0; j < ce->argc; j++) addReplyBulk(c, ce->argv[j]);
addReplyBulkCBuffer(c, ce->peerid, sdslen(ce->peerid));
addReplyBulkCBuffer(c, ce->cname, sdslen(ce->cname));
}
}

/* The SLOWLOG command. Implements all the subcommands needed to handle the
* slow log. */
void slowlogCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET [<count>]",
" Return top <count> entries from the slowlog (default: 10, -1 mean all).",
" Entries are made of:",
" id, timestamp, time in microseconds, arguments array, client IP and port,",
" client name",
"LEN",
" Return the length of the slowlog.",
"RESET",
" Reset the slowlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "reset")) {
commandlogReset(COMMANDLOG_TYPE_SLOW);
addReply(c, shared.ok);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "len")) {
addReplyLongLong(c, listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries));
} else if ((c->argc == 2 || c->argc == 3) && !strcasecmp(c->argv[1]->ptr, "get")) {
long count = 10;

if (c->argc == 3) {
/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all slow logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[COMMANDLOG_TYPE_SLOW].entries);
}
}

commandlogGetReply(c, COMMANDLOG_TYPE_SLOW, count);
} else {
addReplySubcommandSyntaxError(c);
}
}

int commandlogGetTypeOrReply(client *c, robj *o) {
if (!strcasecmp(o->ptr, "slow")) return COMMANDLOG_TYPE_SLOW;
if (!strcasecmp(o->ptr, "large-request")) return COMMANDLOG_TYPE_LARGE_REQUEST;
if (!strcasecmp(o->ptr, "large-reply")) return COMMANDLOG_TYPE_LARGE_REPLY;
addReplyError(c, "type should be one of the following: slow, large-request, large-reply");
return -1;
}

/* The COMMANDLOG command. Implements all the subcommands needed to handle the
* command log. */
void commandlogCommand(client *c) {
int type;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "help")) {
const char *help[] = {
"GET <count> <type>",
" Return top <count> entries of the specified <type> from the commandlog (-1 mean all).",
" Entries are made of:",
" id, timestamp,",
" time in microseconds for type of slow,",
" or size in bytes for type of large-request,",
" or size in bytes for type of large-reply",
" arguments array, client IP and port,",
" client name",
"LEN <type>",
" Return the length of the specified type of commandlog.",
"RESET <type>",
" Reset the specified type of commandlog.",
NULL,
};
addReplyHelp(c, help);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "reset")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
commandlogReset(type);
addReply(c, shared.ok);
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr, "len")) {
if ((type = commandlogGetTypeOrReply(c, c->argv[2])) == -1) return;
addReplyLongLong(c, listLength(server.commandlog[type].entries));
} else if (c->argc == 4 && !strcasecmp(c->argv[1]->ptr, "get")) {
long count;

/* Consume count arg. */
if (getRangeLongFromObjectOrReply(c, c->argv[2], -1, LONG_MAX, &count,
"count should be greater than or equal to -1") != C_OK)
return;

if ((type = commandlogGetTypeOrReply(c, c->argv[3])) == -1) return;

if (count == -1) {
/* We treat -1 as a special value, which means to get all command logs.
* Simply set count to the length of server.commandlog. */
count = listLength(server.commandlog[type].entries);
}

commandlogGetReply(c, type, count);
} else {
addReplySubcommandSyntaxError(c);
}
}
30 changes: 15 additions & 15 deletions src/slowlog.h → src/commandlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __SLOWLOG_H__
#define __SLOWLOG_H__
#ifndef __COMMANDLOG_H__
#define __COMMANDLOG_H__

#include "server.h"

#define SLOWLOG_ENTRY_MAX_ARGC 32
#define SLOWLOG_ENTRY_MAX_STRING 128
#define COMMANDLOG_ENTRY_MAX_ARGC 32
#define COMMANDLOG_ENTRY_MAX_STRING 128

/* This structure defines an entry inside the slow log list */
typedef struct slowlogEntry {
/* This structure defines an entry inside the command log list */
typedef struct commandlogEntry {
robj **argv;
int argc;
long long id; /* Unique entry identifier. */
long long duration; /* Time spent by the query, in microseconds. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} slowlogEntry;
long long id; /* Unique entry identifier. */
long long value; /* The meaning is determined by the type of command log. */
time_t time; /* Unix time at which the query was executed. */
sds cname; /* Client name. */
sds peerid; /* Client network address. */
} commandlogEntry;

/* Exported API */
void slowlogInit(void);
void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration);
void commandlogInit(void);
void commandlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long value, int type);

#endif /* __SLOWLOG_H__ */
#endif /* __COMMANDLOG_H__ */
Loading
Loading