diff --git a/src/replication.c b/src/replication.c index 21ccb0e92d..65ffe65e93 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3400,6 +3400,18 @@ void replicationRequestAckFromReplicas(void) { server.get_ack_from_replicas = 1; } +/* This function return client woff. If the script is currently running, + * returns the actual client woff */ +long long getClientWriteOffset(client *c) { + if (scriptIsRunning()) { + /* If a script is currently running, the client passed in is a fake + * client, and its woff is always 0. */ + serverAssert(scriptGetClient() == c); + c = scriptGetCaller(); + } + return c->woff; +} + /* Return the number of replicas that already acknowledged the specified * replication offset. */ int replicationCountAcksByOffset(long long offset) { @@ -3439,7 +3451,7 @@ int replicationCountAOFAcksByOffset(long long offset) { void waitCommand(client *c) { mstime_t timeout; long numreplicas, ackreplicas; - long long offset = c->woff; + long long offset = getClientWriteOffset(); if (server.primary_host) { addReplyError( @@ -3453,7 +3465,7 @@ void waitCommand(client *c) { if (getTimeoutFromObjectOrReply(c, c->argv[2], &timeout, UNIT_MILLISECONDS) != C_OK) return; /* First try without blocking at all. */ - ackreplicas = replicationCountAcksByOffset(c->woff); + ackreplicas = replicationCountAcksByOffset(offset); if (ackreplicas >= numreplicas || c->flag.deny_blocking) { addReplyLongLong(c, ackreplicas); return; @@ -3489,9 +3501,11 @@ void waitaofCommand(client *c) { return; } + long long offset = getClientWriteOffset(c); + /* First try without blocking at all. */ - ackreplicas = replicationCountAOFAcksByOffset(c->woff); - acklocal = server.fsynced_reploff >= c->woff; + ackreplicas = replicationCountAOFAcksByOffset(offset); + acklocal = server.fsynced_reploff >= offset; if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) { addReplyArrayLen(c, 2); addReplyLongLong(c, acklocal); @@ -3501,7 +3515,7 @@ void waitaofCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from replicas. */ - blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal); + blockClientForReplicaAck(c, timeout, offset, numreplicas, numlocal); /* Make sure that the server will send an ACK request to all the replicas * before returning to the event loop. */ diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index b608d39195..1b635e8b68 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -295,7 +295,7 @@ start_server {tags {"scripting"}} { # Temporarily disable test for external until it is stabilized, see https://github.com/valkey-io/valkey/issues/770 test {EVAL - Scripts do not block on waitaof} { - run_script {redis.call('incr', 'x') return redis.pcall('waitaof','0','1','0')} 0 + run_script {return redis.pcall('waitaof','0','1','0')} 0 } {0 0} {external:skip} test {EVAL - Scripts do not block on XREAD with BLOCK option} {