Skip to content

Commit

Permalink
Replicate slot migrating states via RDB aux fields
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed Jun 2, 2024
1 parent d16b4ec commit 8ac5fee
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 44 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ redis.code-workspace
.cache
.cscope*
.swp
nodes.conf
1 change: 0 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
void deleteCachedResponseClient(client *recording_client);
Expand Down
81 changes: 65 additions & 16 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ int auxTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, void *s);

int getNodeDefaultClientPort(clusterNode *n) {
return server.tls_cluster ? n->tls_port : n->tcp_port;
Expand Down Expand Up @@ -1017,6 +1019,10 @@ void clusterInit(void) {
exit(1);
}

/* Register our own rdb aux fields */
serverAssert(rdbRegisterAuxField("cluster-slot-states", clusterEncodeOpenSlotsAuxField,
clusterDecodeOpenSlotsAuxField) == C_OK);

/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
Expand Down Expand Up @@ -6521,38 +6527,81 @@ int detectAndUpdateCachedNodeHealth(void) {
}

/* Replicate migrating and importing slot states to all replicas */
void clusterReplicateOpenSlots(void) {
if (!server.cluster_enabled) return;
sds clusterEncodeOpenSlotsAuxField(int rdbflags) {
if (!server.cluster_enabled) return NULL;

int argc = 5;
robj **argv = zmalloc(sizeof(robj *) * argc);
/* Open slots should not be persisted (nor loaded) */
if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return NULL;

argv[0] = shared.cluster;
argv[1] = shared.setslot;
sds s = NULL;

for (int i = 0; i < 2; i++) {
clusterNode **nodes_ptr = NULL;
clusterNode **nodes_ptr;
if (i == 0) {
nodes_ptr = server.cluster->importing_slots_from;
argv[3] = shared.importing;
} else {
nodes_ptr = server.cluster->migrating_slots_to;
argv[3] = shared.migrating;
}

for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (nodes_ptr[j] == NULL) continue;
if (s == NULL) s = sdsempty();
s = sdscatfmt(s, "%i%s", j, (i == 0) ? "<" : ">");
s = sdscatlen(s, nodes_ptr[j]->name, CLUSTER_NAMELEN);
s = sdscatlen(s, ",", 1);
}
}

return s;
}

argv[2] = createStringObjectFromLongLongForValue(j);
sds name = sdsnewlen(nodes_ptr[j]->name, sizeof(nodes_ptr[j]->name));
argv[4] = createObject(OBJ_STRING, name);
int clusterDecodeOpenSlotsAuxField(int rdbflags, void *p) {
if (!server.cluster_enabled || p == NULL) return C_OK;

replicationFeedSlaves(0, argv, argc);
/* Open slots should not be loaded (nor persisted) */
if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return C_OK;

decrRefCount(argv[2]);
decrRefCount(argv[4]);
char *s = p;
while (*s) {
/* Extract slot number */
int slot = atoi(s);
if (slot < 0 || slot >= CLUSTER_SLOTS) return C_ERR;

while (*s && *s != '<' && *s != '>') s++;
if (*s != '<' && *s != '>') return C_ERR;

/* Determine if it's an importing or migrating slot */
int is_importing = (*s == '<');
s++;

/* Extract the node name */
char node_name[CLUSTER_NAMELEN];
int k = 0;
while (*s && *s != ',' && k < CLUSTER_NAMELEN) {
node_name[k++] = *s++;
}

/* Ensure the node name is of the correct length */
if (k != CLUSTER_NAMELEN || *s != ',') return C_ERR;

/* Move to the next slot */
s++;

/* Find the corresponding node */
clusterNode *node = clusterLookupNode(node_name, CLUSTER_NAMELEN);
if (!node) {
/* Create a new node if not found */
node = createClusterNode(node_name, 0);
clusterAddNode(node);
}

/* Set the slot state */
if (is_importing) {
server.cluster->importing_slots_from[slot] = node;
} else {
server.cluster->migrating_slots_to[slot] = node;
}
}

zfree(argv);
return C_OK;
}
2 changes: 0 additions & 2 deletions src/latency.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ uint64_t dictStringHash(const void *key) {
return dictGenHashFunction(key, strlen(key));
}

void dictVanillaFree(dict *d, void *val);

dictType latencyTimeSeriesDictType = {
dictStringHash, /* hash function */
NULL, /* key dup */
Expand Down
64 changes: 61 additions & 3 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "functions.h"
#include "intset.h" /* Compact integer set structure */
#include "bio.h"
#include "zmalloc.h"

#include <math.h>
#include <fcntl.h>
Expand Down Expand Up @@ -107,6 +108,31 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
exit(1);
}

typedef struct {
rdbAuxFieldEncoder encoder;
rdbAuxFieldDecoder decoder;
} rdbAuxFieldCodec;

dictType rdbAuxFieldDictType = {
dictSdsCaseHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCaseCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictVanillaFree, /* val destructor */
NULL /* allow to expand */
};

dict *rdbAuxFields = NULL;

int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder) {
if (rdbAuxFields == NULL) rdbAuxFields = dictCreate(&rdbAuxFieldDictType);
rdbAuxFieldCodec *codec = zmalloc(sizeof(rdbAuxFieldCodec));
codec->encoder = encoder;
codec->decoder = decoder;
return dictAdd(rdbAuxFields, sdsnew(auxfield), (void *)codec) == DICT_OK ? C_OK : C_ERR;
}

ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb, p, len) == 0) return -1;
return len;
Expand Down Expand Up @@ -1186,6 +1212,25 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveAuxFieldStrInt(rdb, "repl-offset", server.master_repl_offset) == -1) return -1;
}
if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1;

