Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Brocast a PONG to all node in cluster when role changed #1295

Merged
merged 9 commits into from
Nov 22, 2024
19 changes: 13 additions & 6 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,8 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG |
CLUSTER_TODO_BROADCAST_ALL);
} else if (nodeIsPrimary(myself) && (sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
Expand Down Expand Up @@ -4533,7 +4534,7 @@ void clusterFailoverReplaceYourPrimary(void) {

/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to primary role. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);

/* 5) If there was a manual failover in progress, clear the state. */
resetManualFailover();
Expand Down Expand Up @@ -5001,7 +5002,7 @@ void clusterCron(void) {

/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
if (!(iteration % 10)) {
if (server.debug_cluster_random_ping && !(iteration % 10)) {
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
int j;

/* Check a few random nodes and ping the one with the oldest
Expand Down Expand Up @@ -5178,6 +5179,13 @@ void clusterBeforeSleep(void) {
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
clusterSaveConfigOrDie(fsync);
}

if (flags & CLUSTER_TODO_BROADCAST_ALL) {
/* Broadcast a pong to all known nodes. This is useful when something changes
* in the configuration and we want to make the cluster aware it before the
* regular ping. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}
}

void clusterDoBeforeSleep(int flags) {
Expand Down Expand Up @@ -6523,7 +6531,7 @@ void clusterCommandSetSlot(client *c) {
}
/* After importing this slot, let the other nodes know as
* soon as possible. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);
}
}
}
Expand Down Expand Up @@ -6715,8 +6723,7 @@ int clusterCommandSpecial(client *c) {
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "count-failure-reports") && c->argc == 3) {
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define CLUSTER_TODO_SAVE_CONFIG (1 << 2)
#define CLUSTER_TODO_FSYNC_CONFIG (1 << 3)
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1 << 4)
#define CLUSTER_TODO_BROADCAST_ALL (1 << 5)

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
Expand Down
5 changes: 5 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ void debugCommand(client *c) {
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.",
" When set to 1, the cluster link is closed after dropping a packet based on the filter.",
"CLUSTER-RANDOM-PING <0|1>",
" Send cluster ping to a random node every second. Enabled by default.",
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -607,6 +609,9 @@ void debugCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "cluster-random-ping") && c->argc == 3) {
server.debug_cluster_random_ping = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "object") && (c->argc == 3 || c->argc == 4)) {
dictEntry *de;
robj *val;
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2705,6 +2705,7 @@ void initServer(void) {
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
server.cluster_drop_packet_filter = -1;
server.debug_cluster_random_ping = 1;
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,7 @@ struct valkeyServer {
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
uint32_t debug_cluster_random_ping : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/cluster/manual-failover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,64 @@ test "Wait for instance #0 to return back alive" {
}

} ;# start_cluster

start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 1000}} {
test "Broadcast PONG to the cluster when the node role changes" {
# R0 is a primary and R3 is a replica, we will do multiple cluster failover
# and then check their role and flags.
set R0_nodeid [R 0 cluster myid]
set R3_nodeid [R 3 cluster myid]

# Make sure we don't send PINGs for a short period of time.
for {set j 0} {$j < [llength $::servers]} {incr j} {
R $j debug cluster-random-ping 0
R $j config set cluster-ping-interval 300000
}

R 3 cluster failover
wait_for_condition 1000 50 {
[s 0 role] eq {slave} &&
[s -3 role] eq {master}
} else {
fail "Failover does not happened"
}

# Get the node information of R0 and R3 in my view from CLUSTER NODES
# R0 should be a replica and R3 should be a primary in all views.
for {set j 0} {$j < [llength $::servers]} {incr j} {
wait_for_condition 1000 50 {
[check_cluster_node_mark slave $j $R0_nodeid] &&
[check_cluster_node_mark master $j $R3_nodeid]
} else {
puts "R0_nodeid: $R0_nodeid"
puts "R3_nodeid: $R3_nodeid"
puts "R $j cluster nodes:"
puts [R $j cluster nodes]
fail "Node role does not changed in the first failover"
}
}

R 0 cluster failover
wait_for_condition 1000 50 {
[s 0 role] eq {master} &&
[s -3 role] eq {slave}
} else {
fail "The second failover does not happened"
}

# Get the node information of R0 and R3 in my view from CLUSTER NODES
# R0 should be a primary and R3 should be a replica in all views.
for {set j 0} {$j < [llength $::servers]} {incr j} {
wait_for_condition 1000 50 {
[check_cluster_node_mark master $j $R0_nodeid] &&
[check_cluster_node_mark slave $j $R3_nodeid]
} else {
puts "R0_nodeid: $R0_nodeid"
puts "R3_nodeid: $R3_nodeid"
puts "R $j cluster nodes:"
puts [R $j cluster nodes]
fail "Node role does not changed in the second failover"
}
}
}
} ;# start_cluster
Loading