From 6e4762ebdee28bfd260f80271dc76a98a0044c2b Mon Sep 17 00:00:00 2001 From: Simon Baatz Date: Tue, 30 Jul 2024 23:45:55 +0200 Subject: [PATCH] Role change: Redirect suitable clients instead of disconnecting them In case of an instance role switch in standalone mode, clients with blocking commands get disconnected sending an UNBLOCKED error to the client. Now that the REDIRECT error is available as an alternative (#325), issue a REDIRECT (redirecting the client to the primary) for clients that are supporting it. There is no need to disconnect the client in this case. This is in-line with the semantics of the MOVED error in cluster mode. Signed-off-by: Simon Baatz --- src/blocked.c | 24 ++++- src/replication.c | 4 +- src/server.h | 3 +- tests/integration/replica-redirect.tcl | 128 +++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 56dc02dec0..268410d139 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -271,8 +271,9 @@ void replyToClientsBlockedOnShutdown(void) { * is called when a primary turns into a replica. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting - * it at the same time. */ -void disconnectAllBlockedClients(void) { + * it at the same time. If a client has the redirect capability, we send a + * -REDIRECT error instead to point it to the primary, if possible. */ +void disconnectOrRedirectBlockedClients(void) { listNode *ln; listIter li; @@ -287,9 +288,14 @@ void disconnectAllBlockedClients(void) { * which the command is already in progress in a way. */ if (c->bstate.btype == BLOCKED_POSTPONE) continue; - unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " - "instance state changed (master -> replica?)"); - c->flag.close_after_reply = 1; + if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host) { + unblockClientOnErrorSds( + c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port)); + } else { + unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> replica?)"); + c->flag.close_after_reply = 1; + } } } } @@ -705,6 +711,14 @@ void unblockClientOnError(client *c, const char *err_str) { unblockClient(c, 1); } +/* Unblock a client which is currently blocked with error. + * err will be used to reply to the blocked client. As a side + * effect the SDS string is freed. */ +void unblockClientOnErrorSds(client *c, sds err) { + addReplyErrorSds(c, err); + unblockClientOnError(c, NULL); +} + void blockedBeforeSleep(void) { /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); diff --git a/src/replication.c b/src/replication.c index 3aa80fda12..3690b1bce1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3731,7 +3731,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) { if (server.primary) { freeClient(server.primary); } - if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ + if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */ /* Setting primary_host only after the call to freeClient since it calls * replicationHandlePrimaryDisconnection which can trigger a re-connect @@ -4739,7 +4739,7 @@ void clearFailoverState(int success) { server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; - if (success) disconnectAllBlockedClients(); + if (success) disconnectOrRedirectBlockedClients(); unpauseActions(PAUSE_DURING_FAILOVER); } diff --git a/src/server.h b/src/server.h index 72986f5439..22cbfe5a70 100644 --- a/src/server.h +++ b/src/server.h @@ -3634,10 +3634,11 @@ void blockClient(client *c, int btype); void unblockClient(client *c, int queue_for_reprocessing); void unblockClientOnTimeout(client *c); void unblockClientOnError(client *c, const char *err_str); +void unblockClientOnErrorSds(client *c, sds err); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); -void disconnectAllBlockedClients(void); +void disconnectOrRedirectBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); diff --git a/tests/integration/replica-redirect.tcl b/tests/integration/replica-redirect.tcl index 0db51dd3ff..7b94610300 100644 --- a/tests/integration/replica-redirect.tcl +++ b/tests/integration/replica-redirect.tcl @@ -1,7 +1,11 @@ start_server {tags {needs:repl external:skip}} { start_server {} { + set primary [srv -1 client] set primary_host [srv -1 host] set primary_port [srv -1 port] + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] r replicaof $primary_host $primary_port wait_for_condition 50 100 { @@ -32,5 +36,129 @@ start_server {tags {needs:repl external:skip}} { r readonly r get foo } {} + + test {responses in waiting-for-sync state} { + $primary replicaof no one + r replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replica not replicating from primary" + } + + $primary client capa redirect + + set rd_brpop_before [valkey_deferring_client -1] + $rd_brpop_before client capa redirect + $rd_brpop_before read ; # Consume the OK reply + $rd_brpop_before brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + pause_process [srv 0 pid] + + $primary set foo bar + $primary failover + + # The replica is not synced and sleeps. Primary is waiting for + # sync. + assert_equal "waiting-for-sync" [s -1 master_failover_state] + + set rd_brpop_after [valkey_deferring_client -1] + $rd_brpop_after client capa redirect + $rd_brpop_after read ; # Consume the OK reply + $rd_brpop_after brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 2 + } else { + fail "All deferring clients should be blocked" + } + + resume_process [srv 0 pid] + + wait_for_condition 50 100 { + [s -1 master_failover_state] == "no-failover" + } else { + fail "primary not in no-failover state" + } + + # REDIRECT response shall be sent when the new primary is ready, + # i.e. when unblocking all clients after failover + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read} + + $rd_brpop_before close + $rd_brpop_after close + } + + test {responses in failover-in-progress state} { + $primary replicaof no one + r replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replicas not replicating from primary" + } + + $primary client capa redirect + $primary wait 1 0 + + set rd_brpop_before [valkey_deferring_client -1] + $rd_brpop_before client capa redirect + $rd_brpop_before read ; # Consume the OK reply + $rd_brpop_before brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + pause_process [srv 0 pid] + + $primary failover to $replica_host $replica_port timeout 100 force + + # The paused replica is fully synced (and even if not, we force + # the failover-in-progress state). + wait_for_condition 50 100 { + [s -1 master_failover_state] == "failover-in-progress" + } else { + fail "primary not in failover-in-progress state" + } + + set rd_brpop_after [valkey_deferring_client -1] + $rd_brpop_after client capa redirect + $rd_brpop_after read ; # Consume the OK reply + $rd_brpop_after brpop list 0 + + #FIXME (#821): rd_brpop_after must still be blocked here! + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + resume_process [srv 0 pid] + + wait_for_condition 50 100 { + [s -1 master_failover_state] == "no-failover" + } else { + fail "primary not in no-failover state" + } + + # REDIRECT response shall be sent when the new primary is ready, + # i.e. when unblocking all clients after failover + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read} + + $rd_brpop_before close + $rd_brpop_after close + } + } }