Skip to content

Commit

Permalink
Fix reconfiguring sub-replica causing data loss when myself change sh…
Browse files Browse the repository at this point in the history
…ard_id (#944)

When reconfiguring sub-replica, there may a case that the sub-replica will
use the old offset and win the election and cause the data loss if the old
primary went down.

In this case, sender is myself's primary, when executing updateShardId,
not only the sender's shard_id is updated, but also the shard_id of
myself is updated, casuing the subsequent areInSameShard check, that is,
the full_sync_required check to fail.

As part of the recent fix of #885, the sub-replica needs to decide whether
a full sync is required or not when switching shards. This shard membership
check is supposed to be done against sub-replica's current shard_id, which
however was lost in this code path. This then leads to sub-replica joining
the other shard with a completely different and incorrect replication history.

This is the only place where replicaof state can be updated on this path
so the most natural fix would be to pull the chain replication reduction
logic into this code block and before the updateShardId call.

This one follow #885 and closes #942.

Signed-off-by: Binbin <[email protected]>
Co-authored-by: Ping Xie <[email protected]>
  • Loading branch information
enjoy-binbin and PingXie authored Aug 29, 2024
1 parent 4a9b4f6 commit ecbfb6a
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 21 deletions.
54 changes: 34 additions & 20 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3312,10 +3312,43 @@ int clusterProcessPacket(clusterLink *link) {
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, sender_claimed_primary->name,
sender_claimed_primary->human_nodename, sender->shard_id);
sender_claimed_primary->human_nodename, sender_claimed_primary->shard_id);
clusterNodeAddReplica(sender_claimed_primary, sender);
sender->replicaof = sender_claimed_primary;

/* The chain reduction logic requires correctly establishing the replication relationship.
* A key decision when designating a new primary for 'myself' is determining whether
* 'myself' and the new primary belong to the same shard, which would imply shared
* replication history and allow a safe partial synchronization (psync).
*
* This decision hinges on the shard_id, a per-node property that helps verify if the
* two nodes share the same replication history. It's critical not to update myself's
* shard_id prematurely during this process. Doing so could incorrectly associate
* 'myself' with the sender's shard_id, leading the subsequent clusterSetPrimary call
* to falsely assume that 'myself' and the new primary have been in the same shard.
* This mistake could result in data loss by incorrectly permitting a psync.
*
* Therefore, it's essential to delay any shard_id updates until after the replication
* relationship has been properly established and verified. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1,
!areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE |
CLUSTER_TODO_FSYNC_CONFIG);
}

/* Update the shard_id when a replica is connected to its
* primary in the very first time. */
updateShardId(sender, sender_claimed_primary->shard_id);
Expand Down Expand Up @@ -3383,25 +3416,6 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
Expand Down
128 changes: 127 additions & 1 deletion tests/unit/cluster/replica-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ proc my_slot_allocation {masters replicas} {
R [expr $masters-1] cluster addslots 0
}

proc get_my_primary_peer {srv_idx} {
set role_response [R $srv_idx role]
set primary_ip [lindex $role_response 1]
set primary_port [lindex $role_response 2]
set primary_peer "$primary_ip:$primary_port"
return $primary_peer
}

proc test_migrated_replica {type} {
test "Migrated replica reports zero repl offset and rank, and fails to win election - $type" {
# Write some data to primary 0, slot 1, make a small repl_offset.
Expand Down Expand Up @@ -190,7 +198,7 @@ proc test_nonempty_replica {type} {
if {$type == "sigstop"} {
resume_process $primary0_pid

# Waiting the old primary go online and become a replica.
# Wait for the old primary to go online and become a replica.
wait_for_condition 1000 50 {
[s 0 role] eq {slave}
} else {
Expand All @@ -208,6 +216,124 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout
test_nonempty_replica "sigstop"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

proc test_sub_replica {type} {
test "Sub-replica reports zero repl offset and rank, and fails to win election - $type" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration no

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Move slot 0 from primary 3 to primary 0.
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Make sure server 3 and server 7 becomes a replica of primary 0.
wait_for_condition 1000 50 {
[get_my_primary_peer 3] eq $addr &&
[get_my_primary_peer 7] eq $addr
} else {
puts "R 3 role: [R 3 role]"
puts "R 7 role: [R 7 role]"
fail "Server 3 and 7 role response has not changed"
}

# Make sure server 7 got a sub-replica log.
verify_log_message -7 "*I'm a sub-replica!*" 0

if {$type == "shutdown"} {
# Shutdown primary 0.
catch {R 0 shutdown nosave}
} elseif {$type == "sigstop"} {
# Pause primary 0.
set primary0_pid [s 0 process_id]
pause_process $primary0_pid
}

# Wait for the replica to become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -3 role: [s -3 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure the offset of server 3 / 7 is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 3 cluster_state] eq "ok" &&
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 3: [R 3 cluster info]"
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 &&
[R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 &&
[R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240
} else {
puts "R 3: [R 3 keys *]"
puts "R 4: [R 4 keys *]"
puts "R 7: [R 7 keys *]"
fail "Key not consistent"
}

if {$type == "sigstop"} {
resume_process $primary0_pid

# Wait for the old primary to go online and become a replica.
wait_for_condition 1000 50 {
[s 0 role] eq {slave}
} else {
fail "The old primary was not converted into replica"
}
}
}
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_sub_replica "shutdown"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test_sub_replica "sigstop"
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT" {
R 3 config set cluster-allow-replica-migration no
Expand Down

0 comments on commit ecbfb6a

Please sign in to comment.