diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index b35b0b3b8e..a7d24761bc 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); -void clusterSetPrimary(clusterNode *n, int closeSlots); +void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync); void clusterHandleReplicaFailover(void); void clusterHandleReplicaMigration(int max_replicas); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -2625,8 +2625,11 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc "as a replica of node %.40s (%s) in shard %.40s", sender->name, sender->human_nodename, sender->shard_id); /* Don't clear the migrating/importing states if this is a replica that - * just gets promoted to the new primary in the shard. */ - clusterSetPrimary(sender, !areInSameShard(sender, myself)); + * just gets promoted to the new primary in the shard. + * + * Make a full sync hint if sender and myself not in the same shard. */ + int are_in_same_shard = areInSameShard(sender, myself); + clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) { /* When all our slots are lost to the sender and the sender belongs to @@ -3388,7 +3391,8 @@ int clusterProcessPacket(clusterLink *link) { * over the slot, there is nothing else to trigger replica migration. */ serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1); + int are_in_same_shard = areInSameShard(myself->replicaof->replicaof, myself); + clusterSetPrimary(myself->replicaof->replicaof, 1, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -3790,10 +3794,7 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { hdr->configEpoch = htonu64(primary->configEpoch); /* Set the replication offset. */ - if (nodeIsReplica(myself)) - offset = replicationGetReplicaOffset(); - else - offset = server.primary_repl_offset; + offset = getNodeReplicationOffset(myself); hdr->offset = htonu64(offset); /* Set the message flags. */ @@ -4327,7 +4328,7 @@ int clusterGetReplicaRank(void) { primary = myself->replicaof; if (primary == NULL) return 0; /* Never called by replicas without primary. */ - myoffset = replicationGetReplicaOffset(); + myoffset = getNodeReplicationOffset(myself); for (j = 0; j < primary->num_replicas; j++) { if (primary->replicas[j] == myself) continue; if (nodeCantFailover(primary->replicas[j])) continue; @@ -4381,6 +4382,7 @@ void clusterLogCantFailover(int reason) { case CLUSTER_CANT_FAILOVER_WAITING_DELAY: msg = "Waiting the delay before I can start a new failover."; break; case CLUSTER_CANT_FAILOVER_EXPIRED: msg = "Failover attempt expired."; break; case CLUSTER_CANT_FAILOVER_WAITING_VOTES: msg = "Waiting for votes, but majority still not reached."; break; + case CLUSTER_CANT_FAILOVER_FULL_SYNC: msg = "Replica has not completed sync and can not perform failover."; break; default: msg = "Unknown reason code."; break; } lastlog_time = time(NULL); @@ -4473,6 +4475,11 @@ void clusterHandleReplicaFailover(void) { return; } + if (myself->flags & CLUSTER_NODE_FULL_SYNC && !manual_failover) { + clusterLogCantFailover(CLUSTER_CANT_FAILOVER_FULL_SYNC); + return; + } + /* Set data_age to the number of milliseconds we are disconnected from * the primary. */ if (server.repl_state == REPL_STATE_CONNECTED) { @@ -4522,7 +4529,7 @@ void clusterHandleReplicaFailover(void) { "Start of election delayed for %lld milliseconds " "(rank #%d, offset %lld).", server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank, - replicationGetReplicaOffset()); + getNodeReplicationOffset(myself)); /* Now that we have a scheduled election, broadcast our offset * to all the other replicas so that they'll updated their offsets * if our offset is better. */ @@ -4692,7 +4699,8 @@ void clusterHandleReplicaMigration(int max_replicas) { !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name, target->human_nodename, target->shard_id); - clusterSetPrimary(target, 1); + int are_in_same_shard = areInSameShard(target, myself); + clusterSetPrimary(target, 1, !are_in_same_shard); } } @@ -4762,7 +4770,7 @@ void clusterHandleManualFailover(void) { if (server.cluster->mf_primary_offset == -1) return; /* Wait for offset... */ - if (server.cluster->mf_primary_offset == replicationGetReplicaOffset()) { + if (server.cluster->mf_primary_offset == getNodeReplicationOffset(myself)) { /* Our replication offset matches the primary replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; @@ -5398,7 +5406,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { /* Set the specified node 'n' as primary for this node. * If this node is currently a primary, it is turned into a replica. */ -void clusterSetPrimary(clusterNode *n, int closeSlots) { +void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync) { serverAssert(n != myself); serverAssert(myself->numslots == 0); @@ -5409,6 +5417,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself); } if (closeSlots) clusterCloseAllSlots(); + if (full_sync) myself->flags |= CLUSTER_NODE_FULL_SYNC; myself->replicaof = n; updateShardId(myself, n->shard_id); clusterNodeAddReplica(n, myself); @@ -5773,6 +5782,15 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { long long getNodeReplicationOffset(clusterNode *node) { if (node->flags & CLUSTER_NODE_MYSELF) { + if (nodeIsPrimary(node)) { + return server.primary_repl_offset; + } else { + /* The replica has not completed sync, offset is treated as 0. */ + if (myself->flags & CLUSTER_NODE_FULL_SYNC) return 0; + + return replicationGetReplicaOffset(); + } + return nodeIsReplica(node) ? replicationGetReplicaOffset() : server.primary_repl_offset; } else { return node->repl_offset; @@ -6118,7 +6136,7 @@ int clusterNodeIsFailing(clusterNode *node) { } int clusterNodeIsNoFailover(clusterNode *node) { - return node->flags & CLUSTER_NODE_NOFAILOVER; + return nodeCantFailover(node); } const char **clusterDebugCommandExtendedHelp(void) { @@ -6343,7 +6361,8 @@ void clusterCommandSetSlot(client *c) { "Lost my last slot during slot migration. Reconfiguring myself " "as a replica of %.40s (%s) in shard %.40s", n->name, n->human_nodename, n->shard_id); - clusterSetPrimary(n, 1); + int are_in_same_shard = areInSameShard(n, myself); + clusterSetPrimary(n, 1, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -6549,7 +6568,8 @@ int clusterCommandSpecial(client *c) { } /* Set the primary. */ - clusterSetPrimary(n, 1); + int are_in_same_shard = areInSameShard(n, myself); + clusterSetPrimary(n, 1, !are_in_same_shard); clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); addReply(c, shared.ok); diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 7184a2b204..046dd89d04 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -17,6 +17,7 @@ #define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2 #define CLUSTER_CANT_FAILOVER_EXPIRED 3 #define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4 +#define CLUSTER_CANT_FAILOVER_FULL_SYNC 5 #define CLUSTER_CANT_FAILOVER_RELOG_PERIOD 1 /* seconds. */ /* clusterState todo_before_sleep flags. */ @@ -53,6 +54,7 @@ typedef struct clusterLink { #define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */ #define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */ #define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */ +#define CLUSTER_NODE_FULL_SYNC (1 << 12) /* the node is about to do a sync and the sync is not complete. */ #define CLUSTER_NODE_NULL_NAME \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ "\000\000\000\000\000\000\000\000\000\000\000\000" diff --git a/src/replication.c b/src/replication.c index 6be8d3f9d5..85696c088b 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2362,6 +2362,7 @@ void readSyncBulkPayload(connection *conn) { } else { replicationCreatePrimaryClient(server.repl_transfer_s, rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; + if (server.cluster_enabled) server.cluster->myself->flags &= ~CLUSTER_NODE_FULL_SYNC; /* Send the initial ACK immediately to put this replica in online state. */ replicationSendAck(); } @@ -4104,6 +4105,7 @@ void establishPrimaryConnection(void) { server.primary->flag.authenticated = 1; server.primary->last_interaction = server.unixtime; server.repl_state = REPL_STATE_CONNECTED; + if (server.cluster_enabled) server.cluster->myself->flags &= ~CLUSTER_NODE_FULL_SYNC; server.repl_down_since = 0; /* Fire the primary link modules event. */ diff --git a/tests/unit/cluster/slot-migration.tcl b/tests/unit/cluster/slot-migration.tcl index 030404dfde..3f4e006d33 100644 --- a/tests/unit/cluster/slot-migration.tcl +++ b/tests/unit/cluster/slot-migration.tcl @@ -72,7 +72,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica assert_equal {OK} [R 1 CLUSTER SETSLOT 609 IMPORTING $R0_id] # Validate that R0 is migrating slot 609 to R1 assert_equal [get_open_slots 0] "\[609->-$R1_id\]" - # Validate that R1 is importing slot 609 from R0 + # Validate that R1 is importing slot 609 from R0 assert_equal [get_open_slots 1] "\[609-<-$R0_id\]" # Validate final states wait_for_slot_state 0 "\[609->-$R1_id\]" @@ -244,7 +244,7 @@ start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica assert_equal {OK} [R 6 CLUSTER SETSLOT 609 IMPORTING $R0_id] # Validate that R0 is migrating slot 609 to R6 assert_equal [get_open_slots 0] "\[609->-$R6_id\]" - # Validate that R6 is importing slot 609 from R0 + # Validate that R6 is importing slot 609 from R0 assert_equal [get_open_slots 6] "\[609-<-$R0_id\]" # Validate final states wait_for_slot_state 0 "\[609->-$R6_id\]" @@ -435,3 +435,116 @@ start_cluster 2 0 {tags {tls:skip external:skip cluster regression} overrides {c R 0 MIGRATE 127.0.0.1 [lindex [R 1 CONFIG GET port] 1] $stream_name 0 5000 } } + +# Allocate slot 0 to the last primary and evenly distribute the remaining +# slots to the remaining primary. +proc my_slot_allocation {masters replicas} { + set avg [expr double(16384) / [expr $masters-1]] + set slot_start 1 + for {set j 0} {$j < $masters-1} {incr j} { + set slot_end [expr int(ceil(($j + 1) * $avg) - 1)] + R $j cluster addslotsrange $slot_start $slot_end + set slot_start [expr $slot_end + 1] + } + R [expr $masters-1] cluster addslots 0 +} + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000}} { + test "If a replica has not completed sync, it can not do the failover" { + # Write some data to primary 4, slot 0 + for {set i 0} {$i < 10240} {incr i} { + R 3 incr key_977613 + } + assert_equal {10240} [R 3 get key_977613] + + # 10s, make sure primary 1 will hang in the save + R 0 config set rdb-key-save-delay 100000000 + + # Move the slot 0 from primary 4 to primary 1 + set addr "[srv 0 host]:[srv 0 port]" + set myid [R 3 CLUSTER MYID] + set code [catch { + exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0 + } result] + if {$code != 0} { + puts "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" + } + + R 3 config set cluster-replica-validity-factor 0 + R 7 config set cluster-replica-validity-factor 0 + R 3 config set cluster-allow-replica-migration yes + R 7 config set cluster-allow-replica-migration yes + + # Shutdown the primary 1 + catch {R 0 shutdown nosave} + + # Wait for the replica become a primary, and make sure + # the other primary become a replica. + wait_for_condition 1000 50 { + [s -4 role] eq {master} && + [s -3 role] eq {slave} && + [s -7 role] eq {slave} + } else { + fail "Failover does not happened" + } + + # Make sure the key is exists and consistent. + R 3 readonly + R 7 readonly + wait_for_condition 1000 50 { + [R 3 get key_977613] == 10240 && + [R 4 get key_977613] == 10240 && + [R 7 get key_977613] == 10240 + } else { + fail "Key not consistent" + } + + # Make sure we print the logs. + verify_log_message -3 "*Replica has not completed sync and can not perform failover*" 0 + verify_log_message -7 "*Replica has not completed sync and can not perform failover*" 0 + } +} my_slot_allocation cluster_allocate_replicas ;# start_cluster + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000}} { + test "If a replica has not completed sync, it can not do the failover" { + # Write some data to primary 0, slot 1, make a big repl_offset. + for {set i 0} {$i < 10240} {incr i} { + R 0 incr key_991803 + } + assert_equal {10240} [R 0 get key_991803] + + # 10s, make sure primary 1 will hang in the save + R 0 config set rdb-key-save-delay 100000000 + + # Let the replica do the replicate with primary 1. + R 7 config set cluster-replica-validity-factor 0 + R 7 config set cluster-allow-replica-migration yes + R 7 cluster replicate [R 0 cluster myid] + + catch {R 0 shutdown nosave} + + # 0 1 2 3 + # 4 5 6 + # 7 + + # Wait for the replica become a primary + wait_for_condition 1000 50 { + [s -4 role] eq {master} && + [s -7 role] eq {slave} + } else { + fail "Failover does not happened" + } + + # Make sure the key is exists and consistence. + R 7 readonly + wait_for_condition 1000 50 { + [R 4 get key_991803] == 10240 && + [R 7 get key_991803] == 10240 + } else { + fail "Key not consistent" + } + + # Make sure we print the logs. + verify_log_message -7 "*Replica has not completed sync and can not perform failover*" 0 + } +} my_slot_allocation cluster_allocate_replicas ;# start_cluster