Skip to content

Commit

Permalink
If a replica has not completed sync, it can not do the failover
Browse files Browse the repository at this point in the history
  • Loading branch information
enjoy-binbin committed Aug 10, 2024
1 parent 7424620 commit 1e02cb7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 18 deletions.
52 changes: 36 additions & 16 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetPrimary(clusterNode *n, int closeSlots);
void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync);
void clusterHandleReplicaFailover(void);
void clusterHandleReplicaMigration(int max_replicas);
int bitmapTestBit(unsigned char *bitmap, int pos);
Expand Down Expand Up @@ -2625,8 +2625,11 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
"as a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, sender->shard_id);
/* Don't clear the migrating/importing states if this is a replica that
* just gets promoted to the new primary in the shard. */
clusterSetPrimary(sender, !areInSameShard(sender, myself));
* just gets promoted to the new primary in the shard.
*
* Make a full sync hint if sender and myself not in the same shard. */
int are_in_same_shard = areInSameShard(sender, myself);
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
} else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) {
/* When all our slots are lost to the sender and the sender belongs to
Expand Down Expand Up @@ -3388,7 +3391,8 @@ int clusterProcessPacket(clusterLink *link) {
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
int are_in_same_shard = areInSameShard(myself->replicaof->replicaof, myself);
clusterSetPrimary(myself->replicaof->replicaof, 1, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -3790,10 +3794,7 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
hdr->configEpoch = htonu64(primary->configEpoch);

/* Set the replication offset. */
if (nodeIsReplica(myself))
offset = replicationGetReplicaOffset();
else
offset = server.primary_repl_offset;
offset = getNodeReplicationOffset(myself);
hdr->offset = htonu64(offset);

/* Set the message flags. */
Expand Down Expand Up @@ -4327,7 +4328,7 @@ int clusterGetReplicaRank(void) {
primary = myself->replicaof;
if (primary == NULL) return 0; /* Never called by replicas without primary. */

myoffset = replicationGetReplicaOffset();
myoffset = getNodeReplicationOffset(myself);
for (j = 0; j < primary->num_replicas; j++) {
if (primary->replicas[j] == myself) continue;
if (nodeCantFailover(primary->replicas[j])) continue;
Expand Down Expand Up @@ -4381,6 +4382,7 @@ void clusterLogCantFailover(int reason) {
case CLUSTER_CANT_FAILOVER_WAITING_DELAY: msg = "Waiting the delay before I can start a new failover."; break;
case CLUSTER_CANT_FAILOVER_EXPIRED: msg = "Failover attempt expired."; break;
case CLUSTER_CANT_FAILOVER_WAITING_VOTES: msg = "Waiting for votes, but majority still not reached."; break;
case CLUSTER_CANT_FAILOVER_FULL_SYNC: msg = "Replica has not completed sync and can not perform failover."; break;
default: msg = "Unknown reason code."; break;
}
lastlog_time = time(NULL);
Expand Down Expand Up @@ -4473,6 +4475,11 @@ void clusterHandleReplicaFailover(void) {
return;
}

if (myself->flags & CLUSTER_NODE_FULL_SYNC && !manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_FULL_SYNC);
return;
}

/* Set data_age to the number of milliseconds we are disconnected from
* the primary. */
if (server.repl_state == REPL_STATE_CONNECTED) {
Expand Down Expand Up @@ -4522,7 +4529,7 @@ void clusterHandleReplicaFailover(void) {
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank,
replicationGetReplicaOffset());
getNodeReplicationOffset(myself));
/* Now that we have a scheduled election, broadcast our offset
* to all the other replicas so that they'll updated their offsets
* if our offset is better. */
Expand Down Expand Up @@ -4692,7 +4699,8 @@ void clusterHandleReplicaMigration(int max_replicas) {
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) {
serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name,
target->human_nodename, target->shard_id);
clusterSetPrimary(target, 1);
int are_in_same_shard = areInSameShard(target, myself);
clusterSetPrimary(target, 1, !are_in_same_shard);
}
}

