Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network-bytes-in and network-bytes-out metric support under CLUSTER SLOT-STATS command (#20) #720

Merged
merged 9 commits into from
Jul 26, 2024
3 changes: 2 additions & 1 deletion src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ struct _clusterNode {
/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;

struct clusterState {
Expand Down Expand Up @@ -385,5 +387,4 @@ struct clusterState {
slotStat slot_stats[CLUSTER_SLOTS];
};


#endif // CLUSTER_LEGACY_H
87 changes: 85 additions & 2 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define UNASSIGNED_SLOT 0

typedef enum { KEY_COUNT, CPU_USEC, SLOT_STAT_COUNT, INVALID } slotStatTypes;
typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatTypes;

/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
Expand Down Expand Up @@ -43,6 +43,8 @@ static uint64_t getSlotStat(int slot, int stat_type) {
uint64_t slot_stat = 0;
if (stat_type == KEY_COUNT) {
slot_stat = countKeysInSlot(slot);
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == NETWORK_BYTES_IN) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_in;
} else if (stat_type == NETWORK_BYTES_OUT) {
slot_stat = server.cluster->slot_stats[slot].network_bytes_out;

We might do a case switch here so that it throws a warning if we miss a type here.

} else if (stat_type == CPU_USEC) {
slot_stat = server.cluster->slot_stats[slot].cpu_usec;
}
Expand Down Expand Up @@ -96,6 +98,10 @@ static void addReplySlotStat(client *c, int slot) {
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}

Expand All @@ -119,6 +125,52 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should be kept idempotent, regardless of the function's early return condition. */
madolson marked this conversation as resolved.
Show resolved Hide resolved
c->slot = _slot;
return;
}

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, int order_by, long limit, int desc) {
Expand Down Expand Up @@ -167,8 +219,35 @@ void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
ctx->original_client->slot = -1;
}

static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;

if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}

server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}

void clusterSlotStatsCommand(client *c) {
if (server.cluster_enabled == 0) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
Expand Down Expand Up @@ -197,6 +276,10 @@ void clusterSlotStatsCommand(client *c) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
Expand Down
13 changes: 13 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,20 @@
#include "script.h"
#include "cluster_legacy.h"

/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);

/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
6 changes: 6 additions & 0 deletions src/commands/cluster-slot-stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
},
"cpu-usec": {
"type": "integer"
},
"network-bytes-in": {
madolson marked this conversation as resolved.
Show resolved Hide resolved
"type": "integer"
},
"network-bytes-out": {
"type": "integer"
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,19 @@ int getKeySlot(sds key) {
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command) {
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
madolson marked this conversation as resolved.
Show resolved Hide resolved
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
int slot = calculateKeySlot(key);
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
if (server.current_client && server.current_client->flag.primary) {
server.current_client->slot = slot;
}
return slot;
madolson marked this conversation as resolved.
Show resolved Hide resolved
}

/* This is a special version of dbAdd() that is used only when loading
Expand Down
5 changes: 5 additions & 0 deletions src/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*/

#include "server.h"
#include "cluster_slot_stats.h"
madolson marked this conversation as resolved.
Show resolved Hide resolved

/* ================================ MULTI/EXEC ============================== */

Expand Down Expand Up @@ -91,6 +92,10 @@ void queueMultiCommand(client *c, uint64_t cmd_flags) {
c->argc = 0;
c->argv_len_sum = 0;
c->argv_len = 0;

/* Since afterCommand() is not reached upon queuing a command,
* below call is made explicitly to accumulate its network ingress bytes. */
clusterSlotStatsAddNetworkBytesInForUserClient(c);
}

void discardTransaction(client *c) {
Expand Down
71 changes: 68 additions & 3 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h"
#include "fpconv_dtoa.h"
#include "fmtargs.h"
Expand Down Expand Up @@ -231,7 +232,9 @@ client *createClient(connection *conn) {
if (conn) linkClient(c);
initClientMultiState(c);
c->net_input_bytes = 0;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes = 0;
c->net_output_bytes_curr_cmd = 0;
c->commands_processed = 0;
return c;
}
Expand Down Expand Up @@ -449,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return;
}

c->net_output_bytes_curr_cmd += len;

/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
Expand Down Expand Up @@ -2479,10 +2484,12 @@ void resetClient(client *c) {
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
c->net_input_bytes_curr_cmd = 0;
c->bulklen = -1;
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down Expand Up @@ -2625,6 +2632,21 @@ void processInlineBuffer(client *c) {
c->argv_len_sum += sdslen(argv[j]);
}
zfree(argv);

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For inline buffer, every whitespace is of length 1,
* with the exception of the trailing '\r\n' being length 2.
*
* For example;
* Command) SET key value
* Inline) SET key value\r\n
* */
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
}

