Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the TLS and REPS issues about CLUSTER SLOTS cache #581

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1354,15 +1354,17 @@ void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, in

void clearCachedClusterSlotsResponse(void) {
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
if (server.cached_cluster_slot_info[conn_type]) {
sdsfree(server.cached_cluster_slot_info[conn_type]);
server.cached_cluster_slot_info[conn_type] = NULL;
for (int resp = 0; resp <= 3; resp++) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
if (server.cached_cluster_slot_info[conn_type][resp]) {
sdsfree(server.cached_cluster_slot_info[conn_type][resp]);
server.cached_cluster_slot_info[conn_type][resp] = NULL;
}
}
}
}

sds generateClusterSlotResponse(void) {
client *recording_client = createCachedResponseClient();
sds generateClusterSlotResponse(int resp) {
client *recording_client = createCachedResponseClient(resp);
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(recording_client);
Expand Down Expand Up @@ -1392,8 +1394,8 @@ sds generateClusterSlotResponse(void) {
return cluster_slot_response;
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int verifyCachedClusterSlotsResponse(sds cached_response, int resp) {
sds generated_response = generateClusterSlotResponse(resp);
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal)
Expand All @@ -1413,16 +1415,16 @@ void clusterCommandSlots(client *c) {
* 3) node ID
* ... continued until done
*/
connTypeForCaching conn_type = connIsTLS(c->conn);
connTypeForCaching conn_type = shouldReturnTlsInfo();
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

if (detectAndUpdateCachedNodeHealth()) clearCachedClusterSlotsResponse();

sds cached_reply = server.cached_cluster_slot_info[conn_type];
sds cached_reply = server.cached_cluster_slot_info[conn_type][c->resp];
if (!cached_reply) {
cached_reply = generateClusterSlotResponse();
server.cached_cluster_slot_info[conn_type] = cached_reply;
cached_reply = generateClusterSlotResponse(c->resp);
server.cached_cluster_slot_info[conn_type][c->resp] = cached_reply;
} else {
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply) == 1);
debugServerAssertWithInfo(c, NULL, verifyCachedClusterSlotsResponse(cached_reply, c->resp) == 1);
}

addReplyProto(c, cached_reply, sdslen(cached_reply));
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);

Expand Down
4 changes: 3 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,9 @@ void clusterInit(void) {
server.cluster->mf_end = 0;
server.cluster->mf_slave = NULL;
for (connTypeForCaching conn_type = CACHE_CONN_TCP; conn_type < CACHE_CONN_TYPE_MAX; conn_type++) {
server.cached_cluster_slot_info[conn_type] = NULL;
for (int resp = 0; resp <= 3; resp++) {
server.cached_cluster_slot_info[conn_type][resp] = NULL;
}
}
resetManualFailover();
clusterUpdateMyselfFlags();
Expand Down
3 changes: 2 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ sds aggregateClientOutputBuffer(client *c) {
* to initiate caching of any command response.
*
* It needs be paired with `deleteCachedResponseClient` function to stop caching. */
client *createCachedResponseClient(void) {
client *createCachedResponseClient(int resp) {
struct client *recording_client = createClient(NULL);
recording_client->resp = resp;
/* Allocating the `conn` allows to prepare the caching client before adding
* data to the clients output buffer by `prepareClientToWrite`. */
recording_client->conn = zcalloc(sizeof(connection));
Expand Down
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX][4]; /* Align to RESP3 */
soloestoy marked this conversation as resolved.
Show resolved Hide resolved
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down Expand Up @@ -2725,7 +2725,7 @@ void initSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void);
client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);

/* logreqres.c - logging of requests and responses */
Expand Down
Loading