diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 19613181cc..a915821f89 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -4000,6 +4000,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { clusterNode *node = listNodeValue(ln); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; clusterSendMessage(node->link, msgblock); + clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(msgblock->totlen); } clusterMsgSendBlockDecrRefCount(msgblock); } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 0c2b168ec7..bc8c7323d3 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -333,6 +333,7 @@ struct _clusterNode { typedef struct slotStat { uint64_t cpu_usec; uint64_t network_bytes_in; + uint64_t network_bytes_out; } slotStat; struct clusterState { diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index afcbd5b7bb..6af059e26a 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -8,7 +8,7 @@ #define UNASSIGNED_SLOT 0 -typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, SLOT_STAT_COUNT, INVALID } slotStatTypes; +typedef enum { KEY_COUNT, CPU_USEC, NETWORK_BYTES_IN, NETWORK_BYTES_OUT, SLOT_STAT_COUNT, INVALID } slotStatTypes; /* ----------------------------------------------------------------------------- * CLUSTER SLOT-STATS command @@ -106,6 +106,8 @@ static void addReplySlotStat(client *c, int slot) { addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec); addReplyBulkCString(c, "network-bytes-in"); addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in); + addReplyBulkCString(c, "network-bytes-out"); + addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out); } } @@ -129,6 +131,63 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon } } +static int canAddNetworkBytesOut(client *c) { + return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1; +} + +/* Accumulates egress bytes upon sending RESP responses back to user clients. */ +void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) { + if (!canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; +} + +/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */ +void clusterSlotStatsAddNetworkBytesOutForReplication(int len) { + client *c = server.current_client; + if (c == NULL || !canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas)); +} + +/* Upon SPUBLISH, two egress events are triggered. + * 1) Internal propagation, for clients that are subscribed to the current node. + * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). + * This function covers the internal propagation component. */ +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) { + /* For a blocked client, c->slot could be pre-filled. + * Thus c->slot is backed-up for restoration after aggregation is completed. */ + int _slot = c->slot; + c->slot = slot; + if (!canAddNetworkBytesOut(c)) { + /* c->slot should be kept idempotent, regardless of the function's early return condition. */ + c->slot = _slot; + return + }; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd; + + /* For sharded pubsub, the client's network bytes metrics must be reset here, + * as resetClient() is not called until subscription ends. */ + c->net_output_bytes_curr_cmd = 0; + c->slot = _slot; +} + +/* Upon SPUBLISH, two egress events are triggered. + * 1) Internal propagation, for clients that are subscribed to the current node. + * 2) External propagation, for other nodes within the same shard (could either be a primary or replica). + * This function covers the external propagation component. */ +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len) { + client *c = server.current_client; + if (c == NULL || !canAddNetworkBytesOut(c)) return; + + serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS); + server.cluster->slot_stats[c->slot].network_bytes_out += len; +} + /* Adds reply for the ORDERBY variant. * Response is ordered based on the sort result. */ static void addReplyOrderBy(client *c, int order_by, long limit, int desc) { @@ -205,7 +264,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len) { pubsub_state.len = len; } -void clusterSlotStatsResetClusterMsgLength() { +void clusterSlotStatsResetClusterMsgLength(void) { pubsub_state.len = 0; } @@ -249,6 +308,8 @@ void clusterSlotStatsCommand(client *c) { order_by = CPU_USEC; } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) { order_by = NETWORK_BYTES_IN; + } else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) { + order_by = NETWORK_BYTES_OUT; } else { addReplyError(c, "Unrecognized sort metric for ORDERBY."); return; diff --git a/src/cluster_slot_stats.h b/src/cluster_slot_stats.h index 60aad6e387..4e16c28544 100644 --- a/src/cluster_slot_stats.h +++ b/src/cluster_slot_stats.h @@ -11,3 +11,8 @@ void clusterSlotStatsAddNetworkBytesIn(client *c); void clusterSlotStatsSetClusterMsgLength(uint32_t len); void clusterSlotStatsResetClusterMsgLength(void); void clusterSlotStatsAddNetworkBytesInForShardedPubSub(int slot); +void clusterSlotStatsAddNetworkBytesOut(client *c); +void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c); +void clusterSlotStatsAddNetworkBytesOutForReplication(int len); +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot); +void clusterSlotStatsAddNetworkBytesOutForShardedPubSubExternalPropagation(size_t len); diff --git a/src/networking.c b/src/networking.c index ae866e1e6e..df5489e50b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -234,6 +234,7 @@ client *createClient(connection *conn) { c->net_input_bytes = 0; c->net_input_bytes_curr_cmd = 0; c->net_output_bytes = 0; + c->net_output_bytes_curr_cmd = 0; c->commands_processed = 0; return c; } @@ -451,6 +452,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { return; } + c->net_output_bytes_curr_cmd += len; + /* We call it here because this function may affect the reply * buffer offset (see function comment) */ reqresSaveClientReplyOffset(c); @@ -2486,6 +2489,7 @@ void resetClient(client *c) { c->slot = -1; c->flag.executing_command = 0; c->flag.replication_done = 0; + c->net_output_bytes_curr_cmd = 0; /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); @@ -2631,7 +2635,7 @@ void processInlineBuffer(client *c) { /* Per-slot network bytes-in calculation. * - * We calculate and store the current command's ingress bytes under + * We calculate and store the current command's ingress bytes under * c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred * until c->slot is parsed later within processCommand(). * diff --git a/src/pubsub.c b/src/pubsub.c index 26e3a1af88..04636257b4 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -476,14 +476,14 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) int receivers = 0; dictEntry *de; dictIterator *di; - unsigned int slot = 0; + int slot = -1; /* Send to clients listening for that channel */ if (server.cluster_enabled && type.shard) { slot = keyHashSlot(channel->ptr, sdslen(channel->ptr)); clusterSlotStatsAddNetworkBytesInForShardedPubSub(slot); } - de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel); + de = kvstoreDictFind(*type.serverPubSubChannels, (slot == -1) ? 0 : slot, channel); if (de) { dict *clients = dictGetVal(de); dictEntry *entry; @@ -491,6 +491,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) while ((entry = dictNext(iter)) != NULL) { client *c = dictGetKey(entry); addReplyPubsubMessage(c, channel, message, *type.messageBulk); + clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot); updateClientMemUsageAndBucket(c); receivers++; } diff --git a/src/replication.c b/src/replication.c index b00da525bf..a39d56fa63 100644 --- a/src/replication.c +++ b/src/replication.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" +#include "cluster_slot_stats.h" #include "bio.h" #include "functions.h" #include "connection.h" @@ -415,6 +416,8 @@ void feedReplicationBuffer(char *s, size_t len) { if (server.repl_backlog == NULL) return; + clusterSlotStatsAddNetworkBytesOutForReplication(len); + while (len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ @@ -568,6 +571,11 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) { feedReplicationBufferWithObject(selectcmd); + /* Although the SELECT command is not associated with any slot, + * its per-slot network-bytes-out accumulation is made by the above function call. + * To cancel-out this accumulation, below adjustment is made. */ + clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr)); + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); server.replicas_eldb = dictid; diff --git a/src/server.c b/src/server.c index b1eec97308..65066a4431 100644 --- a/src/server.c +++ b/src/server.c @@ -3694,6 +3694,8 @@ void afterCommand(client *c) { /* Flush pending tracking invalidations. */ trackingHandlePendingKeyInvalidations(); + clusterSlotStatsAddNetworkBytesOutForUserClient(c); + /* Flush other pending push messages. only when we are not in nested call. * So the messages are not interleaved with transaction response. */ if (!server.execution_nesting) listJoin(c->reply, server.pending_push_messages); diff --git a/src/server.h b/src/server.h index 3311110e02..c9b0791274 100644 --- a/src/server.h +++ b/src/server.h @@ -1372,6 +1372,8 @@ typedef struct client { * execution of this client's current command. */ unsigned long long net_output_bytes; /* Total network output bytes sent to this client. */ unsigned long long commands_processed; /* Total count of commands this client executed. */ + unsigned long long + net_output_bytes_curr_cmd; /* Total network output bytes sent to this client, by the current command. */ } client; /* ACL information */ diff --git a/tests/unit/cluster/slot-stats.tcl b/tests/unit/cluster/slot-stats.tcl index 1aaf7a1bee..3b369cbf41 100644 --- a/tests/unit/cluster/slot-stats.tcl +++ b/tests/unit/cluster/slot-stats.tcl @@ -487,6 +487,190 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en R 0 FLUSHALL } +# ----------------------------------------------------------------------------- +# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness. +# ----------------------------------------------------------------------------- + +start_cluster 1 0 {tags {external:skip cluster}} { + # Define shared variables. + set key "FOO" + set key_slot [R 0 cluster keyslot $key] + set expected_slots_to_key_count [dict create $key_slot 1] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." { + R 0 INFO + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." { + R 0 SET $key value + # +OK\r\n --> 5 bytes + + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 5 + ] + ] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL + + test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." { + set rd [valkey_deferring_client] + $rd BLPOP $key 0 + wait_for_blocked_clients_count 1 + + # Assert empty slot stats here, since COB is yet to be flushed due to the block. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + assert_empty_slot_stats $slot_stats $metrics_to_assert + + # Unblock the command. + # LPUSH client) :1\r\n --> 4 bytes. + # BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking. + R 0 LPUSH $key value + wait_for_blocked_clients_count 0 + + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 28 ;# 4 + 24 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + R 0 CONFIG RESETSTAT + R 0 FLUSHALL +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set key "FOO" + set key_slot [R 0 CLUSTER KEYSLOT $key] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + # Setup replication. + assert {[s -1 role] eq {slave}} + wait_for_condition 1000 50 { + [s -1 master_link_status] eq {up} + } else { + fail "Instance #1 master link status is not up" + } + R 1 readonly + + test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." { + assert_equal [R 0 SET $key VALUE] {OK} + # Local client) +OK\r\n --> 5 bytes. + # Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes. + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 ;# 5 + 33 bytes. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + +start_cluster 1 1 {tags {external:skip cluster}} { + + # Define shared variables. + set channel "channel" + set key_slot [R 0 cluster keyslot $channel] + set channel_secondary "channel2" + set key_slot_secondary [R 0 cluster keyslot $channel_secondary] + set metrics_to_assert [list network-bytes-out] + R 0 CONFIG SET cluster-slot-stats-enabled yes + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [valkey_client] + set replica [valkey_deferring_client -1] + + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + $subscriber SSUBSCRIBE $channel + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 38 + ] + ] + R 0 CONFIG RESETSTAT + + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + # Cluster propagation) sdslen(channel) + sdslen(hello) --> 12 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create $key_slot [ + dict create network-bytes-out 2338 ;# 4 + 42 + 12 + 2280 bytes from clusterMsgSendBlock. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } + $subscriber QUIT + R 0 FLUSHALL + R 0 CONFIG RESETSTAT + + test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." { + set slot [R 0 cluster keyslot $channel] + set publisher [Rn 0] + set subscriber [valkey_client] + set replica [valkey_deferring_client -1] + + # Stack multi-slot subscriptions against a single client. + # For primary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes + # For secondary channel; + # Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes + $subscriber SSUBSCRIBE $channel + $subscriber SSUBSCRIBE $channel_secondary + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 38 + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 39 + ] + ] + R 0 CONFIG RESETSTAT + + # For primary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes. + # Cluster propagation) sdslen(channel) + sdslen(hello) --> 12 bytes. + # For secondary channel; + # Publisher client) :1\r\n --> 4 bytes. + # Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes. + # Cluster propagation) sdslen(channel2) + sdslen(hello) --> 13 bytes. + assert_equal 1 [$publisher SPUBLISH $channel hello] + assert_equal 1 [$publisher SPUBLISH $channel_secondary hello] + set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383] + set expected_slot_stats [ + dict create \ + $key_slot [ \ + dict create network-bytes-out 2338 ;# 4 + 42 + 12 + 2280 bytes from clusterMsgSendBlock. + ] \ + $key_slot_secondary [ \ + dict create network-bytes-out 2340 ;# 4 + 43 + 13 + 2280 bytes from clusterMsgSendBlock. + ] + ] + assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert + } +} + # ----------------------------------------------------------------------------- # Test cases for CLUSTER SLOT-STATS key-count metric correctness. # -----------------------------------------------------------------------------