diff --git a/src/blocked.c b/src/blocked.c index 5f4c5d4494..a50b80e79a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -271,8 +271,9 @@ void replyToClientsBlockedOnShutdown(void) { * is called when a primary turns into a replica. * * The semantics is to send an -UNBLOCKED error to the client, disconnecting - * it at the same time. */ -void disconnectAllBlockedClients(void) { + * it at the same time. If a client has the redirect capability, we send a + * -REDIRECT error instead to point it to the primary, if possible. */ +void disconnectOrRedirectBlockedClients(void) { listNode *ln; listIter li; @@ -287,9 +288,23 @@ void disconnectAllBlockedClients(void) { * which the command is already in progress in a way. */ if (c->bstate.btype == BLOCKED_POSTPONE) continue; - unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " - "instance state changed (master -> replica?)"); - c->flag.close_after_reply = 1; + if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && + (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM || c->bstate.btype == BLOCKED_MODULE)) { + /* If the client is blocked on module, but not on a specific key, + * don't unblock it. */ + if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) continue; + + /* if the client is read-only and attempting to read, don't unblock it. */ + if (c->flag.readonly && !(c->lastcmd->flags & CMD_WRITE)) continue; + + unblockClientOnErrorSds( + c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port)); + } else { + unblockClientOnError(c, "-UNBLOCKED force unblock from blocking operation, " + "instance state changed (master -> replica?)"); + c->flag.close_after_reply = 1; + } } } } @@ -705,6 +720,14 @@ void unblockClientOnError(client *c, const char *err_str) { unblockClient(c, 1); } +/* Unblock a client which is currently blocked with error. + * err will be used to reply to the blocked client. As a side + * effect the SDS string is freed. */ +void unblockClientOnErrorSds(client *c, sds err) { + addReplyErrorSds(c, err); + unblockClientOnError(c, NULL); +} + void blockedBeforeSleep(void) { /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); diff --git a/src/replication.c b/src/replication.c index 9a2f192771..59db7c1a27 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3734,7 +3734,7 @@ void replicationSetPrimary(char *ip, int port, int disconnect_blocked) { if (server.primary) { freeClient(server.primary); } - if (disconnect_blocked) disconnectAllBlockedClients(); /* Clients blocked in primary, now replica. */ + if (disconnect_blocked) disconnectOrRedirectBlockedClients(); /* Clients blocked in primary, now replica. */ /* Setting primary_host only after the call to freeClient since it calls * replicationHandlePrimaryDisconnection which can trigger a re-connect @@ -4744,7 +4744,7 @@ void clearFailoverState(int success) { server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; - if (success) disconnectAllBlockedClients(); + if (success) disconnectOrRedirectBlockedClients(); unpauseActions(PAUSE_DURING_FAILOVER); } diff --git a/src/server.h b/src/server.h index 3e67436e61..03da403c6d 100644 --- a/src/server.h +++ b/src/server.h @@ -3635,10 +3635,11 @@ void blockClient(client *c, int btype); void unblockClient(client *c, int queue_for_reprocessing); void unblockClientOnTimeout(client *c); void unblockClientOnError(client *c, const char *err_str); +void unblockClientOnErrorSds(client *c, sds err); void queueClientForReprocessing(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); -void disconnectAllBlockedClients(void); +void disconnectOrRedirectBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); diff --git a/tests/integration/replica-redirect.tcl b/tests/integration/replica-redirect.tcl index 050cf0368c..c6d9de7dda 100644 --- a/tests/integration/replica-redirect.tcl +++ b/tests/integration/replica-redirect.tcl @@ -38,13 +38,23 @@ start_server {tags {needs:repl external:skip}} { r get foo } {} - test {client paused during failover-in-progress} { + test {client paused before and during failover-in-progress} { + # Ensure that replica is online and in sync + r -1 set foo baz + r -1 wait 1 0 + + set rd_blocking [valkey_deferring_client -1] + $rd_blocking client capa redirect + assert_match "OK" [$rd_blocking read] + $rd_blocking brpop list 0 + + wait_for_blocked_clients_count 1 100 10 -1 + pause_process $replica_pid - # replica will never acknowledge this write - r -1 set foo bar + r -1 failover to $replica_host $replica_port TIMEOUT 100 FORCE - # Wait for primary to give up on sync attempt and start failover + # Wait for primary to start failover wait_for_condition 50 100 { [s -1 master_failover_state] == "failover-in-progress" } else { @@ -54,10 +64,10 @@ start_server {tags {needs:repl external:skip}} { set rd [valkey_deferring_client -1] $rd client capa redirect assert_match "OK" [$rd read] - $rd set foo bar + $rd get foo - # Client paused during failover-in-progress, see more details in PR #871 - wait_for_blocked_clients_count 1 100 10 -1 + # Reading and Writing clients paused during failover-in-progress, see more details in PR #871 + wait_for_blocked_clients_count 2 100 10 -1 resume_process $replica_pid @@ -72,7 +82,97 @@ start_server {tags {needs:repl external:skip}} { assert_match *slave* [r -1 role] assert_error "REDIRECT $replica_host:$replica_port" {$rd read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_blocking read} $rd close + $rd_blocking close + } + + test {responses in waiting-for-sync state} { + r -1 replicaof no one + r replicaof $primary_host $primary_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replica not replicating from primary" + } + + # Ensure that replica is online and in sync + r -1 set foo baz + r -1 wait 1 0 + + set rd_brpop_before [valkey_deferring_client -1] + $rd_brpop_before client capa redirect + assert_match "OK" [$rd_brpop_before read] + $rd_brpop_before brpop list 0 + + wait_for_blocked_clients_count 1 100 10 -1 + + pause_process [srv 0 pid] + + set rd_wait [valkey_deferring_client -1] + $rd_wait client capa redirect + $rd_wait read ; # Consume the OK reply + $rd_wait set foo bar + $rd_wait read + $rd_wait wait 1 0 ; # Blocks as we can't sync + + # XREAD is a reading command and thus, should + # not be redirected if the client is read only. + set rd_xread [valkey_deferring_client -1] + $rd_xread client capa redirect + assert_match "OK" [$rd_xread read] + $rd_xread readonly + assert_match "OK" [$rd_xread read] + $rd_xread xread block 0 streams k 0 + + wait_for_blocked_clients_count 3 100 10 -1 + + r -1 failover + + # The replica is not synced and sleeps. Primary is waiting for + # sync. + assert_equal "waiting-for-sync" [s -1 master_failover_state] + + set rd_brpop_after [valkey_deferring_client -1] + $rd_brpop_after client capa redirect + assert_match "OK" [$rd_brpop_after read] + $rd_brpop_after brpop list 0 + + wait_for_blocked_clients_count 4 100 10 -1 + + resume_process [srv 0 pid] + + # Wait for failover to end + wait_for_condition 50 100 { + [s -1 master_failover_state] == "no-failover" + } else { + fail "primary not in no-failover state" + } + + assert_match *master* [r role] + assert_match *slave* [r -1 role] + + # REDIRECT response shall be sent when the new primary is ready, + # i.e. when unblocking all clients after failover + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_before read} + assert_error "REDIRECT $replica_host:$replica_port" {$rd_brpop_after read} + + # WAIT is unblocked in the main loop. Thus, it must have succeeded by now. + assert_equal "1" [$rd_wait read] + + $rd_brpop_before close + $rd_brpop_after close + $rd_wait close + + # As a reading command for a readonly client, XREAD should still be blocked + wait_for_blocked_clients_count 1 100 10 -1 + + r XADD k * foo bar ; # replica is the primary now + set res [$rd_xread read] + set res [lindex [lindex [lindex [lindex $res 0] 1] 0] 1] + assert_equal "foo bar" $res + + $rd_xread close } } }