Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
  • Loading branch information
roshkhatri committed Jul 23, 2024
1 parent b11b014 commit 4427415
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ static inline int defaultClientPort(void) {
(server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))

#define RCVBUF_INIT_LEN 1024
#define RCVBUF_MIN_READ_LEN 16
#define RCVBUF_MIN_READ_LEN 14
static_assert(offsetof(clusterMsg, type) + sizeof(uint16_t) == RCVBUF_MIN_READ_LEN,
"Incorrect lenght to read to identify type");

#define RCVBUF_MAX_PREALLOC (1 << 20) /* 1MB */

/* Fixed timeout value for cluster operations (milliseconds) */
Expand Down Expand Up @@ -1117,7 +1120,6 @@ void clusterInit(void) {
* by the createClusterNode() function. */
myself = server.cluster->myself = createClusterNode(NULL, CLUSTER_NODE_MYSELF | CLUSTER_NODE_PRIMARY);
serverLog(LL_NOTICE, "No cluster configuration found, I'm %.40s", myself->name);
clusterUpdateMyselfFlags();
clusterAddNode(myself);
clusterAddNodeToShard(myself->shard_id, myself);
saveconf = 1;
Expand Down Expand Up @@ -1269,7 +1271,7 @@ static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen)
msgblock->refcount = 1;
msgblock->totlen = blocklen;
server.stat_cluster_links_memory += blocklen;
if IS_LIGHT_MESSAGE (type) {
if (IS_LIGHT_MESSAGE(type)) {
clusterBuildMessageHdrLight(&msgblock->msg_light, type, msglen);
} else {
clusterBuildMessageHdr(&msgblock->msg, type, msglen);
Expand Down Expand Up @@ -2874,7 +2876,7 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
return sender;
}

void processPublishPacket(clusterMsgDataPublish *publish_data, uint16_t type) {
void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uint16_t type) {
robj *channel, *message;
uint32_t channel_len, message_len;

Expand All @@ -2896,7 +2898,7 @@ void processLightPacket(clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;

if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
processPublishPacket(&hdr->data.publish.msg, type);
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
}
}

Expand All @@ -2916,8 +2918,9 @@ int clusterIsValidPacket(clusterLink *link) {
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;

if (is_light && !messageTypeSupportsLightHdr(type)) {
serverLog(LL_NOTICE, "--- Packet of type %s does not support light cluster header",
clusterGetMessageTypeString(type));
serverLog(LL_NOTICE,
"Packet of type '%s' (%u) does not support light cluster header. Marking packet as invalid.",
clusterGetMessageTypeString(type), type);
return 0;
}

Expand Down Expand Up @@ -3418,7 +3421,7 @@ int clusterProcessPacket(clusterLink *link) {
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
processPublishPacket(&hdr->data.publish.msg, type);
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender, hdr);
Expand Down

0 comments on commit 4427415

Please sign in to comment.