Skip to content

Commit

Permalink
Fix data loss when replica do a failover with a old history repl offset
Browse files Browse the repository at this point in the history
Our current replica can initiate a failover without restriction when
it detects that the primary node is offline. This is generally not a
problem. However, consider the following scenarios:

1. In slot migration, a primary loses its last slot and then becomes
a replica. When it is fully synchronized with the new primary, the new
primary downs.

2. In CLUSTER REPLICATE command, a replica becomes a replica of another
primary. When it is fully synchronized with the new primary, the new
primary downs.

In the above scenario, case 1 may cause the empty primary to be elected
as the new primary, resulting in primary data loss. Case 2 may cause the
non-empty replica to be elected as the new primary, resulting in data
loss and confusion.

The reason is that we have cached primary logic, which is used for psync.
In the above scenario, when clusterSetPrimary is called, myself will cache
server.primary in server.cached_primary for psync. In replicationGetReplicaOffset,
we get server.cached_primary->reploff for offset, gossip it and rank it,
which causes the replica to use the old historical offset to initiate
failover, and it get a good rank, initiates election first, and then is
elected as the new primary.

The main problem here is that when the replica has not completed full
sync, it may get the historical offset in replicationGetReplicaOffset.

The fix is to clear cached_primary in these places where full sync is
obviously needed, and let the replica use offset == 0 to participate
in the election. In this way, this unhealthy replica has a worse rank
and is not easy to be elected.

Of course, it is possible that it will be elected with offset == 0.
In the future, we may need to prohibit the replica with offset == 0
from having the right to initiate elections.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Aug 11, 2024
1 parent 7424620 commit e41044f
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 25 deletions.
57 changes: 38 additions & 19 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetPrimary(clusterNode *n, int closeSlots);
void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync);
void clusterHandleReplicaFailover(void);
void clusterHandleReplicaMigration(int max_replicas);
int bitmapTestBit(unsigned char *bitmap, int pos);
Expand Down Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node));
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1);
return 1;
}

Expand Down Expand Up @@ -2432,6 +2432,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
return;
}

