diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index ad0b78d71a..f8f60cf8c3 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * /* Check if this is our primary and we have to change the * replication target as well. */ if (nodeIsReplica(myself) && myself->replicaof == node) - replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node)); + replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1); return 1; } @@ -5016,7 +5016,7 @@ void clusterCron(void) { * enable it if we know the address of our primary and it appears to * be up. */ if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) { - replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof)); + replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1); } /* Abort a manual failover if the timeout is reached. */ @@ -5423,7 +5423,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { myself->replicaof = n; updateShardId(myself, n->shard_id); clusterNodeAddReplica(n, myself); - replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n)); + replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), 1); removeAllNotOwnedShardChannelSubscriptions(); resetManualFailover(); diff --git a/src/replication.c b/src/replication.c index ce57e5f89a..3aa80fda12 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3535,7 +3535,7 @@ void syncWithPrimary(connection *conn) { * could happen in edge cases. */ if (server.failover_state == FAILOVER_IN_PROGRESS) { if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) { - clearFailoverState(); + clearFailoverState(1); } else { abortFailover("Failover target rejected psync request"); return; @@ -3723,7 +3723,7 @@ int cancelReplicationHandshake(int reconnect) { } /* Set replication to the specified primary address and port. */ -void replicationSetPrimary(char *ip, int port) { +void replicationSetPrimary(char *ip, int port, int disconnect_blocked) { int was_primary = server.primary_host == NULL; sdsfree(server.primary_host); @@ -3731,7 +3731,7 @@ void replicationSetPrimary(char *ip, int port) { if (server.primary) { freeClient(server.primary); } - disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ + if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ /* Setting primary_host only after the call to freeClient since it calls * replicationHandlePrimaryDisconnection which can trigger a re-connect @@ -3893,7 +3893,7 @@ void replicaofCommand(client *c) { } /* There was no previous primary or the user specified a different one, * we can continue. */ - replicationSetPrimary(c->argv[1]->ptr, port); + replicationSetPrimary(c->argv[1]->ptr, port, 1); sds client = catClientInfoString(sdsempty(), c); serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host, server.primary_port, client); @@ -4732,13 +4732,14 @@ const char *getFailoverStateString(void) { /* Resets the internal failover configuration, this needs * to be called after a failover either succeeds or fails * as it includes the client unpause. */ -void clearFailoverState(void) { +void clearFailoverState(int success) { server.failover_end_time = 0; server.force_failover = 0; zfree(server.target_replica_host); server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; + if (success) disconnectAllBlockedClients(); unpauseActions(PAUSE_DURING_FAILOVER); } @@ -4755,7 +4756,7 @@ void abortFailover(const char *err) { if (server.failover_state == FAILOVER_IN_PROGRESS) { replicationUnsetPrimary(); } - clearFailoverState(); + clearFailoverState(0); } /* @@ -4902,7 +4903,7 @@ void updateFailoverStatus(void) { server.target_replica_port); server.failover_state = FAILOVER_IN_PROGRESS; /* If timeout has expired force a failover if requested. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); return; } else { /* Force was not requested, so timeout. */ @@ -4945,6 +4946,6 @@ void updateFailoverStatus(void) { serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host, server.target_replica_port); /* Designated replica is caught up, failover to it. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); } } diff --git a/src/server.h b/src/server.h index ccdece20dd..72986f5439 100644 --- a/src/server.h +++ b/src/server.h @@ -3020,7 +3020,7 @@ void replicationStartPendingFork(void); void replicationHandlePrimaryDisconnection(void); void replicationCachePrimary(client *c); void resizeReplicationBacklog(void); -void replicationSetPrimary(char *ip, int port); +void replicationSetPrimary(char *ip, int port, int disconnect_blocked); void replicationUnsetPrimary(void); void refreshGoodReplicasCount(void); int checkGoodReplicasStatus(void); @@ -3046,7 +3046,7 @@ void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder); -void clearFailoverState(void); +void clearFailoverState(int success); void updateFailoverStatus(void); void abortFailover(const char *err); const char *getFailoverStateString(void); diff --git a/tests/integration/failover.tcl b/tests/integration/failover.tcl index 3049cd0ca0..ab21cf6625 100644 --- a/tests/integration/failover.tcl +++ b/tests/integration/failover.tcl @@ -138,6 +138,16 @@ start_server {overrides {save {}}} { set initial_psyncs [s 0 sync_partial_ok] set initial_syncs [s 0 sync_full] + set rd_blocking [valkey_deferring_client -2] + + $rd_blocking BRPOP FOO_LIST 0 + + wait_for_condition 50 100 { + [s -2 blocked_clients] == 1 + } else { + fail "rd_blocking client should be blocked" + } + pause_process $node_0_pid # node 0 will never acknowledge this write $node_2 set case 2 @@ -155,6 +165,12 @@ start_server {overrides {save {}}} { assert_match *slave* [$node_1 role] assert_match *slave* [$node_2 role] + wait_for_condition 50 100 { + [s -2 blocked_clients] == 1 + } else { + fail "rd_blocking client should still be blocked in failover-in-progress" + } + resume_process $node_0_pid # Wait for failover to end @@ -163,6 +179,10 @@ start_server {overrides {save {}}} { } else { fail "Failover from node 2 to node 0 did not finish" } + + assert_error "UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)" {$rd_blocking read} + $rd_blocking close + $node_1 replicaof $node_0_host $node_0_port wait_for_sync $node_1 @@ -253,19 +273,36 @@ start_server {overrides {save {}}} { # We block psync, so the failover will fail $node_1 acl setuser default -psync - # We pause the target long enough to send a write command - # during the pause. This write will not be interrupted. - pause_process [srv -1 pid] - set rd [valkey_deferring_client] - # wait for the client creation + # We send a BRPOP command and pause the target long enough + # to send an LPUSH and another BRPOP command during the pause. + # This push and the two pops (which are blocked for different + # reasons, respectively) will not be interrupted by the aborted + # failover. + pause_process $node_1_pid + set rd_lpush [valkey_deferring_client] + set rd_brpop_before [valkey_deferring_client] + set rd_brpop_after [valkey_deferring_client] + + $rd_brpop_before BRPOP FOO_LIST 0 wait_for_condition 50 100 { - [s connected_clients] == 2 + [s 0 blocked_clients] == 1 } else { - fail "Client creation failed" + fail "rd_brpop_before should be blocked" } - $rd SET FOO BAR + $node_0 failover to $node_1_host $node_1_port - resume_process [srv -1 pid] + $rd_brpop_after BRPOP FOO_LIST 0 + $rd_lpush LPUSH FOO_LIST BAR1 BAR2 + + wait_for_condition 50 100 { + [s 0 blocked_clients] == 3 + } else { + fail "All deferring clients should be blocked" + } + + assert_equal "waiting-for-sync" [s 0 master_failover_state] + + resume_process $node_1_pid # Wait for failover to end wait_for_condition 50 100 { @@ -274,8 +311,12 @@ start_server {overrides {save {}}} { fail "Failover from node_0 to replica did not finish" } - assert_equal [$rd read] "OK" - $rd close + assert_equal "2" [$rd_lpush read] + assert_equal "FOO_LIST BAR1" [$rd_brpop_before read] + assert_equal "FOO_LIST BAR2" [$rd_brpop_after read] + $rd_lpush close + $rd_brpop_before close + $rd_brpop_after close # restore access to psync $node_1 acl setuser default +psync