diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index d80144d3fc..e895bfca6f 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); -void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync); +static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required); void clusterHandleReplicaFailover(void); void clusterHandleReplicaMigration(int max_replicas); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * /* Check if this is our primary and we have to change the * replication target as well. */ if (nodeIsReplica(myself) && myself->replicaof == node) - replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1); + replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0); return 1; } @@ -2631,7 +2631,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * just gets promoted to the new primary in the shard. * * If the sender and myself are in the same shard, try psync. */ - clusterSetPrimary(sender, !are_in_same_shard, are_in_same_shard); + clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) { /* When all our slots are lost to the sender and the sender belongs to @@ -3400,7 +3400,7 @@ int clusterProcessPacket(clusterLink *link) { * so we can try a psync. */ serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1, areInSameShard(myself->replicaof->replicaof, myself)); + clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself)); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -4704,9 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) { !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name, target->human_nodename, target->shard_id); - /* We are migrating to a different primary, a total different replication - * history, so a full sync is required. */ - clusterSetPrimary(target, 1, 0); + /* We are migrating to a different shard that has a completely different + * replication history, so a full sync is required. */ + clusterSetPrimary(target, 1, 1); } } @@ -5019,7 +5019,7 @@ void clusterCron(void) { * enable it if we know the address of our primary and it appears to * be up. */ if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) { - replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1); + replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0); } /* Abort a manual failover if the timeout is reached. */ @@ -5412,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { /* Set the specified node 'n' as primary for this node. * If this node is currently a primary, it is turned into a replica. */ -void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync) { +static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required) { serverAssert(n != myself); serverAssert(myself->numslots == 0); @@ -5426,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots, int try_psync) { myself->replicaof = n; updateShardId(myself, n->shard_id); clusterNodeAddReplica(n, myself); - replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), try_psync); + replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), full_sync_required); removeAllNotOwnedShardChannelSubscriptions(); resetManualFailover(); @@ -6357,9 +6357,9 @@ void clusterCommandSetSlot(client *c) { "Lost my last slot during slot migration. Reconfiguring myself " "as a replica of %.40s (%s) in shard %.40s", n->name, n->human_nodename, n->shard_id); - /* We are migrating to a different primary, a total different replication - * history, so a full sync is required. */ - clusterSetPrimary(n, 1, 0); + /* We are migrating to a different shard that has a completely different + * replication history, so a full sync is required. */ + clusterSetPrimary(n, 1, 1); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -6568,7 +6568,7 @@ int clusterCommandSpecial(client *c) { * If the instance is a primary, it is an empty primary. * If the instance is a replica, it had a totally different replication history. * In these both cases, myself as a replica has to do a full sync. */ - clusterSetPrimary(n, 1, 0); + clusterSetPrimary(n, 1, 1); clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); addReply(c, shared.ok); diff --git a/src/replication.c b/src/replication.c index 76fde030ab..2e353df1d1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3723,7 +3723,7 @@ int cancelReplicationHandshake(int reconnect) { } /* Set replication to the specified primary address and port. */ -void replicationSetPrimary(char *ip, int port, int try_psync) { +void replicationSetPrimary(char *ip, int port, int full_sync_required) { int was_primary = server.primary_host == NULL; sdsfree(server.primary_host); @@ -3752,14 +3752,16 @@ void replicationSetPrimary(char *ip, int port, int try_psync) { /* Before destroying our primary state, create a cached primary using * our own parameters, to later PSYNC with the new primary. */ - if (was_primary && try_psync) { + if (was_primary && !full_sync_required) { replicationDiscardCachedPrimary(); replicationCachePrimaryUsingMyself(); } - /* If try_psync is 0, it means the caller requested that psync not be - * performed, dropping the cached primary. */ - if (!try_psync) { + /* If full sync is required, drop the cached primary. Doing so increases + * this replica node's election rank (delay) and reduces its chance of + * winning the election. If a replica requiring a full sync wins the + * election, it will flush valid data in the shard, causing data loss. */ + if (full_sync_required) { replicationDiscardCachedPrimary(); } @@ -3900,7 +3902,7 @@ void replicaofCommand(client *c) { } /* There was no previous primary or the user specified a different one, * we can continue. */ - replicationSetPrimary(c->argv[1]->ptr, port, 1); + replicationSetPrimary(c->argv[1]->ptr, port, 0); sds client = catClientInfoString(sdsempty(), c); serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host, server.primary_port, client); @@ -4909,7 +4911,7 @@ void updateFailoverStatus(void) { server.target_replica_port); server.failover_state = FAILOVER_IN_PROGRESS; /* If timeout has expired force a failover if requested. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); return; } else { /* Force was not requested, so timeout. */ @@ -4952,6 +4954,6 @@ void updateFailoverStatus(void) { serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host, server.target_replica_port); /* Designated replica is caught up, failover to it. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port, 1); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); } } diff --git a/src/server.h b/src/server.h index 2acf78e1e1..6ee4d5e05e 100644 --- a/src/server.h +++ b/src/server.h @@ -3020,7 +3020,7 @@ void replicationStartPendingFork(void); void replicationHandlePrimaryDisconnection(void); void replicationCachePrimary(client *c); void resizeReplicationBacklog(void); -void replicationSetPrimary(char *ip, int port, int try_psync); +void replicationSetPrimary(char *ip, int port, int full_sync_required); void replicationUnsetPrimary(void); void refreshGoodReplicasCount(void); int checkGoodReplicasStatus(void); diff --git a/tests/unit/cluster/replica_migration.tcl b/tests/unit/cluster/replica_migration.tcl index e3927027f9..0c81fe54bf 100644 --- a/tests/unit/cluster/replica_migration.tcl +++ b/tests/unit/cluster/replica_migration.tcl @@ -19,18 +19,18 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout assert_equal {1024} [R 0 get key_991803] } - test "Write some data to primary 4, slot 0, make a big repl_offset" { + test "Write some data to primary 3, slot 0, make a big repl_offset" { for {set i 0} {$i < 10240} {incr i} { R 3 incr key_977613 } assert_equal {10240} [R 3 get key_977613] } - test "If a replica has not completed sync, it can not do the failover" { - # 10s, make sure primary 1 will hang in the save + test "If a replica has not completed full sync, the offset is 0 and the rank is the lowest" { + # 10s, make sure primary 0 will hang in the save. R 0 config set rdb-key-save-delay 100000000 - # Move the slot 0 from primary 4 to primary 1 + # Move the slot 0 from primary 3 to primary 0 set addr "[srv 0 host]:[srv 0 port]" set myid [R 3 CLUSTER MYID] set code [catch { @@ -40,15 +40,15 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" } - # Let primary 4's primary and replica can convert to replicas when - # they lost the last slot. + # Validate that shard 3's primary and replica can convert to replicas after + # they lose the last slot. R 3 config set cluster-replica-validity-factor 0 R 7 config set cluster-replica-validity-factor 0 R 3 config set cluster-allow-replica-migration yes R 7 config set cluster-allow-replica-migration yes - # Shutdown the primary 1 - catch {R 0 shutdown nosave} + # Shutdown the primary 0. + catch {R 0 shutdown} # Wait for the replica become a primary, and make sure # the other primary become a replica. @@ -63,14 +63,14 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout fail "Failover does not happened" } - # Make sure 3 / 7 get the lower rank and offset is 0. + # Make sure 3 / 7 get the lower rank and the offset is 0. verify_log_message -3 "*Start of election*offset 0*" 0 verify_log_message -7 "*Start of election*offset 0*" 0 # Make sure the right replica get the higher rank. verify_log_message -4 "*Start of election*rank #0*" 0 - # Make sure the key is exists and consistent. + # Make sure the key exists and is consistent. R 3 readonly R 7 readonly wait_for_condition 1000 50 { @@ -94,7 +94,7 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout assert_equal {1024} [R 0 get key_991803] } - test "Write some data to primary 4, slot 0, make a big repl_offset" { + test "Write some data to primary 3, slot 0, make a big repl_offset" { for {set i 0} {$i < 10240} {incr i} { R 3 incr key_977613 } @@ -102,16 +102,16 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout } test "A old replica use CLUSTER REPLICA get a zero offset before the full sync is completed" { - # 10s, make sure primary 1 will hang in the save + # 10s, make sure primary 0 will hang in the save. R 0 config set rdb-key-save-delay 100000000 - # Let the replica do the replicate with primary 1. + # Let the replica do the replicate with primary 0. R 7 config set cluster-replica-validity-factor 0 R 7 config set cluster-allow-replica-migration yes R 7 cluster replicate [R 0 cluster myid] - # Shutdown the primary 1. - catch {R 0 shutdown nosave} + # Shutdown the primary 0. + catch {R 0 shutdown} # Wait for the replica become a primary. wait_for_condition 1000 50 { @@ -127,7 +127,7 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout verify_log_message -4 "*Start of election*rank #0*" 0 verify_log_message -7 "*Start of election*offset 0*" 0 - # Make sure the key is exists and consistence. + # Make sure the key exists and is consistent. R 7 readonly wait_for_condition 1000 50 { [R 4 get key_991803] == 1024 &&