Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the wrong woff when execute WAIT / WAITAOF in script #776

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3400,6 +3400,18 @@ void replicationRequestAckFromReplicas(void) {
server.get_ack_from_replicas = 1;
}

/* This function return client woff. If the script is currently running,
* returns the actual client woff */
long long getClientWriteOffset(client *c) {
if (scriptIsRunning()) {
/* If a script is currently running, the client passed in is a fake
* client, and its woff is always 0. */
serverAssert(scriptGetClient() == c);
c = scriptGetCaller();
}
return c->woff;
}

/* Return the number of replicas that already acknowledged the specified
* replication offset. */
int replicationCountAcksByOffset(long long offset) {
Expand Down Expand Up @@ -3439,7 +3451,7 @@ int replicationCountAOFAcksByOffset(long long offset) {
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
long long offset = getClientWriteOffset(c);

if (server.primary_host) {
addReplyError(
Expand All @@ -3453,7 +3465,7 @@ void waitCommand(client *c) {
if (getTimeoutFromObjectOrReply(c, c->argv[2], &timeout, UNIT_MILLISECONDS) != C_OK) return;

/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
ackreplicas = replicationCountAcksByOffset(offset);
if (ackreplicas >= numreplicas || c->flag.deny_blocking) {
addReplyLongLong(c, ackreplicas);
return;
Expand Down Expand Up @@ -3489,9 +3501,11 @@ void waitaofCommand(client *c) {
return;
}

long long offset = getClientWriteOffset(c);

/* First try without blocking at all. */
ackreplicas = replicationCountAOFAcksByOffset(c->woff);
acklocal = server.fsynced_reploff >= c->woff;
ackreplicas = replicationCountAOFAcksByOffset(offset);
acklocal = server.fsynced_reploff >= offset;
if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flag.deny_blocking) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, acklocal);
Expand All @@ -3501,7 +3515,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from replicas. */
blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal);
blockClientForReplicaAck(c, timeout, offset, numreplicas, numlocal);

/* Make sure that the server will send an ACK request to all the replicas
* before returning to the event loop. */
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/scripting.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ start_server {tags {"scripting"}} {

# Temporarily disable test for external until it is stabilized, see https://github.com/valkey-io/valkey/issues/770
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
test {EVAL - Scripts do not block on waitaof} {
run_script {redis.call('incr', 'x') return redis.pcall('waitaof','0','1','0')} 0
run_script {return redis.pcall('waitaof','0','1','0')} 0
} {0 0} {external:skip}
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

test {EVAL - Scripts do not block on XREAD with BLOCK option} {
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/wait.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ start_server {} {
$rd close
$rd2 close
}

start_server {} {
test {Setup a new replica} {
r replicaof $master_host $master_port
wait_for_ofs_sync $master r
wait_for_ofs_sync $master $slave
}

test {WAIT in script will work} {
# Pause the old replica so it can not catch up the offset.
pause_process $slave_pid

# Primary set a new key and wait the new replica catch up the offset.
$master set foo bar
wait_for_ofs_sync $master r

# Wait for the new replica to report the acked offset to the primary.
# Because the old replica is paused, so the WAIT can only return 1.
# In the old code it will return 2, because the fake client's woff is
# always 0 so WAIT will count all the replicas.
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
wait_for_condition 50 100 {
[$master eval "return server.call('wait', '2', '0')" 0] eq 1
} else {
fail "WAIT in script does not work as expected."
}

resume_process $slave_pid
}
}
}}


Expand Down
Loading