diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 21bdd09919..352b4ff4e2 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -122,6 +122,10 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); +#define CLUSTER_BROADCAST_ALL 0 +#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 +void clusterBroadcastPong(int target); + /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ static inline int clusterNodeIsVotingPrimary(clusterNode *n) { @@ -1830,6 +1834,7 @@ int clusterBumpConfigEpochWithoutConsensus(void) { if (myself->configEpoch == 0 || myself->configEpoch != maxEpoch) { server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); serverLog(LL_NOTICE, "New configEpoch set to %llu", (unsigned long long)myself->configEpoch); return C_OK; @@ -1880,6 +1885,14 @@ int clusterBumpConfigEpochWithoutConsensus(void) { * with the conflicting epoch (the 'sender' node), it will assign itself * the greatest configuration epoch currently detected among nodes plus 1. * + * The above is an optimistic scenario. It this node and the sender node + * are in the same shard, their conflict in configEpoch indicates that a + * node has experienced a partition. Or for example, the old primary node + * was down then up again, and the new primary node won the election. In + * this case, we need to take the replication offset into consideration, + * otherwise, if the old primary wins the collision, we will lose some of + * the new primary's data. + * * This means that even if there are multiple nodes colliding, the node * with the greatest Node ID never moves forward, so eventually all the nodes * end with a different configuration epoch. @@ -1888,14 +1901,34 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) { /* Prerequisites: nodes have the same configEpoch and are both primaries. */ if (sender->configEpoch != myself->configEpoch || !clusterNodeIsPrimary(sender) || !clusterNodeIsPrimary(myself)) return; - /* Don't act if the colliding node has a smaller Node ID. */ - if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + + /* If sender and myself are in the same shard, the one with the + * bigger offset will win. Otherwise if sender and myself are not + * in the same shard, the one will the lexicographically small + * Node ID will win.*/ + if (areInSameShard(sender, myself)) { + long long sender_offset = getNodeReplicationOffset(sender); + long long myself_offset = getNodeReplicationOffset(myself); + if (sender_offset > myself_offset) { + /* Don't act if the colliding node has a bigger offset. */ + return; + } else if (sender_offset == myself_offset) { + /* If the offset are the same, we fall back to Node ID logic. + * Don't act if the colliding node has a smaller Node ID. */ + if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + } + } else { + /* Don't act if the colliding node has a smaller Node ID. */ + if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + } + /* Get the next ID available at the best of this node knowledge. */ server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; - clusterSaveConfigOrDie(1); + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name, sender->human_nodename, (unsigned long long)myself->configEpoch); + clusterSaveConfigOrDie(1); } /* ----------------------------------------------------------------------------- @@ -4001,8 +4034,6 @@ void clusterSendPing(clusterLink *link, int type) { * CLUSTER_BROADCAST_ALL -> All known instances. * CLUSTER_BROADCAST_LOCAL_REPLICAS -> All replicas in my primary-replicas ring. */ -#define CLUSTER_BROADCAST_ALL 0 -#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 void clusterBroadcastPong(int target) { dictIterator *di; dictEntry *de; diff --git a/tests/unit/cluster/manual-takeover.tcl b/tests/unit/cluster/manual-takeover.tcl index a175f79342..dcbbcd1c59 100644 --- a/tests/unit/cluster/manual-takeover.tcl +++ b/tests/unit/cluster/manual-takeover.tcl @@ -22,8 +22,12 @@ set paused_pid [srv 0 pid] set paused_pid1 [srv -1 pid] set paused_pid2 [srv -2 pid] test "Killing majority of master nodes" { + # Bumping the epochs to increase the chance of conflicts. + R 0 cluster bumpepoch pause_process $paused_pid + R 1 cluster bumpepoch pause_process $paused_pid1 + R 2 cluster bumpepoch pause_process $paused_pid2 }