Skip to content

Commit

Permalink
enhances the reliability and automation of the Valkey cluster re-shar…
Browse files Browse the repository at this point in the history
…ding process,

specifically during slot migrations in the face of primary failures.

Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie committed May 7, 2024
1 parent 93f8a19 commit 091303a
Show file tree
Hide file tree
Showing 17 changed files with 757 additions and 331 deletions.
35 changes: 23 additions & 12 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) {
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -203,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand Down Expand Up @@ -241,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -598,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAITAOF);
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node);
const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);

/* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
Expand Down
949 changes: 672 additions & 277 deletions src/cluster_legacy.c

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"

#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
Expand Down
7 changes: 5 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,9 @@ struct COMMAND_ARG CLUSTER_SET_CONFIG_EPOCH_Args[] = {

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SETSLOT history */
#define CLUSTER_SETSLOT_History NULL
commandHistory CLUSTER_SETSLOT_History[] = {
{"8.0.0","Added the `TIMEOUT` option."},
};
#endif

#ifndef SKIP_CMD_TIPS_TABLE
Expand All @@ -876,6 +878,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_subcommand_Subargs[] = {
struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
{MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs},
{MAKE_ARG("timeout",ARG_TYPE_INTEGER,-1,"TIMEOUT",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="timeout"},
};

/********** CLUSTER SHARDS ********************/
Expand Down Expand Up @@ -969,7 +972,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
Expand Down
19 changes: 17 additions & 2 deletions src/commands/cluster-setslot.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
"arity": -4,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"history": [
[
"8.0.0",
"Added the `TIMEOUT` option."
]
],
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
"STALE"
"STALE",
"MAY_REPLICATE"
],
"arguments": [
{
Expand Down Expand Up @@ -45,6 +52,14 @@
"token": "STABLE"
}
]
},
{
"name": "timeout",
"display": "timeout",
"type": "integer",
"token": "TIMEOUT",
"optional": true,
"since": "8.0.0"
}
],
"reply_schema": {
Expand Down
3 changes: 1 addition & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,8 +873,7 @@ NULL
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) {
replicationFeedSlaves(server.slaves, -1,
c->argv + 2, c->argc - 2);
replicationFeedSlaves(-1, c->argv + 2, c->argc - 2);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) {
sds errstr = sdsnewlen("-",1);
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~CLIENT_EXECUTING_COMMAND;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
2 changes: 1 addition & 1 deletion src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
robj *argv[2];
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
argv[1] = &keyobj;
replicationFeedSlaves(server.slaves,dbid,argv,2);
replicationFeedSlaves(dbid,argv,2);
}
sdsfree(key);
decrRefCount(val);
Expand Down
19 changes: 12 additions & 7 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) {
* received by our clients in order to create the replication stream.
* Instead if the instance is a replica and has sub-replicas attached, we use
* replicationFeedStreamFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
void replicationFeedSlaves(int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];

Expand All @@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) {
if (server.repl_backlog == NULL && listLength(server.slaves) == 0) {
/* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
* even when there's no replication active. This code will not be reached if AOF
* is also disabled. */
Expand All @@ -460,7 +460,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}

/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
serverAssert(!(listLength(server.slaves) != 0 && server.repl_backlog == NULL));

/* Must install write handler for all replicas first before feeding
* replication stream. */
Expand Down Expand Up @@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) {
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));

/* Replicate slot being migrated/imported to the new replica */
clusterReplicateOpenSlots();
return 1;
}

Expand Down Expand Up @@ -3619,8 +3622,8 @@ void unblockClientWaitingReplicas(client *c) {
updateStatsOnUnblock(c, 0, 0, 0);
}

/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
* since we received enough ACKs from slaves. */
/* Check if there are clients blocked in WAIT, WAITAOF, or WAIT_PREREPL
* that can be unblocked since we received enough ACKs from replicas. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
long long last_aof_offset = 0;
Expand All @@ -3637,6 +3640,7 @@ void processClientsWaitingReplicas(void) {

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3686,6 +3690,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down Expand Up @@ -3788,8 +3794,7 @@ void replicationCron(void) {

if (!manual_failover_in_progress) {
ping_argv[0] = shared.ping;
replicationFeedSlaves(server.slaves, -1,
ping_argv, 1);
replicationFeedSlaves(-1, ping_argv, 1);
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) {
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
replicationFeedSlaves(server.slaves, -1, argv, 3);
replicationFeedSlaves(-1, argv, 3);
}

extern int ProcessingEventsWhileBlocked;
Expand Down Expand Up @@ -1999,6 +1999,10 @@ void createSharedObjects(void) {
shared.special_asterick = createStringObject("*",1);
shared.special_equals = createStringObject("=",1);
shared.redacted = makeObjectShared(createStringObject("(redacted)",10));
shared.cluster = createStringObject("CLUSTER", 7);
shared.setslot = createStringObject("SETSLOT", 7);
shared.importing = createStringObject("IMPORTING", 9);
shared.migrating = createStringObject("MIGRATING", 9);

for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
Expand Down Expand Up @@ -3314,7 +3318,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) {
if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF)
feedAppendOnlyFile(dbid,argv,argc);
if (target & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
replicationFeedSlaves(dbid,argv,argc);
}

/* Used inside commands to schedule the propagation of additional commands
Expand Down
8 changes: 6 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand All @@ -415,6 +416,7 @@ typedef enum blocking_type {
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;
Expand Down Expand Up @@ -1334,7 +1336,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
Expand Down Expand Up @@ -2820,7 +2822,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);

/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlaves(int dictid, robj **argv, int argc);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
Expand Down Expand Up @@ -3433,7 +3435,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with);
Expand Down
5 changes: 0 additions & 5 deletions tests/cluster/tests/20-half-migrated-slot.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
# 4. migration is half finished on "migrating" node
# 5. migration is half finished on "importing" node

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {
source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -95,4 +91,3 @@ test "Half-finish importing" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
6 changes: 0 additions & 6 deletions tests/cluster/tests/21-many-slot-migration.tcl
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# Tests for many simultaneous migrations.

# TODO: Test is currently disabled until it is stabilized (fixing the test
# itself or real issues in the server).

if {false} {

source "../tests/includes/init-tests.tcl"
source "../tests/includes/utils.tcl"

Expand Down Expand Up @@ -61,4 +56,3 @@ test "Keys are accessible" {
}

config_set_all_nodes cluster-allow-replica-migration yes
}
11 changes: 4 additions & 7 deletions tests/unit/cluster/cli.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} {
catch { $newnode_r get foo } e
assert_equal "MOVED $slot $owner_host:$owner_port" $e

# Check that the empty node has turned itself into a replica of the new
# owner and that the new owner knows that.
wait_for_condition 1000 50 {
[string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]]
} else {
fail "Empty node didn't turn itself into a replica."
}
# Check that the now empty primary node doesn't turn itself into
# a replica of any other nodes
wait_for_cluster_propagation
assert_match *master* [$owner_r role]
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 2
[llength [R 6 CLUSTER SLOTS]] eq 3
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}

wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}
Expand Down
Loading

0 comments on commit 091303a

Please sign in to comment.