Skip to content

Commit

Permalink
Fix data loss when replica do a failover with a old history repl offs…
Browse files Browse the repository at this point in the history
…et (valkey-io#885)

Our current replica can initiate a failover without restriction when
it detects that the primary node is offline. This is generally not a
problem. However, consider the following scenarios:

1. In slot migration, a primary loses its last slot and then becomes
a replica. When it is fully synchronized with the new primary, the new
primary downs.

2. In CLUSTER REPLICATE command, a replica becomes a replica of another
primary. When it is fully synchronized with the new primary, the new
primary downs.

In the above scenario, case 1 may cause the empty primary to be elected
as the new primary, resulting in primary data loss. Case 2 may cause the
non-empty replica to be elected as the new primary, resulting in data
loss and confusion.

The reason is that we have cached primary logic, which is used for psync.
In the above scenario, when clusterSetPrimary is called, myself will cache
server.primary in server.cached_primary for psync. In replicationGetReplicaOffset,
we get server.cached_primary->reploff for offset, gossip it and rank it,
which causes the replica to use the old historical offset to initiate
failover, and it get a good rank, initiates election first, and then is
elected as the new primary.

The main problem here is that when the replica has not completed full
sync, it may get the historical offset in replicationGetReplicaOffset.

The fix is to clear cached_primary in these places where full sync is
obviously needed, and let the replica use offset == 0 to participate
in the election. In this way, this unhealthy replica has a worse rank
and is not easy to be elected.

Of course, it is possible that it will be elected with offset == 0.
In the future, we may need to prohibit the replica with offset == 0
from having the right to initiate elections.

Another point worth mentioning, in above cases:
1. In the ROLE command, the replica status will be handshake, and the
offset will be -1.
2. Before this PR, in the CLUSTER SHARD command, the replica status will
be online, and the offset will be the old cached value (which is wrong).
3. After this PR, in the CLUSTER SHARD, the replica status will be loading,
and the offset will be 0.

Signed-off-by: Binbin <[email protected]>
Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed Sep 14, 2024
1 parent 3fa4b60 commit f69085f
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 8 deletions.
26 changes: 19 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3416,6 +3416,25 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
Expand Down Expand Up @@ -6584,13 +6603,6 @@ int clusterCommandSpecial(client *c) {
return 1;
}

/* If `n` is already my primary, there is no need to re-establish the
* replication connection. */
if (myself->replicaof == n) {
addReply(c, shared.ok);
return 1;
}

/* Set the primary.
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3913,7 +3913,7 @@ void replicaofCommand(client *c) {
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port, 0);
sds client = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
sdsfree(client);
Expand Down
159 changes: 159 additions & 0 deletions tests/unit/cluster/replica_migration.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Allocate slot 0 to the last primary and evenly distribute the remaining
# slots to the remaining primaries.
proc my_slot_allocation {masters replicas} {
set avg [expr double(16384) / [expr $masters-1]]
set slot_start 1
for {set j 0} {$j < $masters-1} {incr j} {
set slot_end [expr int(ceil(($j + 1) * $avg) - 1)]
R $j cluster addslotsrange $slot_start $slot_end
set slot_start [expr $slot_end + 1]
}
R [expr $masters-1] cluster addslots 0
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "Migrated replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 3 to primary 0
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Validate that shard 3's primary and replica can convert to replicas after
# they lose the last slot.
R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -3 role: [s -3 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure the offset of server 3 / 7 is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the right replica gets the higher rank.
verify_log_message -4 "*Start of election*rank #0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 3 cluster_state] eq "ok" &&
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 3: [R 3 cluster info]"
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 &&
[R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 &&
[R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240
} else {
puts "R 3: [R 3 keys *]"
puts "R 4: [R 4 keys *]"
puts "R 7: [R 7 keys *]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "New non-empty replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Make server 7 a replica of server 0.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure server 7 gets the lower rank and it's offset is 0.
verify_log_message -4 "*Start of election*rank #0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 1024 &&
[R 7 get key_991803] == 1024
} else {
puts "R 4: [R 4 get key_991803]"
puts "R 7: [R 7 get key_991803]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit f69085f

Please sign in to comment.