diff --git a/src/cluster.c b/src/cluster.c index 741b05c603..d8c5d7486c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1312,24 +1312,6 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { return 0; } -/* Returns an indication if the replica node is fully available - * and should be listed in CLUSTER SLOTS response. - * Returns 1 for available nodes, 0 for nodes that have - * not finished their initial sync, in failed state, or are - * otherwise considered not available to serve read commands. */ -static int isReplicaAvailable(clusterNode *node) { - if (clusterNodeIsFailing(node)) { - return 0; - } - long long repl_offset = clusterNodeReplOffset(node); - if (clusterNodeIsMyself(node)) { - /* Nodes do not update their own information - * in the cluster node list. */ - repl_offset = replicationGetSlaveOffset(); - } - return (repl_offset != 0); -} - void addNodeToNodeReply(client *c, clusterNode *node) { char* hostname = clusterNodeHostname(node); addReplyArrayLen(c, 4); @@ -1381,10 +1363,28 @@ void addNodeToNodeReply(client *c, clusterNode *node) { serverAssert(length == 0); } +/* Returns an indication if the node is fully available + * and should be listed in CLUSTER SLOTS response. + * Returns 1 for available nodes, 0 for nodes that have + * not finished their initial sync, in failed state, or are + * otherwise considered not available to serve read commands. */ +int isNodeAvailable(clusterNode *node) { + if (clusterNodeIsFailing(node)) { + return 0; + } + long long repl_offset = clusterNodeReplOffset(node); + if (clusterNodeIsMyself(node)) { + /* Nodes do not update their own information + * in the cluster node list. */ + repl_offset = getNodeReplicationOffset(node); + } + return (repl_offset != 0); +} + void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { int i, nested_elements = 3; /* slots (2) + master addr (1) */ for (i = 0; i < clusterNodeNumSlaves(node); i++) { - if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue; + if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue; nested_elements++; } addReplyArrayLen(c, nested_elements); @@ -1396,27 +1396,27 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in for (i = 0; i < clusterNodeNumSlaves(node); i++) { /* This loop is copy/pasted from clusterGenNodeDescription() * with modifications for per-slot node aggregation. */ - if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue; + if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue; addNodeToNodeReply(c, clusterNodeGetSlave(node, i)); nested_elements--; } serverAssert(nested_elements == 3); /* Original 3 elements */ } -void clusterCommandSlots(client * c) { - /* Format: 1) 1) start slot - * 2) end slot - * 3) 1) master IP - * 2) master port - * 3) node ID - * 4) 1) replica IP - * 2) replica port - * 3) node ID - * ... continued until done - */ +void clearCachedClusterSlotsResponse(void) { + for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { + if (server.cached_cluster_slot_info[conn_type]) { + sdsfree(server.cached_cluster_slot_info[conn_type]); + server.cached_cluster_slot_info[conn_type] = NULL; + } + } +} + +sds generateClusterSlotResponse(void) { + client *recording_client = createCachedResponseClient(); clusterNode *n = NULL; int num_masters = 0, start = -1; - void *slot_replylen = addReplyDeferredLen(c); + void *slot_replylen = addReplyDeferredLen(recording_client); for (int i = 0; i <= CLUSTER_SLOTS; i++) { /* Find start node and slot id. */ @@ -1430,14 +1430,52 @@ void clusterCommandSlots(client * c) { /* Add cluster slots info when occur different node with start * or end of slot. */ if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) { - addNodeReplyForClusterSlot(c, n, start, i-1); + addNodeReplyForClusterSlot(recording_client, n, start, i-1); num_masters++; if (i == CLUSTER_SLOTS) break; n = getNodeBySlot(i); start = i; } } - setDeferredArrayLen(c, slot_replylen, num_masters); + setDeferredArrayLen(recording_client, slot_replylen, num_masters); + sds cluster_slot_response = aggregateClientOutputBuffer(recording_client); + deleteCachedResponseClient(recording_client); + return cluster_slot_response; +} + +int verifyCachedClusterSlotsResponse(sds cached_response) { + sds generated_response = generateClusterSlotResponse(); + int is_equal = !sdscmp(generated_response, cached_response); + /* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */ + if (!is_equal) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response); + sdsfree(generated_response); + return is_equal; +} + +void clusterCommandSlots(client *c) { + /* Format: 1) 1) start slot + * 2) end slot + * 3) 1) master IP + * 2) master port + * 3) node ID + * 4) 1) replica IP + * 2) replica port + * 3) node ID + * ... continued until done + */ + connTypeForCaching conn_type = connIsTLS(c->conn); + + if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse(); + + sds cached_reply = server.cached_cluster_slot_info[conn_type]; + if (!cached_reply) { + cached_reply = generateClusterSlotResponse(); + server.cached_cluster_slot_info[conn_type] = cached_reply; + } else { + debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1); + } + + addReplyProto(c, cached_reply, sdslen(cached_reply)); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster.h b/src/cluster.h index 481fcc9736..dc54e07052 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -100,6 +100,10 @@ const char *clusterNodePreferredEndpoint(clusterNode *n); long long clusterNodeReplOffset(clusterNode *node); clusterNode *clusterLookupNode(const char *name, int length); void clusterReplicateOpenSlots(void); +int detectAndUpdateCachedNodeHealth(void); +client *createCachedResponseClient(void); +void deleteCachedResponseClient(client *recording_client); +void clearCachedClusterSlotsResponse(void); /* functions with shared implementations */ int clusterNodeIsMyself(clusterNode *n); @@ -113,4 +117,7 @@ int isValidAuxString(char *s, unsigned int length); void migrateCommand(client *c); void clusterCommand(client *c); ConnectionType *connTypeOfCluster(void); +int isNodeAvailable(clusterNode *node); +long long getNodeReplicationOffset(clusterNode *node); +sds aggregateClientOutputBuffer(client *c); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b2e9690fd9..25b791b91b 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -762,6 +762,7 @@ void clusterSaveConfigOrDie(int do_fsync) { serverLog(LL_WARNING,"Fatal: can't update cluster config file."); exit(1); } + clearCachedClusterSlotsResponse(); } /* Lock the cluster config using flock(), and retain the file descriptor used to @@ -1039,6 +1040,9 @@ void clusterInit(void) { server.cluster->mf_end = 0; server.cluster->mf_slave = NULL; + for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) { + server.cached_cluster_slot_info[conn_type] = NULL; + } resetManualFailover(); clusterUpdateMyselfFlags(); clusterUpdateMyselfIp(); @@ -1363,6 +1367,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->repl_offset_time = 0; node->repl_offset = 0; listSetFreeMethod(node->fail_reports,zfree); + node->is_node_healthy = 0; return node; } @@ -5862,6 +5867,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { } } +long long getNodeReplicationOffset(clusterNode *node) { + if (node->flags & CLUSTER_NODE_MYSELF) { + return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; + } else { + return node->repl_offset; + } +} + /* Add detailed information of a node to the output buffer of the given client. */ void addNodeDetailsToShardReply(client *c, clusterNode *node) { int reply_count = 0; @@ -5896,12 +5909,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) { reply_count++; } - long long node_offset; - if (node->flags & CLUSTER_NODE_MYSELF) { - node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; - } else { - node_offset = node->repl_offset; - } + long long node_offset = getNodeReplicationOffset(node); addReplyBulkCString(c, "role"); addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); @@ -6882,9 +6890,26 @@ void clusterPromoteSelfToMaster(void) { replicationUnsetMaster(); } +int detectAndUpdateCachedNodeHealth(void) { + dictIterator di; + dictInitSafeIterator(&di, server.cluster->nodes); + dictEntry *de; + clusterNode *node; + int overall_health_changed = 0; + while((de = dictNext(&di)) != NULL) { + node = dictGetVal(de); + int present_is_node_healthy = isNodeAvailable(node); + if (present_is_node_healthy != node->is_node_healthy) { + overall_health_changed = 1; + node->is_node_healthy = present_is_node_healthy; + } + } + + return overall_health_changed; +} + /* Replicate migrating and importing slot states to all replicas */ -void clusterReplicateOpenSlots(void) -{ +void clusterReplicateOpenSlots(void) { if (!server.cluster_enabled) return; int argc = 5; diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 68bddbbb9a..2a22557066 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -308,6 +308,8 @@ struct _clusterNode { clusterLink *link; /* TCP/IP link established toward this node */ clusterLink *inbound_link; /* TCP/IP link accepted from this node */ list *fail_reports; /* List of nodes signaling this as failing */ + int is_node_healthy; /* Boolean indicating the cached node health. + Update with updateAndCountChangedNodeHealth(). */ }; struct clusterState { diff --git a/src/config.c b/src/config.c index 38e6d1d2fb..92816a30dc 100644 --- a/src/config.c +++ b/src/config.c @@ -2586,6 +2586,12 @@ static int updateOOMScoreAdj(const char **err) { return 1; } +int invalidateClusterSlotsResp(const char **err) { + UNUSED(err); + clearCachedClusterSlotsResponse(); + return 1; +} + int updateRequirePass(const char **err) { UNUSED(err); /* The old "requirepass" directive just translates to setting @@ -2649,6 +2655,7 @@ int updateClusterFlags(const char **err) { static int updateClusterAnnouncedPort(const char **err) { UNUSED(err); clusterUpdateMyselfAnnouncedPorts(); + clearCachedClusterSlotsResponse(); return 1; } @@ -3162,7 +3169,7 @@ standardConfig static_configs[] = { createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL), - createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL), + createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp), createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), diff --git a/src/connection.h b/src/connection.h index f50cd89d1c..b44ef07782 100644 --- a/src/connection.h +++ b/src/connection.h @@ -62,6 +62,12 @@ typedef enum { #define CONN_TYPE_TLS "tls" #define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ +typedef enum connTypeForCaching { + CACHE_CONN_TCP, + CACHE_CONN_TLS, + CACHE_CONN_TYPE_MAX +} connTypeForCaching; + typedef void (*ConnectionCallbackFunc)(struct connection *conn); typedef struct ConnectionType { diff --git a/src/networking.c b/src/networking.c index 5aa02e8315..f5d6bad7f0 100644 --- a/src/networking.c +++ b/src/networking.c @@ -319,6 +319,44 @@ int prepareClientToWrite(client *c) { return C_OK; } +/* Returns everything in the client reply linked list in a SDS format. + * This should only be used only with a caching client. */ +sds aggregateClientOutputBuffer(client *c) { + sds cmd_response = sdsempty(); + listIter li; + listNode *ln; + clientReplyBlock *val_block; + listRewind(c->reply,&li); + + /* Here, c->buf is not used, thus we confirm c->bufpos remains 0. */ + serverAssert(c->bufpos == 0); + while ((ln = listNext(&li)) != NULL) { + val_block = (clientReplyBlock *)listNodeValue(ln); + cmd_response = sdscatlen(cmd_response, val_block->buf,val_block->used); + } + return cmd_response; +} + +/* This function creates and returns a fake client for recording the command response + * to initiate caching of any command response. + * + * It needs be paired with `deleteCachedResponseClient` function to stop caching. */ +client *createCachedResponseClient(void) { + struct client *recording_client = createClient(NULL); + /* Allocating the `conn` allows to prepare the caching client before adding + * data to the clients output buffer by `prepareClientToWrite`. */ + recording_client->conn = zcalloc(sizeof(connection)); + return recording_client; +} + +/* This function is used to stop caching of any command response after `createCachedResponseClient` is called. + * It returns the command response as SDS from the recording_client's reply buffer. */ +void deleteCachedResponseClient(client *recording_client) { + zfree(recording_client->conn); + recording_client->conn = NULL; + freeClient(recording_client); +} + /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ diff --git a/src/server.h b/src/server.h index 23bcda81d7..788eb75e1f 100644 --- a/src/server.h +++ b/src/server.h @@ -2027,6 +2027,7 @@ struct valkeyServer { unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */ int cluster_drop_packet_filter; /* Debug config that allows tactically * dropping packets of a specific type */ + sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Scripting */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ int pre_command_oom_state; /* OOM before command (script?) was started */ @@ -2680,6 +2681,8 @@ void initThreadedIO(void); client *lookupClientByID(uint64_t id); int authRequired(client *c); void putClientInPendingWriteQueue(client *c); +client *createCachedResponseClient(void); +void deleteCachedResponseClient(client *recording_client); /* logreqres.c - logging of requests and responses */ void reqresReset(client *c, int free_buf); diff --git a/tests/cluster/tests/04-resharding.tcl b/tests/cluster/tests/04-resharding.tcl index e063dc0694..ff11119ccd 100644 --- a/tests/cluster/tests/04-resharding.tcl +++ b/tests/cluster/tests/04-resharding.tcl @@ -125,7 +125,7 @@ test "Cluster consistency during live resharding" { } else { fail "Resharding is not terminating after some time." } - + wait_for_cluster_propagation } test "Verify $numkeys keys for consistency with logical content" { diff --git a/tests/unit/cluster/announced-endpoints.tcl b/tests/unit/cluster/announced-endpoints.tcl index becba2270e..b44a2a0e95 100644 --- a/tests/unit/cluster/announced-endpoints.tcl +++ b/tests/unit/cluster/announced-endpoints.tcl @@ -9,12 +9,18 @@ start_cluster 2 2 {tags {external:skip cluster}} { set count [expr [llength $::servers] + 1] set used_port [find_available_port $baseport $count] + # We execute CLUSTER SLOTS command to trigger the `debugServerAssertWithInfo` in `clusterCommandSlots` function, ensuring + # that the cached response is invalidated upon updating any of cluster-announce-tls-port or cluster-announce-port. + R 0 CLUSTER SLOTS + R 1 CLUSTER SLOTS + R 0 config set cluster-announce-tls-port $used_port R 0 config set cluster-announce-port $used_port assert_match "*:$used_port@*" [R 0 CLUSTER NODES] + assert_match "*$used_port*" [R 0 CLUSTER SLOTS] wait_for_condition 50 100 { - [string match "*:$used_port@*" [R 1 CLUSTER NODES]] + ([string match "*:$used_port@*" [R 1 CLUSTER NODES]] && [string match "*$used_port*" [R 1 CLUSTER SLOTS]]) } else { fail "Cluster announced port was not propagated via gossip" } diff --git a/tests/unit/cluster/failover.tcl b/tests/unit/cluster/failover.tcl index b2c68db3d2..9e84aef4ce 100644 --- a/tests/unit/cluster/failover.tcl +++ b/tests/unit/cluster/failover.tcl @@ -66,6 +66,7 @@ test "Instance #0 gets converted into a slave" { } else { fail "Old master was not converted into slave" } + wait_for_cluster_propagation } } ;# start_cluster diff --git a/tests/unit/cluster/manual-failover.tcl b/tests/unit/cluster/manual-failover.tcl index 2d0a8921cb..2a9dff934b 100644 --- a/tests/unit/cluster/manual-failover.tcl +++ b/tests/unit/cluster/manual-failover.tcl @@ -60,6 +60,7 @@ test "Wait for failover" { } else { fail "No failover detected" } + wait_for_cluster_propagation } test "Cluster should eventually be up again" { @@ -87,6 +88,7 @@ test "Instance #0 gets converted into a slave" { } else { fail "Old master was not converted into slave" } + wait_for_cluster_propagation } } ;# start_cluster