From 903e8bd18a2de520dff9a5edae8e3c364e6ee8f3 Mon Sep 17 00:00:00 2001 From: Roshan Khatri Date: Thu, 13 Jun 2024 20:24:07 +0000 Subject: [PATCH] Adds compatability with older versions --- src/cluster_legacy.c | 147 ++++++++++++++++++++++++++++++------------- src/cluster_legacy.h | 2 + 2 files changed, 106 insertions(+), 43 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 32e3c6dd36..61e7cb0dcc 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -842,6 +842,7 @@ void clusterUpdateMyselfFlags(void) { int nofailover = server.cluster_slave_no_failover ? CLUSTER_NODE_NOFAILOVER : 0; myself->flags &= ~CLUSTER_NODE_NOFAILOVER; myself->flags |= nofailover; + myself->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED; if (myself->flags != oldflags) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -1001,6 +1002,7 @@ void clusterInit(void) { * by the createClusterNode() function. */ myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_MASTER); serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name); + myself->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED; clusterAddNode(myself); clusterAddNodeToShard(myself->shard_id, myself); saveconf = 1; @@ -1170,8 +1172,9 @@ static void clusterMsgSendBlockDecrRefCount(void *node) { } } -static void clusterMsgSendBlockDecrRefCountPublish(void *node) { - clusterMsgSendBlockPubsub *msgblock = (clusterMsgSendBlockPubsub *)node; +static void clusterMsgSendBlockDecrRefCountPublish(void *node_light, void *node) { + clusterMsgSendBlockDecrRefCount(node); + clusterMsgSendBlockPubsub *msgblock = (clusterMsgSendBlockPubsub *)node_light; msgblock->refcount--; serverAssert(msgblock->refcount >= 0); if (msgblock->refcount == 0) { @@ -2776,6 +2779,27 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { return sender; } +int pubsubProcessLightPacket(clusterLink *link, uint16_t type) { + clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf; + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ + if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || + (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { + channel_len = ntohl(hdr_pubsub->data.publish.msg.channel_len); + message_len = ntohl(hdr_pubsub->data.publish.msg.message_len); + channel = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data, channel_len); + message = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data + channel_len, message_len); + pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); + decrRefCount(channel); + decrRefCount(message); + } + return 1; +} + + int clusterIsValidPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); @@ -2835,10 +2859,17 @@ int clusterIsValidPacket(clusterLink *link) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataFail); } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf; - explen = sizeof(clusterMsgPubsub) - sizeof(union clusterMsgData); - explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr_pubsub->data.publish.msg.channel_len) + - ntohl(hdr_pubsub->data.publish.msg.message_len); + clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); + if (sender && nodeSupportsPubsubMsgHdr(sender)) { + clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf; + explen = sizeof(clusterMsgPubsub) - sizeof(union clusterMsgData); + explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr_pubsub->data.publish.msg.channel_len) + + ntohl(hdr_pubsub->data.publish.msg.message_len); + } else { + explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) + + ntohl(hdr->data.publish.msg.message_len); + } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); @@ -2878,38 +2909,17 @@ int clusterProcessPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); mstime_t now = mstime(); - clusterNode *sender; - - if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf; - robj *channel, *message; - serverAssert(link->node && !nodeInHandshake(link->node)); - sender = link->node; - sender->data_received = now; - uint32_t channel_len, message_len; - - /* Don't bother creating useless objects if there are no - * Pub/Sub subscribers. */ - if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || - (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { - channel_len = ntohl(hdr_pubsub->data.publish.msg.channel_len); - message_len = ntohl(hdr_pubsub->data.publish.msg.message_len); - channel = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data, channel_len); - message = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data + channel_len, message_len); - pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); - decrRefCount(channel); - decrRefCount(message); - } - return 1; - } - - sender = getNodeFromLinkAndMsg(link, hdr); - + clusterNode *sender = getNodeFromLinkAndMsg(link, hdr); /* Update the last time we saw any data from this node. We * use this in order to avoid detecting a timeout from a node that * is just sending a lot of data in the cluster bus, for instance * because of Pub/Sub. */ if (sender) sender->data_received = now; + + if (sender && (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) && nodeSupportsPubsubMsgHdr(sender)) { + return pubsubProcessLightPacket(link, type); + } + uint16_t flags = ntohs(hdr->flags); uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0; @@ -2917,6 +2927,10 @@ int clusterProcessPacket(clusterLink *link) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; } + if (sender && (!nodeSupportsPubsubMsgHdr(link->node)) && (flags & CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED)) { + sender->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED; + } + if (sender && !nodeInHandshake(sender)) { /* Update our currentEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); @@ -3292,6 +3306,24 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); } + } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { + if (!sender) return 1; /* We don't know that node. */ + + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ + if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || + (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len); + message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len); + pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); + decrRefCount(channel); + decrRefCount(message); + } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ clusterSendFailoverAuthIfNeeded(sender, hdr); @@ -3513,7 +3545,6 @@ void clusterReadHandler(connection *conn) { /* Finally read the full message. */ hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); - serverLog(LL_WARNING, "TYPE:%hu",type); if (rcvbuflen == RCVBUF_MIN_READ_LEN) { int is_msg_valid = 0; /* Perform some sanity check on the message signature @@ -3650,7 +3681,7 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { dictReleaseIterator(di); } -void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock) { +void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock_light, clusterMsgSendBlock *msgblock) { dictIterator *di; dictEntry *de; @@ -3659,7 +3690,8 @@ void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock) { clusterNode *node = dictGetVal(de); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - clusterSendPublishMessage(node->link, msgblock); + if (nodeSupportsPubsubMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light); + else clusterSendMessage(node->link, msgblock); } dictReleaseIterator(di); } @@ -3951,7 +3983,31 @@ void clusterBroadcastPong(int target) { * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false * positive in this context. */ VALKEY_NO_SANITIZE("bounds") -clusterMsgSendBlockPubsub *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) { +clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) { + uint32_t channel_len, message_len; + + channel = getDecodedObject(channel); + message = getDecodedObject(message); + channel_len = sdslen(channel->ptr); + message_len = sdslen(message->ptr); + + size_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen); + + clusterMsg *hdr = &msgblock->msg; + hdr->data.publish.msg.channel_len = htonl(channel_len); + hdr->data.publish.msg.message_len = htonl(message_len); + memcpy(hdr->data.publish.msg.bulk_data, channel->ptr, sdslen(channel->ptr)); + memcpy(hdr->data.publish.msg.bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr)); + + decrRefCount(channel); + decrRefCount(message); + + return msgblock; +} + +clusterMsgSendBlockPubsub *clusterCreatePublishLightMsgBlock(robj *channel, robj *message, uint16_t type) { uint32_t channel_len, message_len; channel = getDecodedObject(channel); @@ -4069,12 +4125,15 @@ int clusterSendModuleMessageToTarget(const char *target, * Publish this message across the slot (primary/replica). * -------------------------------------------------------------------------- */ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { - clusterMsgSendBlockPubsub *msgblock; + clusterMsgSendBlockPubsub *msgblock_light; + clusterMsgSendBlock *msgblock; if (!sharded) { + msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); - clusterBroadcastPublishMessage(msgblock); - clusterMsgSendBlockDecrRefCountPublish(msgblock); + + clusterBroadcastPublishMessage(msgblock_light, msgblock); + clusterMsgSendBlockDecrRefCountPublish(msgblock_light, msgblock); return; } @@ -4083,13 +4142,15 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself); serverAssert(nodes_for_slot != NULL); listRewind(nodes_for_slot, &li); - msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); + msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); + msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); while ((ln = listNext(&li))) { clusterNode *node = listNodeValue(ln); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - clusterSendPublishMessage(node->link, msgblock); + if nodeSupportsPubsubMsgHdr(node) clusterSendPublishMessage(node->link, msgblock_light); + else clusterSendMessage(node->link, msgblock); } - clusterMsgSendBlockDecrRefCountPublish(msgblock); + clusterMsgSendBlockDecrRefCountPublish(msgblock_light, msgblock); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index b304de580c..145455adcb 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -52,6 +52,7 @@ typedef struct clusterLink { #define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */ #define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */ #define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */ +#define CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED 2048 /* This node supports light pubsub message header. */ #define CLUSTER_NODE_NULL_NAME \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ "\000\000\000\000\000\000\000\000\000\000\000\000" @@ -64,6 +65,7 @@ typedef struct clusterLink { #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) #define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER) #define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED) +#define nodeSupportsPubsubMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED) /* This structure represent elements of node->fail_reports. */ typedef struct clusterNodeFailReport {