Skip to content

Commit

Permalink
Fix early unblock/disconnect of clients during failover
Browse files Browse the repository at this point in the history
When entering the `FAILOVER_IN_PROGRESS` state during a failover,
`disconnectAllBlockedClients` is called (as part of
`replicationSetPrimary`) to disconnect all blocked clients (except
those that are postponed due to a client pause).

While this makes sense in case of a role change in general, this is
happening too early in case of failover:
1. The role switch hasn't happened in `FAILOVER_IN_PROGESS` yet,
   thus, there is no new primary to connect to.
2. The failover may still fail. In this case, there is no need to
   unblock/disconnect clients at all.

Therefore, move the `disconnectAllBlockedClients` to when we unblock
the paused clients after the failover completes. As said above, this
call must only happen if the failover succeeds.

Extend two tests to verify the changed behavior:

1. "failover to a replica with force works": Issue a blocking command
   before the failover and verify that it is unblocked after the
   failover finished.

2. "failover aborts if target rejects sync request": Both a blocking
  command issued before a failover as well as one issued during the
  failover won't be interrupted when the failover eventually
  aborts. (Note that the location of the previous `SET FOO BAR`
  command in the test was wrong, as it did not block and thus,
  wouldn't be "interrupted" by the failover in any case.)

Signed-off-by: Simon Baatz <[email protected]>
  • Loading branch information
gmbnomis committed Jul 30, 2024
1 parent fa238dc commit 9e35f7c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 24 deletions.
6 changes: 3 additions & 3 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node));
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1);
return 1;
}

Expand Down Expand Up @@ -5016,7 +5016,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof));
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5423,7 +5423,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n));
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), 1);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down
17 changes: 9 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3535,7 +3535,7 @@ void syncWithPrimary(connection *conn) {
* could happen in edge cases. */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {
clearFailoverState();
clearFailoverState(1);
} else {
abortFailover("Failover target rejected psync request");
return;
Expand Down Expand Up @@ -3723,15 +3723,15 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port) {
void replicationSetPrimary(char *ip, int port, int disconnect_blocked) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
server.primary_host = NULL;
if (server.primary) {
freeClient(server.primary);
}
disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */

/* Setting primary_host only after the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -3893,7 +3893,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port);
replicationSetPrimary(c->argv[1]->ptr, port, 1);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4732,13 +4732,14 @@ const char *getFailoverStateString(void) {
/* Resets the internal failover configuration, this needs
* to be called after a failover either succeeds or fails
* as it includes the client unpause. */
void clearFailoverState(void) {
void clearFailoverState(int success) {
server.failover_end_time = 0;
server.force_failover = 0;
zfree(server.target_replica_host);
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
if (success) disconnectAllBlockedClients();
unpauseActions(PAUSE_DURING_FAILOVER);
}

Expand All @@ -4755,7 +4756,7 @@ void abortFailover(const char *err) {
if (server.failover_state == FAILOVER_IN_PROGRESS) {
replicationUnsetPrimary();
}
clearFailoverState();
clearFailoverState(0);
}

/*
Expand Down Expand Up @@ -4902,7 +4903,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4945,6 +4946,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
}
}
4 changes: 2 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3020,7 +3020,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port);
void replicationSetPrimary(char *ip, int port, int disconnect_blocked);
void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand All @@ -3046,7 +3046,7 @@ void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder);
void clearFailoverState(void);
void clearFailoverState(int success);
void updateFailoverStatus(void);
void abortFailover(const char *err);
const char *getFailoverStateString(void);
Expand Down
63 changes: 52 additions & 11 deletions tests/integration/failover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ start_server {overrides {save {}}} {
set initial_psyncs [s 0 sync_partial_ok]
set initial_syncs [s 0 sync_full]

set rd_blocking [valkey_deferring_client -2]

$rd_blocking BRPOP FOO_LIST 0

wait_for_condition 50 100 {
[s -2 blocked_clients] == 1
} else {
fail "rd_blocking client should be blocked"
}

pause_process $node_0_pid
# node 0 will never acknowledge this write
$node_2 set case 2
Expand All @@ -155,6 +165,12 @@ start_server {overrides {save {}}} {
assert_match *slave* [$node_1 role]
assert_match *slave* [$node_2 role]

wait_for_condition 50 100 {
[s -2 blocked_clients] == 1
} else {
fail "rd_blocking client should still be blocked in failover-in-progress"
}

resume_process $node_0_pid

# Wait for failover to end
Expand All @@ -163,6 +179,10 @@ start_server {overrides {save {}}} {
} else {
fail "Failover from node 2 to node 0 did not finish"
}

assert_error "UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)" {$rd_blocking read}
$rd_blocking close

$node_1 replicaof $node_0_host $node_0_port

wait_for_sync $node_1
Expand Down Expand Up @@ -253,19 +273,36 @@ start_server {overrides {save {}}} {
# We block psync, so the failover will fail
$node_1 acl setuser default -psync

# We pause the target long enough to send a write command
# during the pause. This write will not be interrupted.
pause_process [srv -1 pid]
set rd [valkey_deferring_client]
# wait for the client creation
# We send a BRPOP command and pause the target long enough
# to send an LPUSH and another BRPOP command during the pause.
# This push and the two pops (which are blocked for different
# reasons, respectively) will not be interrupted by the aborted
# failover.
pause_process $node_1_pid
set rd_lpush [valkey_deferring_client]
set rd_brpop_before [valkey_deferring_client]
set rd_brpop_after [valkey_deferring_client]

$rd_brpop_before BRPOP FOO_LIST 0
wait_for_condition 50 100 {
[s connected_clients] == 2
[s 0 blocked_clients] == 1
} else {
fail "Client creation failed"
fail "rd_brpop_before should be blocked"
}
$rd SET FOO BAR

$node_0 failover to $node_1_host $node_1_port
resume_process [srv -1 pid]
$rd_brpop_after BRPOP FOO_LIST 0
$rd_lpush LPUSH FOO_LIST BAR1 BAR2

wait_for_condition 50 100 {
[s 0 blocked_clients] == 3
} else {
fail "All deferring clients should be blocked"
}

assert_equal "waiting-for-sync" [s 0 master_failover_state]

resume_process $node_1_pid

# Wait for failover to end
wait_for_condition 50 100 {
Expand All @@ -274,8 +311,12 @@ start_server {overrides {save {}}} {
fail "Failover from node_0 to replica did not finish"
}

assert_equal [$rd read] "OK"
$rd close
assert_equal "2" [$rd_lpush read]
assert_equal "FOO_LIST BAR1" [$rd_brpop_before read]
assert_equal "FOO_LIST BAR2" [$rd_brpop_after read]
$rd_lpush close
$rd_brpop_before close
$rd_brpop_after close

# restore access to psync
$node_1 acl setuser default +psync
Expand Down

0 comments on commit 9e35f7c

Please sign in to comment.