Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use replica queries when available #15808

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/compose/fix_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ function get_replication_status() {
function reset_replication() {
# Necessary before sql file can be imported
echo "Importing MysqlDump: $KEYSPACE.sql"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "RESET MASTER;STOP SLAVE;CHANGE MASTER TO MASTER_AUTO_POSITION = 0;source $KEYSPACE.sql;START SLAVE;"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "RESET MASTER;STOP REPLICA;CHANGE REPLICATION SOURCE TO SOURCE_AUTO_POSITION = 0;source $KEYSPACE.sql;START REPLICA;"
# Restore Master Auto Position
echo "Restoring Master Auto Setting"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "STOP SLAVE;CHANGE MASTER TO MASTER_AUTO_POSITION = 1;START SLAVE;"
mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "STOP REPLICA;CHANGE REPLICATION SOURCE TO SOURCE_AUTO_POSITION = 1;START REPLICA;"
}

# Retrieve replication status
Expand Down
15 changes: 7 additions & 8 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
return fmt.Errorf("can't reset replication: %v", err)
}
// We need to switch off super_read_only before we create the database.
resetFunc, err := mysqld.SetSuperReadOnly(false)
resetFunc, err := mysqld.SetSuperReadOnly(ctx, false)
if err != nil {
return fmt.Errorf("failed to disable super_read_only during backup: %v", err)
}
Expand Down Expand Up @@ -528,7 +528,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

lastStatus = status
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand Down Expand Up @@ -560,12 +560,12 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
phase.Set(phaseNameCatchupReplication, int64(0))

