Skip to content

Commit

Permalink
Brocast a PONG to all node in cluster when role changed (#1295)
Browse files Browse the repository at this point in the history
When a node role changes, we should brocast the change to notify other nodes.
For example, one primary and one replica, after a failover, the replica became
a new primary, the primary became a new replica.

And then we trigger a second cluster failover for the new replica, the
new replica will send a MFSTART to its primary, ie, the new primary.

But the new primary may reject the MFSTART due to this logic:
```
    } else if (type == CLUSTERMSG_TYPE_MFSTART) {
        if (!sender || sender->replicaof != myself) return 1;
```

In the new primary views, sender is still a primary, and sender->replicaof
is NULL, so we will return. Then the manual failover timedout.

Another possibility is that other primaries refuse to vote after receiving
the FAILOVER_AUTH_REQUEST, since in their's views, sender is still a primary,
so it refuse to vote, and then manual failover timedout.
```
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    ...
        if (clusterNodeIsPrimary(node)) {
            serverLog(LL_WARNING, "Failover auth denied to...
```

The reason is that, currently, we only update the node->replicaof information
when we receive a PING/PONG from the sender. For details, see clusterProcessPacket.
Therefore, in some scenarios, such as clusters with many nodes and a large
cluster-ping-interval (that is, cluster-node-timeout), the role change of the node
will be very delayed.

Added a DEBUG DISABLE-CLUSTER-RANDOM-PING command, send cluster ping
to a random node every second (see clusterCron).

Signed-off-by: Binbin <binloveplay1314@qq.com>
  • Loading branch information
enjoy-binbin authored Nov 22, 2024
1 parent 979f4c1 commit b9d2240
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,8 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG |
CLUSTER_TODO_BROADCAST_ALL);
} else if (nodeIsPrimary(myself) && (sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
Expand Down Expand Up @@ -4538,7 +4539,7 @@ void clusterFailoverReplaceYourPrimary(void) {

/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to primary role. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);

/* 5) If there was a manual failover in progress, clear the state. */
resetManualFailover();
Expand Down Expand Up @@ -5029,7 +5030,7 @@ void clusterCron(void) {

/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
if (!(iteration % 10)) {
if (!server.debug_cluster_disable_random_ping && !(iteration % 10)) {
int j;

/* Check a few random nodes and ping the one with the oldest
Expand Down Expand Up @@ -5206,6 +5207,13 @@ void clusterBeforeSleep(void) {
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
clusterSaveConfigOrDie(fsync);
}

if (flags & CLUSTER_TODO_BROADCAST_ALL) {
/* Broadcast a pong to all known nodes. This is useful when something changes
* in the configuration and we want to make the cluster aware it before the
* regular ping. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
}
}

void clusterDoBeforeSleep(int flags) {
Expand Down Expand Up @@ -6556,7 +6564,7 @@ void clusterCommandSetSlot(client *c) {
}
/* After importing this slot, let the other nodes know as
* soon as possible. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);
}
}
}
Expand Down Expand Up @@ -6748,8 +6756,7 @@ int clusterCommandSpecial(client *c) {
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "count-failure-reports") && c->argc == 3) {
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
Expand Down
1 change: 1 addition & 0 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define CLUSTER_TODO_SAVE_CONFIG (1 << 2)
#define CLUSTER_TODO_FSYNC_CONFIG (1 << 3)
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1 << 4)
#define CLUSTER_TODO_BROADCAST_ALL (1 << 5)

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
Expand Down
5 changes: 5 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ void debugCommand(client *c) {
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.",
" When set to 1, the cluster link is closed after dropping a packet based on the filter.",
"DISABLE-CLUSTER-RANDOM-PING <0|1>",
" Disable sending cluster ping to a random node every second.",
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -607,6 +609,9 @@ void debugCommand(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "disable-cluster-random-ping") && c->argc == 3) {
server.debug_cluster_disable_random_ping = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "object") && (c->argc == 3 || c->argc == 4)) {
dictEntry *de;
robj *val;
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2693,6 +2693,7 @@ void initServer(void) {
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
server.cluster_drop_packet_filter = -1;
server.debug_cluster_disable_random_ping = 0;
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2194,6 +2194,8 @@ struct valkeyServer {
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
/* Debug config to control the random ping. When set, we will disable the random ping in clusterCron. */
uint32_t debug_cluster_disable_random_ping : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/cluster/manual-failover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,64 @@ start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval
verify_no_log_message -3 "*Manual failover timed out*" $loglines2
}
} ;# start_cluster

start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 1000}} {
test "Broadcast PONG to the cluster when the node role changes" {
# R0 is a primary and R3 is a replica, we will do multiple cluster failover
# and then check their role and flags.
set R0_nodeid [R 0 cluster myid]
set R3_nodeid [R 3 cluster myid]

# Make sure we don't send PINGs for a short period of time.
for {set j 0} {$j < [llength $::servers]} {incr j} {
R $j debug disable-cluster-random-ping 0
R $j config set cluster-ping-interval 300000
}

R 3 cluster failover
wait_for_condition 1000 50 {
[s 0 role] eq {slave} &&
[s -3 role] eq {master}
} else {
fail "Failover does not happened"
}

# Get the node information of R0 and R3 in my view from CLUSTER NODES
# R0 should be a replica and R3 should be a primary in all views.
for {set j 0} {$j < [llength $::servers]} {incr j} {
wait_for_condition 1000 50 {
[check_cluster_node_mark slave $j $R0_nodeid] &&
[check_cluster_node_mark master $j $R3_nodeid]
} else {
puts "R0_nodeid: $R0_nodeid"
puts "R3_nodeid: $R3_nodeid"
puts "R $j cluster nodes:"
puts [R $j cluster nodes]
fail "Node role does not changed in the first failover"
}
}

R 0 cluster failover
wait_for_condition 1000 50 {
[s 0 role] eq {master} &&
[s -3 role] eq {slave}
} else {
fail "The second failover does not happened"
}

# Get the node information of R0 and R3 in my view from CLUSTER NODES
# R0 should be a primary and R3 should be a replica in all views.
for {set j 0} {$j < [llength $::servers]} {incr j} {
wait_for_condition 1000 50 {
[check_cluster_node_mark master $j $R0_nodeid] &&
[check_cluster_node_mark slave $j $R3_nodeid]
} else {
puts "R0_nodeid: $R0_nodeid"
puts "R3_nodeid: $R3_nodeid"
puts "R $j cluster nodes:"
puts [R $j cluster nodes]
fail "Node role does not changed in the second failover"
}
}
}
} ;# start_cluster

0 comments on commit b9d2240

Please sign in to comment.