/* Handle additional server aux fileds */
if (rdbAuxFields != NULL) {
dictIterator *di = dictGetIterator(rdbAuxFields);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de);
sds s = codec->encoder(rdbflags);
if (s == NULL) continue;
if (rdbSaveAuxFieldStrStr(rdb, dictGetKey(de), s) == -1) {
sdsfree(s);
dictReleaseIterator(di);
return -1;
}
sdsfree(s);
}
dictReleaseIterator(di);
}

return 1;
}

Expand Down Expand Up @@ -3100,9 +3145,22 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (!strcasecmp(auxkey->ptr, "redis-bits")) {
/* Just ignored. */
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr);
/* Check if this is a dynamic aux field */
int handled = 0;
if (rdbAuxFields != NULL) {
dictEntry *de = dictFind(rdbAuxFields, auxkey->ptr);
if (de != NULL) {
handled = 1;
rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de);
if (codec->decoder(rdbflags, auxval->ptr) < 0) goto eoferr;
}
}

if (!handled) {
/* We ignore fields we don't understand, as by AUX field
* contract. */
serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr);
}
}

decrRefCount(auxkey);
Expand Down
3 changes: 0 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1270,9 +1270,6 @@ int replicaPutOnline(client *slave) {
/* Fire the replica change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL);
serverLog(LL_NOTICE, "Synchronization with replica %s succeeded", replicationGetSlaveName(slave));

/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}

Expand Down
4 changes: 0 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1930,10 +1930,6 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*", 1);
shared.special_equals = createStringObject("=", 1);
shared.redacted = makeObjectShared(createStringObject("(redacted)", 10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);

for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] = makeObjectShared(createObject(OBJ_STRING, (void *)(long)j));
Expand Down
18 changes: 12 additions & 6 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,11 @@ typedef enum {
#define serverAssert(_e) (likely(_e) ? (void)0 : (_serverAssert(#_e, __FILE__, __LINE__), valkey_unreachable()))
#define serverPanic(...) _serverPanic(__FILE__, __LINE__, __VA_ARGS__), valkey_unreachable()

/* The following macro provides a conditional assertion that is only executed
/* The following macros provide a conditional assertion that is only executed
* when the server config 'enable-debug-assert' is true. This is useful for adding
* assertions that are too computationally expensive or risky to run in normal
* operation, but are valuable for debugging or testing. */
#define debugServerAssert(...) (server.enable_debug_assert ? serverAssert(__VA_ARGS__) : (void)0)
#define debugServerAssertWithInfo(...) (server.enable_debug_assert ? serverAssertWithInfo(__VA_ARGS__) : (void)0)

/* latency histogram per command init settings */
Expand Down Expand Up @@ -1031,6 +1032,9 @@ typedef struct rdbLoadingCtx {
functionsLibCtx *functions_lib_ctx;
} rdbLoadingCtx;

typedef sds (*rdbAuxFieldEncoder)(int flags);
typedef int (*rdbAuxFieldDecoder)(int flags, void *p);

/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
Expand Down Expand Up @@ -1367,11 +1371,11 @@ struct sharedObjectsStruct {
*xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl,
*retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack,
*special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk,
*smessagebulk, *cluster, *setslot, *importing, *migrating, *select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
*smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$<value>\r\n" */
*maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%<value>\r\n" */
*sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~<value>\r\n" */
sds minstring, maxstring;
};

Expand Down Expand Up @@ -2602,6 +2606,7 @@ int serverSetProcTitle(char *title);
int validateProcTitleTemplate(const char *template);
int serverCommunicateSystemd(const char *sd_notify_msg);
void serverSetCpuAffinity(const char *cpulist);
void dictVanillaFree(dict *d, void *val);

/* afterErrorReply flags */
#define ERR_REPLY_FLAG_NO_STATS_UPDATE \
Expand Down Expand Up @@ -2903,6 +2908,7 @@ void rebaseReplicationBuffer(long long base_repl_offset);
void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder);
void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(const char *err);
Expand Down
4 changes: 4 additions & 0 deletions tests/support/cluster_util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,7 @@ proc check_cluster_node_mark {flag ref_node_index instance_id_to_check} {
}
fail "Unable to find instance id in cluster nodes. ID: $instance_id_to_check"
}

