Skip to content

Commit

Permalink
Cache CLUSTER SLOTS response for improving throughput and reduced lat…
Browse files Browse the repository at this point in the history
…ency. (#53)

This commit adds a logic to cache `CLUSTER SLOTS` response for reduced
latency and also updates the cache when a change in the cluster is
detected.

Historically, `CLUSTER SLOTS` command was deprecated, however all the
server clients have been using `CLUSTER SLOTS` and have not migrated to
`CLUSTER SHARDS`. In future this logic can be added to any other
commands to improve the performance of the engine.

---------

Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri authored May 22, 2024
1 parent 7253862 commit c478206
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 45 deletions.
106 changes: 72 additions & 34 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1312,24 +1312,6 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
return 0;
}

/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}

void addNodeToNodeReply(client *c, clusterNode *node) {
char* hostname = clusterNodeHostname(node);
addReplyArrayLen(c, 4);
Expand Down Expand Up @@ -1381,10 +1363,28 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
serverAssert(length == 0);
}

/* Returns an indication if the node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
int isNodeAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = clusterNodeReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = getNodeReplicationOffset(node);
}
return (repl_offset != 0);
}

void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
Expand All @@ -1396,27 +1396,27 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in
for (i = 0; i < clusterNodeNumSlaves(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
if (!isNodeAvailable(clusterNodeGetSlave(node, i))) continue;
addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}

void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
void *slot_replylen = addReplyDeferredLen(recording_client);

for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
Expand All @@ -1430,14 +1430,52 @@ void clusterCommandSlots(client * c) {
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
addNodeReplyForClusterSlot(recording_client, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return is_equal;
}

void clusterCommandSlots(client *c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
connTypeForCaching conn_type = connIsTLS(c->conn);

if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
}

/* -----------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* functions with shared implementations */
int clusterNodeIsMyself(clusterNode *n);
Expand All @@ -113,4 +117,7 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
#endif /* __CLUSTER_H */
41 changes: 33 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ void clusterSaveConfigOrDie(int do_fsync) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
clearCachedClusterSlotsResponse();
}

/* Lock the cluster config using flock(), and retain the file descriptor used to
Expand Down Expand Up @@ -1039,6 +1040,9 @@ void clusterInit(void) {

server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL;
}
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
Expand Down Expand Up @@ -1363,6 +1367,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
node->is_node_healthy = 0;
return node;
}

Expand Down Expand Up @@ -5862,6 +5867,14 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeReplicationOffset(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
return node->repl_offset;
}
}

/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
Expand Down Expand Up @@ -5896,12 +5909,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
node_offset = node->repl_offset;
}
long long node_offset = getNodeReplicationOffset(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down Expand Up @@ -6882,9 +6890,26 @@ void clusterPromoteSelfToMaster(void) {
replicationUnsetMaster();
}

int detectAndUpdateCachedNodeHealth(void) {
dictIterator di;
dictInitSafeIterator(&di, server.cluster->nodes);
dictEntry *de;
clusterNode *node;
int overall_health_changed = 0;
while((de = dictNext(&di)) != NULL) {
node = dictGetVal(de);
int present_is_node_healthy = isNodeAvailable(node);
if (present_is_node_healthy != node->is_node_healthy) {
overall_health_changed = 1;
node->is_node_healthy = present_is_node_healthy;
}
}

This comment has been minimized.

Copy link
@srgsanky

srgsanky Jun 7, 2024

Contributor

I came to know that if we use stack based iterator, we need to invoke dictResetIterator so that rehashing on the dictionary is reenabled after the iteration.

valkey/src/dict.c

Lines 960 to 967 in 54c9747

void dictResetIterator(dictIterator *iter) {
if (!(iter->index == -1 && iter->table == 0)) {
if (iter->safe)
dictResumeRehashing(iter->d);
else
assert(iter->fingerprint == dictFingerprint(iter->d));
}
}

@roshkhatri @madolson

This comment has been minimized.

Copy link
@madolson

madolson Jun 7, 2024

Member

I missed this. We aren't supposed to use a safe iterator here. Normal iterators don't set pause rehashing. We should maybe remove dictInitSafeIterator from the public API.

return overall_health_changed;
}

/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void)
{
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;

int argc = 5;
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int is_node_healthy; /* Boolean indicating the cached node health.
Update with updateAndCountChangedNodeHealth(). */
};

