diff --git a/src/blocked.c b/src/blocked.c index ec1d377020..e4ec86779d 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -187,7 +187,8 @@ void unblockClient(client *c, int queue_for_reprocessing) { c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) { + } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF || + c->bstate.btype == BLOCKED_WAIT_PREREPL) { unblockClientWaitingReplicas(c); } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); @@ -203,7 +204,8 @@ void unblockClient(client *c, int queue_for_reprocessing) { /* Reset the client for a new query, unless the client has pending command to process * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ - if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN && + c->bstate.btype != BLOCKED_WAIT_PREREPL) { freeClientOriginalArgv(c); /* Clients that are not blocked on keys are not reprocessed so we must * call reqresAppendResponse here (for clients blocked on key, @@ -241,6 +243,8 @@ void replyToBlockedClientTimedOut(client *c) { addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); + } else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) { + addReplyErrorObject(c, shared.noreplicaserr); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } @@ -598,23 +602,30 @@ static void handleClientsBlockedOnKey(readyList *rl) { } } -/* block a client due to wait command */ -void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { +/* block a client for replica acknowledgement */ +void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) { c->bstate.timeout = timeout; c->bstate.reploffset = offset; c->bstate.numreplicas = numreplicas; - listAddNodeHead(server.clients_waiting_acks,c); - blockClient(c,BLOCKED_WAIT); + c->bstate.numlocal = numlocal; + listAddNodeHead(server.clients_waiting_acks, c); + blockClient(c, btype); +} + +/* block a client due to pre-replication */ +void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { + blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0); + c->flags |= CLIENT_PENDING_COMMAND; +} + +/* block a client due to wait command */ +void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { + blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0); } /* block a client due to waitaof command */ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { - c->bstate.timeout = timeout; - c->bstate.reploffset = offset; - c->bstate.numreplicas = numreplicas; - c->bstate.numlocal = numlocal; - listAddNodeHead(server.clients_waiting_acks,c); - blockClient(c,BLOCKED_WAITAOF); + blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal); } /* Postpone client from executing a command. For example the server might be busy diff --git a/src/cluster.h b/src/cluster.h index a7211615dd..8a2b97cad0 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -103,6 +103,7 @@ char *clusterNodeHostname(clusterNode *node); const char *clusterNodePreferredEndpoint(clusterNode *n); long long clusterNodeReplOffset(clusterNode *node); clusterNode *clusterLookupNode(const char *name, int length); +void clusterReplicateOpenSlots(void); /* functions with shared implementations */ clusterNode *getNodeByQuery(client *c, struct serverCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 6754a0b8e0..f1a8d87b74 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -38,6 +38,7 @@ #include "endianconv.h" #include "connection.h" +#include #include #include #include @@ -65,10 +66,9 @@ list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); -int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); -void clusterSetMaster(clusterNode *n); +void clusterSetMaster(clusterNode *n, int closeSlots); void clusterHandleSlaveFailover(void); void clusterHandleSlaveMigration(int max_slaves); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -133,6 +133,9 @@ static inline int defaultClientPort(void) { #define RCVBUF_INIT_LEN 1024 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */ +/* Fixed timeout value for cluster operations (milliseconds) */ +#define CLUSTER_OPERATION_TIMEOUT 2000 + /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to * clusterNode structures. */ dictType clusterNodesDictType = { @@ -942,6 +945,14 @@ static void updateShardId(clusterNode *node, const char *shard_id) { } } +static inline int areInSameShard(clusterNode *node1, clusterNode *node2) { + return memcmp(node1->shard_id, node2->shard_id, CLUSTER_NAMELEN) == 0; +} + +static inline uint64_t nodeEpoch(clusterNode *n) { + return n->slaveof ? n->slaveof->configEpoch : n->configEpoch; +} + /* Update my hostname based on server configuration values */ void clusterUpdateMyselfHostname(void) { if (!myself) return; @@ -1608,8 +1619,10 @@ void clusterRenameNode(clusterNode *node, char *newname) { int retval; sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG,"Renaming node %.40s into %.40s", - node->name, newname); + serverLog(LL_DEBUG,"Renaming node %.40s (%s) into %.40s", + node->name, + node->human_nodename, + newname); retval = dictDelete(server.cluster->nodes, s); sdsfree(s); serverAssert(retval == DICT_OK); @@ -1945,7 +1958,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { } } -/* Return true if we already have a node in HANDSHAKE state matching the +/* Return 1 if we already have a node in HANDSHAKE state matching the * specified ip address and port number. This function is used in order to * avoid adding a new handshake node for the same address multiple times. */ int clusterHandshakeInProgress(char *ip, int port, int cport) { @@ -2360,19 +2373,27 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc continue; } - /* The slot is in importing state, it should be modified only - * manually via valkey-cli (example: a resharding is in progress - * and the migrating side slot was already closed and is advertising - * a new config. We still want the slot to be closed manually). */ - if (server.cluster->importing_slots_from[j]) continue; - - /* We rebind the slot to the new node claiming it if: - * 1) The slot was unassigned or the previous owner no longer owns the slot or - * the new node claims it with a greater configEpoch. - * 2) We are not currently importing the slot. */ + /* We rebind the slot to the new node claiming it if + * the slot was unassigned or the new node claims it with a + * greater configEpoch. */ if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch < senderConfigEpoch) { + if (!isSlotUnclaimed(j) && + !areInSameShard(server.cluster->slots[j], sender)) + { + serverLog(LL_NOTICE, + "Slot %d is migrated from node %.40s (%s) in shard %.40s" + " to node %.40s (%s) in shard %.40s.", + j, + server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename, + server.cluster->slots[j]->shard_id, + sender->name, + sender->human_nodename, + sender->shard_id); + } + /* Was this slot mine, and still contains keys? Mark it as * a dirty slot. */ if (server.cluster->slots[j] == myself && @@ -2387,21 +2408,145 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc newmaster = sender; migrated_our_slots++; } + + /* If the sender who claims this slot is not in the same shard, + * it must be a result of deliberate operator actions. Therefore, + * we should honor it and clear the outstanding migrating_slots_to + * state for the slot. Otherwise, we are looking at a failover within + * the same shard and we should retain the migrating_slots_to state + * for the slot in question */ + if (server.cluster->migrating_slots_to[j] != NULL) { + if (!areInSameShard(sender, myself)) { + serverLog(LL_NOTICE, + "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.", + j, + server.cluster->migrating_slots_to[j]->name, + server.cluster->migrating_slots_to[j]->human_nodename, + server.cluster->migrating_slots_to[j]->shard_id); + server.cluster->migrating_slots_to[j] = NULL; + } + } + + /* Handle the case where we are importing this slot and the ownership changes */ + if (server.cluster->importing_slots_from[j] != NULL && + server.cluster->importing_slots_from[j] != sender) + { + /* Update importing_slots_from to point to the sender, if it is in the + * same shard as the previous slot owner */ + if (areInSameShard(sender, server.cluster->importing_slots_from[j])) { + serverLog(LL_NOTICE, + "Failover occurred in migration source. Update importing " + "source for slot %d to node %.40s (%s) in shard %.40s.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->importing_slots_from[j] = sender; + } else { + /* If the sender is from a different shard, it must be a result + * of deliberate operator actions. We should clear the importing + * state to conform to the operator's will. */ + serverLog(LL_NOTICE, + "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s.", + j, + server.cluster->importing_slots_from[j]->name, + server.cluster->importing_slots_from[j]->human_nodename, + server.cluster->importing_slots_from[j]->shard_id); + server.cluster->importing_slots_from[j] = NULL; + } + } + clusterDelSlot(j); clusterAddSlot(sender,j); + bitmapClearBit(server.cluster->owner_not_claiming_slot, j); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE| CLUSTER_TODO_FSYNC_CONFIG); } - } else if (server.cluster->slots[j] == sender) { - /* The slot is currently bound to the sender but the sender is no longer - * claiming it. We don't want to unbind the slot yet as it can cause the cluster - * to move to FAIL state and also throw client error. Keeping the slot bound to - * the previous owner will cause a few client side redirects, but won't throw - * any errors. We will keep track of the uncertainty in ownership to avoid - * propagating misinformation about this slot's ownership using UPDATE - * messages. */ - bitmapSetBit(server.cluster->owner_not_claiming_slot, j); + } else { + if (server.cluster->slots[j] == sender) { + /* The slot is currently bound to the sender but the sender is no longer + * claiming it. We don't want to unbind the slot yet as it can cause the cluster + * to move to FAIL state and also throw client error. Keeping the slot bound to + * the previous owner will cause a few client side redirects, but won't throw + * any errors. We will keep track of the uncertainty in ownership to avoid + * propagating misinformation about this slot's ownership using UPDATE + * messages. */ + bitmapSetBit(server.cluster->owner_not_claiming_slot, j); + } + + /* If the sender doesn't claim the slot, check if we are migrating + * any slot to its shard and if there is a primaryship change in + * the shard. Update the migrating_slots_to state to point to the + * sender if it has just taken over the primary role. */ + if (server.cluster->migrating_slots_to[j] != NULL && + server.cluster->migrating_slots_to[j] != sender && + (server.cluster->migrating_slots_to[j]->configEpoch < senderConfigEpoch || + nodeIsSlave(server.cluster->migrating_slots_to[j])) && + areInSameShard(server.cluster->migrating_slots_to[j], sender)) + { + serverLog(LL_NOTICE, + "Failover occurred in migration target." + " Slot %d is now being migrated to node %.40s (%s) in shard %.40s.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->migrating_slots_to[j] = sender; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } + + /* If the sender is no longer the owner of the slot, and I am a primary + * and I am still in the process of importing the slot from the sender, + * there are two possibilities: + * + * 1. I could be a replica of the target primary and missed the slot + * finalization step on my primary due to my primary crashing during + * the slot migration process. + * 2. I could be the original primary and missed the slot finalization + * step entirely. + * + * To ensure complete slot coverage in either case, the following steps + * will be taken: + * + * 1. Remove the importing state for the specific slot. + * 2. Finalize the slot's ownership, if I am not already the owner of + * the slot. */ + if (nodeIsMaster(myself) && + server.cluster->importing_slots_from[j] == sender) + { + serverLog(LL_NOTICE, + "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s;" + " Clear my importing source for the slot.", + j, + sender->name, + sender->human_nodename, + sender->shard_id); + server.cluster->importing_slots_from[j] = NULL; + /* Take over the slot ownership if I am not the owner yet*/ + if (server.cluster->slots[j] != myself) { + /* A primary reason why we are here is likely due to my primary crashing during the + * slot finalization process, leading me to become the new primary without + * inheriting the slot ownership, while the source shard continued and relinquished + * theslot to its old primary. Under such circumstances, the node would undergo + * an election and have its config epoch increased with consensus. That said, we + * will still explicitly bump the config epoch here to be consistent with the + * existing practice. + * Nevertheless, there are scenarios where the source shard may have transferred slot + * to a different shard. In these cases, the bumping of the config epoch + * could result in that slot assignment getting reverted. However, we consider + * this as a very rare case and err on the side of being consistent with the current + * practice. */ + clusterDelSlot(j); + clusterAddSlot(myself,j); + clusterBumpConfigEpochWithoutConsensus(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } + } } } @@ -2411,40 +2556,66 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) return; - /* If at least one slot was reassigned from a node to another node - * with a greater configEpoch, it is possible that: - * 1) We are a master left without slots. This means that we were - * failed over and we should turn into a replica of the new - * master. - * 2) We are a slave and our master is left without slots. We need - * to replicate to the new slots owner. */ - if (newmaster && curmaster->numslots == 0 && - (server.cluster_allow_replica_migration || - sender_slots == migrated_our_slots)) { - serverLog(LL_NOTICE, - "Configuration change detected. Reconfiguring myself " - "as a replica of %.40s (%s)", sender->name, sender->human_nodename); - clusterSetMaster(sender); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| - CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_FSYNC_CONFIG); - } else if (myself->slaveof && myself->slaveof->slaveof && - /* In some rare case when CLUSTER FAILOVER TAKEOVER is used, it - * can happen that myself is a replica of a replica of myself. If - * this happens, we do nothing to avoid a crash and wait for the - * admin to repair the cluster. */ - myself->slaveof->slaveof != myself) + /* Handle a special case where newmaster is not set but both sender + * and myself own no slots and in the same shard. Set the sender as + * the new primary if my current config epoch is lower than the + * sender's. */ + if (!newmaster && + myself->slaveof != sender && + sender_slots == 0 && + myself->numslots == 0 && + nodeEpoch(myself) < senderConfigEpoch && + areInSameShard(sender, myself)) { - /* Safeguard against sub-replicas. A replica's master can turn itself - * into a replica if its last slot is removed. If no other node takes - * over the slot, there is nothing else to trigger replica migration. */ - serverLog(LL_NOTICE, - "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s (%s)", - myself->slaveof->slaveof->name, myself->slaveof->slaveof->human_nodename); - clusterSetMaster(myself->slaveof->slaveof); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| - CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_FSYNC_CONFIG); + newmaster = sender; + } + + /* If the shard to which this node (myself) belongs loses all of + * its slots, this node should become a replica of the sender if + * one of the following conditions is true: + * + * 1. cluster-allow-replication-migration is enabled + * 2. all the lost slots go to the sender and the sender belongs + * to this node's shard + * + * Note that case 2 can happen in one of the following scenarios: + * 1) we were a primary and the sender was a replica in the same + * shard but just became the primary after a failover + * 2) we were a replica and our primary lost all of its slots to + * the sender who was another replica in the same shard and has + * just become the primary after a failover + * + * It is also possible that the sender is a primary in a different + * shard and our primary just had its last slot migrated to the + * sender. In this case we don't reconfigure ourselves as a replica + * of the sender. */ + if (newmaster && curmaster->numslots == 0) { + if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) { + serverLog(LL_NOTICE, + "Configuration change detected. Reconfiguring myself " + "as a replica of node %.40s (%s) in shard %.40s", + sender->name, + sender->human_nodename, + sender->shard_id); + /* Don't clear the migrating/importing states if this is a replica that + * just gets promoted to the new primary in the shard. */ + clusterSetMaster(sender, !areInSameShard(sender, myself)); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } else if ((sender_slots >= migrated_our_slots) && + !areInSameShard(sender, myself)) + { + /* When all our slots are lost to the sender and the sender belongs to + * a different shard, this is likely due to a client triggered slot + * migration. Don't reconfigure this node to migrate to the new shard + * in this case. */ + serverLog(LL_NOTICE, + "My last slot was migrated to node %.40s (%s) in shard %.40s. I am now an empty master.", + sender->name, + sender->human_nodename, + sender->shard_id); + } } else if (dirty_slots_count) { /* If we are here, we received an update message which removed * ownership for certain slots we still have keys about, but still @@ -2453,8 +2624,16 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * * In order to maintain a consistent state between keys and slots * we need to remove all the keys from the slots we lost. */ - for (j = 0; j < dirty_slots_count; j++) + for (int j = 0; j < dirty_slots_count; j++) { + serverLog(LL_NOTICE, + "Deleting keys in dirty slot %d on node %.40s (%s) in shard %.40s", + dirty_slots[j], + myself->name, + myself->human_nodename, + myself->shard_id + ); delKeysInSlot(dirty_slots[j]); + } } } @@ -2691,49 +2870,38 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { return sender; } -/* When this function is called, there is a packet to process starting - * at link->rcvbuf. Releasing the buffer is up to the caller, so this - * function should just handle the higher level stuff of processing the - * packet, modifying the cluster state if needed. - * - * The function returns 1 if the link is still valid after the packet - * was processed, otherwise 0 if the link was freed since the packet - * processing lead to some inconsistency error (for instance a PONG - * received from the wrong sender ID). */ -int clusterProcessPacket(clusterLink *link) { +int clusterIsValidPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg*) link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); - mstime_t now = mstime(); - if (type < CLUSTERMSG_TYPE_COUNT) - server.cluster->stats_bus_messages_received[type]++; - serverLog(LL_DEBUG,"--- Processing packet of type %s, %lu bytes", - clusterGetMessageTypeString(type), (unsigned long) totlen); + if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++; + + serverLog(LL_DEBUG, + "--- Processing packet of type %s, %lu bytes", + clusterGetMessageTypeString(type), + (unsigned long) totlen); /* Perform sanity checks */ - if (totlen < 16) return 1; /* At least signature, version, totlen, count. */ - if (totlen > link->rcvbuf_len) return 1; + if (totlen < 16) return 0; /* At least signature, version, totlen, count. */ + if (totlen > link->rcvbuf_len) return 0; if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) { /* Can't handle messages of different versions. */ - return 1; + return 0; } if (type == server.cluster_drop_packet_filter) { serverLog(LL_WARNING, "Dropping packet that matches debug drop filter"); - return 1; + return 0; } - uint16_t flags = ntohs(hdr->flags); - uint16_t extensions = ntohs(hdr->extensions); - uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; uint32_t explen; /* expected length of this packet */ - clusterNode *sender; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { + uint16_t extensions = ntohs(hdr->extensions); uint16_t count = ntohs(hdr->count); explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); @@ -2748,13 +2916,13 @@ int clusterProcessPacket(clusterLink *link) { if (extlen % 8 != 0) { serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)", clusterGetMessageTypeString(type), (int) extlen); - return 1; + return 0; } if ((totlen - explen) < extlen) { serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds " "total packet length (%lld)", clusterGetMessageTypeString(type), (unsigned long long) totlen); - return 1; + return 0; } explen += extlen; ext = getNextPingExt(ext); @@ -2789,10 +2957,33 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) { serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld", clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen); - return 1; + return 0; } - sender = getNodeFromLinkAndMsg(link, hdr); + return 1; +} + +/* When this function is called, there is a packet to process starting + * at link->rcvbuf. Releasing the buffer is up to the caller, so this + * function should just handle the higher level stuff of processing the + * packet, modifying the cluster state if needed. + * + * The function returns 1 if the link is still valid after the packet + * was processed, otherwise 0 if the link was freed since the packet + * processing lead to some inconsistency error (for instance a PONG + * received from the wrong sender ID). */ +int clusterProcessPacket(clusterLink *link) { + + /* Validate that the packet is well-formed */ + if (!clusterIsValidPacket(link)) return 1; + + clusterMsg *hdr = (clusterMsg*) link->rcvbuf; + uint16_t type = ntohs(hdr->type); + mstime_t now = mstime(); + + uint16_t flags = ntohs(hdr->flags); + uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; + clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; @@ -2810,8 +3001,9 @@ int clusterProcessPacket(clusterLink *link) { senderConfigEpoch = ntohu64(hdr->configEpoch); if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; - /* Update the sender configEpoch if it is publishing a newer one. */ - if (senderConfigEpoch > sender->configEpoch) { + /* Update the sender configEpoch if it is a primary publishing a newer one. */ + if (!memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof)) && + senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_FSYNC_CONFIG); @@ -2929,8 +3121,11 @@ int clusterProcessPacket(clusterLink *link) { /* If the reply has a non matching node ID we * disconnect this node and set it as not having an associated * address. */ - serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", + serverLog(LL_DEBUG, + "PONG contains mismatching sender ID. About node %.40s (%s) in shard %.40s added %d ms ago, having flags %d", link->node->name, + link->node->human_nodename, + link->node->shard_id, (int)(now-(link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; @@ -2987,9 +3182,13 @@ int clusterProcessPacket(clusterLink *link) { /* Check for role switch: slave -> master or master -> slave. */ if (sender) { - if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME, - sizeof(hdr->slaveof))) - { + serverLog(LL_DEBUG, + "node %.40s (%s) announces that it is a %s in shard %.40s", + sender->name, + sender->human_nodename, + !memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof)) ? "master" : "slave", + sender->shard_id); + if (!memcmp(hdr->slaveof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->slaveof))) { /* Node is a master. */ clusterSetNodeAsMaster(sender); } else { @@ -2998,7 +3197,7 @@ int clusterProcessPacket(clusterLink *link) { if (clusterNodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */ - if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) { + if (master && areInSameShard(master, sender)) { /* `sender` was a primary and was in the same shard as `master`, its new primary */ if (sender->configEpoch > senderConfigEpoch) { serverLog(LL_NOTICE, @@ -3010,18 +3209,14 @@ int clusterProcessPacket(clusterLink *link) { (unsigned long long)senderConfigEpoch, (unsigned long long)sender->configEpoch); } else { - /* A failover occurred in the shard where `sender` belongs to and `sender` is no longer - * a primary. Update slot assignment to `master`, which is the new primary in the shard */ - int slots = clusterMoveNodeSlots(sender, master); /* `master` is still a `slave` in this observer node's view; update its role and configEpoch */ clusterSetNodeAsMaster(master); master->configEpoch = senderConfigEpoch; serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" - " lost %d slot(s) to node %.40s (%s) with a config epoch of %llu", + " failed over to node %.40s (%s) with a config epoch of %llu", sender->shard_id, sender->name, sender->human_nodename, - slots, master->name, master->human_nodename, (unsigned long long) master->configEpoch); @@ -3042,6 +3237,7 @@ int clusterProcessPacket(clusterLink *link) { master->shard_id); } } + sender->flags &= ~(CLUSTER_NODE_MASTER| CLUSTER_NODE_MIGRATE_TO); sender->flags |= CLUSTER_NODE_SLAVE; @@ -3055,6 +3251,13 @@ int clusterProcessPacket(clusterLink *link) { if (master && sender->slaveof != master) { if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender); + serverLog(LL_NOTICE, + "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s", + sender->name, + sender->human_nodename, + master->name, + master->human_nodename, + sender->shard_id); clusterNodeAddSlave(master,sender); sender->slaveof = master; @@ -3083,16 +3286,70 @@ int clusterProcessPacket(clusterLink *link) { if (sender) { sender_master = clusterNodeIsMaster(sender) ? sender : sender->slaveof; if (sender_master) { - dirty_slots = memcmp(sender_master->slots, - hdr->myslots,sizeof(hdr->myslots)) != 0; + dirty_slots = memcmp(sender_master->slots, hdr->myslots, sizeof(hdr->myslots)) != 0; + + /* Force dirty when the sending shard owns no slots so that + * we have a chance to examine and repair slot migrating/importing + * states that involve empty shards. */ + dirty_slots |= sender_master->numslots == 0; } } /* 1) If the sender of the message is a master, and we detected that * the set of slots it claims changed, scan the slots to see if we * need to update our configuration. */ - if (sender && clusterNodeIsMaster(sender) && dirty_slots) - clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots); + if (sender_master && dirty_slots) + clusterUpdateSlotsConfigWith(sender_master, senderConfigEpoch, hdr->myslots); + + /* Explicitly check for a replication loop before attempting the replication + * chain folding logic. + * + * In some rare case, slot config updates (via either PING/PONG or UPDATE) + * can be delivered out of order as illustrated below. + * + * 1. To keep the discussion simple, let's assume we have 2 shards, shard a + * and shard b. Let's also assume there are two slots in total with shard + * a owning slot 1 and shard b owning slot 2. + * 2. Shard a has two nodes: primary A and replica A*; shard b has primary + * B and replica B*. + * 3. A manual failover was initiated on A* and A* just wins the election. + * 4. A* announces to the world that it now owns slot 1 using PING messages. + * These PING messages are queued in the outgoing buffer to every other + * node in the cluster, namely, A, B, and B*. + * 5. Keep in mind that there is no ordering in the delivery of these PING + * messages. For the stale PING message to appear, we need the following + * events in the exact order as they are laid out. + * a. An old PING message before A* becomes the new primary is still queued + * in A*'s outgoing buffer to A. This later becomes the stale message, + * which says A* is a replica of A. It is followed by A*'s election + * winning announcement PING message. + * b. B or B* processes A's election winning announcement PING message + * and sets slots[1]=A*. + * c. A sends a PING message to B (or B*). Since A hasn't learnt that A* + * wins the election, it claims that it owns slot 1 but with a lower + * epoch than B has on slot 1. This leads to B sending an UPDATE to + * A directly saying A* is the new owner of slot 1 with a higher epoch. + * d. A receives the UPDATE from B and executes clusterUpdateSlotsConfigWith. + * A now realizes that it is a replica of A* hence setting myself->slaveof + * to A*. + * e. Finally, the pre-failover PING message queued up in A*'s outgoing + * buffer to A is delivered and processed, out of order though, to A. + * f. This stale PING message creates the replication loop */ + if (myself->slaveof && + myself->slaveof->slaveof && + myself->slaveof->slaveof != myself) { + /* Safeguard against sub-replicas. A replica's master can turn itself + * into a replica if its last slot is removed. If no other node takes + * over the slot, there is nothing else to trigger replica migration. */ + serverLog(LL_NOTICE, + "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", + myself->slaveof->slaveof->name, + myself->slaveof->name); + clusterSetMaster(myself->slaveof->slaveof, 1); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); + } /* 2) We also check for the reverse condition, that is, the sender * claims to serve slots we know are served by a master with a @@ -4227,7 +4484,7 @@ void clusterHandleSlaveFailover(void) { * Retry is two times the Timeout. */ auth_timeout = server.cluster_node_timeout*2; - if (auth_timeout < 2000) auth_timeout = 2000; + if (auth_timeout < CLUSTER_OPERATION_TIMEOUT) auth_timeout = CLUSTER_OPERATION_TIMEOUT; auth_retry_time = auth_timeout*2; /* Pre conditions to run the function, that must be met both in case @@ -4235,13 +4492,11 @@ void clusterHandleSlaveFailover(void) { * 1) We are a slave. * 2) Our master is flagged as FAIL, or this is a manual failover. * 3) We don't have the no failover configuration set, and this is - * not a manual failover. - * 4) It is serving slots. */ + * not a manual failover. */ if (clusterNodeIsMaster(myself) || myself->slaveof == NULL || (!nodeFailed(myself->slaveof) && !manual_failover) || - (server.cluster_slave_no_failover && !manual_failover) || - myself->slaveof->numslots == 0) + (server.cluster_slave_no_failover && !manual_failover)) { /* There are no reasons to failover, so we set the reason why we * are returning without failing over to NONE. */ @@ -4486,9 +4741,11 @@ void clusterHandleSlaveMigration(int max_slaves) { (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY && !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { - serverLog(LL_NOTICE,"Migrating to orphaned master %.40s", - target->name); - clusterSetMaster(target); + serverLog(LL_NOTICE,"Migrating to orphaned master %.40s (%s) in shard %.40s", + target->name, + target->human_nodename, + target->shard_id); + clusterSetMaster(target, 1); } } @@ -4859,7 +5116,7 @@ void clusterBeforeSleep(void) { if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) { /* Handle manual failover as soon as possible so that won't have a 100ms * as it was handled only in clusterCron */ - if(nodeIsSlave(myself)) { + if (nodeIsSlave(myself)) { clusterHandleManualFailover(); if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) clusterHandleSlaveFailover(); @@ -4976,6 +5233,7 @@ int clusterAddSlot(clusterNode *n, int slot) { if (server.cluster->slots[slot]) return C_ERR; clusterNodeSetSlotBit(n,slot); server.cluster->slots[slot] = n; + bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); return C_OK; } @@ -4997,22 +5255,6 @@ int clusterDelSlot(int slot) { return C_OK; } -/* Transfer slots from `from_node` to `to_node`. - * Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`. - * Counts and returns the number of slots transferred. */ -int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) { - int processed = 0; - - for (int j = 0; j < CLUSTER_SLOTS; j++) { - if (clusterNodeCoversSlot(from_node, j)) { - clusterDelSlot(j); - clusterAddSlot(to_node, j); - processed++; - } - } - return processed; -} - /* Delete all the slots associated with the specified node. * The number of deleted slots is returned. */ int clusterDelNodeSlots(clusterNode *node) { @@ -5148,14 +5390,9 @@ void clusterUpdateState(void) { /* This function is called after the node startup in order to verify that data * loaded from disk is in agreement with the cluster configuration: * - * 1) If we find keys about hash slots we have no responsibility for, the - * following happens: - * A) If no other node is in charge according to the current cluster - * configuration, we add these slots to our node. - * B) If according to our config other nodes are already in charge for - * this slots, we set the slots as IMPORTING from our point of view - * in order to justify we have those slots, and in order to make - * valkey-cli aware of the issue, so that it can try to fix it. + * 1) If we find keys about hash slots we have no responsibility for and + * no other node is in charge according to the current cluster + * configuration, we add these slots to our node. * 2) If we find data in a DB different than DB0 we return C_ERR to * signal the caller it should quit the server with an error message * or take other actions. @@ -5200,16 +5437,27 @@ int verifyClusterConfigWithData(void) { * assigned to this slot. Fix this condition. */ update_config++; - /* Case A: slot is unassigned. Take responsibility for it. */ + /* slot is unassigned. Take responsibility for it. */ if (server.cluster->slots[j] == NULL) { serverLog(LL_NOTICE, "I have keys for unassigned slot %d. " "Taking responsibility for it.",j); clusterAddSlot(myself,j); - } else { - serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " - "assigned to another node. " - "Setting it to importing state.",j); - server.cluster->importing_slots_from[j] = server.cluster->slots[j]; + } else if (server.cluster->importing_slots_from[j] != server.cluster->slots[j]) { + if (server.cluster->importing_slots_from[j] == NULL) { + serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " + "assigned to another node. Deleting keys in the slot.", j); + } else { + serverLog(LL_NOTICE, "I am importing keys from node %.40s (%s) in shard %.40s to slot %d, " + "but the slot is now owned by node %.40s (%s) in shard %.40s. Deleting keys in the slot", + server.cluster->importing_slots_from[j]->name, + server.cluster->importing_slots_from[j]->human_nodename, + server.cluster->importing_slots_from[j]->shard_id, + j, + server.cluster->slots[j]->name, + server.cluster->slots[j]->human_nodename, + server.cluster->slots[j]->shard_id); + } + delKeysInSlot(j); } } if (update_config) clusterSaveConfigOrDie(1); @@ -5233,18 +5481,18 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { /* Set the specified node 'n' as master for this node. * If this node is currently a master, it is turned into a slave. */ -void clusterSetMaster(clusterNode *n) { +void clusterSetMaster(clusterNode *n, int closeSlots) { serverAssert(n != myself); serverAssert(myself->numslots == 0); if (clusterNodeIsMaster(myself)) { myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO); myself->flags |= CLUSTER_NODE_SLAVE; - clusterCloseAllSlots(); } else { if (myself->slaveof) clusterNodeRemoveSlave(myself->slaveof,myself); } + if (closeSlots) clusterCloseAllSlots(); myself->slaveof = n; updateShardId(myself, n->shard_id); clusterNodeAddSlave(n,myself); @@ -5350,15 +5598,11 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) { else ci = sdscatlen(ci,"-",1); - unsigned long long nodeEpoch = node->configEpoch; - if (nodeIsSlave(node) && node->slaveof) { - nodeEpoch = node->slaveof->configEpoch; - } /* Latency from the POV of this node, config epoch, link status */ ci = sdscatfmt(ci," %I %I %U %s", (long long) node->ping_sent, (long long) node->pong_received, - nodeEpoch, + nodeEpoch(node), (node->link || node->flags & CLUSTER_NODE_MYSELF) ? "connected" : "disconnected"); @@ -5731,7 +5975,6 @@ sds genClusterInfoString(void) { sds info = sdsempty(); char *statestr[] = {"ok","fail"}; int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0; - uint64_t myepoch; int j; for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -5748,9 +5991,6 @@ sds genClusterInfoString(void) { } } - myepoch = (nodeIsSlave(myself) && myself->slaveof) ? - myself->slaveof->configEpoch : myself->configEpoch; - info = sdscatprintf(info, "cluster_state:%s\r\n" "cluster_slots_assigned:%d\r\n" @@ -5769,7 +6009,7 @@ sds genClusterInfoString(void) { dictSize(server.cluster->nodes), server.cluster->size, (unsigned long long) server.cluster->currentEpoch, - (unsigned long long) myepoch + (unsigned long long) nodeEpoch(myself) ); /* Show stats about messages sent and received. */ @@ -5986,6 +6226,245 @@ char *clusterNodeGetShardId(clusterNode *node) { return node->shard_id; } +int clusterParseSetSlotCommand(client *c, int *slot_out, clusterNode **node_out, int *timeout_out) { + int slot = -1; + clusterNode *n = NULL; + int timeout = 0; + + /* Allow primaries to replicate "CLUSTER SETSLOT" */ + if (!(c->flags & CLIENT_MASTER) && nodeIsSlave(myself)) { + addReplyError(c,"Please use SETSLOT only with masters."); + return 0; + } + + /* Process optional arguments */ + for (int i = 0; i < c->argc;) { + if (!strcasecmp(c->argv[i]->ptr, "timeout")) { + if(i+1 < c->argc) { + timeout = (int)strtol(c->argv[i+1]->ptr, NULL, 10); + decrRefCount(c->argv[i]); + decrRefCount(c->argv[i+1]); + memmove(&c->argv[i], &c->argv[i+2], c->argc-i-2); + c->argc -= 2; + continue; + } + addReplyError(c, "Missing timeout value."); + return 0; + } + i++; + } + + if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 0; + + if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc >= 5) { + /* Scope the check to primaries only */ + if (nodeIsMaster(myself) && server.cluster->slots[slot] != myself) { + addReplyErrorFormat(c,"I'm not the owner of hash slot %u", slot); + return 0; + } + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[4]->ptr); + return 0; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 0; + } + } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc >= 5) { + if (server.cluster->slots[slot] == myself) { + addReplyErrorFormat(c, "I'm already the owner of hash slot %u", slot); + return 0; + } + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", (char*)c->argv[4]->ptr); + return 0; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 0; + } + } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc >= 4) { + /* Do nothing */ + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc >= 5) { + /* CLUSTER SETSLOT NODE */ + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr); + return 0; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 0; + } + /* If this hash slot was served by 'myself' before to switch + * make sure there are no longer local keys for this hash slot. */ + if (server.cluster->slots[slot] == myself && n != myself) { + if (countKeysInSlot(slot) != 0) { + addReplyErrorFormat(c, + "Can't assign hashslot %d to a different node " + "while I still hold keys for this hash slot.", slot); + return 0; + } + } + } else { + addReplyError(c, + "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"); + return 0; + } + + *slot_out = slot; + *node_out = n; + *timeout_out = timeout; + return 1; +} + +void clusterSetSlotCommand(client *c) { + int slot; + int timeout_ms; + clusterNode *n; + + if (!clusterParseSetSlotCommand(c, &slot, &n, &timeout_ms)) return; + + /* Enhance cluster topology change resilience against primary failures by + * replicating SETSLOT before execution. + * + * Cluster topology changes such slot ownership and migrating states must + * be replicated to replicas before applying them to the primary. This + * guarantees that after a command is successfully executed, the new state + * won't be lost due to a primary node failure. The following example + * illustrates how a cluster state can be lost during slot ownership + * finalization: + * + * When finalizing the slot, the target primary node B might send a cluster + * PONG to the source primary node A before the SETSLOT command is replicated + * to replica node B'. If primary node B crashes at this point, B' will be in + * the importing state and the slot will have no owner. + * + * To mitigate this issue, the following order needs to be enforced for slot + * migration finalization such that the replicas finalize the slot ownership + * before the primary: + . * + * 1. Client C issues SETSLOT n NODE B against node B. + * 2. Primary B replicates `SETSLOT n NODE B` to all of its replicas (e.g., B', B''). + * 3. Upon replication completion, primary B executes `SETSLOT n NODE B` and + * returns success to client C. + * 4. The following steps can happen in parallel: + * a. Client C issues `SETSLOT n NODE B` against parimary A. + * b. Primary B gossips its new slot ownership to the cluster (including A, A', etc.). + * + * This ensures that all replicas have the latest topology information, enabling + * a reliable slot ownership transfer even if the primary node went down during + * the process. */ + if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) { + forceCommandPropagation(c, PROPAGATE_REPL); + /* We are a primary and this is the first time we see this `SETSLOT` + * command. Force-replicate the command to all of our replicas + * first and only on success will we handle the command. + * Note that + * 1. All replicas are expected to ack the replication within the given timeout + * 2. The repl offset target is set to the master's current repl offset + 1. + * There is no concern of partial replication because replicas always + * ack the repl offset at the command boundary. */ + if (timeout_ms == 0) { + timeout_ms = CLUSTER_OPERATION_TIMEOUT; + } + blockForPreReplication(c, mstime()+timeout_ms, server.master_repl_offset+1, myself->numslaves); + replicationRequestAckFromSlaves(); + return; + } + + /* Slot states have been updated on the replicas (if any). + * Now exuecte the command on the primary. */ + if (!strcasecmp(c->argv[3]->ptr,"migrating")) { + serverLog(LL_NOTICE, + "Migrating slot %d to node %.40s (%s)", + slot, + n->name, + n->human_nodename); + server.cluster->migrating_slots_to[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"importing")) { + serverLog(LL_NOTICE, + "Importing slot %d from node %.40s (%s)", + slot, + n->name, + n->human_nodename); + server.cluster->importing_slots_from[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"stable")) { + /* CLUSTER SETSLOT STABLE */ + serverLog(LL_NOTICE, "Marking slot %d stable", slot); + server.cluster->importing_slots_from[slot] = NULL; + server.cluster->migrating_slots_to[slot] = NULL; + } else if (!strcasecmp(c->argv[3]->ptr,"node")) { + /* CLUSTER SETSLOT NODE */ + serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s", + slot, + n->name, + n->human_nodename, + n->shard_id); + + /* If this slot is in migrating status but we have no keys + * for it assigning the slot to another node will clear + * the migrating status. */ + if (countKeysInSlot(slot) == 0 && server.cluster->migrating_slots_to[slot]) { + server.cluster->migrating_slots_to[slot] = NULL; + } + + int slot_was_mine = server.cluster->slots[slot] == myself; + clusterDelSlot(slot); + clusterAddSlot(n, slot); + + /* If we are a master left without slots, we should turn into a + * replica of the new master. */ + if (slot_was_mine && + n != myself && + myself->numslots == 0 && + server.cluster_allow_replica_migration) { + serverLog(LL_NOTICE, + "Lost my last slot during slot migration. Reconfiguring myself " + "as a replica of %.40s (%s) in shard %.40s", + n->name, + n->human_nodename, + n->shard_id); + clusterSetMaster(n, 1); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | + CLUSTER_TODO_UPDATE_STATE | + CLUSTER_TODO_FSYNC_CONFIG); + } + + /* If this node or this node's primary was importing this slot, + * assigning the slot to itself also clears the importing status. */ + if ((n == myself || n == myself->slaveof) && server.cluster->importing_slots_from[slot]) { + server.cluster->importing_slots_from[slot] = NULL; + + /* Only primary broadcasts the updates */ + if (n == myself) { + /* This slot was manually migrated, set this node configEpoch + * to a new epoch so that the new version can be propagated + * by the cluster. + * + * Note that if this ever results in a collision with another + * node getting the same configEpoch, for example because a + * failover happens at the same time we close the slot, the + * configEpoch collision resolution will fix it assigning + * a different epoch to each node. */ + if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { + serverLog(LL_NOTICE, + "ConfigEpoch updated after importing slot %d", + slot); + } + /* After importing this slot, let the other nodes know as + * soon as possible. */ + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + } + } + } + + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); + addReply(c,shared.ok); +} + int clusterCommandSpecial(client *c) { if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) { /* CLUSTER MEET [cport] */ @@ -6096,130 +6575,7 @@ int clusterCommandSpecial(client *c) { /* SETSLOT 10 IMPORTING */ /* SETSLOT 10 STABLE */ /* SETSLOT 10 NODE */ - int slot; - clusterNode *n; - - if (nodeIsSlave(myself)) { - addReplyError(c,"Please use SETSLOT only with masters."); - return 1; - } - - if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 1; - - if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { - if (server.cluster->slots[slot] != myself) { - addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); - return 1; - } - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (n == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[4]->ptr); - return 1; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return 1; - } - server.cluster->migrating_slots_to[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { - if (server.cluster->slots[slot] == myself) { - addReplyErrorFormat(c, - "I'm already the owner of hash slot %u",slot); - return 1; - } - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (n == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[4]->ptr); - return 1; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return 1; - } - server.cluster->importing_slots_from[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { - /* CLUSTER SETSLOT STABLE */ - server.cluster->importing_slots_from[slot] = NULL; - server.cluster->migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { - /* CLUSTER SETSLOT NODE */ - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (!n) { - addReplyErrorFormat(c,"Unknown node %s", - (char*)c->argv[4]->ptr); - return 1; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return 1; - } - /* If this hash slot was served by 'myself' before to switch - * make sure there are no longer local keys for this hash slot. */ - if (server.cluster->slots[slot] == myself && n != myself) { - if (countKeysInSlot(slot) != 0) { - addReplyErrorFormat(c, - "Can't assign hashslot %d to a different node " - "while I still hold keys for this hash slot.", slot); - return 1; - } - } - /* If this slot is in migrating status but we have no keys - * for it assigning the slot to another node will clear - * the migrating status. */ - if (countKeysInSlot(slot) == 0 && - server.cluster->migrating_slots_to[slot]) - server.cluster->migrating_slots_to[slot] = NULL; - - int slot_was_mine = server.cluster->slots[slot] == myself; - clusterDelSlot(slot); - clusterAddSlot(n,slot); - - /* If we are a master left without slots, we should turn into a - * replica of the new master. */ - if (slot_was_mine && - n != myself && - myself->numslots == 0 && - server.cluster_allow_replica_migration) { - serverLog(LL_NOTICE, - "Configuration change detected. Reconfiguring myself " - "as a replica of %.40s (%s)", n->name, n->human_nodename); - clusterSetMaster(n); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | - CLUSTER_TODO_UPDATE_STATE | - CLUSTER_TODO_FSYNC_CONFIG); - } - - /* If this node was importing this slot, assigning the slot to - * itself also clears the importing status. */ - if (n == myself && - server.cluster->importing_slots_from[slot]) { - /* This slot was manually migrated, set this node configEpoch - * to a new epoch so that the new version can be propagated - * by the cluster. - * - * Note that if this ever results in a collision with another - * node getting the same configEpoch, for example because a - * failover happens at the same time we close the slot, the - * configEpoch collision resolution will fix it assigning - * a different epoch to each node. */ - if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { - serverLog(LL_NOTICE, - "configEpoch updated after importing slot %d", slot); - } - server.cluster->importing_slots_from[slot] = NULL; - /* After importing this slot, let the other nodes know as - * soon as possible. */ - clusterBroadcastPong(CLUSTER_BROADCAST_ALL); - } - } else { - addReplyError(c, - "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"); - return 1; - } - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); - addReply(c,shared.ok); + clusterSetSlotCommand(c); } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) { /* CLUSTER BUMPEPOCH */ int retval = clusterBumpConfigEpochWithoutConsensus(); @@ -6291,7 +6647,8 @@ int clusterCommandSpecial(client *c) { } /* Set the master. */ - clusterSetMaster(n); + clusterSetMaster(n, 1); + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") && @@ -6532,3 +6889,41 @@ int clusterAllowFailoverCmd(client *c) { void clusterPromoteSelfToMaster(void) { replicationUnsetMaster(); } + +/* Replicate migrating and importing slot states to all replicas */ +void clusterReplicateOpenSlots(void) +{ + if (!server.cluster_enabled) return; + + int argc = 5; + robj **argv = zmalloc(sizeof(robj*)*argc); + + argv[0] = shared.cluster; + argv[1] = shared.setslot; + + for (int i = 0; i < 2; i++) { + clusterNode **nodes_ptr = NULL; + if (i == 0) { + nodes_ptr = server.cluster->importing_slots_from; + argv[3] = shared.importing; + } else { + nodes_ptr = server.cluster->migrating_slots_to; + argv[3] = shared.migrating; + } + + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (nodes_ptr[j] == NULL) continue; + + argv[2] = createStringObjectFromLongLongForValue(j); + sds name = sdsnewlen(nodes_ptr[j]->name, sizeof(nodes_ptr[j]->name)); + argv[4] = createObject(OBJ_STRING, name); + + replicationFeedSlaves(0, argv, argc); + + decrRefCount(argv[2]); + decrRefCount(argv[4]); + } + } + + zfree(argv); +} diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 95af5d4e24..68bddbbb9a 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -54,6 +54,7 @@ typedef struct clusterLink { #define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */ #define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" +#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER) #define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) diff --git a/src/commands.def b/src/commands.def index bd6ed38153..6afdf34b19 100644 --- a/src/commands.def +++ b/src/commands.def @@ -851,7 +851,9 @@ struct COMMAND_ARG CLUSTER_SET_CONFIG_EPOCH_Args[] = { #ifndef SKIP_CMD_HISTORY_TABLE /* CLUSTER SETSLOT history */ -#define CLUSTER_SETSLOT_History NULL +commandHistory CLUSTER_SETSLOT_History[] = { +{"8.0.0","Added the `TIMEOUT` option."}, +}; #endif #ifndef SKIP_CMD_TIPS_TABLE @@ -876,6 +878,7 @@ struct COMMAND_ARG CLUSTER_SETSLOT_subcommand_Subargs[] = { struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = { {MAKE_ARG("slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs}, +{MAKE_ARG("timeout",ARG_TYPE_INTEGER,-1,"TIMEOUT",NULL,"8.0.0",CMD_ARG_OPTIONAL,0,NULL),.display_text="timeout"}, }; /********** CLUSTER SHARDS ********************/ @@ -969,7 +972,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = { {MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args}, {MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)}, {MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args}, -{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args}, +{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,1,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE|CMD_MAY_REPLICATE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,3),.args=CLUSTER_SETSLOT_Args}, {MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, {MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, diff --git a/src/commands/cluster-setslot.json b/src/commands/cluster-setslot.json index d0d48193d3..c3e126074c 100644 --- a/src/commands/cluster-setslot.json +++ b/src/commands/cluster-setslot.json @@ -7,10 +7,17 @@ "arity": -4, "container": "CLUSTER", "function": "clusterCommand", - "command_flags": [ + "history": [ + [ + "8.0.0", + "Added the `TIMEOUT` option." + ] + ], + "command_flags": [ "NO_ASYNC_LOADING", "ADMIN", - "STALE" + "STALE", + "MAY_REPLICATE" ], "arguments": [ { @@ -45,6 +52,14 @@ "token": "STABLE" } ] + }, + { + "name": "timeout", + "display": "timeout", + "type": "integer", + "token": "TIMEOUT", + "optional": true, + "since": "8.0.0" } ], "reply_schema": { diff --git a/src/debug.c b/src/debug.c index 5fa9a70d5e..5327a231ac 100644 --- a/src/debug.c +++ b/src/debug.c @@ -873,8 +873,7 @@ NULL server.aof_flush_sleep = atoi(c->argv[2]->ptr); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc >= 3) { - replicationFeedSlaves(server.slaves, -1, - c->argv + 2, c->argc - 2); + replicationFeedSlaves(-1, c->argv + 2, c->argc - 2); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"error") && c->argc == 3) { sds errstr = sdsnewlen("-",1); diff --git a/src/networking.c b/src/networking.c index d5df47234a..701b904aad 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2086,7 +2086,7 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; c->slot = -1; - c->flags &= ~CLIENT_EXECUTING_COMMAND; + c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE); /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); diff --git a/src/rdb.c b/src/rdb.c index 206cca6908..54f0b070cd 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3310,7 +3310,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin robj *argv[2]; argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del; argv[1] = &keyobj; - replicationFeedSlaves(server.slaves,dbid,argv,2); + replicationFeedSlaves(dbid,argv,2); } sdsfree(key); decrRefCount(val); diff --git a/src/replication.c b/src/replication.c index 028c55716e..258ff789f0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -434,7 +434,7 @@ void feedReplicationBuffer(char *s, size_t len) { * received by our clients in order to create the replication stream. * Instead if the instance is a replica and has sub-replicas attached, we use * replicationFeedStreamFromMasterStream() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { +void replicationFeedSlaves(int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; @@ -451,7 +451,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) { + if (server.repl_backlog == NULL && listLength(server.slaves) == 0) { /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs * even when there's no replication active. This code will not be reached if AOF * is also disabled. */ @@ -460,7 +460,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } /* We can't have slaves attached and no backlog. */ - serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + serverAssert(!(listLength(server.slaves) != 0 && server.repl_backlog == NULL)); /* Must install write handler for all replicas first before feeding * replication stream. */ @@ -1313,6 +1313,9 @@ int replicaPutOnline(client *slave) { NULL); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(slave)); + + /* Replicate slot being migrated/imported to the new replica */ + clusterReplicateOpenSlots(); return 1; } @@ -3619,8 +3622,8 @@ void unblockClientWaitingReplicas(client *c) { updateStatsOnUnblock(c, 0, 0, 0); } -/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked - * since we received enough ACKs from slaves. */ +/* Check if there are clients blocked in WAIT, WAITAOF, or WAIT_PREREPL + * that can be unblocked since we received enough ACKs from replicas. */ void processClientsWaitingReplicas(void) { long long last_offset = 0; long long last_aof_offset = 0; @@ -3637,6 +3640,7 @@ void processClientsWaitingReplicas(void) { client *c = ln->value; int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF; + int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL; if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) { addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled."); @@ -3686,6 +3690,8 @@ void processClientsWaitingReplicas(void) { addReplyArrayLen(c, 2); addReplyLongLong(c, numlocal); addReplyLongLong(c, numreplicas); + } else if (is_wait_prerepl) { + c->flags |= CLIENT_PREREPL_DONE; } else { addReplyLongLong(c, numreplicas); } @@ -3788,8 +3794,7 @@ void replicationCron(void) { if (!manual_failover_in_progress) { ping_argv[0] = shared.ping; - replicationFeedSlaves(server.slaves, -1, - ping_argv, 1); + replicationFeedSlaves(-1, ping_argv, 1); } } diff --git a/src/server.c b/src/server.c index 45eeea597a..0e97e3decf 100644 --- a/src/server.c +++ b/src/server.c @@ -1612,7 +1612,7 @@ static void sendGetackToReplicas(void) { argv[0] = shared.replconf; argv[1] = shared.getack; argv[2] = shared.special_asterick; /* Not used argument. */ - replicationFeedSlaves(server.slaves, -1, argv, 3); + replicationFeedSlaves(-1, argv, 3); } extern int ProcessingEventsWhileBlocked; @@ -1999,6 +1999,10 @@ void createSharedObjects(void) { shared.special_asterick = createStringObject("*",1); shared.special_equals = createStringObject("=",1); shared.redacted = makeObjectShared(createStringObject("(redacted)",10)); + shared.cluster = createStringObject("CLUSTER", 7); + shared.setslot = createStringObject("SETSLOT", 7); + shared.importing = createStringObject("IMPORTING", 9); + shared.migrating = createStringObject("MIGRATING", 9); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = @@ -3314,7 +3318,7 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid,argv,argc); if (target & PROPAGATE_REPL) - replicationFeedSlaves(server.slaves,dbid,argv,argc); + replicationFeedSlaves(dbid,argv,argc); } /* Used inside commands to schedule the propagation of additional commands diff --git a/src/server.h b/src/server.h index d71d6276d9..98ac30b7d6 100644 --- a/src/server.h +++ b/src/server.h @@ -402,6 +402,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ +#define CLIENT_PREREPL_DONE (1ULL<<51) /* Indicate that pre-replication has been done on the client */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -415,6 +416,7 @@ typedef enum blocking_type { BLOCKED_ZSET, /* BZPOP et al. */ BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */ BLOCKED_NUM, /* Number of blocked states. */ BLOCKED_END /* End of enumeration */ } blocking_type; @@ -1334,7 +1336,7 @@ struct sharedObjectsStruct { *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, - *ssubscribebulk,*sunsubscribebulk, *smessagebulk, + *ssubscribebulk,*sunsubscribebulk, *smessagebulk, *cluster, *setslot, *importing, *migrating, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -2820,7 +2822,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +void replicationFeedSlaves(int dictid, robj **argv, int argc); void replicationFeedStreamFromMasterStream(char *buf, size_t buflen); void resetReplicationBuffer(void); void feedReplicationBuffer(char *buf, size_t len); @@ -3433,7 +3435,9 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo void blockClientShutdown(client *c); void blockPostponeClient(client *c); void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); +void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas); void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas); +void replicationRequestAckFromSlaves(void); void signalDeletedKeyAsReady(serverDb *db, robj *key, int type); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(serverDb *emptied, serverDb *replaced_with); diff --git a/tests/cluster/tests/20-half-migrated-slot.tcl b/tests/cluster/tests/20-half-migrated-slot.tcl index 8049ca1be4..ede42613be 100644 --- a/tests/cluster/tests/20-half-migrated-slot.tcl +++ b/tests/cluster/tests/20-half-migrated-slot.tcl @@ -5,10 +5,6 @@ # 4. migration is half finished on "migrating" node # 5. migration is half finished on "importing" node -# TODO: Test is currently disabled until it is stabilized (fixing the test -# itself or real issues in the server). - -if {false} { source "../tests/includes/init-tests.tcl" source "../tests/includes/utils.tcl" @@ -95,4 +91,3 @@ test "Half-finish importing" { } config_set_all_nodes cluster-allow-replica-migration yes -} diff --git a/tests/cluster/tests/21-many-slot-migration.tcl b/tests/cluster/tests/21-many-slot-migration.tcl index 703cf58614..40dc498126 100644 --- a/tests/cluster/tests/21-many-slot-migration.tcl +++ b/tests/cluster/tests/21-many-slot-migration.tcl @@ -1,10 +1,5 @@ # Tests for many simultaneous migrations. -# TODO: Test is currently disabled until it is stabilized (fixing the test -# itself or real issues in the server). - -if {false} { - source "../tests/includes/init-tests.tcl" source "../tests/includes/utils.tcl" @@ -61,4 +56,3 @@ test "Keys are accessible" { } config_set_all_nodes cluster-allow-replica-migration yes -} diff --git a/tests/unit/cluster/cli.tcl b/tests/unit/cluster/cli.tcl index d61288bb55..62f0328352 100644 --- a/tests/unit/cluster/cli.tcl +++ b/tests/unit/cluster/cli.tcl @@ -317,13 +317,10 @@ test {Migrate the last slot away from a node using valkey-cli} { catch { $newnode_r get foo } e assert_equal "MOVED $slot $owner_host:$owner_port" $e - # Check that the empty node has turned itself into a replica of the new - # owner and that the new owner knows that. - wait_for_condition 1000 50 { - [string match "*slave*" [$owner_r CLUSTER REPLICAS $owner_id]] - } else { - fail "Empty node didn't turn itself into a replica." - } + # Check that the now empty primary node doesn't turn itself into + # a replica of any other nodes + wait_for_cluster_propagation + assert_match *master* [$owner_r role] } } diff --git a/tests/unit/cluster/hostnames.tcl b/tests/unit/cluster/hostnames.tcl index f08c9cfa84..7be4b42aa3 100644 --- a/tests/unit/cluster/hostnames.tcl +++ b/tests/unit/cluster/hostnames.tcl @@ -146,18 +146,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne # to accept our isolated nodes connections. At this point they will # start showing up in cluster slots. wait_for_condition 50 100 { - [llength [R 6 CLUSTER SLOTS]] eq 2 + [llength [R 6 CLUSTER SLOTS]] eq 3 } else { fail "Node did not learn about the 2 shards it can talk to" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com" } else { fail "hostname for shard-1 didn't reach node 6" } wait_for_condition 50 100 { - [lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com" + [lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com" } else { fail "hostname for shard-2 didn't reach node 6" } diff --git a/valkey.conf b/valkey.conf index c7fd020b17..7a3c4458cb 100644 --- a/valkey.conf +++ b/valkey.conf @@ -1672,8 +1672,10 @@ aof-timestamp-enabled no # cluster-migration-barrier 1 # Turning off this option allows to use less automatic cluster configuration. -# It both disables migration to orphaned masters and migration from masters -# that became empty. +# It disables migration of replicas to orphaned masters. Masters that become +# empty due to losing their last slots to another master will not automatically +# replicate from the master that took over their last slots. Instead, they will +# remain as empty masters without any slots. # # Default is 'yes' (allow automatic migrations). #