Skip to content

Commit

Permalink
Role change: Redirect suitable clients instead of disconnecting them
Browse files Browse the repository at this point in the history
In case of an instance role switch in standalone mode, clients with
blocking commands get disconnected sending an UNBLOCKED error to the
client.

Now that the REDIRECT error is available as an alternative (#325),
issue a REDIRECT (redirecting the client to the primary) for clients
that are supporting it. There is no need to disconnect the client in
this case. This is in-line with the semantics of the MOVED error in
cluster mode.

Signed-off-by: Simon Baatz <[email protected]>
  • Loading branch information
gmbnomis committed Jul 31, 2024
1 parent 9e35f7c commit 6e4762e
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 8 deletions.
24 changes: 19 additions & 5 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ void replyToClientsBlockedOnShutdown(void) {
* is called when a primary turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
* it at the same time. If a client has the redirect capability, we send a
* -REDIRECT error instead to point it to the primary, if possible. */
void disconnectOrRedirectBlockedClients(void) {
listNode *ln;
listIter li;

Expand All @@ -287,9 +288,14 @@ void disconnectAllBlockedClients(void) {
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host) {
unblockClientOnErrorSds(
c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
} else {
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
}
}
}
}
Expand Down Expand Up @@ -705,6 +711,14 @@ void unblockClientOnError(client *c, const char *err_str) {
unblockClient(c, 1);
}

/* Unblock a client which is currently blocked with error.
* err will be used to reply to the blocked client. As a side
* effect the SDS string is freed. */
void unblockClientOnErrorSds(client *c, sds err) {
addReplyErrorSds(c, err);
unblockClientOnError(c, NULL);
}

void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
Expand Down
4 changes: 2 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3731,7 +3731,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) {
if (server.primary) {
freeClient(server.primary);
}
if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */

/* Setting primary_host only after the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -4739,7 +4739,7 @@ void clearFailoverState(int success) {
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
if (success) disconnectAllBlockedClients();
if (success) disconnectOrRedirectBlockedClients();
unpauseActions(PAUSE_DURING_FAILOVER);
}

Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3634,10 +3634,11 @@ void blockClient(client *c, int btype);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void unblockClientOnErrorSds(client *c, sds err);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void disconnectOrRedirectBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
Expand Down
128 changes: 128 additions & 0 deletions tests/integration/replica-redirect.tcl
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
start_server {tags {needs:repl external:skip}} {
start_server {} {
set primary [srv -1 client]
set primary_host [srv -1 host]
set primary_port [srv -1 port]
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]

r replicaof $primary_host $primary_port
wait_for_condition 50 100 {
Expand Down Expand Up @@ -32,5 +36,129 @@ start_server {tags {needs:repl external:skip}} {
r readonly
r get foo
} {}

test {responses in waiting-for-sync state} {
$primary replicaof no one
r replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replica not replicating from primary"
}

$primary client capa redirect

set rd_brpop_before [valkey_deferring_client -1]
$rd_brpop_before client capa redirect
$rd_brpop_before read ; # Consume the OK reply
$rd_brpop_before brpop list 0

wait_for_condition 50 100 {
[s -1 blocked_clients] == 1
} else {
fail "All deferring clients should be blocked"
}

pause_process [srv 0 pid]

$primary set foo bar
$primary failover

# The replica is not synced and sleeps. Primary is waiting for
# sync.
assert_equal "waiting-for-sync" [s -1 master_failover_state]

set rd_brpop_after [valkey_deferring_client -1]
$rd_brpop_after client capa redirect
$rd_brpop_after read ; # Consume the OK reply
$rd_brpop_after brpop list 0

wait_for_condition 50 100 {
[s -1 blocked_clients] == 2
} else {
fail "All deferring clients should be blocked"
}

resume_process [srv 0 pid]

wait_for_condition 50 100 {
[s -1 master_failover_state] == "no-failover"
} else {
fail "primary not in no-failover state"
}

# REDIRECT response shall be sent when the new primary is ready,
# i.e. when unblocking all clients after failover
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read}

$rd_brpop_before close
$rd_brpop_after close
}

test {responses in failover-in-progress state} {
$primary replicaof no one
r replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replicas not replicating from primary"
}

$primary client capa redirect
$primary wait 1 0

set rd_brpop_before [valkey_deferring_client -1]
$rd_brpop_before client capa redirect
$rd_brpop_before read ; # Consume the OK reply
$rd_brpop_before brpop list 0

wait_for_condition 50 100 {
[s -1 blocked_clients] == 1
} else {
fail "All deferring clients should be blocked"
}

pause_process [srv 0 pid]

$primary failover to $replica_host $replica_port timeout 100 force

# The paused replica is fully synced (and even if not, we force
# the failover-in-progress state).
wait_for_condition 50 100 {
[s -1 master_failover_state] == "failover-in-progress"
} else {
fail "primary not in failover-in-progress state"
}

set rd_brpop_after [valkey_deferring_client -1]
$rd_brpop_after client capa redirect
$rd_brpop_after read ; # Consume the OK reply
$rd_brpop_after brpop list 0

#FIXME (#821): rd_brpop_after must still be blocked here!
wait_for_condition 50 100 {
[s -1 blocked_clients] == 1
} else {
fail "All deferring clients should be blocked"
}

resume_process [srv 0 pid]

wait_for_condition 50 100 {
[s -1 master_failover_state] == "no-failover"
} else {
fail "primary not in no-failover state"
}

# REDIRECT response shall be sent when the new primary is ready,
# i.e. when unblocking all clients after failover
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read}

$rd_brpop_before close
$rd_brpop_after close
}

}
}

0 comments on commit 6e4762e

Please sign in to comment.