Skip to content

Commit

Permalink
Add network-bytes-in metric support under CLUSTER SLOT-STATS command (#…
Browse files Browse the repository at this point in the history
…20).

The metric tracks network ingress bytes under per-slot context,
by reverse calculation of c->argv_len_sum and c->argc, stored
under a newly introduced field c->net_input_bytes_curr_cmd.

Signed-off-by: Kyle Kim <[email protected]>
  • Loading branch information
kyle-yh-kim committed Jul 1, 2024
1 parent 2420881 commit 018d698
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "server.h"
#include "cluster.h"
#include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "endianconv.h"
#include "connection.h"

Expand Down Expand Up @@ -1042,6 +1043,7 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
clusterSlotStatsReset();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4943,6 +4945,7 @@ int clusterAddSlot(clusterNode *n, int slot) {
clusterNodeSetSlotBit(n, slot);
server.cluster->slots[slot] = n;
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand All @@ -4961,6 +4964,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK;
}

Expand Down
54 changes: 49 additions & 5 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

#define UNASSIGNED_SLOT 0

typedef enum {
KEY_COUNT,
INVALID,
KEY_COUNT,
NETWORK_BYTES_IN,
} slotStatTypes;

/* -----------------------------------------------------------------------------
Expand All @@ -24,6 +24,14 @@ typedef struct {
uint64_t stat;
} slotStatForSort;

/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t network_bytes_in;
} slotStat;

/* Struct used for storing slot statistics, for all slots owned by the current shard. */
struct slotStat cluster_slot_stats[CLUSTER_SLOTS];

static int doesSlotBelongToMyShard(int slot) {
clusterNode *myself = getMyClusterNode();
clusterNode *primary = clusterNodeGetPrimary(myself);
Expand All @@ -47,6 +55,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = cluster_slot_stats[slot].network_bytes_in;
}
return slot_stat;
}
Expand Down Expand Up @@ -88,9 +98,11 @@ static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, 1); /* Nested map representing slot usage statistics. */
addReplyMapLen(c, 2); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, cluster_slot_stats[slot].network_bytes_in);
}

/* Adds reply for the SLOTSRANGE variant.
Expand Down Expand Up @@ -121,6 +133,35 @@ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
addReplySortedSlotStats(c, slot_stats, limit);
}

static int canAddNetworkBytes(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking. */
return server.cluster_enabled && c->slot != -1 && !(c->flag.blocked);
}

/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through countKeysInSlot(). */
cluster_slot_stats[slot].network_bytes_in = 0;
}

void clusterSlotStatsReset(void) {
memset(cluster_slot_stats, 0, sizeof(cluster_slot_stats));
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytes() check failure.
* */
void clusterSlotStatsAddNetworkBytesIn(client *c) {
if (!canAddNetworkBytes(c)) return;

cluster_slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
Expand Down Expand Up @@ -149,8 +190,11 @@ void clusterSlotStatsCommand(client *c) {
int desc = 1, order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in")) {
order_by = NETWORK_BYTES_IN;
} else {
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported metrics are: key-count.");
addReplyError(c, "Unrecognized sort metric for ORDER BY. The supported "
"metrics are: key-count and cpu-usec.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
Expand Down
12 changes: 12 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/

#include "server.h"
#include "cluster.h"

void clusterSlotStatReset(int slot);
void clusterSlotStatsReset(void);
void clusterSlotStatsAddNetworkBytesIn(client *c);
3 changes: 3 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"properties": {
"key-count": {
"type": "integer"
},
"memory-bytes-in": {
"type": "integer"
}
}
}
Expand Down
65 changes: 62 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -215,6 +216,7 @@ client *createClient(connection *conn) {
if (conn) linkClient(c);
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->commands_processed = 0;
return c;
Expand Down Expand Up @@ -2084,6 +2086,7 @@ void resetClient(client *c) {
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
c->net_input_bytes_curr_cmd = 0;
c->bulklen = -1;
c->slot = -1;
c->flag.executing_command = 0;
Expand Down Expand Up @@ -2268,6 +2271,21 @@ int processInlineBuffer(client *c) {
c->argv_len_sum += sdslen(argv[j]);
}
zfree(argv);

/* Per-slot network bytes-in calculation.
*
* Within networking.c, we calculate and store the current command's ingress bytes
* under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For inline buffer, every whitespace is of length 1,
* with the exception of the trailing '\r\n' being length 2.
*
* For example;
* Command) SET key value
* Inline) SET key value\r\n
* */
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
return C_OK;
}

Expand Down Expand Up @@ -2341,7 +2359,8 @@ int processMultibulkBuffer(client *c) {
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll);
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll);
if (!ok || ll > INT_MAX) {
addReplyError(c, "Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count", c);
Expand All @@ -2363,6 +2382,39 @@ int processMultibulkBuffer(client *c) {
c->argv_len = min(c->multibulklen, 1024);
c->argv = zmalloc(sizeof(robj *) * c->argv_len);
c->argv_len_sum = 0;

/* Per-slot network bytes-in calculation.
*
* Within networking.c, we calculate and store the current command's ingress bytes
* under c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
*
* 1) multibulklen_slen + 1
* Cumulative string length (and not the value of) of multibulklen,
* including +1 from RESP first byte.
* 2) bulklen_slen + c->argc
* Cumulative string length (and not the value of) of bulklen,
* including +1 from RESP first byte per argument count.
* 3) c->argv_len_sum
* Cumulative string length of all argument vectors.
* 4) c->argc * 4 + 2
* Cumulative string length of all white-spaces, for which there exists a total of
* 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen.
*
* For example;
* Command) SET key value
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
* 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1).
* 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc).
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
* 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2).
*
* The 1st component is calculated within the below line.
* */
c->net_input_bytes_curr_cmd += (multibulklen_slen + 1);
}

serverAssertWithInfo(c, NULL, c->multibulklen > 0);
Expand All @@ -2388,7 +2440,8 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}

ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll);
if (!ok || ll < 0 || (!c->flag.primary && ll > server.proto_max_bulk_len)) {
addReplyError(c, "Protocol error: invalid bulk length");
setProtocolError("invalid bulk length", c);
Expand Down Expand Up @@ -2430,6 +2483,8 @@ int processMultibulkBuffer(client *c) {
}
}
c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component. */
c->net_input_bytes_curr_cmd += (bulklen_slen + c->argc);
}

/* Read bulk argument */
Expand Down Expand Up @@ -2466,7 +2521,11 @@ int processMultibulkBuffer(client *c) {
}

/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK;
if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 4 + 2));
return C_OK;
}

