diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 65d8081978..469530adcc 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6224,6 +6224,9 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out, return 0; } + /* If 'myself' is a replica, 'c' must be the primary client. */ + serverAssert(!nodeIsReplica(myself) || c == server.primary); + if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0; if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) { @@ -6412,20 +6415,27 @@ void clusterCommandSetSlot(client *c) { server.cluster->migrating_slots_to[slot] = NULL; } - int slot_was_mine = server.cluster->slots[slot] == myself; + clusterNode *my_primary = clusterNodeGetPrimary(myself); + int slot_was_mine = server.cluster->slots[slot] == my_primary; clusterDelSlot(slot); clusterAddSlot(n, slot); - /* If we are a primary left without slots, we should turn into a - * replica of the new primary. */ - if (slot_was_mine && n != myself && myself->numslots == 0 && server.cluster_allow_replica_migration) { + /* If replica migration is allowed, check if the primary of this shard + * loses its last slot and the shard becomes empty. In this case, we + * should turn into a replica of the new primary. */ + if (server.cluster_allow_replica_migration && slot_was_mine && my_primary->numslots == 0) { + serverAssert(n != my_primary); serverLog(LL_NOTICE, "Lost my last slot during slot migration. Reconfiguring myself " "as a replica of %.40s (%s) in shard %.40s", n->name, n->human_nodename, n->shard_id); + /* `c` is the primary client if `myself` is a replica, prevent it + * from being freed by clusterSetPrimary. */ + if (nodeIsReplica(myself)) protectClient(c); /* We are migrating to a different shard that has a completely different * replication history, so a full sync is required. */ clusterSetPrimary(n, 1, 1); + if (nodeIsReplica(myself)) unprotectClient(c); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } diff --git a/src/networking.c b/src/networking.c index 8f8db33888..44a94087c9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1671,7 +1671,7 @@ void freeClient(client *c) { * some unexpected state, by checking its flags. */ if (server.primary && c->flag.primary) { serverLog(LL_NOTICE, "Connection with primary lost."); - if (!(c->flag.protocol_error || c->flag.blocked)) { + if (!c->flag.dont_cache_primary && !(c->flag.protocol_error || c->flag.blocked)) { c->flag.close_asap = 0; c->flag.close_after_reply = 0; replicationCachePrimary(c); @@ -2553,6 +2553,7 @@ void freeSharedQueryBuf(void) { * * * DEBUG RELOAD and similar. * * When a Lua script is in -BUSY state. + * * A cluster replica executing CLUSTER SETSLOT during slot migration. * * So the function will protect the client by doing two things: * diff --git a/src/replication.c b/src/replication.c index a7040e2261..3b7b3b1709 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3740,6 +3740,12 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) { sdsfree(server.primary_host); server.primary_host = NULL; if (server.primary) { + /* When joining 'myself' to a new primary, set the dont_cache_primary flag + * if a full sync is required. This happens when 'myself' was previously + * part of a different shard from the new primary. Since 'myself' does not + * have the replication history of the shard it is joining, clearing the + * cached primary is necessary to ensure proper replication behavior. */ + server.primary->flag.dont_cache_primary = full_sync_required; freeClient(server.primary); } disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ @@ -3768,14 +3774,6 @@ void replicationSetPrimary(char *ip, int port, int full_sync_required) { replicationCachePrimaryUsingMyself(); } - /* If full sync is required, drop the cached primary. Doing so increases - * this replica node's election rank (delay) and reduces its chance of - * winning the election. If a replica requiring a full sync wins the - * election, it will flush valid data in the shard, causing data loss. */ - if (full_sync_required) { - replicationDiscardCachedPrimary(); - } - /* Fire the role change modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, NULL); diff --git a/src/server.h b/src/server.h index fbe57917c8..a5cee03055 100644 --- a/src/server.h +++ b/src/server.h @@ -1239,7 +1239,10 @@ typedef struct ClientFlags { * By using this flag, we ensure that the RDB client remains intact until the replica * \ has successfully initiated PSYNC. */ uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ - uint64_t reserved : 7; /* Reserved for future use */ + uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica + * knows that it does not need the cache and required a full sync. With this + * flag, we won't cache the primary in freeClient. */ + uint64_t reserved : 6; /* Reserved for future use */ } ClientFlags; typedef struct client { diff --git a/tests/unit/cluster/replica-migration.tcl b/tests/unit/cluster/replica-migration.tcl index 8053859c69..b40bfcec22 100644 --- a/tests/unit/cluster/replica-migration.tcl +++ b/tests/unit/cluster/replica-migration.tcl @@ -351,11 +351,17 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout test_sub_replica "sigstop" } my_slot_allocation cluster_allocate_replicas ;# start_cluster -start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { - test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT" { +proc test_cluster_setslot {type} { + test "valkey-cli make source node ignores NOREPLICAS error when doing the last CLUSTER SETSLOT - $type" { R 3 config set cluster-allow-replica-migration no R 7 config set cluster-allow-replica-migration yes + if {$type == "setslot"} { + # Make R 7 drop the PING message so that we have a higher + # chance to trigger the migration from CLUSTER SETSLOT. + R 7 DEBUG DROP-CLUSTER-PACKET-FILTER 1 + } + # Move slot 0 from primary 3 to primary 0. set addr "[srv 0 host]:[srv 0 port]" set myid [R 3 CLUSTER MYID] @@ -366,6 +372,13 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" } + # Wait for R 3 to report that it is an empty replica (cluster-allow-replica-migration no) + wait_for_log_messages -3 {"*I am now an empty primary*"} 0 1000 50 + + if {$type == "setslot"} { + R 7 DEBUG DROP-CLUSTER-PACKET-FILTER -1 + } + # Make sure server 3 lost its replica (server 7) and server 7 becomes a replica of primary 0. wait_for_condition 1000 50 { [s -3 role] eq {master} && @@ -378,4 +391,12 @@ start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout fail "Server 3 and 7 role response has not changed" } } +} + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { + test_cluster_setslot "gossip" +} my_slot_allocation cluster_allocate_replicas ;# start_cluster + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { + test_cluster_setslot "setslot" } my_slot_allocation cluster_allocate_replicas ;# start_cluster