From 8ac5fee8d3629cbf00ed725f8a4688dcbeed9b28 Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Sat, 1 Jun 2024 22:04:26 -0700 Subject: [PATCH] Replicate slot migrating states via RDB aux fields Signed-off-by: Ping Xie --- .gitignore | 1 + src/cluster.h | 1 - src/cluster_legacy.c | 81 +++++++++++++++++++++------ src/latency.c | 2 - src/rdb.c | 64 ++++++++++++++++++++- src/replication.c | 3 - src/server.c | 4 -- src/server.h | 18 ++++-- tests/support/cluster_util.tcl | 4 ++ tests/unit/cluster/hostnames.tcl | 4 -- tests/unit/cluster/slot-migration.tcl | 9 ++- 11 files changed, 147 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index e745f76a04..b22948a3c6 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ redis.code-workspace .cache .cscope* .swp +nodes.conf diff --git a/src/cluster.h b/src/cluster.h index de58486440..fec63c2da9 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -103,7 +103,6 @@ char *clusterNodeHostname(clusterNode *node); const char *clusterNodePreferredEndpoint(clusterNode *n); long long clusterNodeReplOffset(clusterNode *node); clusterNode *clusterLookupNode(const char *name, int length); -void clusterReplicateOpenSlots(void); int detectAndUpdateCachedNodeHealth(void); client *createCachedResponseClient(void); void deleteCachedResponseClient(client *recording_client); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 0de6351e90..2788ff3917 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -113,6 +113,8 @@ int auxTlsPortPresent(clusterNode *n); static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); void freeClusterLink(clusterLink *link); int verifyClusterNodeId(const char *name, int length); +sds clusterEncodeOpenSlotsAuxField(int rdbflags); +int clusterDecodeOpenSlotsAuxField(int rdbflags, void *s); int getNodeDefaultClientPort(clusterNode *n) { return server.tls_cluster ? n->tls_port : n->tcp_port; @@ -1017,6 +1019,10 @@ void clusterInit(void) { exit(1); } + /* Register our own rdb aux fields */ + serverAssert(rdbRegisterAuxField("cluster-slot-states", clusterEncodeOpenSlotsAuxField, + clusterDecodeOpenSlotsAuxField) == C_OK); + /* Set myself->port/cport/pport to my listening ports, we'll just need to * discover the IP address via MEET messages. */ deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport); @@ -6521,38 +6527,81 @@ int detectAndUpdateCachedNodeHealth(void) { } /* Replicate migrating and importing slot states to all replicas */ -void clusterReplicateOpenSlots(void) { - if (!server.cluster_enabled) return; +sds clusterEncodeOpenSlotsAuxField(int rdbflags) { + if (!server.cluster_enabled) return NULL; - int argc = 5; - robj **argv = zmalloc(sizeof(robj *) * argc); + /* Open slots should not be persisted (nor loaded) */ + if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return NULL; - argv[0] = shared.cluster; - argv[1] = shared.setslot; + sds s = NULL; for (int i = 0; i < 2; i++) { - clusterNode **nodes_ptr = NULL; + clusterNode **nodes_ptr; if (i == 0) { nodes_ptr = server.cluster->importing_slots_from; - argv[3] = shared.importing; } else { nodes_ptr = server.cluster->migrating_slots_to; - argv[3] = shared.migrating; } for (int j = 0; j < CLUSTER_SLOTS; j++) { if (nodes_ptr[j] == NULL) continue; + if (s == NULL) s = sdsempty(); + s = sdscatfmt(s, "%i%s", j, (i == 0) ? "<" : ">"); + s = sdscatlen(s, nodes_ptr[j]->name, CLUSTER_NAMELEN); + s = sdscatlen(s, ",", 1); + } + } + + return s; +} - argv[2] = createStringObjectFromLongLongForValue(j); - sds name = sdsnewlen(nodes_ptr[j]->name, sizeof(nodes_ptr[j]->name)); - argv[4] = createObject(OBJ_STRING, name); +int clusterDecodeOpenSlotsAuxField(int rdbflags, void *p) { + if (!server.cluster_enabled || p == NULL) return C_OK; - replicationFeedSlaves(0, argv, argc); + /* Open slots should not be loaded (nor persisted) */ + if ((rdbflags & RDBFLAGS_REPLICATION) == 0) return C_OK; - decrRefCount(argv[2]); - decrRefCount(argv[4]); + char *s = p; + while (*s) { + /* Extract slot number */ + int slot = atoi(s); + if (slot < 0 || slot >= CLUSTER_SLOTS) return C_ERR; + + while (*s && *s != '<' && *s != '>') s++; + if (*s != '<' && *s != '>') return C_ERR; + + /* Determine if it's an importing or migrating slot */ + int is_importing = (*s == '<'); + s++; + + /* Extract the node name */ + char node_name[CLUSTER_NAMELEN]; + int k = 0; + while (*s && *s != ',' && k < CLUSTER_NAMELEN) { + node_name[k++] = *s++; + } + + /* Ensure the node name is of the correct length */ + if (k != CLUSTER_NAMELEN || *s != ',') return C_ERR; + + /* Move to the next slot */ + s++; + + /* Find the corresponding node */ + clusterNode *node = clusterLookupNode(node_name, CLUSTER_NAMELEN); + if (!node) { + /* Create a new node if not found */ + node = createClusterNode(node_name, 0); + clusterAddNode(node); + } + + /* Set the slot state */ + if (is_importing) { + server.cluster->importing_slots_from[slot] = node; + } else { + server.cluster->migrating_slots_to[slot] = node; } } - zfree(argv); + return C_OK; } diff --git a/src/latency.c b/src/latency.c index 78f3cc3edd..c31c88c817 100644 --- a/src/latency.c +++ b/src/latency.c @@ -46,8 +46,6 @@ uint64_t dictStringHash(const void *key) { return dictGenHashFunction(key, strlen(key)); } -void dictVanillaFree(dict *d, void *val); - dictType latencyTimeSeriesDictType = { dictStringHash, /* hash function */ NULL, /* key dup */ diff --git a/src/rdb.c b/src/rdb.c index fe297cb7a9..ff6040af7e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -36,6 +36,7 @@ #include "functions.h" #include "intset.h" /* Compact integer set structure */ #include "bio.h" +#include "zmalloc.h" #include #include @@ -107,6 +108,31 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) { exit(1); } +typedef struct { + rdbAuxFieldEncoder encoder; + rdbAuxFieldDecoder decoder; +} rdbAuxFieldCodec; + +dictType rdbAuxFieldDictType = { + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictVanillaFree, /* val destructor */ + NULL /* allow to expand */ +}; + +dict *rdbAuxFields = NULL; + +int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder) { + if (rdbAuxFields == NULL) rdbAuxFields = dictCreate(&rdbAuxFieldDictType); + rdbAuxFieldCodec *codec = zmalloc(sizeof(rdbAuxFieldCodec)); + codec->encoder = encoder; + codec->decoder = decoder; + return dictAdd(rdbAuxFields, sdsnew(auxfield), (void *)codec) == DICT_OK ? C_OK : C_ERR; +} + ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) { if (rdb && rioWrite(rdb, p, len) == 0) return -1; return len; @@ -1186,6 +1212,25 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveAuxFieldStrInt(rdb, "repl-offset", server.master_repl_offset) == -1) return -1; } if (rdbSaveAuxFieldStrInt(rdb, "aof-base", aof_base) == -1) return -1; + + /* Handle additional server aux fileds */ + if (rdbAuxFields != NULL) { + dictIterator *di = dictGetIterator(rdbAuxFields); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de); + sds s = codec->encoder(rdbflags); + if (s == NULL) continue; + if (rdbSaveAuxFieldStrStr(rdb, dictGetKey(de), s) == -1) { + sdsfree(s); + dictReleaseIterator(di); + return -1; + } + sdsfree(s); + } + dictReleaseIterator(di); + } + return 1; } @@ -3100,9 +3145,22 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } else if (!strcasecmp(auxkey->ptr, "redis-bits")) { /* Just ignored. */ } else { - /* We ignore fields we don't understand, as by AUX field - * contract. */ - serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr); + /* Check if this is a dynamic aux field */ + int handled = 0; + if (rdbAuxFields != NULL) { + dictEntry *de = dictFind(rdbAuxFields, auxkey->ptr); + if (de != NULL) { + handled = 1; + rdbAuxFieldCodec *codec = (rdbAuxFieldCodec *)dictGetVal(de); + if (codec->decoder(rdbflags, auxval->ptr) < 0) goto eoferr; + } + } + + if (!handled) { + /* We ignore fields we don't understand, as by AUX field + * contract. */ + serverLog(LL_DEBUG, "Unrecognized RDB AUX field: '%s'", (char *)auxkey->ptr); + } } decrRefCount(auxkey); diff --git a/src/replication.c b/src/replication.c index 375b637f61..93883d3227 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1270,9 +1270,6 @@ int replicaPutOnline(client *slave) { /* Fire the replica change modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICA_CHANGE, VALKEYMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL); serverLog(LL_NOTICE, "Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); - - /* Replicate slot being migrated/imported to the new replica */ - clusterReplicateOpenSlots(); return 1; } diff --git a/src/server.c b/src/server.c index bf4967c106..00be622bb6 100644 --- a/src/server.c +++ b/src/server.c @@ -1930,10 +1930,6 @@ void createSharedObjects(void) { shared.special_asterick = createStringObject("*", 1); shared.special_equals = createStringObject("=", 1); shared.redacted = makeObjectShared(createStringObject("(redacted)", 10)); - shared.cluster = createStringObject("CLUSTER", 7); - shared.setslot = createStringObject("SETSLOT", 7); - shared.importing = createStringObject("IMPORTING", 9); - shared.migrating = createStringObject("MIGRATING", 9); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING, (void *)(long)j)); diff --git a/src/server.h b/src/server.h index 8f273db1e8..29d77d30f4 100644 --- a/src/server.h +++ b/src/server.h @@ -701,10 +701,11 @@ typedef enum { #define serverAssert(_e) (likely(_e) ? (void)0 : (_serverAssert(#_e, __FILE__, __LINE__), valkey_unreachable())) #define serverPanic(...) _serverPanic(__FILE__, __LINE__, __VA_ARGS__), valkey_unreachable() -/* The following macro provides a conditional assertion that is only executed +/* The following macros provide a conditional assertion that is only executed * when the server config 'enable-debug-assert' is true. This is useful for adding * assertions that are too computationally expensive or risky to run in normal * operation, but are valuable for debugging or testing. */ +#define debugServerAssert(...) (server.enable_debug_assert ? serverAssert(__VA_ARGS__) : (void)0) #define debugServerAssertWithInfo(...) (server.enable_debug_assert ? serverAssertWithInfo(__VA_ARGS__) : (void)0) /* latency histogram per command init settings */ @@ -1031,6 +1032,9 @@ typedef struct rdbLoadingCtx { functionsLibCtx *functions_lib_ctx; } rdbLoadingCtx; +typedef sds (*rdbAuxFieldEncoder)(int flags); +typedef int (*rdbAuxFieldDecoder)(int flags, void *p); + /* Client MULTI/EXEC state */ typedef struct multiCmd { robj **argv; @@ -1367,11 +1371,11 @@ struct sharedObjectsStruct { *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk, - *smessagebulk, *cluster, *setslot, *importing, *migrating, *select[PROTO_SHARED_SELECT_CMDS], - *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ - *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$\r\n" */ - *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%\r\n" */ - *sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~\r\n" */ + *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], + *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ + *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$\r\n" */ + *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%\r\n" */ + *sethdr[OBJ_SHARED_BULKHDR_LEN]; /* "~\r\n" */ sds minstring, maxstring; }; @@ -2602,6 +2606,7 @@ int serverSetProcTitle(char *title); int validateProcTitleTemplate(const char *template); int serverCommunicateSystemd(const char *sd_notify_msg); void serverSetCpuAffinity(const char *cpulist); +void dictVanillaFree(dict *d, void *val); /* afterErrorReply flags */ #define ERR_REPLY_FLAG_NO_STATS_UPDATE \ @@ -2903,6 +2908,7 @@ void rebaseReplicationBuffer(long long base_repl_offset); void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); +int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder); void clearFailoverState(void); void updateFailoverStatus(void); void abortFailover(const char *err); diff --git a/tests/support/cluster_util.tcl b/tests/support/cluster_util.tcl index ebca69d9ca..5708dfac7e 100644 --- a/tests/support/cluster_util.tcl +++ b/tests/support/cluster_util.tcl @@ -371,3 +371,7 @@ proc check_cluster_node_mark {flag ref_node_index instance_id_to_check} { } fail "Unable to find instance id in cluster nodes. ID: $instance_id_to_check" } + +proc get_slot_field {slot_output shard_id node_id attrib_id} { + return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id] +} diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index 98a6385c6f..04b32e380b 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -1,7 +1,3 @@ -proc get_slot_field {slot_output shard_id node_id attrib_id} { - return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id] -} - # Start a cluster with 3 masters and 4 replicas. # These tests rely on specific node ordering, so make sure no node fails over. start_cluster 3 4 {tags {external:skip cluster} overrides {cluster-replica-no-failover yes}} { diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl index d4f0d43b3b..82818b31ab 100644 --- a/tests/unit/cluster/slot-migration.tcl +++ b/tests/unit/cluster/slot-migration.tcl @@ -15,9 +15,6 @@ proc get_cluster_role {srv_idx} { } proc wait_for_role {srv_idx role} { - set node_timeout [lindex [R 0 CONFIG GET cluster-node-timeout] 1] - # wait for a gossip cycle for states to be propagated throughout the cluster - after $node_timeout wait_for_condition 100 100 { [lindex [split [R $srv_idx ROLE] " "] 0] eq $role } else { @@ -35,6 +32,8 @@ proc wait_for_slot_state {srv_idx pattern} { wait_for_condition 100 100 { [get_open_slots $srv_idx] eq $pattern } else { + puts [R $srv_idx config get port] + gets stdin fail "incorrect slot state on R $srv_idx: expected $pattern; got [get_open_slots $srv_idx]" } } @@ -198,7 +197,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica assert_equal {OK} [R 3 CLUSTER REPLICATE $R0_id] wait_for_role 3 slave # Validate that R3 now sees slot 609 open - assert_equal [get_open_slots 3] "\[609->-$R1_id\]" + wait_for_slot_state 3 "\[609->-$R1_id\]" } test "New replica inherits importing slot" { @@ -212,7 +211,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica assert_equal {OK} [R 4 CLUSTER REPLICATE $R1_id] wait_for_role 4 slave # Validate that R4 now sees slot 609 open - assert_equal [get_open_slots 4] "\[609-<-$R0_id\]" + wait_for_slot_state 4 "\[609-<-$R0_id\]" } }