Skip to content

Commit

Permalink
feat: add capability to vtorc to fix the replication misconfiguration
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed May 8, 2024
1 parent 12d13ae commit 7523d10
Show file tree
Hide file tree
Showing 39 changed files with 595 additions and 427 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func startReplication(ctx context.Context, mysqld mysqlctl.MysqlDaemon, topoServ
}

// Stop replication (in case we're restarting), set replication source, and start replication.
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, true /* stopReplicationBefore */, true /* startReplicationAfter */); err != nil {
if err := mysqld.SetReplicationSource(ctx, ti.Tablet.MysqlHostname, ti.Tablet.MysqlPort, 0, true, true); err != nil {
return vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ type vtcomboMysqld struct {
}

// SetReplicationSource implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type flavor interface {

// setReplicationSourceCommand returns the command to use the provided host/port
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string
setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string

// status returns the result of the appropriate status command,
// with parsed replication position.
Expand Down Expand Up @@ -370,8 +370,8 @@ func (c *Conn) SetReplicationPositionCommands(pos replication.Position) []string
// as the new replication source (without changing any GTID position).
// It is guaranteed to be called with replication stopped.
// It should not start or stop replication.
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
return c.flavor.setReplicationSourceCommand(params, host, port, connectRetry)
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
return c.flavor.setReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
}

// resultToMap is a helper function used by ShowReplicationStatus.
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (flv *filePosFlavor) setReplicationPositionCommands(pos replication.Positio
}

// setReplicationSourceCommand is part of the Flavor interface.
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
return "unsupported"
}

Expand Down
5 changes: 4 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (mariadbFlavor) setReplicationPositionCommands(pos replication.Position) []
}
}

