Skip to content

Commit

Permalink
mysql: Handle more deprecated SQL commands (#15907)
Browse files Browse the repository at this point in the history
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
dbussink authored May 14, 2024
1 parent 473c49a commit 42e97a0
Show file tree
Hide file tree
Showing 28 changed files with 513 additions and 389 deletions.
18 changes: 9 additions & 9 deletions examples/compose/fix_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ cd "$(dirname "${BASH_SOURCE[0]}")"

function get_replication_status() {
# Get replication status
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW SLAVE STATUS\G")
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW REPLICA STATUS\G")
LAST_ERRNO=$(grep "Last_IO_Errno:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_SQL_RUNNING=$(grep "Slave_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_IO_RUNNING=$(grep "Slave_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_HOST=$(grep "Master_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_PORT=$(grep "Master_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_SQL_RUNNING=$(grep "Replica_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_IO_RUNNING=$(grep "Replica_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_HOST=$(grep "Source_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_PORT=$(grep "Source_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')

echo "Slave_SQL_Running: $SLAVE_SQL_RUNNING"
echo "Slave_IO_Running: $SLAVE_IO_RUNNING"
echo "Replica_SQL_Running: $REPLICA_SQL_RUNNING"
echo "Replica_IO_Running: $REPLICA_IO_RUNNING"
echo "Last_IO_Errno: $LAST_ERRNO"
}

Expand All @@ -54,7 +54,7 @@ get_replication_status
[ ${1:-''} != 'status' ] || exit 0;

# Check if IO_Thread is running
if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
if [[ $REPLICA_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then

echo "Primary has purged bin logs that replica requires. Sync will require restore from mysqldump"
if [[ -f $KEYSPACE.sql ]] ; then
Expand All @@ -64,7 +64,7 @@ if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
else
echo "Starting mysqldump. This may take a while.."
# Modify flags to user's requirements
if mysqldump -h $MASTER_HOST -P $MASTER_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
if mysqldump -h $SOURCE_HOST -P $SOURCE_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
--triggers --routines --events --hex-blob --master-data=1 --quick --order-by-primary \
--no-autocommit --skip-comments --skip-add-drop-table --skip-add-locks \
--skip-disable-keys --single-transaction --set-gtid-purged=on --verbose > $KEYSPACE.sql; then
Expand Down
2 changes: 0 additions & 2 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,4 @@ func IsNum(typ uint8) bool {

const (
readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration"
readReplicaNetTimeout = "select @@global.replica_net_timeout"
readSlaveNetTimeout = "select @@global.slave_net_timeout"
)
30 changes: 9 additions & 21 deletions go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
Expand All @@ -46,15 +47,6 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
t.Fatal(err)
}

// We need to know if this is MariaDB, to set the right flag.
if conn.IsMariaDB() {
// This flag is required to get GTIDs from MariaDB.
t.Log("MariaDB: sensing SET @mariadb_slave_capability=4")
if _, err := conn.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil {
t.Fatalf("failed to set @mariadb_slave_capability=4: %v", err)
}
}

// Switch server to RBR if needed.
if rbr {
if _, err := conn.ExecuteFetch("SET GLOBAL binlog_format='ROW'", 0, false); err != nil {
Expand All @@ -63,25 +55,21 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
}

// First we get the current binlog position.
result, err := conn.ExecuteFetch("SHOW MASTER STATUS", 1, true)
require.NoError(t, err, "SHOW MASTER STATUS failed: %v", err)
status, err := conn.ShowPrimaryStatus()
require.NoError(t, err, "retrieving primary status failed: %v", err)

if len(result.Fields) < 2 || result.Fields[0].Name != "File" || result.Fields[1].Name != "Position" ||
len(result.Rows) != 1 {
t.Fatalf("SHOW MASTER STATUS returned unexpected result: %v", result)
}
file := result.Rows[0][0].ToString()
position, err := result.Rows[0][1].ToCastUint64()
require.NoError(t, err, "SHOW MASTER STATUS returned invalid position: %v", result.Rows[0][1])
filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID)
file := filePos.File
position := filePos.Pos

// Tell the server that we understand the format of events
// that will be used if binlog_checksum is enabled on the server.
if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err)
if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err)
}

// Write ComBinlogDump packet with to start streaming events from here.
if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil {
if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
}

Expand Down
16 changes: 10 additions & 6 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,14 @@ type flavor interface {
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string

// resetBinaryLogsCommand returns the command to reset the binary logs.
resetBinaryLogsCommand() string

// status returns the result of the appropriate status command,
// with parsed replication position.
status(c *Conn) (replication.ReplicationStatus, error)

// primaryStatus returns the result of 'SHOW MASTER STATUS',
// primaryStatus returns the result of 'SHOW BINARY LOG STATUS',
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

Expand Down Expand Up @@ -185,15 +188,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl
f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}}
}
case strings.HasPrefix(serverVersion, mysql8VersionPrefix):
recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability)
if recent {
if latest, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 2, 0); latest {
f = mysqlFlavor82{mysqlFlavor{serverVersion: serverVersion}}
} else if recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability); recent {
f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}}
} else {
f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor8Legacy{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
default:
// If unknown, return the most basic flavor: MySQL 57.
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor57{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
return f, f.supportsCapability, canonicalVersion
}
Expand Down Expand Up @@ -400,7 +404,7 @@ func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) {
return c.flavor.status(c)
}

// ShowPrimaryStatus executes the right SHOW MASTER STATUS command,
// ShowPrimaryStatus executes the right SHOW BINARY LOG STATUS command,
// and returns a parsed executed Position, as well as file based Position.
func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host s
return "unsupported"
}

// resetBinaryLogsCommand is part of the Flavor interface.
func (flv *filePosFlavor) resetBinaryLogsCommand() string {
return "unsupported"
}

// status is part of the Flavor interface.
func (flv *filePosFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */)
Expand Down
6 changes: 5 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

func (mariadbFlavor) resetBinaryLogsCommand() string {
return "RESET MASTER"
}

// status is part of the Flavor interface.
func (mariadbFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW ALL SLAVES STATUS", 100, true /* wantfields */)
Expand Down Expand Up @@ -288,7 +292,7 @@ func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configu

// replicationNetTimeout is part of the Flavor interface.
func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false)
qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 42e97a0

Please sign in to comment.