Skip to content

Commit

Permalink
more changes1
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 3, 2024
1 parent 5373938 commit 9e42ada
Showing 1 changed file with 19 additions and 51 deletions.
70 changes: 19 additions & 51 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3595,23 +3595,16 @@ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++;
}

void clusterSendPublishMessage(clusterLink *link, clusterMsgSendBlockLight *msgblock) {
if (!link) {
return;
/* Helper function to send message to node depending on
* the header type supported by the node. */
void clusterSendPublishMessage(clusterNode *node, int type, clusterMsgSendBlock *msgblock) {
if ((type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) & !nodeSupportsLightMsgHdr(node)) {
clusterSendMessage(node->link, msgblock);
} else if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) &
nodeSupportsLightMsgHdr(node)) {
clusterSendMessage(node->link, msgblock);
}
if (listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);

listAddNodeTail(link->send_msg_queue, msgblock);
msgblock->refcount++;

/* Update memory tracking */
link->send_msg_queue_mem += sizeof(listNode) + msgblock->totlen;
server.stat_cluster_links_memory += sizeof(listNode);

/* Populate sent messages stats. */
uint16_t type = ntohs(msgblock->msg.type);
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++;
return;
}

/* Send a message to all the nodes that are part of the cluster having
Expand All @@ -3624,49 +3617,24 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
}
dictReleaseIterator(di);
}

void clusterBroadcastPublishMessage(clusterMsgSendBlock *msgblock) {
dictIterator *di;
dictEntry *de;
uint16_t type = ntohs(msgblock->msg.type);
int is_msg_type_publish = (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD ||
type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT);

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
continue;
else
if (!is_msg_type_publish) {
clusterSendMessage(node->link, msgblock);
} else {
clusterSendPublishMessage(node, type, msgblock);
}
}
dictReleaseIterator(di);
}

void clusterBroadcastPublishLightMessage(clusterMsgSendBlockLight *msgblock_light) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
clusterSendPublishMessage(node->link, msgblock_light);
else
continue;
}
dictReleaseIterator(di);
}
/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
Expand Down Expand Up @@ -4100,11 +4068,11 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar
msgblock_light = clusterCreatePublishLightMsgBlock(
channel, messages, count, sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT : CLUSTERMSG_TYPE_PUBLISH_LIGHT);
if (!sharded) {
clusterBroadcastPublishLightMessage(msgblock_light);
clusterBroadcastMessage((clusterMsgSendBlock *)msgblock_light);
clusterMsgSendBlockDecrRefCount(msgblock_light);
for (i = 0; i < count; i++) {
msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastPublishMessage(msgblock);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
return;
Expand All @@ -4120,7 +4088,7 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node))
clusterSendPublishMessage(node->link, msgblock_light);
clusterSendMessage(node->link, (clusterMsgSendBlock *)msgblock_light);
else
continue;
}
Expand Down

0 comments on commit 9e42ada

Please sign in to comment.