Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only (re-)send MEET packet once every handshake timeout period #1441

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
Expand Down Expand Up @@ -1336,9 +1337,10 @@ clusterLink *createClusterLink(clusterNode *node) {
* with this link will have the 'link' field set to NULL. */
void freeClusterLink(clusterLink *link) {
serverAssert(link != NULL);
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s",
serverLog(LL_DEBUG, "Freeing cluster link for node: %.40s:%s (%s)",
link->node ? link->node->name : "<unknown>",
link->inbound ? "inbound" : "outbound");
link->inbound ? "inbound" : "outbound",
link->node ? link->node->human_nodename : "<unknown>");

if (link->conn) {
connClose(link->conn);
Expand Down Expand Up @@ -1492,6 +1494,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->last_in_ping_gossip = 0;
node->ping_sent = node->pong_received = 0;
node->data_received = 0;
node->meet_sent = 0;
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
Expand Down Expand Up @@ -1703,7 +1706,7 @@ void clusterAddNode(clusterNode *node) {
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s from cluster view", delnode->name);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, delnode->human_nodename);

int j;
dictIterator *di;
Expand Down Expand Up @@ -3242,18 +3245,18 @@ int clusterProcessPacket(clusterLink *link) {
* the gossip section here since we have to trust the sender because
* of the message type. */
clusterProcessGossipSection(hdr, link);
} else if (sender->link && now - sender->ctime > server.cluster_node_timeout) {
} else if (sender->link && nodeExceedsHandshakeTimeout(sender, now)) {
/* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
* know it.
* Freeing my outbound link to that node, to force a reconnect and sending a PING.
* Free my outbound link to that node, triggering a reconnect and a PING over the new link.
* Once that node receives our PING, it should recognize the new connection as an inbound link from me.
* We should only free the outbound link if the node is known for more time than the handshake timeout,
* since during this time, the other side might still be trying to complete the handshake. */

/* We should always receive a MEET packet on an inbound link. */
serverAssert(link != sender->link);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s after receiving a MEET packet from this known node",
sender->name);
serverLog(LL_NOTICE, "Freeing outbound link to node %.40s (%s) after receiving a MEET packet from this known node",
sender->name, sender->human_nodename);
freeClusterLink(sender->link);
}
}
Expand Down Expand Up @@ -4019,7 +4022,12 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
clusterMsg *hdr = &msgblock->msg;

if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime();
if (!link->inbound) {
if (type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
else if (type == CLUSTERMSG_TYPE_MEET)
link->node->meet_sent = mstime();
}

/* Populate the gossip fields */
int maxiterations = wanted * 3;
Expand Down Expand Up @@ -4957,10 +4965,23 @@ void clusterHandleManualFailover(void) {
* CLUSTER cron job
* -------------------------------------------------------------------------- */

static mstime_t getHandshakeTimeout(void) {
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the cluster_node_timeout value, but when cluster_node_timeout is
* too small we use the value of 1 second. */
return max(server.cluster_node_timeout, 1000);
}

static int nodeExceedsHandshakeTimeout(clusterNode *node, mstime_t now) {
serverAssert(node != NULL);
return now - node->ctime > getHandshakeTimeout() ? 1 : 0;
}

/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) return 1;
Expand All @@ -4969,19 +4990,22 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_

/* A Node in HANDSHAKE state has a limited lifespan equal to the
* configured node timeout. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d after %lldms", node->ip,
node->cport, handshake_timeout);
if (nodeInHandshake(node) && nodeExceedsHandshakeTimeout(node, now)) {
serverLog(LL_WARNING, "Clusterbus handshake timeout %s:%d", node->ip, node->cport);
clusterDelNode(node);
return 1;
}
if (node->link != NULL && node->inbound_link == NULL && nodeInNormalState(node) &&
now - node->inbound_link_freed_time > handshake_timeout) {
if (nodeInNormalState(node) && node->link != NULL && node->inbound_link == NULL &&
now - node->inbound_link_freed_time > getHandshakeTimeout() &&
now - node->meet_sent > getHandshakeTimeout()) {
/* Node has an outbound link, but no inbound link for more than the handshake timeout.
* This probably means this node does not know us yet, whereas we know it.
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
* So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view.
* We make sure to not re-send a MEET packet more than once every handshake timeout period, so as to
* leave the other node time to complete the handshake. */
node->flags |= CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s because there is no inbound link for it", node->name);
serverLog(LL_NOTICE, "Sending MEET packet to node %.40s (%s) because there is no inbound link for it",
node->name, node->human_nodename);
clusterSendPing(node->link, CLUSTERMSG_TYPE_MEET);
}

Expand Down Expand Up @@ -5042,19 +5066,11 @@ void clusterCron(void) {
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

iteration++; /* Number of times this function was called so far. */

clusterUpdateMyselfHostname();

/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Run through some of the operations we want to do on each cluster node. */
Expand All @@ -5067,7 +5083,7 @@ void clusterCron(void) {
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if (clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
if (clusterNodeCronHandleReconnect(node, now)) continue;
}
dictReleaseIterator(di);

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ struct _clusterNode {
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t meet_sent; /* Unix time we sent latest meet packet */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a replica of this primary in non manual
* failover scenarios. */
Expand Down
Loading