Skip to content

Commit

Permalink
Add flag for creating only light header for hoomogeneous cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 11, 2024
1 parent fc4dd81 commit 125ffb7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
57 changes: 39 additions & 18 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ void clusterInit(void) {
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->is_light_hdr_supported = 1;

/* Initialize stats */
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
Expand Down Expand Up @@ -3607,11 +3608,12 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
uint16_t type = ntohs(msgblock->msg.type);
int is_msg_type_publish = (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD ||
type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT);
int nodes_not_supporting_light_header = 0;

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (!nodeSupportsLightMsgHdr(node)) nodes_not_supporting_light_header++;
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (!is_msg_type_publish) {
clusterSendMessage(node->link, msgblock);
Expand All @@ -3620,6 +3622,12 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
}
}
dictReleaseIterator(di);

if (nodes_not_supporting_light_header) {
server.cluster->is_light_hdr_supported = 0;
} else {
server.cluster->is_light_hdr_supported = 1;
}
}

/* Build the message header. hdr must point to a buffer at least
Expand Down Expand Up @@ -4056,10 +4064,12 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar
if (!sharded) {
clusterBroadcastMessage(msgblock_light);
clusterMsgSendBlockDecrRefCount(msgblock_light);
for (i = 0; i < count; i++) {
msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
if (!server.cluster->is_light_hdr_supported) {
for (i = 0; i < count; i++) {
msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
}
return;
}
Expand All @@ -4070,28 +4080,39 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar
list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself);
serverAssert(nodes_for_slot != NULL);
listRewind(nodes_for_slot, &li);
int nodes_not_supporting_light_header = 0;
while ((ln = listNext(&li))) {
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
if (nodeSupportsLightMsgHdr(node)) {
clusterSendMessage(node->link, msgblock_light);
else
} else {
nodes_not_supporting_light_header ++;
continue;
}
}
clusterMsgSendBlockDecrRefCount(msgblock_light);

for (i = 0; i < count; i++) {
listRewind(nodes_for_slot, &li);
msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISHSHARD);
while ((ln = listNext(&li))) {
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
continue;
else
clusterSendMessage(node->link, msgblock);
if (nodes_not_supporting_light_header) {
server.cluster->is_light_hdr_supported = 0;
} else {
server.cluster->is_light_hdr_supported = 1;
}

if (!server.cluster->is_light_hdr_supported) {
for (i = 0; i < count; i++) {
listRewind(nodes_for_slot, &li);
msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISHSHARD);
while ((ln = listNext(&li))) {
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
continue;
else
clusterSendMessage(node->link, msgblock);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
int is_light_hdr_supported;
/* The following fields are used to take the replica state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
Expand Down

0 comments on commit 125ffb7

Please sign in to comment.