Skip to content

Commit

Permalink
Fix the wrong woff when execute WAIT / WAITAOF in script
Browse files Browse the repository at this point in the history
When executing the script, the client passed in is a fake
client, and its woff is always 0.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Jul 12, 2024
1 parent 9948f07 commit 0f9e388
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
24 changes: 19 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3400,6 +3400,18 @@ void replicationRequestAckFromReplicas(void) {
server.get_ack_from_replicas = 1;
}

/* This function return client woff. If the script is currently running,
* returns the actual client woff */
long long getClientWriteOffset(client *c) {
if (scriptIsRunning()) {
/* If a script is currently running, the client passed in is a fake
* client, and its woff is always 0. */
serverAssert(scriptGetClient() == c);
c = scriptGetCaller();
}
return c->woff;
}

/* Return the number of replicas that already acknowledged the specified
* replication offset. */
int replicationCountAcksByOffset(long long offset) {
Expand Down Expand Up @@ -3439,7 +3451,7 @@ int replicationCountAOFAcksByOffset(long long offset) {
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
long long offset = getClientWriteOffset();

if (server.primary_host) {
addReplyError(
Expand All @@ -3453,7 +3465,7 @@ void waitCommand(client *c) {
if (getTimeoutFromObjectOrReply(c, c->argv[2], &timeout, UNIT_MILLISECONDS) != C_OK) return;

/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
ackreplicas = replicationCountAcksByOffset(offset);
if (ackreplicas >= numreplicas || c->flag.deny_blocking) {
addReplyLongLong(c, ackreplicas);
return;
Expand Down Expand Up @@ -3489,9 +3501,11 @@ void waitaofCommand(client *c) {
return;
}

long long offset = getClientWriteOffset(c);

/* First try without blocking at all. */
ackreplicas = replicationCountAOFAcksByOffset(c->woff);
acklocal = server.fsynced_reploff >= c->woff;
ackreplicas = replicationCountAOFAcksByOffset(offset);
acklocal = server.fsynced_reploff >= offset;
if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, acklocal);
Expand All @@ -3501,7 +3515,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from replicas. */
blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal);
blockClientForReplicaAck(c, timeout, offset, numreplicas, numlocal);

/* Make sure that the server will send an ACK request to all the replicas
* before returning to the event loop. */
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/scripting.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ start_server {tags {"scripting"}} {

# Temporarily disable test for external until it is stabilized, see https://github.com/valkey-io/valkey/issues/770
test {EVAL - Scripts do not block on waitaof} {
run_script {redis.call('incr', 'x') return redis.pcall('waitaof','0','1','0')} 0
run_script {return redis.pcall('waitaof','0','1','0')} 0
} {0 0} {external:skip}

test {EVAL - Scripts do not block on XREAD with BLOCK option} {
Expand Down

0 comments on commit 0f9e388

Please sign in to comment.