Skip to content

Commit

Permalink
Adds compatability with older versions
Browse files Browse the repository at this point in the history
  • Loading branch information
roshkhatri committed Jun 13, 2024
1 parent 90d35a3 commit 903e8bd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 43 deletions.
147 changes: 104 additions & 43 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ void clusterUpdateMyselfFlags(void) {
int nofailover = server.cluster_slave_no_failover ? CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
myself->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
Expand Down Expand Up @@ -1001,6 +1002,7 @@ void clusterInit(void) {
* by the createClusterNode() function. */
myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name);
myself->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED;
clusterAddNode(myself);
clusterAddNodeToShard(myself->shard_id, myself);
saveconf = 1;
Expand Down Expand Up @@ -1170,8 +1172,9 @@ static void clusterMsgSendBlockDecrRefCount(void *node) {
}
}

static void clusterMsgSendBlockDecrRefCountPublish(void *node) {
clusterMsgSendBlockPubsub *msgblock = (clusterMsgSendBlockPubsub *)node;
static void clusterMsgSendBlockDecrRefCountPublish(void *node_light, void *node) {
clusterMsgSendBlockDecrRefCount(node);
clusterMsgSendBlockPubsub *msgblock = (clusterMsgSendBlockPubsub *)node_light;
msgblock->refcount--;
serverAssert(msgblock->refcount >= 0);
if (msgblock->refcount == 0) {
Expand Down Expand Up @@ -2776,6 +2779,27 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
return sender;
}

int pubsubProcessLightPacket(clusterLink *link, uint16_t type) {
clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf;
robj *channel, *message;
uint32_t channel_len, message_len;

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(hdr_pubsub->data.publish.msg.channel_len);
message_len = ntohl(hdr_pubsub->data.publish.msg.message_len);
channel = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
return 1;
}


int clusterIsValidPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
Expand Down Expand Up @@ -2835,10 +2859,17 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf;
explen = sizeof(clusterMsgPubsub) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr_pubsub->data.publish.msg.channel_len) +
ntohl(hdr_pubsub->data.publish.msg.message_len);
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
if (sender && nodeSupportsPubsubMsgHdr(sender)) {
clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf;
explen = sizeof(clusterMsgPubsub) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr_pubsub->data.publish.msg.channel_len) +
ntohl(hdr_pubsub->data.publish.msg.message_len);
} else {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
Expand Down Expand Up @@ -2878,45 +2909,28 @@ int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();
clusterNode *sender;

if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterMsgPubsub *hdr_pubsub = (clusterMsgPubsub *)link->rcvbuf;
robj *channel, *message;
serverAssert(link->node && !nodeInHandshake(link->node));
sender = link->node;
sender->data_received = now;
uint32_t channel_len, message_len;

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(hdr_pubsub->data.publish.msg.channel_len);
message_len = ntohl(hdr_pubsub->data.publish.msg.message_len);
channel = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr_pubsub->data.publish.msg.bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
return 1;
}

sender = getNodeFromLinkAndMsg(link, hdr);

clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
* is just sending a lot of data in the cluster bus, for instance
* because of Pub/Sub. */
if (sender) sender->data_received = now;

if (sender && (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) && nodeSupportsPubsubMsgHdr(sender)) {
return pubsubProcessLightPacket(link, type);
}

uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;

if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}

if (sender && (!nodeSupportsPubsubMsgHdr(link->node)) && (flags & CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED)) {
sender->flags |= CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED;
}

if (sender && !nodeInHandshake(sender)) {
/* Update our currentEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
Expand Down Expand Up @@ -3292,6 +3306,24 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_NOTICE, "Ignoring FAIL message from unknown node %.40s about %.40s", hdr->sender,
hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */

robj *channel, *message;
uint32_t channel_len, message_len;

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender, hdr);
Expand Down Expand Up @@ -3513,7 +3545,6 @@ void clusterReadHandler(connection *conn) {
/* Finally read the full message. */
hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
serverLog(LL_WARNING, "TYPE:%hu",type);
if (rcvbuflen == RCVBUF_MIN_READ_LEN) {
int is_msg_valid = 0;
/* Perform some sanity check on the message signature
Expand Down Expand Up @@ -3650,7 +3681,7 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
dictReleaseIterator(di);
}

void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock) {
void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock_light, clusterMsgSendBlock *msgblock) {
dictIterator *di;
dictEntry *de;

Expand All @@ -3659,7 +3690,8 @@ void clusterBroadcastPublishMessage(clusterMsgSendBlockPubsub *msgblock) {
clusterNode *node = dictGetVal(de);

if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendPublishMessage(node->link, msgblock);
if (nodeSupportsPubsubMsgHdr(node)) clusterSendPublishMessage(node->link, msgblock_light);
else clusterSendMessage(node->link, msgblock);
}
dictReleaseIterator(di);
}
Expand Down Expand Up @@ -3951,7 +3983,31 @@ void clusterBroadcastPong(int target) {
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
VALKEY_NO_SANITIZE("bounds")
clusterMsgSendBlockPubsub *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) {
clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) {
uint32_t channel_len, message_len;

channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);

size_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen);

clusterMsg *hdr = &msgblock->msg;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
memcpy(hdr->data.publish.msg.bulk_data, channel->ptr, sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr));

decrRefCount(channel);
decrRefCount(message);

return msgblock;
}

clusterMsgSendBlockPubsub *clusterCreatePublishLightMsgBlock(robj *channel, robj *message, uint16_t type) {
uint32_t channel_len, message_len;

channel = getDecodedObject(channel);
Expand Down Expand Up @@ -4069,12 +4125,15 @@ int clusterSendModuleMessageToTarget(const char *target,
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterMsgSendBlockPubsub *msgblock;
clusterMsgSendBlockPubsub *msgblock_light;
clusterMsgSendBlock *msgblock;

if (!sharded) {
msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastPublishMessage(msgblock);
clusterMsgSendBlockDecrRefCountPublish(msgblock);

clusterBroadcastPublishMessage(msgblock_light, msgblock);
clusterMsgSendBlockDecrRefCountPublish(msgblock_light, msgblock);
return;
}

Expand All @@ -4083,13 +4142,15 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself);
serverAssert(nodes_for_slot != NULL);
listRewind(nodes_for_slot, &li);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
msgblock_light = clusterCreatePublishLightMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
while ((ln = listNext(&li))) {
clusterNode *node = listNodeValue(ln);
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendPublishMessage(node->link, msgblock);
if nodeSupportsPubsubMsgHdr(node) clusterSendPublishMessage(node->link, msgblock_light);
else clusterSendMessage(node->link, msgblock);
}
clusterMsgSendBlockDecrRefCountPublish(msgblock);
clusterMsgSendBlockDecrRefCountPublish(msgblock_light, msgblock);
}

/* -----------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_MIGRATE_TO 256 /* Master eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED 2048 /* This node supports light pubsub message header. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -64,6 +65,7 @@ typedef struct clusterLink {
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsPubsubMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_PUBSUB_SUPPORTED)

/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
Expand Down

0 comments on commit 903e8bd

Please sign in to comment.