Expand Down Expand Up @@ -4762,7 +4770,7 @@ void clusterHandleManualFailover(void) {

if (server.cluster->mf_primary_offset == -1) return; /* Wait for offset... */

if (server.cluster->mf_primary_offset == replicationGetReplicaOffset()) {
if (server.cluster->mf_primary_offset == getNodeReplicationOffset(myself)) {
/* Our replication offset matches the primary replication offset
* announced after clients were paused. We can start the failover. */
server.cluster->mf_can_start = 1;
Expand Down Expand Up @@ -5398,7 +5406,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {

/* Set the specified node 'n' as primary for this node.
* If this node is currently a primary, it is turned into a replica. */
void clusterSetPrimary(clusterNode *n, int closeSlots) {
void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

Expand All @@ -5409,6 +5417,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
if (myself->replicaof) clusterNodeRemoveReplica(myself->replicaof, myself);
}
if (closeSlots) clusterCloseAllSlots();
if (full_sync) myself->flags |= CLUSTER_NODE_FULL_SYNC;
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
Expand Down Expand Up @@ -5773,6 +5782,15 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {

long long getNodeReplicationOffset(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
if (nodeIsPrimary(node)) {
return server.primary_repl_offset;
} else {
/* The replica has not completed sync, offset is treated as 0. */
if (myself->flags & CLUSTER_NODE_FULL_SYNC) return 0;

return replicationGetReplicaOffset();
}

return nodeIsReplica(node) ? replicationGetReplicaOffset() : server.primary_repl_offset;
} else {
return node->repl_offset;
Expand Down Expand Up @@ -6118,7 +6136,7 @@ int clusterNodeIsFailing(clusterNode *node) {
}

int clusterNodeIsNoFailover(clusterNode *node) {
return node->flags & CLUSTER_NODE_NOFAILOVER;
return nodeCantFailover(node);
}

const char **clusterDebugCommandExtendedHelp(void) {
Expand Down Expand Up @@ -6343,7 +6361,8 @@ void clusterCommandSetSlot(client *c) {
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
clusterSetPrimary(n, 1);
int are_in_same_shard = areInSameShard(n, myself);
clusterSetPrimary(n, 1, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6549,7 +6568,8 @@ int clusterCommandSpecial(client *c) {
}

/* Set the primary. */
clusterSetPrimary(n, 1);
int are_in_same_shard = areInSameShard(n, myself);
clusterSetPrimary(n, 1, !are_in_same_shard);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2
#define CLUSTER_CANT_FAILOVER_EXPIRED 3
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4
#define CLUSTER_CANT_FAILOVER_FULL_SYNC 5
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD 1 /* seconds. */

/* clusterState todo_before_sleep flags. */
Expand Down Expand Up @@ -53,6 +54,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */
#define CLUSTER_NODE_FULL_SYNC (1 << 12) /* the node is about to do a sync and the sync is not complete. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand Down
2 changes: 2 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,7 @@ void readSyncBulkPayload(connection *conn) {
} else {
replicationCreatePrimaryClient(server.repl_transfer_s, rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
if (server.cluster_enabled) server.cluster->myself->flags &= ~CLUSTER_NODE_FULL_SYNC;
/* Send the initial ACK immediately to put this replica in online state. */
replicationSendAck();
}
Expand Down Expand Up @@ -4104,6 +4105,7 @@ void establishPrimaryConnection(void) {
server.primary->flag.authenticated = 1;
server.primary->last_interaction = server.unixtime;
server.repl_state = REPL_STATE_CONNECTED;
if (server.cluster_enabled) server.cluster->myself->flags &= ~CLUSTER_NODE_FULL_SYNC;
server.repl_down_since = 0;

/* Fire the primary link modules event. */
Expand Down
117 changes: 115 additions & 2 deletions tests/unit/cluster/slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 1 CLUSTER SETSLOT 609 IMPORTING $R0_id]
# Validate that R0 is migrating slot 609 to R1
assert_equal [get_open_slots 0] "\[609->-$R1_id\]"
# Validate that R1 is importing slot 609 from R0
# Validate that R1 is importing slot 609 from R0
assert_equal [get_open_slots 1] "\[609-<-$R0_id\]"
# Validate final states
wait_for_slot_state 0 "\[609->-$R1_id\]"
Expand Down Expand Up @@ -244,7 +244,7 @@ start_cluster 3 5 {tags {external:skip cluster} overrides {cluster-allow-replica
assert_equal {OK} [R 6 CLUSTER SETSLOT 609 IMPORTING $R0_id]
# Validate that R0 is migrating slot 609 to R6
assert_equal [get_open_slots 0] "\[609->-$R6_id\]"
# Validate that R6 is importing slot 609 from R0
# Validate that R6 is importing slot 609 from R0
assert_equal [get_open_slots 6] "\[609-<-$R0_id\]"
# Validate final states
wait_for_slot_state 0 "\[609->-$R6_id\]"
Expand Down Expand Up @@ -435,3 +435,116 @@ start_cluster 2 0 {tags {tls:skip external:skip cluster regression} overrides {c
R 0 MIGRATE 127.0.0.1 [lindex [R 1 CONFIG GET port] 1] $stream_name 0 5000
}
}

# Allocate slot 0 to the last primary and evenly distribute the remaining
# slots to the remaining primary.
proc my_slot_allocation {masters replicas} {
set avg [expr double(16384) / [expr $masters-1]]
set slot_start 1
for {set j 0} {$j < $masters-1} {incr j} {
set slot_end [expr int(ceil(($j + 1) * $avg) - 1)]
R $j cluster addslotsrange $slot_start $slot_end
set slot_start [expr $slot_end + 1]
}
R [expr $masters-1] cluster addslots 0
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000}} {
test "If a replica has not completed sync, it can not do the failover" {
# Write some data to primary 4, slot 0
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 1 will hang in the save
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 4 to primary 1
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
puts "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown the primary 1
catch {R 0 shutdown nosave}

# Wait for the replica become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
fail "Failover does not happened"
}

# Make sure the key is exists and consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_977613] == 10240 &&
[R 4 get key_977613] == 10240 &&
[R 7 get key_977613] == 10240
} else {
fail "Key not consistent"
}

# Make sure we print the logs.
verify_log_message -3 "*Replica has not completed sync and can not perform failover*" 0
verify_log_message -7 "*Replica has not completed sync and can not perform failover*" 0
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000}} {
test "If a replica has not completed sync, it can not do the failover" {
# Write some data to primary 0, slot 1, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 0 incr key_991803
}
assert_equal {10240} [R 0 get key_991803]

# 10s, make sure primary 1 will hang in the save
R 0 config set rdb-key-save-delay 100000000

# Let the replica do the replicate with primary 1.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

catch {R 0 shutdown nosave}

# 0 1 2 3
# 4 5 6
# 7

# Wait for the replica become a primary
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -7 role] eq {slave}
} else {
fail "Failover does not happened"
}

# Make sure the key is exists and consistence.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 10240 &&
[R 7 get key_991803] == 10240
} else {
fail "Key not consistent"
}

# Make sure we print the logs.
verify_log_message -7 "*Replica has not completed sync and can not perform failover*" 0
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit 1e02cb7

Please sign in to comment.