Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster meet reliable under link failures #461

Merged
merged 11 commits into from
Jun 17, 2024
32 changes: 24 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2975,7 +2975,13 @@ int clusterIsValidPacket(clusterLink *link) {
int clusterProcessPacket(clusterLink *link) {

/* Validate that the packet is well-formed */
if (!clusterIsValidPacket(link)) return 1;
if (!clusterIsValidPacket(link)) {
if (server.cluster_close_link_on_packet_drop) {
freeClusterLink(link);
return 0;
}
return 1;
}

clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint16_t type = ntohs(hdr->type);
Expand Down Expand Up @@ -3088,6 +3094,12 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG,"%s packet received: %.40s",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
/* Once we get a response for MEET from the sender, we can stop sending more MEET. */
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
sender->flags &= ~CLUSTER_NODE_MEET;
}

if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
Expand Down Expand Up @@ -3613,13 +3625,17 @@ void clusterLinkConnectHandler(connection *conn) {
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;

/* NOTE: We cannot clear the MEET flag from the node until we get a response
madolson marked this conversation as resolved.
Show resolved Hide resolved
* from the other node. If the MEET packet is not accepted by the other node
madolson marked this conversation as resolved.
Show resolved Hide resolved
* due to link failure, we want to continue sending MEET. If we don't
* continue sending MEET, this node will know about the other node, but the
* other node will never add this node. Every node always responds to PINGs
* from unknown nodes with a PONG, so this node will know about the other
* node and continue sending PINGs. But the other node won't add this node
* until it sees a MEET (or it gets to know about this node from a trusted
* third node). In this case, clearing the MEET flag here leads to asymmetry
* in the cluster membership.
*/
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ void debugCommand(client *c) {
" Show low level info about `key` and associated value.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."
" When set to 1, the cluster link is closed after dropping a packet based on the filter."
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -603,6 +606,9 @@ NULL
return;
server.cluster_drop_packet_filter = packet_type;
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de;
robj *val;
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,8 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
int cluster_close_link_on_packet_drop; /* Debug config that goes along with cluster_drop_packet_filter.
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
When set, the link is closed on packet drop. */
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down
71 changes: 71 additions & 0 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# make sure the test infra won't use SELECT
set old_singledb $::singledb
set ::singledb 1

tags {tls:skip external:skip cluster} {
set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
for {set id 0} {$id < [llength $::servers]} {incr id} {
# Every node should be reachable.
wait_for_condition 1000 50 {
([catch {R $id ping} ping_reply] == 0) &&
($ping_reply eq {PONG})
} else {
catch {R $id ping} err
fail "Node #$id keeps replying '$err' to PING."
}
}
}

test "Before slots allocation, all nodes report cluster failure" {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
}

test "Allocate slots" {
cluster_allocate_slots 2 0
}

test "MEET is reliabile when target drops the initial MEETs" {
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
# Make 0 drop the initial MEET messages due to link failure
R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET
R 0 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R 1 CLUSTER MEET 127.0.0.1 [srv 0 port]

# Wait for at least a few MEETs to be sent so that we are sure that 0 is
# dropping them.
wait_for_condition 1000 50 {
[CI 0 cluster_stats_messages_meet_received] >= 3
} else {
fail "Cluster node $a never sent multiple MEETs to $b"
}

# Make sure the nodes still don't know about each other
assert {[llength [get_cluster_nodes 1 connected]] == 1}
assert {[llength [get_cluster_nodes 0 connected]] == 1}

R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# If the MEET is reliable, both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[CI 0 cluster_stats_messages_meet_received] >= 4 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
} ;# tags

set ::singledb $old_singledb

Loading