diff --git a/src/blocked.c b/src/blocked.c index 56dc02dec0..268410d139 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -271,8 +271,9 @@ void replyToClientsBlockedOnShutdown(void) { * is called when a primary turns into a replica. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting - * it at the same time. */ -void disconnectAllBlockedClients(void) { + * it at the same time. If a client has the redirect capability, we send a + * -REDIRECT error instead to point it to the primary, if possible. */ +void disconnectOrRedirectBlockedClients(void) { listNode *ln; listIter li; @@ -287,9 +288,14 @@ void disconnectAllBlockedClients(void) { * which the command is already in progress in a way. */ if (c->bstate.btype == BLOCKED_POSTPONE) continue; - unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " - "instance state changed (master -> replica?)"); - c->flag.close_after_reply = 1; + if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host) { + unblockClientOnErrorSds( + c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port)); + } else { + unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> replica?)"); + c->flag.close_after_reply = 1; + } } } } @@ -705,6 +711,14 @@ void unblockClientOnError(client *c, const char *err_str) { unblockClient(c, 1); } +/* Unblock a client which is currently blocked with error. + * err will be used to reply to the blocked client. As a side + * effect the SDS string is freed. */ +void unblockClientOnErrorSds(client *c, sds err) { + addReplyErrorSds(c, err); + unblockClientOnError(c, NULL); +} + void blockedBeforeSleep(void) { /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); diff --git a/src/replication.c b/src/replication.c index 3aa80fda12..3690b1bce1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3731,7 +3731,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) { if (server.primary) { freeClient(server.primary); } - if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ + if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */ /* Setting primary_host only after the call to freeClient since it calls * replicationHandlePrimaryDisconnection which can trigger a re-connect @@ -4739,7 +4739,7 @@ void clearFailoverState(int success) { server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; - if (success) disconnectAllBlockedClients(); + if (success) disconnectOrRedirectBlockedClients(); unpauseActions(PAUSE_DURING_FAILOVER); } diff --git a/src/server.h b/src/server.h index 72986f5439..22cbfe5a70 100644 --- a/src/server.h +++ b/src/server.h @@ -3634,10 +3634,11 @@ void blockClient(client *c, int btype); void unblockClient(client *c, int queue_for_reprocessing); void unblockClientOnTimeout(client *c); void unblockClientOnError(client *c, const char *err_str); +void unblockClientOnErrorSds(client *c, sds err); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); -void disconnectAllBlockedClients(void); +void disconnectOrRedirectBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); diff --git a/tests/integration/replica-redirect.tcl b/tests/integration/replica-redirect.tcl index 0db51dd3ff..7b94610300 100644 --- a/tests/integration/replica-redirect.tcl +++ b/tests/integration/replica-redirect.tcl @@ -1,7 +1,11 @@ start_server {tags {needs:repl external:skip}} { start_server {} { + set primary [srv -1 client] set primary_host [srv -1 host] set primary_port [srv -1 port] + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] r replicaof $primary_host $primary_port wait_for_condition 50 100 { @@ -32,5 +36,129 @@ start_server {tags {needs:repl external:skip}} { r readonly r get foo } {} + + test {responses in waiting-for-sync state} { + $primary replicaof no one + r replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replica not replicating from primary" + } + + $primary client capa redirect + + set rd_brpop_before [valkey_deferring_client -1] + $rd_brpop_before client capa redirect + $rd_brpop_before read ; # Consume the OK reply + $rd_brpop_before brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + pause_process [srv 0 pid] + + $primary set foo bar + $primary failover + + # The replica is not synced and sleeps. Primary is waiting for + # sync. + assert_equal "waiting-for-sync" [s -1 master_failover_state] + + set rd_brpop_after [valkey_deferring_client -1] + $rd_brpop_after client capa redirect + $rd_brpop_after read ; # Consume the OK reply + $rd_brpop_after brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 2 + } else { + fail "All deferring clients should be blocked" + } + + resume_process [srv 0 pid] + + wait_for_condition 50 100 { + [s -1 master_failover_state] == "no-failover" + } else { + fail "primary not in no-failover state" + } + + # REDIRECT response shall be sent when the new primary is ready, + # i.e. when unblocking all clients after failover + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read} + + $rd_brpop_before close + $rd_brpop_after close + } + + test {responses in failover-in-progress state} { + $primary replicaof no one + r replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replicas not replicating from primary" + } + + $primary client capa redirect + $primary wait 1 0 + + set rd_brpop_before [valkey_deferring_client -1] + $rd_brpop_before client capa redirect + $rd_brpop_before read ; # Consume the OK reply + $rd_brpop_before brpop list 0 + + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + pause_process [srv 0 pid] + + $primary failover to $replica_host $replica_port timeout 100 force + + # The paused replica is fully synced (and even if not, we force + # the failover-in-progress state). + wait_for_condition 50 100 { + [s -1 master_failover_state] == "failover-in-progress" + } else { + fail "primary not in failover-in-progress state" + } + + set rd_brpop_after [valkey_deferring_client -1] + $rd_brpop_after client capa redirect + $rd_brpop_after read ; # Consume the OK reply + $rd_brpop_after brpop list 0 + + #FIXME (#821): rd_brpop_after must still be blocked here! + wait_for_condition 50 100 { + [s -1 blocked_clients] == 1 + } else { + fail "All deferring clients should be blocked" + } + + resume_process [srv 0 pid] + + wait_for_condition 50 100 { + [s -1 master_failover_state] == "no-failover" + } else { + fail "primary not in no-failover state" + } + + # REDIRECT response shall be sent when the new primary is ready, + # i.e. when unblocking all clients after failover + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read} + + $rd_brpop_before close + $rd_brpop_after close + } + } }