Expand Down Expand Up @@ -2696,7 +2718,8 @@ void processMultibulkBuffer(client *c) {
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c, NULL, c->querybuf[c->qb_pos] == '*');
ok = string2ll(c->querybuf + 1 + c->qb_pos, newline - (c->querybuf + 1 + c->qb_pos), &ll);
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
ok = string2ll(c->querybuf + 1 + c->qb_pos, multibulklen_slen, &ll);
if (!ok || ll > INT_MAX) {
c->read_flags |= READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN;
return;
Expand All @@ -2719,6 +2742,39 @@ void processMultibulkBuffer(client *c) {
c->argv_len = min(c->multibulklen, 1024);
c->argv = zmalloc(sizeof(robj *) * c->argv_len);
c->argv_len_sum = 0;

/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
*
* 1) multibulklen_slen + 1
* Cumulative string length (and not the value of) of multibulklen,
* including +1 from RESP first byte.
* 2) bulklen_slen + c->argc
* Cumulative string length (and not the value of) of bulklen,
* including +1 from RESP first byte per argument count.
* 3) c->argv_len_sum
* Cumulative string length of all argument vectors.
* 4) c->argc * 4 + 2
* Cumulative string length of all white-spaces, for which there exists a total of
* 4 bytes per argument, plus 2 bytes from the leading '\r\n' from multibulklen.
*
* For example;
* Command) SET key value
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
* 1) String length of "*3" is 2, obtained from (multibulklen_slen + 1).
* 2) String length of "$3" "$3" "$5" is 6, obtained from (bulklen_slen + c->argc).
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
* 4) String length of all white-spaces "\r\n" is 14, obtained from (c->argc * 4 + 2).
*
* The 1st component is calculated within the below line.
* */
c->net_input_bytes_curr_cmd += (multibulklen_slen + 1);
}

serverAssertWithInfo(c, NULL, c->multibulklen > 0);
Expand All @@ -2742,7 +2798,8 @@ void processMultibulkBuffer(client *c) {
return;
}

ok = string2ll(c->querybuf + c->qb_pos + 1, newline - (c->querybuf + c->qb_pos + 1), &ll);
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
ok = string2ll(c->querybuf + c->qb_pos + 1, bulklen_slen, &ll);
if (!ok || ll < 0 || (!(is_primary) && ll > server.proto_max_bulk_len)) {
c->read_flags |= READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN;
return;
Expand Down Expand Up @@ -2782,6 +2839,9 @@ void processMultibulkBuffer(client *c) {
}
}
c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component.
* c->argc portion is deferred, as it may not have been fully populated at this point. */
c->net_input_bytes_curr_cmd += bulklen_slen;
}

/* Read bulk argument */
Expand Down Expand Up @@ -2818,7 +2878,12 @@ void processMultibulkBuffer(client *c) {
}

/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components.
* Here, the deferred c->argc from 2nd component is added, resulting in c->argc * 5 instead of * 4. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 5 + 2));
c->read_flags |= READ_FLAGS_PARSING_COMPLETED;
}
}

/* Perform necessary tasks after a command was executed:
Expand Down
6 changes: 4 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "cluster.h"
#include "cluster_slot_stats.h"

/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
Expand Down Expand Up @@ -475,20 +476,21 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
int receivers = 0;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
int slot = -1;

/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator *iter = dictGetIterator(clients);
while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c, channel, message, *type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
Expand Down
Loading
Loading