proc get_slot_field {slot_output shard_id node_id attrib_id} {
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
}
4 changes: 0 additions & 4 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
proc get_slot_field {slot_output shard_id node_id attrib_id} {
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
}

# Start a cluster with 3 masters and 4 replicas.
# These tests rely on specific node ordering, so make sure no node fails over.
start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} {
Expand Down
9 changes: 4 additions & 5 deletions tests/unit/cluster/slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ proc get_cluster_role {srv_idx} {
}

proc wait_for_role {srv_idx role} {
set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1]
# wait for a gossip cycle for states to be propagated throughout the cluster
after $node_timeout
wait_for_condition 100 100 {
[lindex [split [R $srv_idx ROLE] " "] 0] eq $role
} else {
Expand All @@ -35,6 +32,8 @@ proc wait_for_slot_state {srv_idx pattern} {
wait_for_condition 100 100 {
[get_open_slots $srv_idx] eq $pattern
} else {
puts [R $srv_idx config get port]
gets stdin
fail "incorrect slot state on R $srv_idx: expected $pattern; got [get_open_slots $srv_idx]"
}
}
Expand Down Expand Up @@ -198,7 +197,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 3 CLUSTER REPLICATE $R0_id]
wait_for_role 3 slave
# Validate that R3 now sees slot 609 open
assert_equal [get_open_slots 3] "\[609->-$R1_id\]"
wait_for_slot_state 3 "\[609->-$R1_id\]"
}

test "New replica inherits importing slot" {
Expand All @@ -212,7 +211,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 4 CLUSTER REPLICATE $R1_id]
wait_for_role 4 slave
# Validate that R4 now sees slot 609 open
assert_equal [get_open_slots 4] "\[609-<-$R0_id\]"
wait_for_slot_state 4 "\[609-<-$R0_id\]"
}
}

Expand Down

0 comments on commit 8ac5fee

Please sign in to comment.