Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone FAILOVER: Fix disconnect of blocked clients in standalone failover and support REDIRECT response #848

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ void replyToClientsBlockedOnShutdown(void) {
* is called when a primary turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
* it at the same time. If a client has the redirect capability, we send a
* -REDIRECT error instead to point it to the primary, if possible. */
void disconnectOrRedirectBlockedClients(void) {
listNode *ln;
listIter li;

Expand All @@ -290,9 +291,23 @@ void disconnectAllBlockedClients(void) {
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host &&
(c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the general logic about key ownership and write commands. however module logic can vary greatly and I would hate placing assumptions on modules work. Maybe we can let modules opt it by registering API to query if the module wishes to be kept alive after role change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that I should have added that I did not "invent" this code, it is following the redirect logic in Valkey cluster:

int clusterRedirectBlockedClientIfNeeded(client *c) {

This particular condition was introduced by redis/redis#9483. The discussion there centered on slot migration in cluster, but the MOVED will also be sent/not sent following the usual readwrite/readonly logic on a replica.

So, it looks applicable to the redirect case here, but, TBH, I don't know the module API very well. @soloestoy, as you were part of the discussion on redis/redis#9483, do you think we should do this here as well? (alternatively we could just continue to disconnect clients which are blocked on a module or we may need to come up with something more complex, like @ranshid suggested.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmbnomis actually, I agree. I forgot about the redirect logic taking the same decision, so I guess the modules are already capable of handling it (even though I think modules should have the ability to impact the decision). I would like to ask, though, why not just reuse the function (clusterRedirectBlockedClientIfNeeded) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at both functions, the commonalities are quite limited because the redirect logic in cluster is much more complex and not applicable to the standalone case. We probably could extract the check whether we need to look at a client in more detail, i.e. something like:

int isRedirectCandidate(client *c) {
    if (c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
                            c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
        /* If the client is blocked on module, but not on a specific key,
         * don't unblock it. */
        if (c->bstate.btype != BLOCKED_MODULE || moduleClientIsBlockedOnKeys(c)) return 1;
    }
    return 0;
}

That's not much. Do you think this is worthwhile?

Copy link
Member

@PingXie PingXie Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to create a small function or two, less about code reusing but more for abstraction. For instance, I see the following concepts worth capturing

  1. standalone redirection readiness
!server.cluster_enabled && server.primary_host && c->capa & CLIENT_CAPA_REDIRECT 

This is also used at

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&

  1. client conditions
c->flag.blocked && (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
                    c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)

Then L294-L296 could be rewritten as (in pseudo code)

if (redirection_is_enabled() && client_is_in_right_state(c)) {
...
}

Lastly, I am not sure about the module and read-only client cases. The changes look like leaving them in a perpetually blocked state; while for all other cases, we either redirect (new behavior) or drop the connection (old behavior).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to create a small function or two, less about code reusing but more for abstraction.

Yes, that makes sense. I will give it a try once we sort out what the logic should be.

Lastly, I am not sure about the module and read-only client cases. The changes look like leaving them in a perpetually blocked state; while for all other cases, we either redirect (new behavior) or drop the connection (old behavior).

I realized only today that CLUSTER FAILOVER does not result in MOVED errors for blocked clients satisfying these client conditions (like the documentation of the command and the comments for clusterRedirectBlockedClientIfNeeded may suggest).

Instead, we end up in exactly the function we are discussing about. That is to say, we return UNBLOCK and disconnect.

So, Valkey cluster seems to handle unblocking due to slot migration and unblocking due to primary/replica change differently. (I don't know whether this is by intention or not).

Thus, what I deemed to be "well-known territory" apparently isn't. Furthermore, replicationSetPrimary is not only called during failover but also by an arbitrary REPLICAOF command.

So, I think you are right and we should either redirect or disconnect. Shall I convert the current "do nothing" cases (i.e. continue) into disconnects? (because we are not sure whether redirecting the client is actually beneficial in these cases)

(Alternatively, we could come to the conclusion that we don't need to be "better than CLUSTER FAILOVER" and decide against doing this redirect optimization at all.)


/* if the client is read-only and attempting to read, don't unblock it. */
if (c->flag.readonly && !(c->lastcmd->flags & CMD_WRITE)) continue;

unblockClientOnErrorSds(
c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
} else {
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
}
}
}
}
Expand Down Expand Up @@ -713,6 +728,14 @@ void unblockClientOnError(client *c, const char *err_str) {
unblockClient(c, 1);
}

/* Unblock a client which is currently blocked with error.
* err will be used to reply to the blocked client. As a side
* effect the SDS string is freed. */
void unblockClientOnErrorSds(client *c, sds err) {
addReplyErrorSds(c, err);
unblockClientOnError(c, NULL);
}

void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
Expand Down
4 changes: 2 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3734,7 +3734,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) {
if (server.primary) {
freeClient(server.primary);
}
if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */

/* Setting primary_host only after the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -4744,7 +4744,7 @@ void clearFailoverState(int success) {
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
if (success) disconnectAllBlockedClients();
if (success) disconnectOrRedirectBlockedClients();
unpauseActions(PAUSE_DURING_FAILOVER);
}

Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3637,10 +3637,11 @@ void blockClient(client *c, int btype);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void unblockClientOnErrorSds(client *c, sds err);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void disconnectOrRedirectBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
Expand Down
102 changes: 95 additions & 7 deletions tests/integration/replica-redirect.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,19 @@ start_server {tags {needs:repl external:skip}} {
r get foo
} {}

test {client paused during failover-in-progress} {
test {client paused before and during failover-in-progress} {
set rd_blocking [valkey_deferring_client -1]
$rd_blocking client capa redirect
assert_match "OK" [$rd_blocking read]
$rd_blocking brpop list 0

wait_for_blocked_clients_count 1 100 10 -1

pause_process $replica_pid
# replica will never acknowledge this write
r -1 set foo bar

r -1 failover to $replica_host $replica_port TIMEOUT 100 FORCE

# Wait for primary to give up on sync attempt and start failover
# Wait for primary to start failover
wait_for_condition 50 100 {
[s -1 master_failover_state] == "failover-in-progress"
} else {
Expand All @@ -51,10 +57,10 @@ start_server {tags {needs:repl external:skip}} {
set rd [valkey_deferring_client -1]
$rd client capa redirect
assert_match "OK" [$rd read]
$rd set foo bar
$rd get foo

# Client paused during failover-in-progress, see more details in PR #871
wait_for_blocked_clients_count 1 100 10 -1
# Reading and Writing clients paused during failover-in-progress, see more details in PR #871
wait_for_blocked_clients_count 2 100 10 -1

resume_process $replica_pid

Expand All @@ -69,7 +75,89 @@ start_server {tags {needs:repl external:skip}} {
assert_match *slave* [r -1 role]

assert_error "REDIRECT $replica_host:$replica_port" {$rd read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_blocking read}
$rd close
$rd_blocking close
}

test {responses in waiting-for-sync state} {
r -1 replicaof no one
r replicaof $primary_host $primary_port
wait_replica_online $primary

set rd_brpop_before [valkey_deferring_client -1]
$rd_brpop_before client capa redirect
assert_match "OK" [$rd_brpop_before read]
$rd_brpop_before brpop list 0

wait_for_blocked_clients_count 1 100 10 -1

pause_process [srv 0 pid]

set rd_wait [valkey_deferring_client -1]
$rd_wait client capa redirect
$rd_wait read ; # Consume the OK reply
$rd_wait set foo bar
$rd_wait read
$rd_wait wait 1 0 ; # Blocks as we can't sync

# XREAD is a reading command and thus, should
# not be redirected if the client is read only.
set rd_xread [valkey_deferring_client -1]
$rd_xread client capa redirect
assert_match "OK" [$rd_xread read]
$rd_xread readonly
assert_match "OK" [$rd_xread read]
$rd_xread xread block 0 streams k 0

wait_for_blocked_clients_count 3 100 10 -1

r -1 failover

# The replica is not synced and sleeps. Primary is waiting for
# sync.
assert_equal "waiting-for-sync" [s -1 master_failover_state]

set rd_brpop_after [valkey_deferring_client -1]
$rd_brpop_after client capa redirect
assert_match "OK" [$rd_brpop_after read]
$rd_brpop_after brpop list 0

wait_for_blocked_clients_count 4 100 10 -1

resume_process [srv 0 pid]

# Wait for failover to end
wait_for_condition 50 100 {
[s -1 master_failover_state] == "no-failover"
} else {
fail "primary not in no-failover state"
}

assert_match *master* [r role]
assert_match *slave* [r -1 role]

# REDIRECT response shall be sent when the new primary is ready,
# i.e. when unblocking all clients after failover
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read}

# WAIT is unblocked in the main loop. Thus, it must have succeeded by now.
assert_equal "1" [$rd_wait read]

$rd_brpop_before close
$rd_brpop_after close
$rd_wait close

# As a reading command for a readonly client, XREAD should still be blocked
wait_for_blocked_clients_count 1 100 10 -1

r XADD k * foo bar ; # replica is the primary now
set res [$rd_xread read]
set res [lindex [lindex [lindex [lindex $res 0] 1] 0] 1]
assert_equal "foo bar" $res

$rd_xread close
}
}
}
Loading