struct clusterState {
Expand Down
9 changes: 8 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2586,6 +2586,12 @@ static int updateOOMScoreAdj(const char **err) {
return 1;
}

int invalidateClusterSlotsResp(const char **err) {
UNUSED(err);
clearCachedClusterSlotsResponse();
return 1;
}

int updateRequirePass(const char **err) {
UNUSED(err);
/* The old "requirepass" directive just translates to setting
Expand Down Expand Up @@ -2649,6 +2655,7 @@ int updateClusterFlags(const char **err) {
static int updateClusterAnnouncedPort(const char **err) {
UNUSED(err);
clusterUpdateMyselfAnnouncedPorts();
clearCachedClusterSlotsResponse();
return 1;
}

Expand Down Expand Up @@ -3162,7 +3169,7 @@ standardConfig static_configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, invalidateClusterSlotsResp),
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
Expand Down
6 changes: 6 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ typedef enum {
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */

typedef enum connTypeForCaching {
CACHE_CONN_TCP,
CACHE_CONN_TLS,
CACHE_CONN_TYPE_MAX
} connTypeForCaching;

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

typedef struct ConnectionType {
Expand Down
38 changes: 38 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,44 @@ int prepareClientToWrite(client *c) {
return C_OK;
}

/* Returns everything in the client reply linked list in a SDS format.
* This should only be used only with a caching client. */
sds aggregateClientOutputBuffer(client *c) {
sds cmd_response = sdsempty();
listIter li;
listNode *ln;
clientReplyBlock *val_block;
listRewind(c->reply,&li);

/* Here, c->buf is not used, thus we confirm c->bufpos remains 0. */
serverAssert(c->bufpos == 0);
while ((ln = listNext(&li)) != NULL) {
val_block = (clientReplyBlock *)listNodeValue(ln);
cmd_response = sdscatlen(cmd_response, val_block->buf,val_block->used);
}
return cmd_response;
}

/* This function creates and returns a fake client for recording the command response
* to initiate caching of any command response.
*
* It needs be paired with `deleteCachedResponseClient` function to stop caching. */
client *createCachedResponseClient(void) {
struct client *recording_client = createClient(NULL);
/* Allocating the `conn` allows to prepare the caching client before adding
* data to the clients output buffer by `prepareClientToWrite`. */
recording_client->conn = zcalloc(sizeof(connection));
return recording_client;
}

/* This function is used to stop caching of any command response after `createCachedResponseClient` is called.
* It returns the command response as SDS from the recording_client's reply buffer. */
void deleteCachedResponseClient(client *recording_client) {
zfree(recording_client->conn);
recording_client->conn = NULL;
freeClient(recording_client);
}

/* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down Expand Up @@ -2680,6 +2681,8 @@ void initThreadedIO(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster/tests/04-resharding.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ test "Cluster consistency during live resharding" {
} else {
fail "Resharding is not terminating after some time."
}

wait_for_cluster_propagation
}

test "Verify $numkeys keys for consistency with logical content" {
Expand Down
8 changes: 7 additions & 1 deletion tests/unit/cluster/announced-endpoints.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ start_cluster 2 2 {tags {external:skip cluster}} {
set count [expr [llength $::servers] + 1]
set used_port [find_available_port $baseport $count]

# We execute CLUSTER SLOTS command to trigger the `debugServerAssertWithInfo` in `clusterCommandSlots` function, ensuring
# that the cached response is invalidated upon updating any of cluster-announce-tls-port or cluster-announce-port.
R 0 CLUSTER SLOTS
R 1 CLUSTER SLOTS

R 0 config set cluster-announce-tls-port $used_port
R 0 config set cluster-announce-port $used_port

assert_match "*:$used_port@*" [R 0 CLUSTER NODES]
assert_match "*$used_port*" [R 0 CLUSTER SLOTS]
wait_for_condition 50 100 {
[string match "*:$used_port@*" [R 1 CLUSTER NODES]]
([string match "*:$used_port@*" [R 1 CLUSTER NODES]] && [string match "*$used_port*" [R 1 CLUSTER SLOTS]])
} else {
fail "Cluster announced port was not propagated via gossip"
}
Expand Down
Loading

0 comments on commit c478206

Please sign in to comment.