Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone FAILOVER: Fix disconnect of blocked clients in standalone failover and support REDIRECT response #848

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ void replyToClientsBlockedOnShutdown(void) {
* is called when a primary turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
* it at the same time. If a client has the redirect capability, we send a
* -REDIRECT error instead to point it to the primary, if possible. */
void disconnectOrRedirectBlockedClients(void) {
listNode *ln;
listIter li;

Expand All @@ -290,9 +291,23 @@ void disconnectAllBlockedClients(void) {
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host &&
(c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the general logic about key ownership and write commands. however module logic can vary greatly and I would hate placing assumptions on modules work. Maybe we can let modules opt it by registering API to query if the module wishes to be kept alive after role change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that I should have added that I did not "invent" this code, it is following the redirect logic in Valkey cluster:

int clusterRedirectBlockedClientIfNeeded(client *c) {

This particular condition was introduced by redis/redis#9483. The discussion there centered on slot migration in cluster, but the MOVED will also be sent/not sent following the usual readwrite/readonly logic on a replica.

So, it looks applicable to the redirect case here, but, TBH, I don't know the module API very well. @soloestoy, as you were part of the discussion on redis/redis#9483, do you think we should do this here as well? (alternatively we could just continue to disconnect clients which are blocked on a module or we may need to come up with something more complex, like @ranshid suggested.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmbnomis actually, I agree. I forgot about the redirect logic taking the same decision, so I guess the modules are already capable of handling it (even though I think modules should have the ability to impact the decision). I would like to ask, though, why not just reuse the function (clusterRedirectBlockedClientIfNeeded) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at both functions, the commonalities are quite limited because the redirect logic in cluster is much more complex and not applicable to the standalone case. We probably could extract the check whether we need to look at a client in more detail, i.e. something like:

int isRedirectCandidate(client *c) {
    if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
                            c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
        /* If the client is blocked on module, but not on a specific key,
         * don't unblock it. */
        if (c->bstate.btype != BLOCKED_MODULE || moduleClientIsBlockedOnKeys(c)) return 1;
    }
    return 0;
}

That's not much. Do you think this is worthwhile?

Copy link
Member

@PingXie PingXie Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to create a small function or two, less about code reusing but more for abstraction. For instance, I see the following concepts worth capturing

  1. standalone redirection readiness
!server.cluster_enabled && server.primary_host && c->capa & CLIENT_CAPA_REDIRECT 

This is also used at

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&

  1. client conditions
c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
                    c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)

Then L294-L296 could be rewritten as (in pseudo code)

if (redirection_is_enabled() && client_is_in_right_state(c)) {
...
}

Lastly, I am not sure about the module and read-only client cases. The changes look like leaving them in a perpetually blocked state; while for all other cases, we either redirect (new behavior) or drop the connection (old behavior).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to create a small function or two, less about code reusing but more for abstraction.

Yes, that makes sense. I will give it a try once we sort out what the logic should be.

Lastly, I am not sure about the module and read-only client cases. The changes look like leaving them in a perpetually blocked state; while for all other cases, we either redirect (new behavior) or drop the connection (old behavior).

I realized only today that CLUSTER FAILOVER does not result in MOVED errors for blocked clients satisfying these client conditions (like the documentation of the command and the comments for clusterRedirectBlockedClientIfNeeded may suggest).

Instead, we end up in exactly the function we are discussing about. That is to say, we return UNBLOCK and disconnect.

So, Valkey cluster seems to handle unblocking due to slot migration and unblocking due to primary/replica change differently. (I don't know whether this is by intention or not).

Thus, what I deemed to be "well-known territory" apparently isn't. Furthermore, replicationSetPrimary is not only called during failover but also by an arbitrary REPLICAOF command.

So, I think you are right and we should either redirect or disconnect. Shall I convert the current "do nothing" cases (i.e. continue) into disconnects? (because we are not sure whether redirecting the client is actually beneficial in these cases)

(Alternatively, we could come to the conclusion that we don't need to be "better than CLUSTER FAILOVER" and decide against doing this redirect optimization at all.)


/* if the client is read-only and attempting to read, don't unblock it. */
if (c->flag.readonly && !(c->lastcmd->flags & CMD_WRITE)) continue;

unblockClientOnErrorSds(
c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
} else {
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
}
}
}
}
Expand Down Expand Up @@ -713,6 +728,14 @@ void unblockClientOnError(client *c, const char *err_str) {
unblockClient(c, 1);
}

/* Unblock a client which is currently blocked with error.
* err will be used to reply to the blocked client. As a side
* effect the SDS string is freed. */
void unblockClientOnErrorSds(client *c, sds err) {
addReplyErrorSds(c, err);
unblockClientOnError(c, NULL);
}

void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
Expand Down
6 changes: 3 additions & 3 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node));
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 1);
return 1;
}

