diff --git a/src/replication.c b/src/replication.c index 21ccb0e92d..7878c54362 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3400,6 +3400,18 @@ void replicationRequestAckFromReplicas(void) { server.get_ack_from_replicas = 1; } +/* This function return client woff. If the script is currently running, + * returns the actual client woff */ +long long getClientWriteOffset(client *c) { + if (scriptIsRunning()) { + /* If a script is currently running, the client passed in is a fake + * client, and its woff is always 0. */ + serverAssert(scriptGetClient() == c); + c = scriptGetCaller(); + } + return c->woff; +} + /* Return the number of replicas that already acknowledged the specified * replication offset. */ int replicationCountAcksByOffset(long long offset) { @@ -3439,7 +3451,7 @@ int replicationCountAOFAcksByOffset(long long offset) { void waitCommand(client *c) { mstime_t timeout; long numreplicas, ackreplicas; - long long offset = c->woff; + long long offset = getClientWriteOffset(c); if (server.primary_host) { addReplyError( @@ -3453,7 +3465,7 @@ void waitCommand(client *c) { if (getTimeoutFromObjectOrReply(c, c->argv[2], &timeout, UNIT_MILLISECONDS) != C_OK) return; /* First try without blocking at all. */ - ackreplicas = replicationCountAcksByOffset(c->woff); + ackreplicas = replicationCountAcksByOffset(offset); if (ackreplicas >= numreplicas || c->flag.deny_blocking) { addReplyLongLong(c, ackreplicas); return; @@ -3489,9 +3501,11 @@ void waitaofCommand(client *c) { return; } + long long offset = getClientWriteOffset(c); + /* First try without blocking at all. */ - ackreplicas = replicationCountAOFAcksByOffset(c->woff); - acklocal = server.fsynced_reploff >= c->woff; + ackreplicas = replicationCountAOFAcksByOffset(offset); + acklocal = server.fsynced_reploff >= offset; if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) { addReplyArrayLen(c, 2); addReplyLongLong(c, acklocal); @@ -3501,7 +3515,7 @@ void waitaofCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from replicas. */ - blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal); + blockClientForReplicaAck(c, timeout, offset, numreplicas, numlocal); /* Make sure that the server will send an ACK request to all the replicas * before returning to the event loop. */ diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index 4249ccf8c2..1a73b9d1ab 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -294,7 +294,7 @@ start_server {tags {"scripting"}} { } {0} test {EVAL - Scripts do not block on waitaof} { - run_script {redis.call('incr', 'x') return redis.pcall('waitaof','0','1','0')} 0 + run_script {return redis.pcall('waitaof','0','1','0')} 0 } {* 0} test {EVAL - Scripts do not block on XREAD with BLOCK option} { diff --git a/tests/unit/wait.tcl b/tests/unit/wait.tcl index 3ff97fc669..34bea8014a 100644 --- a/tests/unit/wait.tcl +++ b/tests/unit/wait.tcl @@ -98,6 +98,35 @@ start_server {} { $rd close $rd2 close } + + start_server {} { + test {Setup a new replica} { + r replicaof $master_host $master_port + wait_for_ofs_sync $master r + wait_for_ofs_sync $master $slave + } + + test {WAIT in script will work} { + # Pause the old replica so it can not catch up the offset. + pause_process $slave_pid + + # Primary set a new key and wait the new replica catch up the offset. + $master set foo bar + wait_for_ofs_sync $master r + + # Wait for the new replica to report the acked offset to the primary. + # Because the old replica is paused, so the WAIT can only return 1. + # In an earlier version it returned 2, because the fake client's woff + # is always 0 so WAIT counted all the replicas. + wait_for_condition 50 100 { + [$master eval "return server.call('wait', '2', '0')" 0] eq 1 + } else { + fail "WAIT in script does not work as expected." + } + + resume_process $slave_pid + } + } }}