Skip to content

Commit

Permalink
Role change: Redirect suitable clients instead of disconnecting them
Browse files Browse the repository at this point in the history
In case of an instance role switch in standalone mode, clients with
blocking commands get disconnected sending an UNBLOCKED error to the
client.

Now that the REDIRECT error is available as an alternative (valkey-io#325),
issue a REDIRECT (redirecting the client to the primary) for clients
that are supporting it. If a client is read-only, it may remain
blocked in some situations. There is no need to disconnect the client
in either case. This is in-line with the semantics of the MOVED error
in cluster mode.

Signed-off-by: Simon Baatz <[email protected]>
  • Loading branch information
gmbnomis committed Aug 15, 2024
1 parent 9067828 commit e9f30de
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 15 deletions.
33 changes: 28 additions & 5 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ void replyToClientsBlockedOnShutdown(void) {
* is called when a primary turns into a replica.
*
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */
void disconnectAllBlockedClients(void) {
* it at the same time. If a client has the redirect capability, we send a
* -REDIRECT error instead to point it to the primary, if possible. */
void disconnectOrRedirectBlockedClients(void) {
listNode *ln;
listIter li;

Expand All @@ -287,9 +288,23 @@ void disconnectAllBlockedClients(void) {
* which the command is already in progress in a way. */
if (c->bstate.btype == BLOCKED_POSTPONE) continue;

unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host &&
(c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) {
/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) continue;

/* if the client is read-only and attempting to read, don't unblock it. */
if (c->flag.readonly && !(c->lastcmd->flags & CMD_WRITE)) continue;

unblockClientOnErrorSds(
c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
} else {
unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
c->flag.close_after_reply = 1;
}
}
}
}
Expand Down Expand Up @@ -705,6 +720,14 @@ void unblockClientOnError(client *c, const char *err_str) {
unblockClient(c, 1);
}

/* Unblock a client which is currently blocked with error.
* err will be used to reply to the blocked client. As a side
* effect the SDS string is freed. */
void unblockClientOnErrorSds(client *c, sds err) {
addReplyErrorSds(c, err);
unblockClientOnError(c, NULL);
}

void blockedBeforeSleep(void) {
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
Expand Down
4 changes: 2 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3734,7 +3734,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) {
if (server.primary) {
freeClient(server.primary);
}
if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */
if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */

/* Setting primary_host only after the call to freeClient since it calls
* replicationHandlePrimaryDisconnection which can trigger a re-connect
Expand Down Expand Up @@ -4744,7 +4744,7 @@ void clearFailoverState(int success) {
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
if (success) disconnectAllBlockedClients();
if (success) disconnectOrRedirectBlockedClients();
unpauseActions(PAUSE_DURING_FAILOVER);
}

Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3635,10 +3635,11 @@ void blockClient(client *c, int btype);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void unblockClientOnErrorSds(client *c, sds err);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
void disconnectOrRedirectBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
Expand Down
114 changes: 107 additions & 7 deletions tests/integration/replica-redirect.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,23 @@ start_server {tags {needs:repl external:skip}} {
r get foo
} {}

test {client paused during failover-in-progress} {
test {client paused before and during failover-in-progress} {
# Ensure that replica is online and in sync
r -1 set foo baz
r -1 wait 1 0

set rd_blocking [valkey_deferring_client -1]
$rd_blocking client capa redirect
assert_match "OK" [$rd_blocking read]
$rd_blocking brpop list 0

wait_for_blocked_clients_count 1 100 10 -1

pause_process $replica_pid
# replica will never acknowledge this write
r -1 set foo bar

r -1 failover to $replica_host $replica_port TIMEOUT 100 FORCE

# Wait for primary to give up on sync attempt and start failover
# Wait for primary to start failover
wait_for_condition 50 100 {
[s -1 master_failover_state] == "failover-in-progress"
} else {
Expand All @@ -54,10 +64,10 @@ start_server {tags {needs:repl external:skip}} {
set rd [valkey_deferring_client -1]
$rd client capa redirect
assert_match "OK" [$rd read]
$rd set foo bar
$rd get foo

# Client paused during failover-in-progress, see more details in PR #871
wait_for_blocked_clients_count 1 100 10 -1
# Reading and Writing clients paused during failover-in-progress, see more details in PR #871
wait_for_blocked_clients_count 2 100 10 -1

resume_process $replica_pid

Expand All @@ -72,7 +82,97 @@ start_server {tags {needs:repl external:skip}} {
assert_match *slave* [r -1 role]

assert_error "REDIRECT $replica_host:$replica_port" {$rd read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_blocking read}
$rd close
$rd_blocking close
}

test {responses in waiting-for-sync state} {
r -1 replicaof no one
r replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replica not replicating from primary"
}

# Ensure that replica is online and in sync
r -1 set foo baz
r -1 wait 1 0

set rd_brpop_before [valkey_deferring_client -1]
$rd_brpop_before client capa redirect
assert_match "OK" [$rd_brpop_before read]
$rd_brpop_before brpop list 0

wait_for_blocked_clients_count 1 100 10 -1

pause_process [srv 0 pid]

set rd_wait [valkey_deferring_client -1]
$rd_wait client capa redirect
$rd_wait read ; # Consume the OK reply
$rd_wait set foo bar
$rd_wait read
$rd_wait wait 1 0 ; # Blocks as we can't sync

# XREAD is a reading command and thus, should
# not be redirected if the client is read only.
set rd_xread [valkey_deferring_client -1]
$rd_xread client capa redirect
assert_match "OK" [$rd_xread read]
$rd_xread readonly
assert_match "OK" [$rd_xread read]
$rd_xread xread block 0 streams k 0

wait_for_blocked_clients_count 3 100 10 -1

r -1 failover

# The replica is not synced and sleeps. Primary is waiting for
# sync.
assert_equal "waiting-for-sync" [s -1 master_failover_state]

set rd_brpop_after [valkey_deferring_client -1]
$rd_brpop_after client capa redirect
assert_match "OK" [$rd_brpop_after read]
$rd_brpop_after brpop list 0

wait_for_blocked_clients_count 4 100 10 -1

resume_process [srv 0 pid]

# Wait for failover to end
wait_for_condition 50 100 {
[s -1 master_failover_state] == "no-failover"
} else {
fail "primary not in no-failover state"
}

assert_match *master* [r role]
assert_match *slave* [r -1 role]

# REDIRECT response shall be sent when the new primary is ready,
# i.e. when unblocking all clients after failover
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read}
assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read}

# WAIT is unblocked in the main loop. Thus, it must have succeeded by now.
assert_equal "1" [$rd_wait read]

$rd_brpop_before close
$rd_brpop_after close
$rd_wait close

# As a reading command for a readonly client, XREAD should still be blocked
wait_for_blocked_clients_count 1 100 10 -1

r XADD k * foo bar ; # replica is the primary now
set res [$rd_xread read]
set res [lindex [lindex [lindex [lindex $res 0] 1] 0] 1]
assert_equal "foo bar" $res

$rd_xread close
}
}
}

0 comments on commit e9f30de

Please sign in to comment.