Expand Down Expand Up @@ -5005,7 +5005,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof));
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 1);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5412,7 +5412,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n));
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), 1);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down
17 changes: 9 additions & 8 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3538,7 +3538,7 @@ void syncWithPrimary(connection *conn) {
* could happen in edge cases. */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
if (psync_result == PSYNC_CONTINUE || psync_result == PSYNC_FULLRESYNC) {
clearFailoverState();
clearFailoverState(1);
} else {
abortFailover("Failover target rejected psync request");
return;
Expand Down Expand Up @@ -3726,15 +3726,15 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port) {
void replicationSetPrimary(char *ip, int port, int disconnect_blocked) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
server.primary_host = NULL;
if (server.primary) {
freeClient(server.primary);
}
disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */

/* Setting primary_host only after the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -3896,7 +3896,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port);
replicationSetPrimary(c->argv[1]->ptr, port, 1);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4737,13 +4737,14 @@ const char *getFailoverStateString(void) {
/* Resets the internal failover configuration, this needs
* to be called after a failover either succeeds or fails
* as it includes the client unpause. */
void clearFailoverState(void) {
void clearFailoverState(int success) {
server.failover_end_time = 0;
server.force_failover = 0;
zfree(server.target_replica_host);
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
if (success) disconnectOrRedirectBlockedClients();
unpauseActions(PAUSE_DURING_FAILOVER);
}

Expand All @@ -4760,7 +4761,7 @@ void abortFailover(const char *err) {
if (server.failover_state == FAILOVER_IN_PROGRESS) {
replicationUnsetPrimary();
}
clearFailoverState();
clearFailoverState(0);
}

/*
Expand Down Expand Up @@ -4907,7 +4908,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4950,6 +4951,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
}
}
7 changes: 4 additions & 3 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port);
void replicationSetPrimary(char *ip, int port, int disconnect_blocked);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the only case we do not want to disco blocked clients is only in the case of 'server.failover_state = FAILOVER_IN_PROGRESS' right? can we then just check for this condition inside replicationSetPrimary instead of passing on an external condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that should be possible, but I am reluctant to make replicationSetPrimary (which isn't specific to failover) behave differently based on some sub-state of the the failover state machine. I am not against it, but I think the intention gets clearer if the caller of replicationSetPrimary states "I will take care of blocked clients myself".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO passing this flag to the replicationSetPrimary and having it handle clients disconnects is already extending the function scope. I am fine with letting it handle the state logic and encapsulate the decision into a single place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @gmbnomis that we shouldn't introduce this dependency (server.failover_state = FAILOVER_IN_PROGRESS) to replicationSetPrimary. There's no inherent order between setting the failover state to in-progress and fixing the replication states. Introducing a failover state check adds an unrelated concern to the function, which in turn forces the caller to perform these operations in a specific sequence, an unnecessary constraint IMO and also not resilient to future changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K. I agree this is also a good reasoning.

void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand All @@ -3049,7 +3049,7 @@ void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder);
void clearFailoverState(void);
void clearFailoverState(int success);
void updateFailoverStatus(void);
void abortFailover(const char *err);
const char *getFailoverStateString(void);
Expand Down Expand Up @@ -3637,10 +3637,11 @@ void blockClient(client *c, int btype);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void unblockClientOnErrorSds(client *c, sds err);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void disconnectOrRedirectBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
Expand Down
63 changes: 52 additions & 11 deletions tests/integration/failover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ start_server {overrides {save {}}} {
set initial_psyncs [s 0 sync_partial_ok]
set initial_syncs [s 0 sync_full]

set rd_blocking [valkey_deferring_client -2]

$rd_blocking BRPOP FOO_LIST 0

wait_for_condition 50 100 {
[s -2 blocked_clients] == 1
} else {
fail "rd_blocking client should be blocked"
}

pause_process $node_0_pid
# node 0 will never acknowledge this write
$node_2 set case 2
Expand All @@ -155,6 +165,12 @@ start_server {overrides {save {}}} {
assert_match *slave* [$node_1 role]
assert_match *slave* [$node_2 role]

wait_for_condition 50 100 {
[s -2 blocked_clients] == 1
} else {
fail "rd_blocking client should still be blocked in failover-in-progress"
}

resume_process $node_0_pid

# Wait for failover to end
Expand All @@ -163,6 +179,10 @@ start_server {overrides {save {}}} {
} else {
fail "Failover from node 2 to node 0 did not finish"
}

assert_error "UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)" {$rd_blocking read}
$rd_blocking close

$node_1 replicaof $node_0_host $node_0_port

wait_for_sync $node_1
Expand Down Expand Up @@ -253,19 +273,36 @@ start_server {overrides {save {}}} {
# We block psync, so the failover will fail
$node_1 acl setuser default -psync

# We pause the target long enough to send a write command
# during the pause. This write will not be interrupted.
pause_process [srv -1 pid]
set rd [valkey_deferring_client]
# wait for the client creation
# We send a BRPOP command and pause the target long enough
# to send an LPUSH and another BRPOP command during the pause.
# This push and the two pops (which are blocked for different
# reasons, respectively) will not be interrupted by the aborted
# failover.
pause_process $node_1_pid
set rd_lpush [valkey_deferring_client]
set rd_brpop_before [valkey_deferring_client]
set rd_brpop_after [valkey_deferring_client]

$rd_brpop_before BRPOP FOO_LIST 0
wait_for_condition 50 100 {
[s connected_clients] == 2
[s 0 blocked_clients] == 1
} else {
fail "Client creation failed"
fail "rd_brpop_before should be blocked"
}
$rd SET FOO BAR

$node_0 failover to $node_1_host $node_1_port
resume_process [srv -1 pid]
$rd_brpop_after BRPOP FOO_LIST 0
$rd_lpush LPUSH FOO_LIST BAR1 BAR2

wait_for_condition 50 100 {
[s 0 blocked_clients] == 3
} else {
fail "All deferring clients should be blocked"
}

assert_equal "waiting-for-sync" [s 0 master_failover_state]

resume_process $node_1_pid

# Wait for failover to end
wait_for_condition 50 100 {
Expand All @@ -274,8 +311,12 @@ start_server {overrides {save {}}} {
fail "Failover from node_0 to replica did not finish"
}

assert_equal [$rd read] "OK"
$rd close
assert_equal "2" [$rd_lpush read]
assert_equal "FOO_LIST BAR1" [$rd_brpop_before read]
assert_equal "FOO_LIST BAR2" [$rd_brpop_after read]
$rd_lpush close
$rd_brpop_before close
$rd_brpop_after close

# restore access to psync
$node_1 acl setuser default +psync
Expand Down
Loading
Loading