From 1ebd796b2bd1c090e8fb6677c954b02ad40c8931 Mon Sep 17 00:00:00 2001 From: Roshan Khatri Date: Mon, 1 Jul 2024 01:27:47 +0000 Subject: [PATCH] Fix format Signed-off-by: Roshan Khatri --- src/cluster_legacy.c | 64 +++++++++++++++++++++++++------------------- src/cluster_legacy.h | 18 ++++++------- src/pubsub.c | 4 +-- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index a7cb74ee2c..921cc39ab7 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -2784,7 +2784,8 @@ static clusterMsgDataPublishMessage *getInitialBulkData(clusterMsgDataPublishMul } static clusterMsgDataPublishMessage *getNextCursorBulkData(clusterMsgDataPublishMessage *message_cursor) { - clusterMsgDataPublishMessage *next = (clusterMsgDataPublishMessage *)((char *)message_cursor->message_data + ntohl(message_cursor->message_len)); + clusterMsgDataPublishMessage *next = + (clusterMsgDataPublishMessage *)((char *)message_cursor->message_data + ntohl(message_cursor->message_len)); return next; } @@ -2813,17 +2814,14 @@ int pubsubProcessLightPacket(clusterLink *link, uint16_t type) { uint64_t data_count; /* Don't bother creating useless objects if there are no - * Pub/Sub subscribers. */ + * Pub/Sub subscribers. */ if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT && serverPubsubSubscriptionCount() > 0) || (type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT && serverPubsubShardSubscriptionCount() > 0)) { data_count = ntohu64(hdr->data.publish.msg.data_count); clusterMsgDataPublishMessage *cursor = getInitialBulkData(&hdr->data.publish.msg); channel = readBulkDataFromCursor(cursor); - while(--data_count){ + while (--data_count) { cursor = getNextCursorBulkData(cursor); - if (type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) { - serverLog(LL_NOTICE, "type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT"); - } pubsubPublishMessage(channel, readBulkDataFromCursor(cursor), type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT); } decrRefCount(channel); @@ -2901,7 +2899,7 @@ int clusterIsValidPacket(clusterLink *link) { uint64_t data_count = ntohu64(hdr_pubsub->data.publish.msg.data_count); explen += ((data_count) * (sizeof(clusterMsgDataPublishMessage) - 8)); clusterMsgDataPublishMessage *msg_data = getInitialBulkData(&hdr_pubsub->data.publish.msg); - while(data_count--){ + while (data_count--) { uint32_t msglen = getPublishMsgLength(msg_data); explen += msglen; msg_data = getNextCursorBulkData(msg_data); @@ -2952,7 +2950,8 @@ int clusterProcessPacket(clusterLink *link) { * because of Pub/Sub. */ if (sender) sender->data_received = now; - if (sender && (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && nodeSupportsLightMsgHdr(sender)) { + if (sender && (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && + nodeSupportsLightMsgHdr(sender)) { return pubsubProcessLightPacket(link, type); } @@ -3514,9 +3513,10 @@ void clusterReadHandler(connection *conn) { /* Perform some sanity check on the message signature * and length. */ if (memcmp(hdr->sig, "RCmb", 4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) { - /* The minimum length for PUBLISH and PUBLISHSHARD will be clusterMsgLight_MIN_LEN + /* The minimum length for PUBLISH and PUBLISHSHARD will be clusterMsgLight_MIN_LEN * as we are using the `clusterMsgLight` hdr. */ - if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && ((ntohl(hdr->totlen) >= CLUSTERMSG_LIGHT_MIN_LEN))) { + if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && + ((ntohl(hdr->totlen) >= CLUSTERMSG_LIGHT_MIN_LEN))) { is_msg_valid = 1; } if (!is_msg_valid) { @@ -3524,12 +3524,12 @@ void clusterReadHandler(connection *conn) { int port; if (connAddrPeerName(conn, ip, sizeof(ip), &port) == -1) { serverLog(LL_WARNING, "Bad message length or signature received " - "on the Cluster bus."); + "on the Cluster bus."); } else { serverLog(LL_WARNING, - "Bad message length or signature received " - "on the Cluster bus from %s:%d", - ip, port); + "Bad message length or signature received " + "on the Cluster bus from %s:%d", + ip, port); } handleLinkIOError(link); return; @@ -3655,8 +3655,10 @@ void clusterBroadcastPublishMessage(clusterMsgSendBlock *msgblock) { clusterNode *node = dictGetVal(de); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) continue; - else clusterSendMessage(node->link, msgblock); + if (nodeSupportsLightMsgHdr(node)) + continue; + else + clusterSendMessage(node->link, msgblock); } dictReleaseIterator(di); } @@ -3670,8 +3672,10 @@ void clusterBroadcastPublishLightMessage(clusterMsgSendBlockLight *msgblock_ligh clusterNode *node = dictGetVal(de); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light); - else continue; + if (nodeSupportsLightMsgHdr(node)) + clusterSendPublishMessage(node->link, msgblock_light); + else + continue; } dictReleaseIterator(di); } @@ -3739,7 +3743,6 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { } static void clusterBuildPubsubMessageHdr(clusterMsgLight *hdr, int type, size_t msglen) { - hdr->ver = htons(CLUSTER_PROTO_VER); hdr->sig[0] = 'R'; hdr->sig[1] = 'C'; @@ -3993,7 +3996,7 @@ clusterMsgSendBlockLight *clusterCreatePublishLightMsgBlock(robj *channel, robj channel = getDecodedObject(channel); channel_len = sdslen(channel->ptr); - uint32_t aggregated_msg_len = 0 ; + uint32_t aggregated_msg_len = 0; aggregated_msg_len += channel_len; for (i = 0; i < count; i++) { messages[i] = getDecodedObject(messages[i]); @@ -4001,14 +4004,14 @@ clusterMsgSendBlockLight *clusterCreatePublishLightMsgBlock(robj *channel, robj aggregated_msg_len += message_len; } - size_t msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight) ; + size_t msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight); msglen += sizeof(clusterMsgDataPublishMulti); - msglen += ((count+1) * (sizeof(clusterMsgDataPublishMessage) - 8)); + msglen += ((count + 1) * (sizeof(clusterMsgDataPublishMessage) - 8)); msglen += aggregated_msg_len; clusterMsgSendBlockLight *msgblock = createPubsubClusterMsgSendBlock(type, msglen); clusterMsgLight *hdr = &msgblock->msg; - hdr->data.publish.msg.data_count = htonu64(count+1); + hdr->data.publish.msg.data_count = htonu64(count + 1); clusterMsgDataPublishMessage *cursor = getInitialBulkData(&hdr->data.publish.msg); writeDataToCursor(cursor, channel); for (i = 0; i < count; i++) { @@ -4118,7 +4121,8 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard clusterMsgSendBlockLight *msgblock_light; clusterMsgSendBlock *msgblock; int i; - msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, count, sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT : CLUSTERMSG_TYPE_PUBLISH_LIGHT); + msgblock_light = clusterCreatePublishLightMsgBlock( + channel, message, count, sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT : CLUSTERMSG_TYPE_PUBLISH_LIGHT); if (!sharded) { clusterBroadcastPublishLightMessage(msgblock_light); clusterMsgSendBlockDecrRefCountPublish(msgblock_light); @@ -4139,8 +4143,10 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard while ((ln = listNext(&li))) { node = listNodeValue(ln); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light); - else continue; + if (nodeSupportsLightMsgHdr(node)) + clusterSendPublishMessage(node->link, msgblock_light); + else + continue; } clusterMsgSendBlockDecrRefCountPublish(msgblock_light); @@ -4150,8 +4156,10 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard while ((ln = listNext(&li))) { node = listNodeValue(ln); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) continue; - else clusterSendMessage(node->link, msgblock); + if (nodeSupportsLightMsgHdr(node)) + continue; + else + clusterSendMessage(node->link, msgblock); } clusterMsgSendBlockDecrRefCount(msgblock); } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index ea3347ba38..66102b296d 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -299,19 +299,19 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset"); #define CLUSTERMSG_FLAG0_EXT_DATA (1 << 2) /* Message contains extension data */ typedef struct { - char sig[4]; /* Signature "RCmb" (Cluster message bus). */ - uint32_t totlen; /* Total length of this message */ - uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */ - uint16_t port; /* Primary port number (TCP or TLS). */ - uint16_t type; /* Message type */ + char sig[4]; /* Signature "RCmb" (Cluster message bus). */ + uint32_t totlen; /* Total length of this message */ + uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */ + uint16_t port; /* Primary port number (TCP or TLS). */ + uint16_t type; /* Message type */ union clusterMsgDataLight data; } clusterMsgLight; static_assert(offsetof(clusterMsgLight, sig) == offsetof(clusterMsg, sig), "unexpected field offset"); -static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen),"unexpected field offset"); -static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver) , "unexpected field offset"); -static_assert(offsetof(clusterMsgLight, port) == offsetof(clusterMsg, port) , "unexpected field offset"); -static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type) , "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, port) == offsetof(clusterMsg, port), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type), "unexpected field offset"); static_assert(offsetof(clusterMsgLight, data) == 16, "unexpected field offset"); #define CLUSTERMSG_LIGHT_MIN_LEN (sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight)) diff --git a/src/pubsub.c b/src/pubsub.c index 7fa2747a0f..7507df2a27 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -629,7 +629,7 @@ void publishCommand(client *c) { return; } - int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2],c->argc-2, 0); + int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc - 2, 0); if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL); addReplyLongLong(c, receivers); } @@ -715,7 +715,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) { /* SPUBLISH */ void spublishCommand(client *c) { - int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc-2, 1); + int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc - 2, 1); if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL); addReplyLongLong(c, receivers); }