Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
  • Loading branch information
roshkhatri committed May 20, 2024
1 parent 6c97434 commit 390ade1
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 17 deletions.
13 changes: 8 additions & 5 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ int isNodeAvailable(clusterNode *node) {
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
repl_offset = getNodeReplicationOffset(node);
}
return (repl_offset != 0);
}
Expand Down Expand Up @@ -1438,15 +1438,18 @@ sds generateClusterSlotResponse(void) {
}
}
setDeferredArrayLen(recording_client, slot_replylen, num_masters);
return stopCaching(recording_client);
sds cluster_slot_respose = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_respose;
}

int verifyCachedClusterSlotsResponse(sds cached_response) {
sds generated_response = generateClusterSlotResponse();
int result = !sdscmp(generated_response, cached_response);
if (!result) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
int is_equal = !sdscmp(generated_response, cached_response);
/* Here, we use LL_WARNING so this gets printed when debug assertions are enabled and the system is about to crash. */
if (!is_equal) serverLog(LL_WARNING,"\ngenerated_response:\n%s\n\ncached_response:\n%s", generated_response, cached_response);
sdsfree(generated_response);
return result;
return is_equal;
}

void clusterCommandSlots(client *c) {
Expand Down
4 changes: 3 additions & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ clusterNode *clusterLookupNode(const char *name, int length);
void clusterReplicateOpenSlots(void);
int detectAndUpdateCachedNodeHealth(void);
client *createCachedResponseClient(void);
sds stopCaching(client *recording_client);
void deleteCachedResponseClient(client *recording_client);
void clearCachedClusterSlotsResponse(void);

/* functions with shared implementations */
Expand All @@ -121,4 +121,6 @@ void migrateCommand(client *c);
void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void);
int isNodeAvailable(clusterNode *node);
long long getNodeReplicationOffset(clusterNode *node);
sds aggregateClientOutputBuffer(client *c);
#endif /* __CLUSTER_H */
4 changes: 2 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5882,7 +5882,7 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}

long long getNodeOffset(clusterNode *node) {
long long getNodeReplicationOffset(clusterNode *node) {
if (node->flags & CLUSTER_NODE_MYSELF) {
return nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else {
Expand Down Expand Up @@ -5924,7 +5924,7 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
reply_count++;
}

long long node_offset = getNodeOffset(node);
long long node_offset = getNodeReplicationOffset(node);

addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
Expand Down
4 changes: 2 additions & 2 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ struct _clusterNode {
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
int is_node_healthy; /* Boolean last updated node health used for validating
cached response, can be stale. Update by calling detectAndUpdateCachedNodeHealth() */
int is_node_healthy; /* Boolean indicating the cached node health.
Update with updateAndCountChangedNodeHealth(). */
};

struct clusterState {
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2655,6 +2655,7 @@ int updateClusterFlags(const char **err) {
static int updateClusterAnnouncedPort(const char **err) {
UNUSED(err);
clusterUpdateMyselfAnnouncedPorts();
clearCachedClusterSlotsResponse();
return 1;
}

Expand Down
8 changes: 3 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ int prepareClientToWrite(client *c) {

/* Returns everything in the client reply linked list in a SDS format.
* This should only be used only with a caching client. */
static sds getClientOutputBuffer(client *c) {
sds aggregateClientOutputBuffer(client *c) {
sds cmd_response = sdsempty();
listIter li;
listNode *ln;
Expand All @@ -340,7 +340,7 @@ static sds getClientOutputBuffer(client *c) {
/* This function creates and returns a fake client for recording the command response
* to initiate caching of any command response.
*
* It needs be paired with `stopCaching` function to stop caching. */
* It needs be paired with `deleteCachedResponseClient` function to stop caching. */
client *createCachedResponseClient(void) {
struct client *recording_client = createClient(NULL);
/* Allocating the `conn` allows to prepare the caching client before adding
Expand All @@ -351,12 +351,10 @@ client *createCachedResponseClient(void) {

/* This function is used to stop caching of any command response after `createCachedResponseClient` is called.
* It returns the command response as SDS from the recording_client's reply buffer. */
sds stopCaching(client *recording_client) {
void deleteCachedResponseClient(client *recording_client) {
zfree(recording_client->conn);
recording_client->conn = NULL;
sds output_buff = getClientOutputBuffer(recording_client);
freeClient(recording_client);
return output_buff;
}

/* -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2682,7 +2682,7 @@ client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(void);
sds stopCaching(client *recording_client);
void deleteCachedResponseClient(client *recording_client);

/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/cluster/announced-endpoints.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ start_cluster 2 2 {tags {external:skip cluster}} {
}
set count [expr [llength $::servers] + 1]
set used_port [find_available_port $baseport $count]
R 0 CLUSTER SLOTS
R 1 CLUSTER SLOTS

R 0 config set cluster-announce-tls-port $used_port
R 0 config set cluster-announce-port $used_port

assert_match "*:$used_port@*" [R 0 CLUSTER NODES]
assert_match "*$used_port*" [R 0 CLUSTER SLOTS]
wait_for_condition 50 100 {
[string match "*:$used_port@*" [R 1 CLUSTER NODES]]
([string match "*:$used_port@*" [R 1 CLUSTER NODES]] && [string match "*$used_port*" [R 1 CLUSTER SLOTS]])
} else {
fail "Cluster announced port was not propagated via gossip"
}
Expand Down

0 comments on commit 390ade1

Please sign in to comment.