/* Sender and myself in the same shard? */
int are_in_same_shard = areInSameShard(sender, myself);

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots, j)) {
sender_slots++;
Expand Down Expand Up @@ -2474,7 +2477,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the same shard and we should retain the migrating_slots_to state
* for the slot in question */
if (server.cluster->migrating_slots_to[j] != NULL) {
if (!areInSameShard(sender, myself)) {
if (!are_in_same_shard) {
serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.",
j, server.cluster->migrating_slots_to[j]->name,
server.cluster->migrating_slots_to[j]->human_nodename,
Expand Down Expand Up @@ -2595,7 +2598,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the new primary if my current config epoch is lower than the
* sender's. */
if (!new_primary && myself->replicaof != sender && sender_slots == 0 && myself->numslots == 0 &&
nodeEpoch(myself) < senderConfigEpoch && areInSameShard(sender, myself)) {
nodeEpoch(myself) < senderConfigEpoch && are_in_same_shard) {
new_primary = sender;
}

Expand All @@ -2619,16 +2622,18 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* sender. In this case we don't reconfigure ourselves as a replica
* of the sender. */
if (new_primary && cur_primary->numslots == 0) {
if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) {
if (server.cluster_allow_replica_migration || are_in_same_shard) {
serverLog(LL_NOTICE,
"Configuration change detected. Reconfiguring myself "
"as a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, sender->shard_id);
/* Don't clear the migrating/importing states if this is a replica that
* just gets promoted to the new primary in the shard. */
clusterSetPrimary(sender, !areInSameShard(sender, myself));
* just gets promoted to the new primary in the shard.
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
} else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) {
} else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
* migration. Don't reconfigure this node to migrate to the new shard
Expand Down Expand Up @@ -3383,12 +3388,19 @@ int clusterProcessPacket(clusterLink *link) {
/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterSetPrimary(myself->replicaof->replicaof, 1, areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -4692,7 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) {
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) {
serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name,
target->human_nodename, target->shard_id);
clusterSetPrimary(target, 1);
/* We are migrating to a different primary, a total different replication
* history, so a full sync is required. */
clusterSetPrimary(target, 1, 0);
}
}

Expand Down Expand Up @@ -5005,7 +5019,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof));
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5398,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {

/* Set the specified node 'n' as primary for this node.
* If this node is currently a primary, it is turned into a replica. */
void clusterSetPrimary(clusterNode *n, int closeSlots) {
void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

Expand All @@ -5412,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n));
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), try_psync);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down Expand Up @@ -6343,7 +6357,9 @@ void clusterCommandSetSlot(client *c) {
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
clusterSetPrimary(n, 1);
/* We are migrating to a different primary, a total different replication
* history, so a full sync is required. */
clusterSetPrimary(n, 1, 0);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6548,8 +6564,11 @@ int clusterCommandSpecial(client *c) {
return 1;
}

/* Set the primary. */
clusterSetPrimary(n, 1);
/* Set the primary.
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 0);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
Expand Down
17 changes: 12 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3723,7 +3723,7 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port) {
void replicationSetPrimary(char *ip, int port, int try_psync) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
Expand All @@ -3749,13 +3749,20 @@ void replicationSetPrimary(char *ip, int port) {
* sync with new primary. */

cancelReplicationHandshake(0);

/* Before destroying our primary state, create a cached primary using
* our own parameters, to later PSYNC with the new primary. */
if (was_primary) {
if (was_primary && try_psync) {
replicationDiscardCachedPrimary();
replicationCachePrimaryUsingMyself();
}

/* If try_psync is 0, it means the caller requested that psync not be
* performed, dropping the cached primary. */
if (!try_psync) {
replicationDiscardCachedPrimary();
}

/* Fire the role change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
Expand Down Expand Up @@ -3893,7 +3900,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port);
replicationSetPrimary(c->argv[1]->ptr, port, 1);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4902,7 +4909,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4945,6 +4952,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1);
}
}
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3020,7 +3020,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port);
void replicationSetPrimary(char *ip, int port, int try_psync);
void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand Down
141 changes: 141 additions & 0 deletions tests/unit/cluster/replica_migration.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Allocate slot 0 to the last primary and evenly distribute the remaining
# slots to the remaining primary.
proc my_slot_allocation {masters replicas} {
set avg [expr double(16384) / [expr $masters-1]]
set slot_start 1
for {set j 0} {$j < $masters-1} {incr j} {
set slot_end [expr int(ceil(($j + 1) * $avg) - 1)]
R $j cluster addslotsrange $slot_start $slot_end
set slot_start [expr $slot_end + 1]
}
R [expr $masters-1] cluster addslots 0
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "Write some data to primary 0, slot 1, make a small repl_offset" {
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]
}

test "Write some data to primary 4, slot 0, make a big repl_offset" {
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]
}

test "If a replica has not completed sync, it can not do the failover" {
# 10s, make sure primary 1 will hang in the save
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 4 to primary 1
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Let primary 4's primary and replica can convert to replicas when
# they lost the last slot.
R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown the primary 1
catch {R 0 shutdown nosave}

# Wait for the replica become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -3 role: [s -3 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure 3 / 7 get the lower rank and offset is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the right replica get the higher rank.
verify_log_message -4 "*Start of election*rank #0*" 0

# Make sure the key is exists and consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 &&
[R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 &&
[R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240
} else {
puts "R 3: [R 3 keys *]"
puts "R 4: [R 4 keys *]"
puts "R 7: [R 7 keys *]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "Write some data to primary 0, slot 1, make a small repl_offset" {
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]
}

test "Write some data to primary 4, slot 0, make a big repl_offset" {
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]
}

test "A old replica use CLUSTER REPLICA get a zero offset before the full sync is completed" {
# 10s, make sure primary 1 will hang in the save
R 0 config set rdb-key-save-delay 100000000

# Let the replica do the replicate with primary 1.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

# Shutdown the primary 1.
catch {R 0 shutdown nosave}

# Wait for the replica become a primary.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure 7 get the lower rank and it's offset is 0.
verify_log_message -4 "*Start of election*rank #0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the key is exists and consistence.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 1024 &&
[R 7 get key_991803] == 1024
} else {
puts "R 4: [R 4 get key_991803]"
puts "R 7: [R 7 get key_991803]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit e41044f

Please sign in to comment.