Skip to content

Commit

Permalink
Fix format
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <[email protected]>
  • Loading branch information
roshkhatri committed Jul 1, 2024
1 parent d36be20 commit 1ebd796
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
64 changes: 36 additions & 28 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2784,7 +2784,8 @@ static clusterMsgDataPublishMessage *getInitialBulkData(clusterMsgDataPublishMul
}

static clusterMsgDataPublishMessage *getNextCursorBulkData(clusterMsgDataPublishMessage *message_cursor) {
clusterMsgDataPublishMessage *next = (clusterMsgDataPublishMessage *)((char *)message_cursor->message_data + ntohl(message_cursor->message_len));
clusterMsgDataPublishMessage *next =
(clusterMsgDataPublishMessage *)((char *)message_cursor->message_data + ntohl(message_cursor->message_len));
return next;
}

Expand Down Expand Up @@ -2813,17 +2814,14 @@ int pubsubProcessLightPacket(clusterLink *link, uint16_t type) {
uint64_t data_count;

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT && serverPubsubShardSubscriptionCount() > 0)) {
data_count = ntohu64(hdr->data.publish.msg.data_count);
clusterMsgDataPublishMessage *cursor = getInitialBulkData(&hdr->data.publish.msg);
channel = readBulkDataFromCursor(cursor);
while(--data_count){
while (--data_count) {
cursor = getNextCursorBulkData(cursor);
if (type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) {
serverLog(LL_NOTICE, "type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT");
}
pubsubPublishMessage(channel, readBulkDataFromCursor(cursor), type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT);
}
decrRefCount(channel);
Expand Down Expand Up @@ -2901,7 +2899,7 @@ int clusterIsValidPacket(clusterLink *link) {
uint64_t data_count = ntohu64(hdr_pubsub->data.publish.msg.data_count);
explen += ((data_count) * (sizeof(clusterMsgDataPublishMessage) - 8));
clusterMsgDataPublishMessage *msg_data = getInitialBulkData(&hdr_pubsub->data.publish.msg);
while(data_count--){
while (data_count--) {
uint32_t msglen = getPublishMsgLength(msg_data);
explen += msglen;
msg_data = getNextCursorBulkData(msg_data);
Expand Down Expand Up @@ -2952,7 +2950,8 @@ int clusterProcessPacket(clusterLink *link) {
* because of Pub/Sub. */
if (sender) sender->data_received = now;

if (sender && (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && nodeSupportsLightMsgHdr(sender)) {
if (sender && (type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) &&
nodeSupportsLightMsgHdr(sender)) {
return pubsubProcessLightPacket(link, type);
}

Expand Down Expand Up @@ -3514,22 +3513,23 @@ void clusterReadHandler(connection *conn) {
/* Perform some sanity check on the message signature
* and length. */
if (memcmp(hdr->sig, "RCmb", 4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {
/* The minimum length for PUBLISH and PUBLISHSHARD will be clusterMsgLight_MIN_LEN
/* The minimum length for PUBLISH and PUBLISHSHARD will be clusterMsgLight_MIN_LEN
* as we are using the `clusterMsgLight` hdr. */
if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) && ((ntohl(hdr->totlen) >= CLUSTERMSG_LIGHT_MIN_LEN))) {
if ((type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT) &&
((ntohl(hdr->totlen) >= CLUSTERMSG_LIGHT_MIN_LEN))) {
is_msg_valid = 1;
}
if (!is_msg_valid) {
char ip[NET_IP_STR_LEN];
int port;
if (connAddrPeerName(conn, ip, sizeof(ip), &port) == -1) {
serverLog(LL_WARNING, "Bad message length or signature received "
"on the Cluster bus.");
"on the Cluster bus.");
} else {
serverLog(LL_WARNING,
"Bad message length or signature received "
"on the Cluster bus from %s:%d",
ip, port);
"Bad message length or signature received "
"on the Cluster bus from %s:%d",
ip, port);
}
handleLinkIOError(link);
return;
Expand Down Expand Up @@ -3655,8 +3655,10 @@ void clusterBroadcastPublishMessage(clusterMsgSendBlock *msgblock) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) continue;
else clusterSendMessage(node->link, msgblock);
if (nodeSupportsLightMsgHdr(node))
continue;
else
clusterSendMessage(node->link, msgblock);
}
dictReleaseIterator(di);
}
Expand All @@ -3670,8 +3672,10 @@ void clusterBroadcastPublishLightMessage(clusterMsgSendBlockLight *msgblock_ligh
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light);
else continue;
if (nodeSupportsLightMsgHdr(node))
clusterSendPublishMessage(node->link, msgblock_light);
else
continue;
}
dictReleaseIterator(di);
}
Expand Down Expand Up @@ -3739,7 +3743,6 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
}

static void clusterBuildPubsubMessageHdr(clusterMsgLight *hdr, int type, size_t msglen) {

hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
Expand Down Expand Up @@ -3993,22 +3996,22 @@ clusterMsgSendBlockLight *clusterCreatePublishLightMsgBlock(robj *channel, robj
channel = getDecodedObject(channel);
channel_len = sdslen(channel->ptr);

uint32_t aggregated_msg_len = 0 ;
uint32_t aggregated_msg_len = 0;
aggregated_msg_len += channel_len;
for (i = 0; i < count; i++) {
messages[i] = getDecodedObject(messages[i]);
message_len = sdslen(messages[i]->ptr);
aggregated_msg_len += message_len;
}

size_t msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight) ;
size_t msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight);
msglen += sizeof(clusterMsgDataPublishMulti);
msglen += ((count+1) * (sizeof(clusterMsgDataPublishMessage) - 8));
msglen += ((count + 1) * (sizeof(clusterMsgDataPublishMessage) - 8));
msglen += aggregated_msg_len;
clusterMsgSendBlockLight *msgblock = createPubsubClusterMsgSendBlock(type, msglen);

clusterMsgLight *hdr = &msgblock->msg;
hdr->data.publish.msg.data_count = htonu64(count+1);
hdr->data.publish.msg.data_count = htonu64(count + 1);
clusterMsgDataPublishMessage *cursor = getInitialBulkData(&hdr->data.publish.msg);
writeDataToCursor(cursor, channel);
for (i = 0; i < count; i++) {
Expand Down Expand Up @@ -4118,7 +4121,8 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard
clusterMsgSendBlockLight *msgblock_light;
clusterMsgSendBlock *msgblock;
int i;
msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, count, sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT : CLUSTERMSG_TYPE_PUBLISH_LIGHT);
msgblock_light = clusterCreatePublishLightMsgBlock(
channel, message, count, sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT : CLUSTERMSG_TYPE_PUBLISH_LIGHT);
if (!sharded) {
clusterBroadcastPublishLightMessage(msgblock_light);
clusterMsgSendBlockDecrRefCountPublish(msgblock_light);
Expand All @@ -4139,8 +4143,10 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard
while ((ln = listNext(&li))) {
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light);
else continue;
if (nodeSupportsLightMsgHdr(node))
clusterSendPublishMessage(node->link, msgblock_light);
else
continue;
}
clusterMsgSendBlockDecrRefCountPublish(msgblock_light);

Expand All @@ -4150,8 +4156,10 @@ void clusterPropagatePublish(robj *channel, robj **message, int count, int shard
while ((ln = listNext(&li))) {
node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) continue;
else clusterSendMessage(node->link, msgblock);
if (nodeSupportsLightMsgHdr(node))
continue;
else
clusterSendMessage(node->link, msgblock);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}
Expand Down
18 changes: 9 additions & 9 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,19 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
#define CLUSTERMSG_FLAG0_EXT_DATA (1 << 2) /* Message contains extension data */

typedef struct {
char sig[4]; /* Signature "RCmb" (Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */
uint16_t port; /* Primary port number (TCP or TLS). */
uint16_t type; /* Message type */
char sig[4]; /* Signature "RCmb" (Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */
uint16_t port; /* Primary port number (TCP or TLS). */
uint16_t type; /* Message type */
union clusterMsgDataLight data;
} clusterMsgLight;

static_assert(offsetof(clusterMsgLight, sig) == offsetof(clusterMsg, sig), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen),"unexpected field offset");
static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver) , "unexpected field offset");
static_assert(offsetof(clusterMsgLight, port) == offsetof(clusterMsg, port) , "unexpected field offset");
static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type) , "unexpected field offset");
static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, port) == offsetof(clusterMsg, port), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, data) == 16, "unexpected field offset");

#define CLUSTERMSG_LIGHT_MIN_LEN (sizeof(clusterMsgLight) - sizeof(union clusterMsgDataLight))
Expand Down
4 changes: 2 additions & 2 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ void publishCommand(client *c) {
return;
}

int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2],c->argc-2, 0);
int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc - 2, 0);
if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL);
addReplyLongLong(c, receivers);
}
Expand Down Expand Up @@ -715,7 +715,7 @@ void channelList(client *c, sds pat, kvstore *pubsub_channels) {

/* SPUBLISH <shardchannel> <message> */
void spublishCommand(client *c) {
int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc-2, 1);
int receivers = pubsubPublishMessagesAndPropagateToCluster(c->argv[1], &c->argv[2], c->argc - 2, 1);
if (!server.cluster_enabled) forceCommandPropagation(c, PROPAGATE_REPL);
addReplyLongLong(c, receivers);
}
Expand Down

0 comments on commit 1ebd796

Please sign in to comment.