// Stop replication and see where we are.
if err := mysqld.StopReplication(nil); err != nil {
if err := mysqld.StopReplication(ctx, nil); err != nil {
return fmt.Errorf("can't stop replication: %v", err)
}

// Did we make any progress?
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
Expand Down Expand Up @@ -621,11 +621,10 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

func resetReplication(ctx context.Context, pos replication.Position, mysqld mysqlctl.MysqlDaemon) error {
cmds := []string{
"STOP SLAVE",
"RESET SLAVE ALL", // "ALL" makes it forget replication source host:port.
if err := mysqld.StopReplication(ctx, nil); err != nil {
return vterrors.Wrap(err, "failed to stop replication")
}
if err := mysqld.ExecuteSuperQueryList(ctx, cmds); err != nil {
if err := mysqld.ResetReplicationParameters(ctx); err != nil {
return vterrors.Wrap(err, "failed to reset replication")
}

Expand Down
8 changes: 4 additions & 4 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout)
})
// We want to ensure we can write to this database
mysqld.SetReadOnly(false)
mysqld.SetReadOnly(cmd.Context(), false)

} else {
dbconfigs.GlobalDBConfigs.InitWithSocket("", env.CollationEnv())
Expand Down Expand Up @@ -368,12 +368,12 @@ func (mysqld *vtcomboMysqld) SetReplicationSource(ctx context.Context, host stri
}

// StartReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) StartReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) StartReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

// RestartReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) RestartReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) RestartReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

Expand All @@ -383,7 +383,7 @@ func (mysqld *vtcomboMysqld) StartReplicationUntilAfter(ctx context.Context, pos
}

// StopReplication implements the MysqlDaemon interface
func (mysqld *vtcomboMysqld) StopReplication(hookExtraEnv map[string]string) error {
func (mysqld *vtcomboMysqld) StopReplication(ctx context.Context, hookExtraEnv map[string]string) error {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions go/mysql/capabilities/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability // supported in MySQL 8.0.1 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-1.html
InstantDDLXtrabackupCapability // Supported in 8.0.32 and above, solving a MySQL-vs-Xtrabackup bug starting 8.0.29
ReplicaQueries // Supported in 8.0.22 and above, using SHOW REPLICA STATUS and all variations.
dbussink marked this conversation as resolved.
Show resolved Hide resolved
)

type CapableOf func(capability FlavorCapability) (bool, error)
Expand Down Expand Up @@ -112,6 +113,8 @@ func MySQLVersionHasCapability(serverVersion string, capability FlavorCapability
return atLeast(8, 0, 30)
case InstantDDLXtrabackupCapability:
return atLeast(8, 0, 32)
case ReplicaQueries:
return atLeast(8, 0, 22)
default:
return false, nil
}
Expand Down
70 changes: 32 additions & 38 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ const (
mariaDBReplicationHackPrefix = "5.5.5-"
// mariaDBVersionString is present in
mariaDBVersionString = "MariaDB"
// mysql57VersionPrefix is the prefix for 5.7 mysql version, such as 5.7.31-log
mysql57VersionPrefix = "5.7."
// mysql80VersionPrefix is the prefix for 8.0 mysql version, such as 8.0.19
mysql80VersionPrefix = "8.0."
// mysql8VersionPrefix is the prefix for 8.0 mysql version, such as 8.0.19
dbussink marked this conversation as resolved.
Show resolved Hide resolved
mysql8VersionPrefix = "8."
)

// flavor is the abstract interface for a flavor.
// Flavors are auto-detected upon connection using the server version.
// We have two major implementations (the main difference is the GTID
// handling):
// 1. Oracle MySQL 5.6, 5.7, 8.0, ...
// 1. Oracle MySQL 5.7, 8.0, ...
// 2. MariaDB 10.X
type flavor interface {
// primaryGTIDSet returns the current GTIDSet of a server.
Expand Down Expand Up @@ -88,6 +86,9 @@ type flavor interface {
// stopReplicationCommand returns the command to stop the replication.
stopReplicationCommand() string

// resetReplicationCommand returns the command to reset the replication.
resetReplicationCommand() string

// stopIOThreadCommand returns the command to stop the replica's IO thread only.
stopIOThreadCommand() string

Expand Down Expand Up @@ -116,9 +117,9 @@ type flavor interface {
// replication position at which the replica will resume.
setReplicationPositionCommands(pos replication.Position) []string

// changeReplicationSourceArg returns the specific parameter to add to
// a "change primary" command.
changeReplicationSourceArg() string
// setReplicationSourceCommand returns the command to use the provided host/port
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string

// status returns the result of the appropriate status command,
// with parsed replication position.
Expand All @@ -132,6 +133,11 @@ type flavor interface {
// until the context expires. It returns an error if we did not
// succeed.
waitUntilPosition(ctx context.Context, c *Conn, pos replication.Position) error
// catchupToGTIDCommands returns the command to catch up to a given GTID.
catchupToGTIDCommands(params *ConnParams, pos replication.Position) []string

// binlogReplicaField returns the field to use to check replica updates.
binlogReplicaField() string
dbussink marked this conversation as resolved.
Show resolved Hide resolved

baseShowTables() string
baseShowTablesWithSizes() string
Expand Down Expand Up @@ -171,13 +177,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl
} else {
f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}}
}
case strings.HasPrefix(serverVersion, mysql57VersionPrefix):
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
case strings.HasPrefix(serverVersion, mysql80VersionPrefix):
f = mysqlFlavor80{mysqlFlavor{serverVersion: serverVersion}}
case strings.HasPrefix(serverVersion, mysql8VersionPrefix):
recent, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 0, 22)
if recent {
f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}}
} else {
f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}}
}
default:
// If unknown, return the most basic flavor: MySQL 56.
f = mysqlFlavor56{mysqlFlavor{serverVersion: serverVersion}}
// If unknown, return the most basic flavor: MySQL 57.
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
}
return f, f.supportsCapability, canonicalVersion
}
Expand Down Expand Up @@ -299,6 +308,10 @@ func (c *Conn) StopReplicationCommand() string {
return c.flavor.stopReplicationCommand()
}

func (c *Conn) ResetReplicationCommand() string {
return c.flavor.resetReplicationCommand()
}

