Skip to content

Commit

Permalink
Address Feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
  • Loading branch information
roshkhatri committed Jul 23, 2024
1 parent 64ba736 commit 12ba927
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
57 changes: 33 additions & 24 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ int auxTcpPortPresent(clusterNode *n);
int auxTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxTlsPortGetter(clusterNode *n, sds s);
int auxTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
Expand Down Expand Up @@ -421,7 +422,7 @@ typedef struct {
int refcount; /* Number of cluster link send msg queues containing the message */
union {
clusterMsg msg;
clusterMsgLight light_msg;
clusterMsgLight msg_light;
};
} clusterMsgSendBlock;

Expand Down Expand Up @@ -1268,7 +1269,11 @@ static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen)
msgblock->refcount = 1;
msgblock->totlen = blocklen;
server.stat_cluster_links_memory += blocklen;
clusterBuildMessageHdr(&msgblock->msg, type, msglen);
if IS_LIGHT_MESSAGE (type) {
clusterBuildMessageHdrLight(&msgblock->msg_light, type, msglen);
} else {
clusterBuildMessageHdr(&msgblock->msg, type, msglen);
}
return msgblock;
}

Expand Down Expand Up @@ -2895,25 +2900,25 @@ void processLightPacket(clusterLink *link, uint16_t type) {
}
}

inline int messageTypeSupportsLightHdr(uint16_t type) {
static inline int messageTypeSupportsLightHdr(uint16_t type) {
switch (type) {
case CLUSTERMSG_TYPE_PUBLISH: return 1;
case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1;
}
serverLog(LL_NOTICE, "--- Packet of type %s does not support light cluster header",
clusterGetMessageTypeString(type));
return 0;
}


int clusterIsValidPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
int is_light = IS_LIGHT_MESSAGE(type);
if (is_light) {
type &= ~CLUSTERMSG_LIGHT;
serverAssert(messageTypeSupportsLightHdr(type));
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;

if (is_light && !messageTypeSupportsLightHdr(type)) {
serverLog(LL_NOTICE, "--- Packet of type %s does not support light cluster header",
clusterGetMessageTypeString(type));
return 0;
}

if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++;
Expand Down Expand Up @@ -3028,12 +3033,11 @@ int clusterProcessPacket(clusterLink *link) {
}

clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();
int is_light = IS_LIGHT_MESSAGE(type);
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;

if (is_light) {
type &= ~CLUSTERMSG_LIGHT;
if (!link->node || nodeInHandshake(link->node)) {
freeClusterLink(link);
serverLog(
Expand Down Expand Up @@ -3717,24 +3721,21 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
dictReleaseIterator(di);
}

/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen) {
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
hdr->notused1 = 0;
hdr->notused2 = 0;
hdr->totlen = htonl(msglen);
}

if (IS_LIGHT_MESSAGE(type)) {
clusterMsgLight *hdr_light = (clusterMsgLight *)hdr;
hdr_light->notused1 = 0;
hdr_light->notused2 = 0;
return;
}

/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
uint64_t offset;
clusterNode *primary;

Expand All @@ -3744,6 +3745,12 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
* in charge for this slots. */
primary = (nodeIsReplica(myself) && myself->replicaof) ? myself->replicaof : myself;

hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
memcpy(hdr->sender, myself->name, CLUSTER_NAMELEN);

/* If cluster-announce-ip option is enabled, force the receivers of our
Expand Down Expand Up @@ -3785,6 +3792,8 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {

/* Set the message flags. */
if (clusterNodeIsPrimary(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;

hdr->totlen = htonl(msglen);
}

/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
Expand Down Expand Up @@ -4013,7 +4022,7 @@ clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message,
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen);
clusterMsgDataPublish *hdr_data_msg;
if (is_light) {
clusterMsgLight *hdr_light = &msgblock->light_msg;
clusterMsgLight *hdr_light = &msgblock->msg_light;
hdr_data_msg = &hdr_light->data.publish.msg;
} else {
clusterMsg *hdr = &msgblock->msg;
Expand Down
2 changes: 2 additions & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ typedef struct clusterNodeFailReport {

#define CLUSTERMSG_LIGHT 0x8000 /* Modifier bit for message types that support light header */

#define CLUSTERMSG_MODIFIER_MASK (CLUSTERMSG_LIGHT) /* Modifier bit for message types that support light header */

/* We check for the modifier bit to determine if the message is sent using light header.*/
#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)

Expand Down

0 comments on commit 12ba927

Please sign in to comment.