Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance SENTINEL FAILOVER to use the FAILOVER command to avoid data loss #1238

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -5561,9 +5561,16 @@ struct COMMAND_ARG SENTINEL_DEBUG_Args[] = {
#define SENTINEL_FAILOVER_Keyspecs NULL
#endif

/* SENTINEL FAILOVER failover_type argument table */
struct COMMAND_ARG SENTINEL_FAILOVER_failover_type_Subargs[] = {
{MAKE_ARG("legacy",ARG_TYPE_PURE_TOKEN,-1,"LEGACY",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("pause",ARG_TYPE_PURE_TOKEN,-1,"PAUSE",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* SENTINEL FAILOVER argument table */
struct COMMAND_ARG SENTINEL_FAILOVER_Args[] = {
{MAKE_ARG("primary-name",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("failover-type",ARG_TYPE_ONEOF,-1,"TYPE",NULL,"8.1.0",CMD_ARG_OPTIONAL,2,NULL),.subargs=SENTINEL_FAILOVER_failover_type_Subargs},
};

/********** SENTINEL FLUSHCONFIG ********************/
Expand Down Expand Up @@ -6026,7 +6033,7 @@ struct COMMAND_STRUCT SENTINEL_Subcommands[] = {
{MAKE_CMD("ckquorum","Checks for a Sentinel quorum.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CKQUORUM_History,0,SENTINEL_CKQUORUM_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CKQUORUM_Keyspecs,0,NULL,1),.args=SENTINEL_CKQUORUM_Args},
{MAKE_CMD("config","Configures Sentinel.","O(N) when N is the number of configuration parameters provided","6.2.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_CONFIG_History,1,SENTINEL_CONFIG_Tips,0,sentinelCommand,-4,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_CONFIG_Keyspecs,0,NULL,1),.args=SENTINEL_CONFIG_Args},
{MAKE_CMD("debug","Lists or updates the current configurable parameters of Sentinel.","O(N) where N is the number of configurable parameters","7.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_DEBUG_History,0,SENTINEL_DEBUG_Tips,0,sentinelCommand,-2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_DEBUG_Keyspecs,0,NULL,1),.args=SENTINEL_DEBUG_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,0,SENTINEL_FAILOVER_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,1),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("failover","Forces a Sentinel failover.",NULL,"2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FAILOVER_History,0,SENTINEL_FAILOVER_Tips,0,sentinelCommand,-3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FAILOVER_Keyspecs,0,NULL,2),.args=SENTINEL_FAILOVER_Args},
{MAKE_CMD("flushconfig","Rewrites the Sentinel configuration file.","O(1)","2.8.4",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_FLUSHCONFIG_History,0,SENTINEL_FLUSHCONFIG_Tips,0,sentinelCommand,2,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_FLUSHCONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("get-master-addr-by-name","Returns the port and address of a primary instance.","O(1)","2.8.4",CMD_DOC_DEPRECATED,"`SENTINEL GET-PRIMARY-ADDR-BY-NAME`","8.0.0","sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_MASTER_ADDR_BY_NAME_History,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_MASTER_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_MASTER_ADDR_BY_NAME_Args},
{MAKE_CMD("get-primary-addr-by-name","Returns the port and address of a primary instance.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"sentinel",COMMAND_GROUP_SENTINEL,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_History,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Tips,0,sentinelCommand,3,CMD_ADMIN|CMD_SENTINEL|CMD_ONLY_SENTINEL,0,SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Keyspecs,0,NULL,1),.args=SENTINEL_GET_PRIMARY_ADDR_BY_NAME_Args},
Expand Down
21 changes: 20 additions & 1 deletion src/commands/sentinel-failover.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"summary": "Forces a Sentinel failover.",
"group": "sentinel",
"since": "2.8.4",
"arity": 3,
"arity": -3,
"container": "SENTINEL",
"function": "sentinelCommand",
"command_flags": [
Expand All @@ -19,6 +19,25 @@
{
"name": "primary-name",
"type": "string"
},
{
"token": "TYPE",
"name": "failover-type",
"type": "oneof",
"optional": true,
"since": "8.1.0",
"arguments": [
{
"name": "legacy",
"type": "pure-token",
"token": "legacy"
},
{
"name": "pause",
"type": "pure-token",
"token": "pause"
}
]
}
]
}
Expand Down
104 changes: 99 additions & 5 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ typedef struct sentinelAddr {
#define SRI_FORCE_FAILOVER (1 << 11) /* Force failover with primary up. */
#define SRI_SCRIPT_KILL_SENT (1 << 12) /* SCRIPT KILL already sent on -BUSY */
#define SRI_PRIMARY_REBOOT (1 << 13) /* Primary was detected as rebooting */
#define SRI_SUPPORT_FAILOVER (1 << 14) /* Primary and replica support FAILOVER command. */
/* Note: when adding new flags, please check the flags section in addReplySentinelValkeyInstance. */

/* Note: times are in milliseconds. */
Expand Down Expand Up @@ -114,6 +115,7 @@ static mstime_t sentinel_default_failover_timeout = 60 * 3 * 1000;
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait replica to change role */
#define SENTINEL_FAILOVER_STATE_RECONF_REPLICAS 5 /* REPLICAOF newprimary */
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted replica. */
#define SENTINEL_FAILOVER_STATE_SEND_FAILOVER 7 /* Send FAILOVER Command to primary. */

#define SENTINEL_PRIMARY_LINK_STATUS_UP 0
#define SENTINEL_PRIMARY_LINK_STATUS_DOWN 1
Expand Down Expand Up @@ -3269,6 +3271,7 @@ void addReplySentinelValkeyInstance(client *c, sentinelValkeyInstance *ri) {
if (ri->flags & SRI_FORCE_FAILOVER) flags = sdscat(flags, "force_failover,");
if (ri->flags & SRI_SCRIPT_KILL_SENT) flags = sdscat(flags, "script_kill_sent,");
if (ri->flags & SRI_PRIMARY_REBOOT) flags = sdscat(flags, "master_reboot,");
if (ri->flags & SRI_SUPPORT_FAILOVER) flags = sdscat(flags, "support_failover,");

if (sdslen(flags) != 0) sdsrange(flags, 0, -2); /* remove last "," */
addReplyBulkCString(c, flags);
Expand Down Expand Up @@ -3837,11 +3840,31 @@ void sentinelCommand(client *c) {
addReplyBulkLongLong(c, addr->port);
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover")) {
/* SENTINEL FAILOVER <primary-name> */
/* SENTINEL FAILOVER <primary-name> [type <legacy | pause>] */
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
sentinelValkeyInstance *ri;
/* 0: No parameters passed default is legacy. 1: legacy. 2: pause. */
int failover_type = 0;

if (c->argc != 3) goto numargserr;
if (c->argc != 3 && c->argc != 5) goto numargserr;
if ((ri = sentinelGetPrimaryByNameOrReplyError(c, c->argv[2])) == NULL) return;
for (int i = 3; i < c->argc; i++) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "type") && moreargs && failover_type == 0) {
if (!strcasecmp(c->argv[i + 1]->ptr, "legacy")) {
failover_type = 1;
} else if (!strcasecmp(c->argv[i + 1]->ptr, "pause")) {
failover_type = 2;
} else {
addReplyErrorFormat(c, "Unknown failover type '%s'", (char *)c->argv[i + 1]->ptr);
return;
}
i++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}

if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
addReplyError(c, "-INPROG Failover already in progress");
return;
Expand All @@ -3853,6 +3876,7 @@ void sentinelCommand(client *c) {
serverLog(LL_NOTICE, "Executing user requested FAILOVER of '%s'", ri->name);
sentinelStartFailover(ri);
ri->flags |= SRI_FORCE_FAILOVER;
if (failover_type == 2) ri->flags |= SRI_SUPPORT_FAILOVER;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "pending-scripts")) {
/* SENTINEL PENDING-SCRIPTS */
Expand Down Expand Up @@ -4635,6 +4659,41 @@ char *sentinelGetLeader(sentinelValkeyInstance *primary, uint64_t epoch) {
return winner;
}

void sentinelFailoverReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelValkeyInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

/* Primary does not support FAILOVER, fallback to legacy type. */
if (r->type == REDIS_REPLY_ERROR) {
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
ri->failover_state_change_time = mstime();
}
}

/* Send FAILOVER to the specified primary instance, the replica addr passed in
* at the same time will be used as the TO parameter. */
int sentinelSendFailover(sentinelValkeyInstance *ri, const sentinelAddr *addr) {
char portstr[32];
const char *host;
int retval;

host = announceSentinelAddr(addr);
ll2string(portstr, sizeof(portstr), addr->port);

retval = redisAsyncCommand(ri->link->cc, sentinelFailoverReplyCallback, ri, "%s TO %s %s TIMEOUT %lld",
sentinelInstanceMapCommand(ri, "FAILOVER"), host, portstr, ri->failover_timeout);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

/* Send REPLICAOF to the specified instance, always followed by a
* CONFIG REWRITE command in order to store the new configuration on disk
* when possible (that is, if the instance is recent enough to support
Expand Down Expand Up @@ -4901,12 +4960,46 @@ void sentinelFailoverSelectReplica(sentinelValkeyInstance *ri) {
sentinelEvent(LL_WARNING, "+selected-slave", replica, "%@");
replica->flags |= SRI_PROMOTED;
ri->promoted_replica = replica;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
if (ri->flags & SRI_SUPPORT_FAILOVER) {
sentinelEvent(LL_NOTICE, "+failover-state-send-failover", replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_FAILOVER;
} else {
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
}
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", replica, "%@");
}
}

void sentinelFailoverSendFailover(sentinelValkeyInstance *ri) {
/* We can't send the command to the promoted replica if it is now
* disconnected. Retry again and again with this state until the timeout
* is reached, then abort the failover. */
if (ri->promoted_replica->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING, "-failover-abort-slave-timeout", ri, "%@");
sentinelAbortFailover(ri);
}
return;
}

/* We will first try to use SHUTDOWN to coordinate a failover between the primary
* and promoted replica to avoid data loss. */
if ((ri->flags & (SRI_S_DOWN | SRI_O_DOWN)) == 0 && !ri->link->disconnected) {
if (sentinelSendFailover(ri, ri->promoted_replica->addr) == C_OK) {
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
return;
}
}

/* Fallback to legacy type. */
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone", ri->promoted_replica, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE;
ri->failover_state_change_time = mstime();
}

void sentinelFailoverSendReplicaOfNoOne(sentinelValkeyInstance *ri) {
int retval;

Expand Down Expand Up @@ -5078,6 +5171,7 @@ void sentinelFailoverStateMachine(sentinelValkeyInstance *ri) {
switch (ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break;
case SENTINEL_FAILOVER_STATE_SELECT_REPLICA: sentinelFailoverSelectReplica(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_FAILOVER: sentinelFailoverSendFailover(ri); break;
case SENTINEL_FAILOVER_STATE_SEND_REPLICAOF_NOONE: sentinelFailoverSendReplicaOfNoOne(ri); break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break;
case SENTINEL_FAILOVER_STATE_RECONF_REPLICAS: sentinelFailoverReconfNextReplica(ri); break;
Expand All @@ -5093,7 +5187,7 @@ void sentinelAbortFailover(sentinelValkeyInstance *ri) {
serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);

ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER);
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS | SRI_FORCE_FAILOVER | SRI_SUPPORT_FAILOVER);
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = mstime();
if (ri->promoted_replica) {
Expand Down
58 changes: 50 additions & 8 deletions tests/sentinel/tests/05-manual.tcl
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# Test manual failover

source "../tests/includes/init-tests.tcl"
proc test_sentinel_failover {type master_id} {

foreach_sentinel_id id {
S $id sentinel debug info-period 2000
S $id sentinel debug default-down-after 6000
S $id sentinel debug publish-period 1000
}

test "Manual failover works" {
set val 0

test "Manual failover works - $type" {
R $master_id del foo
set old_port [RPort $master_id]
set addr [S 0 SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster]
assert {[lindex $addr 1] == $old_port}
Expand All @@ -17,16 +20,26 @@ test "Manual failover works" {
# sentinel - replica may not have enough time to exchange INFO and update
# the replica's info-period, so the test may get a NOGOODSLAVE.
wait_for_condition 300 50 {
[catch {S 0 SENTINEL FAILOVER mymaster}] == 0
[catch {S 0 SENTINEL FAILOVER mymaster type $type}] == 0
} else {
catch {S 0 SENTINEL FAILOVER mymaster} reply
catch {S 0 SENTINEL FAILOVER mymaster type $type} reply
puts [S 0 SENTINEL REPLICAS mymaster]
fail "Sentinel manual failover did not work, got: $reply"
}

catch {S 0 SENTINEL FAILOVER mymaster} reply
catch {S 0 SENTINEL FAILOVER mymaster type $type} reply
assert_match {*INPROG*} $reply ;# Failover already in progress

# After sending sentinel failover, continue writing to the primary
# to observe the final data consistency.
for {set j 0} {$j < 1000000} {incr j} {
catch {R $master_id incr foo} err
if {[string match "READONLY*" $err]} {
break
}
set val $err
}

foreach_sentinel_id id {
wait_for_condition 1000 50 {
[lindex [S $id SENTINEL GET-PRIMARY-ADDR-BY-NAME mymaster] 1] != $old_port
Expand All @@ -38,11 +51,11 @@ test "Manual failover works" {
set master_id [get_instance_id_by_port valkey [lindex $addr 1]]
}

test "New primary [join $addr {:}] role matches" {
test "New primary [join $addr {:}] role matches - $type" {
assert {[RI $master_id role] eq {master}}
}

test "All the other slaves now point to the new primary" {
test "All the other slaves now point to the new primary - $type" {
foreach_valkey_id id {
if {$id != $master_id && $id != 0} {
wait_for_condition 1000 50 {
Expand All @@ -54,14 +67,43 @@ test "All the other slaves now point to the new primary" {
}
}

test "The old primary eventually gets reconfigured as a slave" {
test "The old primary eventually gets reconfigured as a replica - $type" {
wait_for_condition 1000 50 {
[RI 0 master_port] == [lindex $addr 1]
} else {
fail "Old master not reconfigured as slave of new master"
}
}

test "Check data consistency - $type" {
if {$type == "legacy"} {
# In legacy type, there is a good chance that data will be lost eventually.
foreach_valkey_id id {
wait_for_condition 1000 50 {
[R $id get foo] != $val
} else {
fail "Data is consistency in legacy type"
}
}
} elseif {$type == "pause"} {
foreach_valkey_id id {
wait_for_condition 1000 50 {
[R $id get foo] == $val
} else {
fail "Data is not consistency in pause type"
}
}
}
}

} ;# end proc test_sentinel_failover

source "../tests/includes/init-tests.tcl"
test_sentinel_failover "legacy" $master_id

source "../tests/includes/init-tests.tcl"
test_sentinel_failover "pause" $master_id
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

foreach flag {crash-after-election crash-after-promotion} {
# Before each SIMULATE-FAILURE test, re-source init-tests to get a clean environment
source "../tests/includes/init-tests.tcl"
Expand Down
Loading