// StopIOThreadCommand returns the command to stop the replica's io thread.
func (c *Conn) StopIOThreadCommand() string {
return c.flavor.stopIOThreadCommand()
Expand Down Expand Up @@ -351,30 +364,7 @@ func (c *Conn) SetReplicationPositionCommands(pos replication.Position) []string
// It is guaranteed to be called with replication stopped.
// It should not start or stop replication.
func (c *Conn) SetReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
fmt.Sprintf("MASTER_USER = '%s'", params.Uname),
fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass),
fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry),
}
if params.SslEnabled() {
args = append(args, "MASTER_SSL = 1")
}
if params.SslCa != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa))
}
if params.SslCaPath != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath))
}
if params.SslCert != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert))
}
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
args = append(args, c.flavor.changeReplicationSourceArg())
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
return c.flavor.setReplicationSourceCommand(params, host, port, connectRetry)
}

// resultToMap is a helper function used by ShowReplicationStatus.
Expand Down Expand Up @@ -415,6 +405,10 @@ func (c *Conn) WaitUntilPosition(ctx context.Context, pos replication.Position)
return c.flavor.waitUntilPosition(ctx, c, pos)
}

func (c *Conn) CatchupToGTIDCommands(params *ConnParams, pos replication.Position) []string {
return c.flavor.catchupToGTIDCommands(params, pos)
}

// WaitUntilFilePosition waits until the given position is reached or until
// the context expires for the file position flavor. It returns an error if
// we did not succeed.
Expand Down
16 changes: 14 additions & 2 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (flv *filePosFlavor) startReplicationCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) resetReplicationCommand() string {
return "unsupported"
}

func (flv *filePosFlavor) restartReplicationCommands() []string {
return []string{"unsupported"}
}
Expand Down Expand Up @@ -223,8 +227,8 @@ func (flv *filePosFlavor) setReplicationPositionCommands(pos replication.Positio
}
}

// setReplicationPositionCommands is part of the Flavor interface.
func (flv *filePosFlavor) changeReplicationSourceArg() string {
// setReplicationSourceCommand is part of the Flavor interface.
func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
return "unsupported"
}

Expand Down Expand Up @@ -342,3 +346,11 @@ func (*filePosFlavor) supportsCapability(capability capabilities.FlavorCapabilit
return false, nil
}
}

func (*filePosFlavor) catchupToGTIDCommands(_ *ConnParams, _ replication.Position) []string {
return []string{"unsupported"}
}

func (*filePosFlavor) binlogReplicaField() string {
return "@@global.log_slave_updates"
}
41 changes: 38 additions & 3 deletions go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
"strings"
"time"

"vitess.io/vitess/go/mysql/capabilities"
Expand Down Expand Up @@ -97,6 +98,10 @@ func (mariadbFlavor) stopReplicationCommand() string {
return "STOP SLAVE"
}

func (mariadbFlavor) resetReplicationCommand() string {
return "RESET SLAVE ALL"
}

func (mariadbFlavor) stopIOThreadCommand() string {
return "STOP SLAVE IO_THREAD"
}
Expand Down Expand Up @@ -182,9 +187,31 @@ func (mariadbFlavor) setReplicationPositionCommands(pos replication.Position) []
}
}

// setReplicationPositionCommands is part of the Flavor interface.
func (mariadbFlavor) changeReplicationSourceArg() string {
return "MASTER_USE_GTID = current_pos"
func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, connectRetry int) string {
args := []string{
fmt.Sprintf("MASTER_HOST = '%s'", host),
fmt.Sprintf("MASTER_PORT = %d", port),
fmt.Sprintf("MASTER_USER = '%s'", params.Uname),
fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass),
fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry),
}
if params.SslEnabled() {
args = append(args, "MASTER_SSL = 1")
}
if params.SslCa != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa))
}
if params.SslCaPath != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath))
}
if params.SslCert != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert))
}
if params.SslKey != "" {
args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey))
}
args = append(args, "MASTER_USE_GTID = current_pos")
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

// status is part of the Flavor interface.
Expand Down Expand Up @@ -296,3 +323,11 @@ func (mariadbFlavor) supportsCapability(capability capabilities.FlavorCapability
return false, nil
}
}

func (mariadbFlavor) catchupToGTIDCommands(_ *ConnParams, _ replication.Position) []string {
return []string{"unsupported"}
}

func (mariadbFlavor) binlogReplicaField() string {
return "@@global.log_slave_updates"
}
Loading
Loading