Skip to content

Commit

Permalink
Make CLUSTER SETSLOT with TIMEOUT 0 block indefinitely (valkey-io#556)
Browse files Browse the repository at this point in the history
This aligns the behaviour with established Valkey commands with a
TIMEOUT argument, such as BLPOP.

Fix valkey-io#422

Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie authored May 27, 2024
1 parent 5d0f4bc commit e4ead94
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
56 changes: 30 additions & 26 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5878,38 +5878,29 @@ char *clusterNodeGetShardId(clusterNode *node) {
return node->shard_id;
}

int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out, int *timeout_out) {
/* clusterParseSetSlotCommand validates the arguments of the CLUSTER SETSLOT command,
* extracts the target slot number (slot_out), and determines the target node (node_out)
* if applicable. It also calculates a timeout value (timeout_out) based on an optional
* timeout argument. If provided, the timeout is added to the current time to obtain an
* absolute timestamp; if omitted, the default timeout CLUSTER_OPERATION_TIMEOUT is used;
* if set to 0, it indicates no timeout. The function returns 1 if successful, and 0
* otherwise, after sending an error message to the client. */
int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out, mstime_t *timeout_out) {
int slot = -1;
clusterNode *n = NULL;
int timeout = 0;
mstime_t timeout = commandTimeSnapshot() + CLUSTER_OPERATION_TIMEOUT;
int optarg_pos = 0;

/* Allow primaries to replicate "CLUSTER SETSLOT" */
if (!(c->flags & CLIENT_MASTER) && nodeIsSlave(myself)) {
addReplyError(c, "Please use SETSLOT only with masters.");
return 0;
}

/* Process optional arguments */
for (int i = 0; i < c->argc;) {
if (!strcasecmp(c->argv[i]->ptr, "timeout")) {
if (i + 1 < c->argc) {
timeout = (int)strtol(c->argv[i + 1]->ptr, NULL, 10);
decrRefCount(c->argv[i]);
decrRefCount(c->argv[i + 1]);
memmove(&c->argv[i], &c->argv[i + 2], c->argc - i - 2);
c->argc -= 2;
continue;
}
addReplyError(c, "Missing timeout value.");
return 0;
}
i++;
}

if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0;

if (!strcasecmp(c->argv[3]->ptr, "migrating") && c->argc >= 5) {
/* Scope the check to primaries only */
/* CLUSTER SETSLOT <SLOT> MIGRATING <NODE> */
if (nodeIsMaster(myself) && server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c, "I'm not the owner of hash slot %u", slot);
return 0;
Expand All @@ -5923,7 +5914,9 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
addReplyError(c, "Target node is not a master");
return 0;
}
if (c->argc > 5) optarg_pos = 5;
} else if (!strcasecmp(c->argv[3]->ptr, "importing") && c->argc >= 5) {
/* CLUSTER SETSLOT <SLOT> IMPORTING <NODE> */
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c, "I'm already the owner of hash slot %u", slot);
return 0;
Expand All @@ -5937,8 +5930,10 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
addReplyError(c, "Target node is not a master");
return 0;
}
if (c->argc > 5) optarg_pos = 5;
} else if (!strcasecmp(c->argv[3]->ptr, "stable") && c->argc >= 4) {
/* Do nothing */
/* CLUSTER SETSLOT <SLOT> STABLE */
if (c->argc > 4) optarg_pos = 4;
} else if (!strcasecmp(c->argv[3]->ptr, "node") && c->argc >= 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
Expand All @@ -5961,11 +5956,23 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,
return 0;
}
}
if (c->argc > 5) optarg_pos = 5;
} else {
addReplyError(c, "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
return 0;
}

/* Process optional arguments */
for (int i = optarg_pos; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr, "timeout")) {
if (i + 1 >= c->argc) {
addReplyError(c, "Missing timeout value");
return 0;
}
if (getTimeoutFromObjectOrReply(c, c->argv[i + 1], &timeout, UNIT_MILLISECONDS) != C_OK) return 0;
}
}

*slot_out = slot;
*node_out = n;
*timeout_out = timeout;
Expand All @@ -5974,7 +5981,7 @@ int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out,

void clusterCommandSetSlot(client *c) {
int slot;
int timeout_ms;
mstime_t timeout_ms;
clusterNode *n;

if (!clusterParseSetSlotCommand(c, &slot, &n, &timeout_ms)) return;
Expand Down Expand Up @@ -6019,10 +6026,7 @@ void clusterCommandSetSlot(client *c) {
* 2. The repl offset target is set to the master's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
if (timeout_ms == 0) {
timeout_ms = CLUSTER_OPERATION_TIMEOUT;
}
blockForPreReplication(c, mstime() + timeout_ms, server.master_repl_offset + 1, myself->numslaves);
blockForPreReplication(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves);
replicationRequestAckFromSlaves();
return;
}
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/cluster/slot-migration.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,27 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
}
}

start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set R1_id [R 1 CLUSTER MYID]

test "CLUSTER SETSLOT with invalid timeouts" {
catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT} e
assert_equal $e "ERR Missing timeout value"

catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT -1} e
assert_equal $e "ERR timeout is negative"

catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT 99999999999999999999} e
assert_equal $e "ERR timeout is not an integer or out of range"

catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT abc} e
assert_equal $e "ERR timeout is not an integer or out of range"

catch {R 0 CLUSTER SETSLOT 609 TIMEOUT 100 MIGRATING $R1_id} e
assert_equal $e "ERR Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"
}
}

start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica-migration no cluster-node-timeout 1000} } {
set R1_id [R 1 CLUSTER MYID]

Expand Down

0 comments on commit e4ead94

Please sign in to comment.