func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
Expand All @@ -212,6 +212,9 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "MASTER_USE_GTID = current_pos")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}
Expand Down
17 changes: 15 additions & 2 deletions go/mysql/flavor_mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,22 @@ func TestMariadbSetReplicationSourceCommand(t *testing.T) {
MASTER_USE_GTID = current_pos`

conn := &Conn{flavor: mariadbFlavor101{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

var heartbeatInterval float64 = 5.4
want = `CHANGE MASTER TO
MASTER_HOST = 'localhost',
MASTER_PORT = 123,
MASTER_USER = 'username',
MASTER_PASSWORD = 'password',
MASTER_CONNECT_RETRY = 1234,
MASTER_HEARTBEAT_PERIOD = 5.4,
MASTER_USE_GTID = current_pos`

got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want)

}

func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) {
Expand Down Expand Up @@ -71,7 +84,7 @@ func TestMariadbSetReplicationSourceCommandSSL(t *testing.T) {
MASTER_USE_GTID = current_pos`

conn := &Conn{flavor: mariadbFlavor101{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mariadbFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

}
10 changes: 8 additions & 2 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (f mysqlFlavor8) supportsCapability(capability capabilities.FlavorCapabilit
return capabilities.MySQLVersionHasCapability(f.serverVersion, capability)
}

func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
Expand All @@ -557,11 +557,14 @@ func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string,
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "MASTER_AUTO_POSITION = 1")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string {
args := []string{
fmt.Sprintf("SOURCE_HOST = '%s'", host),
fmt.Sprintf("SOURCE_PORT = %d", port),
Expand All @@ -584,6 +587,9 @@ func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string,
if params.SslKey != "" {
args = append(args, fmt.Sprintf("SOURCE_SSL_KEY = '%s'", params.SslKey))
}
if heartbeatInterval != 0 {
args = append(args, fmt.Sprintf("SOURCE_HEARTBEAT_PERIOD = %v", heartbeatInterval))
}
args = append(args, "SOURCE_AUTO_POSITION = 1")
return "CHANGE REPLICATION SOURCE TO\n " + strings.Join(args, ",\n ")
}
Expand Down
34 changes: 30 additions & 4 deletions go/mysql/flavor_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,22 @@ func TestMysql57SetReplicationSourceCommand(t *testing.T) {
MASTER_AUTO_POSITION = 1`

conn := &Conn{flavor: mysqlFlavor57{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

var heartbeatInterval float64 = 5.4
want = `CHANGE MASTER TO
MASTER_HOST = 'localhost',
MASTER_PORT = 123,
MASTER_USER = 'username',
MASTER_PASSWORD = 'password',
MASTER_CONNECT_RETRY = 1234,
MASTER_HEARTBEAT_PERIOD = 5.4,
MASTER_AUTO_POSITION = 1`

got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want)

}

func TestMysql57SetReplicationSourceCommandSSL(t *testing.T) {
Expand Down Expand Up @@ -71,7 +84,7 @@ func TestMysql57SetReplicationSourceCommandSSL(t *testing.T) {
MASTER_AUTO_POSITION = 1`

conn := &Conn{flavor: mysqlFlavor57{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)
}

Expand All @@ -92,8 +105,21 @@ func TestMysql8SetReplicationSourceCommand(t *testing.T) {
SOURCE_AUTO_POSITION = 1`

conn := &Conn{flavor: mysqlFlavor8{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)

var heartbeatInterval float64 = 5.4
want = `CHANGE REPLICATION SOURCE TO
SOURCE_HOST = 'localhost',
SOURCE_PORT = 123,
SOURCE_USER = 'username',
SOURCE_PASSWORD = 'password',
SOURCE_CONNECT_RETRY = 1234,
SOURCE_HEARTBEAT_PERIOD = 5.4,
SOURCE_AUTO_POSITION = 1`

got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want)
}

func TestMysql8SetReplicationSourceCommandSSL(t *testing.T) {
Expand Down Expand Up @@ -123,6 +149,6 @@ func TestMysql8SetReplicationSourceCommandSSL(t *testing.T) {
SOURCE_AUTO_POSITION = 1`

conn := &Conn{flavor: mysqlFlavor8{}}
got := conn.SetReplicationSourceCommand(params, host, port, connectRetry)
got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry)
assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want)
}
19 changes: 12 additions & 7 deletions go/test/endtoend/utils/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestReplicationStatus(t *testing.T) {
require.NoError(t, err)
host := "localhost"

q := conn.SetReplicationSourceCommand(&mysqlParams, host, port, int(port))
q := conn.SetReplicationSourceCommand(&mysqlParams, host, port, 0, int(port))
res := Exec(t, conn, q)
require.NotNil(t, res)

Expand Down Expand Up @@ -299,14 +299,19 @@ func TestSetAndResetReplication(t *testing.T) {
require.NoError(t, err)
host := "localhost"

err = mysqld.SetReplicationSource(context.Background(), host, port, true, true)
var heartbeatInterval float64 = 5.4
err = mysqld.SetReplicationSource(context.Background(), host, port, heartbeatInterval, true, true)
assert.NoError(t, err)

r, err := mysqld.ReplicationStatus(context.Background())
assert.NoError(t, err)
assert.Equal(t, port, r.SourcePort)
assert.Equal(t, host, r.SourceHost)

replConfig, err := mysqld.ReplicationConfiguration(context.Background())
require.NoError(t, err)
assert.EqualValues(t, heartbeatInterval, replConfig.HeartbeatInterval)

err = mysqld.ResetReplication(context.Background())
assert.NoError(t, err)

Expand All @@ -315,7 +320,7 @@ func TestSetAndResetReplication(t *testing.T) {
assert.Equal(t, "", r.SourceHost)
assert.Equal(t, int32(0), r.SourcePort)

err = mysqld.SetReplicationSource(context.Background(), host, port, true, true)
err = mysqld.SetReplicationSource(context.Background(), host, port, 0, true, true)
assert.NoError(t, err)

r, err = mysqld.ReplicationStatus(context.Background())
Expand Down Expand Up @@ -454,7 +459,7 @@ func TestWaitForReplicationStart(t *testing.T) {
require.NoError(t, err)
host := "localhost"

err = mysqld.SetReplicationSource(context.Background(), host, port, true, true)
err = mysqld.SetReplicationSource(context.Background(), host, port, 0, true, true)
assert.NoError(t, err)

err = mysqlctl.WaitForReplicationStart(context.Background(), mysqld, 1)
Expand All @@ -477,7 +482,7 @@ func TestStartReplication(t *testing.T) {
host := "localhost"

// Set startReplicationAfter to false as we want to test StartReplication here
err = mysqld.SetReplicationSource(context.Background(), host, port, true, false)
err = mysqld.SetReplicationSource(context.Background(), host, port, 0, true, false)
assert.NoError(t, err)

err = mysqld.StartReplication(context.Background(), map[string]string{})
Expand All @@ -496,7 +501,7 @@ func TestStopReplication(t *testing.T) {
require.NoError(t, err)
host := "localhost"

err = mysqld.SetReplicationSource(context.Background(), host, port, true, true)
err = mysqld.SetReplicationSource(context.Background(), host, port, 0, true, true)
assert.NoError(t, err)

r, err := mysqld.ReplicationStatus(context.Background())
Expand All @@ -522,7 +527,7 @@ func TestStopSQLThread(t *testing.T) {
require.NoError(t, err)
host := "localhost"

err = mysqld.SetReplicationSource(context.Background(), host, port, true, true)
err = mysqld.SetReplicationSource(context.Background(), host, port, 0, true, true)
assert.NoError(t, err)

r, err := mysqld.ReplicationStatus(context.Background())
Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ func TestVTOrcRepairs(t *testing.T) {
utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
})

t.Run("Replication Misconfiguration", func(t *testing.T) {
_, err := utils.RunSQL(t, `SET @@global.replica_net_timeout=33`, replica, "")
require.NoError(t, err)

// wait until heart beat interval has been fixed by vtorc.
utils.CheckHeartbeatInterval(t, replica, 16.5, 15*time.Second)
utils.WaitForSuccessfulRecoveryCount(t, vtOrcProcess, logic.FixReplicaRecoveryName, 6)

// check that writes succeed
utils.VerifyWritesSucceed(t, clusterInfo, curPrimary, []*cluster.Vttablet{replica, otherReplica}, 15*time.Second)
})

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := []string{
Expand Down
34 changes: 34 additions & 0 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,40 @@ func CheckSourcePort(t *testing.T, replica *cluster.Vttablet, source *cluster.Vt
}
}

// CheckHeartbeatInterval is used to check that the replica has the given heartbeat interval set in its MySQL instance
func CheckHeartbeatInterval(t *testing.T, replica *cluster.Vttablet, heartbeatInterval float64, timeToWait time.Duration) {
timeout := time.After(timeToWait)
for {
select {
case <-timeout:
t.Fatal("timed out waiting for correct heartbeat interval to be setup")
return
default:
res, err := RunSQL(t, "select * from performance_schema.replication_connection_configuration", replica, "")
require.NoError(t, err)

if len(res.Rows) != 1 {
log.Warningf("no replication configuration yet, will retry")
break
}

for idx, field := range res.Fields {
if strings.EqualFold(field.Name, "HEARTBEAT_INTERVAL") {
readVal, err := res.Rows[0][idx].ToFloat64()
require.NoError(t, err)
if readVal == heartbeatInterval {
return
} else {
log.Warningf("heartbeat interval set to - %v", readVal)
}
}
}
log.Warningf("heartbeat interval not set correctly yet, will retry")
}
time.Sleep(300 * time.Millisecond)
}
}

// MakeAPICall is used make an API call given the url. It returns the status and the body of the response received
func MakeAPICall(t *testing.T, vtorc *cluster.VTOrcProcess, url string) (status int, response string, err error) {
t.Helper()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos repl
}

// SetReplicationSource is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error {
input := fmt.Sprintf("%v:%v", host, port)
found := false
for _, sourceInput := range fmd.SetReplicationSourceInputs {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type MysqlDaemon interface {
SetReadOnly(ctx context.Context, on bool) error
SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error)
SetReplicationPosition(ctx context.Context, pos replication.Position) error
SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error
SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error
WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error

WaitSourcePos(context.Context, replication.Position) error
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (mysqld *Mysqld) SetReplicationPosition(ctx context.Context, pos replicatio

// SetReplicationSource makes the provided host / port the primary. It optionally
// stops replication before, and starts it after.
func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int32, stopReplicationBefore bool, startReplicationAfter bool) error {
func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, port int32, heartbeatInterval float64, stopReplicationBefore bool, startReplicationAfter bool) error {
params, err := mysqld.dbcfgs.ReplConnector().MysqlParams()
if err != nil {
return err
Expand All @@ -460,7 +460,7 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
if stopReplicationBefore {
cmds = append(cmds, conn.Conn.StopReplicationCommand())
}
smc := conn.Conn.SetReplicationSourceCommand(params, host, port, int(replicationConnectRetry.Seconds()))
smc := conn.Conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, int(replicationConnectRetry.Seconds()))
cmds = append(cmds, smc)
if startReplicationAfter {
cmds = append(cmds, conn.Conn.StartReplicationCommand())
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func TestSetReplicationSource(t *testing.T) {
ctx := context.Background()

// We expect query containing passed host and port to be executed
err := testMysqld.SetReplicationSource(ctx, "test_host", 2, true, true)
err := testMysqld.SetReplicationSource(ctx, "test_host", 2, 0, true, true)
assert.ErrorContains(t, err, `SOURCE_HOST = 'test_host'`)
assert.ErrorContains(t, err, `SOURCE_PORT = 2`)
assert.ErrorContains(t, err, `CHANGE REPLICATION SOURCE TO`)
Expand Down
Loading

0 comments on commit 7523d10

Please sign in to comment.