Skip to content

Commit

Permalink
Only send MEET packet once every handshake timeout period
Browse files Browse the repository at this point in the history
Add meet_sent field in clusterNode indicating the last time we sent a
MEET packet. Use this field to only (re-)send a MEET packet once every
handshake timeout period.
Improve some logging messages to include human_nodename.
Add nodeExceedsHandshakeTimeout() function.
  • Loading branch information
pieturin committed Dec 13, 2024
1 parent 5f7fe9e commit de51f6d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
64 changes: 39 additions & 25 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -1336,9 +1337,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s (%s)",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");
link->inbound ? "inbound" : "outbound",
link->node ? link->node->human_nodename : "<unknown>");

if (link->conn) {
connClose(link->conn);
Expand Down Expand Up @@ -1492,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->last_in_ping_gossip = 0;
node->ping_sent = node->pong_received = 0;
node->data_received = 0;
node->meet_sent = 0;
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
Expand Down Expand Up @@ -1703,7 +1706,7 @@ void clusterAddNode(clusterNode *node) {
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, delnode->human_nodename);

int j;
dictIterator *di;
Expand Down Expand Up @@ -3242,18 +3245,18 @@ int clusterProcessPacket(clusterLink *link) {
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && now - sender->ctime > server.cluster_node_timeout) {
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
/* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
* know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
* Free my outbound link to that node, triggering a reconnect and a PING over the new link.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */

/* We should always receive a MEET packet on an inbound link. */
serverAssert(link != sender->link);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node",
sender->name);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node",
sender->name, sender->human_nodename);
freeClusterLink(sender->link);
}
}
Expand Down Expand Up @@ -4019,7 +4022,10 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
clusterMsg *hdr = &msgblock->msg;

if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
if (!link->inbound) {
if (type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
else if (type == CLUSTERMSG_TYPE_MEET) link->node->meet_sent = mstime();
}

/* Populate the gossip fields */
int maxiterations = wanted * 3;
Expand Down Expand Up @@ -4957,10 +4963,23 @@ void clusterHandleManualFailover(void) {
* CLUSTER cron job
* -------------------------------------------------------------------------- */

static mstime_t getHandshakeTimeout(void) {
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the cluster_node_timeout value, but when cluster_node_timeout is
* too small we use the value of 1 second. */
return max(server.cluster_node_timeout, 1000);
}

static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) {
serverAssert(node != NULL);
return now - node->ctime > getHandshakeTimeout() ? 1 : 0;
}

/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1;
Expand All @@ -4969,19 +4988,22 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d after %lldms", node->ip,
node->cport, handshake_timeout);
if (nodeInHandshake(node) && nodeExceedsHandshakeTimeout(node, now)) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d", node->ip, node->cport);
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
now - node->inbound_link_freed_time > handshake_timeout) {
if (nodeInNormalState(node) && node->link != NULL && node->inbound_link == NULL &&
now - node->inbound_link_freed_time > getHandshakeTimeout() &&
now - node->meet_sent > getHandshakeTimeout()) {
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view.
* We make sure to not re-send a MEET packet more than once every handshake timeout period, so as to
* leave the other node time to complete the handshake. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s (%s) because there is no inbound link for it",
node->name, node->human_nodename);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

Expand Down Expand Up @@ -5042,19 +5064,11 @@ void clusterCron(void) {
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

iteration++; /* Number of times this function was called so far. */

clusterUpdateMyselfHostname();

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Run through some of the operations we want to do on each cluster node. */
Expand All @@ -5067,7 +5081,7 @@ void clusterCron(void) {
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
if (clusterNodeCronHandleReconnect(node, now)) continue;
}
dictReleaseIterator(di);

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ struct _clusterNode {
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t meet_sent; /* Unix time we sent latest meet packet */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a replica of this primary in non manual
* failover scenarios. */
Expand Down

0 comments on commit de51f6d

Please sign in to comment.