/* Still not ready to process the command */
return C_ERR;
Expand Down
5 changes: 5 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "server.h"
#include "monotonic.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h"
#include "bio.h"
#include "latency.h"
Expand Down Expand Up @@ -2499,6 +2500,7 @@ void resetServerStats(void) {
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
clusterSlotStatsReset();
}

/* Make the thread killable at any time, so that kill threads functions
Expand Down Expand Up @@ -3869,6 +3871,9 @@ int processCommand(client *c) {
}
}

/* Now that c->slot has been parsed, accumulate the buffered network bytes-in. */
clusterSlotStatsAddNetworkBytesIn(c);

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
(is_write_command || (is_read_command && !c->flag.readonly))) {
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
Expand Down
8 changes: 5 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1303,9 +1303,11 @@ typedef struct client {
#ifdef LOG_REQ_RES
clientReqResInfo reqres;
#endif
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the
* execution of this client's current command. */
unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */
unsigned long long commands_processed; /* Total count of commands this client executed. */
} client;

/* ACL information */
Expand Down
50 changes: 49 additions & 1 deletion tests/unit/cluster/slot-stats.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ proc assert_empty_slot_stats_with_exception {slot_stats exception_slots} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
if {[dict exists $exception_slots $slot]} {
set expected_key_count [dict get $exception_slots $slot]
set expected_key_count [dict get $exception_slots $slot key-count]
set expected_network_bytes_in [dict get $exception_slots $slot network-bytes-in]
assert {[dict get $stats key-count] == $expected_key_count}
assert {[dict get $stats network-bytes-in] == $expected_network_bytes_in}
} else {
assert {[dict get $stats key-count] == 0}
assert {[dict get $stats network-bytes-in] == 0}
}
}
}
Expand Down Expand Up @@ -114,6 +117,51 @@ proc wait_for_replica_key_exists {key key_count} {
}
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS network-bytes-in.
# -----------------------------------------------------------------------------

start_cluster 1 0 {tags {external:skip cluster}} {

# Define shared variables.
set key "key"
set key_slot [R 0 cluster keyslot $key]

test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." {
# Command) SET key value
# RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
R 0 SET $key value

set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 33
]
]

assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL

test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." {
# Command) SET key value
# Inline) SET key value\r\n
set rd [valkey_deferring_client]
$rd write "SET $key value\r\n"
$rd flush

set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 15
]
]

assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats
}
}

# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS correctness, without additional arguments.
# -----------------------------------------------------------------------------
Expand Down

0 comments on commit 018d698

Please sign in to comment.