Skip to content

Commit

Permalink
mysql: Handle more deprecated SQL commands
Browse files Browse the repository at this point in the history
This deals with more deprecated SQL commands that now uses different
terminology on newer MySQL versions.

Also refactors things to reduce some duplication and to simplify some
logic. We now treat the latest syntax as default and keep older syntax
for older versions as needed explicitly.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
dbussink committed May 9, 2024
1 parent cbf89bd commit 43b33fb
Show file tree
Hide file tree
Showing 28 changed files with 493 additions and 388 deletions.
18 changes: 9 additions & 9 deletions examples/compose/fix_replication.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ cd "$(dirname "${BASH_SOURCE[0]}")"

function get_replication_status() {
# Get replication status
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW SLAVE STATUS\G")
STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW REPLICA STATUS\G")
LAST_ERRNO=$(grep "Last_IO_Errno:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_SQL_RUNNING=$(grep "Slave_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SLAVE_IO_RUNNING=$(grep "Slave_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_HOST=$(grep "Master_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
MASTER_PORT=$(grep "Master_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_SQL_RUNNING=$(grep "Replica_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
REPLICA_IO_RUNNING=$(grep "Replica_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_HOST=$(grep "Source_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }')
SOURCE_PORT=$(grep "Source_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }')

echo "Slave_SQL_Running: $SLAVE_SQL_RUNNING"
echo "Slave_IO_Running: $SLAVE_IO_RUNNING"
echo "Replica_SQL_Running: $REPLICA_SQL_RUNNING"
echo "Replica_IO_Running: $REPLICA_IO_RUNNING"
echo "Last_IO_Errno: $LAST_ERRNO"
}

Expand All @@ -54,7 +54,7 @@ get_replication_status
[ ${1:-''} != 'status' ] || exit 0;

# Check if IO_Thread is running
if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
if [[ $REPLICA_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then

echo "Primary has purged bin logs that replica requires. Sync will require restore from mysqldump"
if [[ -f $KEYSPACE.sql ]] ; then
Expand All @@ -64,7 +64,7 @@ if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then
else
echo "Starting mysqldump. This may take a while.."
# Modify flags to user's requirements
if mysqldump -h $MASTER_HOST -P $MASTER_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
if mysqldump -h $SOURCE_HOST -P $SOURCE_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \
--triggers --routines --events --hex-blob --master-data=1 --quick --order-by-primary \
--no-autocommit --skip-comments --skip-add-drop-table --skip-add-locks \
--skip-disable-keys --single-transaction --set-gtid-purged=on --verbose > $KEYSPACE.sql; then
Expand Down
2 changes: 0 additions & 2 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,4 @@ func IsNum(typ uint8) bool {

const (
readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration"
readReplicaNetTimeout = "select @@global.replica_net_timeout"
readSlaveNetTimeout = "select @@global.slave_net_timeout"
)
30 changes: 9 additions & 21 deletions go/mysql/endtoend/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"vitess.io/vitess/go/mysql"
Expand All @@ -46,15 +47,6 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
t.Fatal(err)
}

// We need to know if this is MariaDB, to set the right flag.
if conn.IsMariaDB() {
// This flag is required to get GTIDs from MariaDB.
t.Log("MariaDB: sensing SET @mariadb_slave_capability=4")
if _, err := conn.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil {
t.Fatalf("failed to set @mariadb_slave_capability=4: %v", err)
}
}

// Switch server to RBR if needed.
if rbr {
if _, err := conn.ExecuteFetch("SET GLOBAL binlog_format='ROW'", 0, false); err != nil {
Expand All @@ -63,25 +55,21 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
}

// First we get the current binlog position.
result, err := conn.ExecuteFetch("SHOW MASTER STATUS", 1, true)
require.NoError(t, err, "SHOW MASTER STATUS failed: %v", err)
status, err := conn.ShowPrimaryStatus()
require.NoError(t, err, "retrieving primary status failed: %v", err)

if len(result.Fields) < 2 || result.Fields[0].Name != "File" || result.Fields[1].Name != "Position" ||
len(result.Rows) != 1 {
t.Fatalf("SHOW MASTER STATUS returned unexpected result: %v", result)
}
file := result.Rows[0][0].ToString()
position, err := result.Rows[0][1].ToCastUint64()
require.NoError(t, err, "SHOW MASTER STATUS returned invalid position: %v", result.Rows[0][1])
filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID)
file := filePos.File
position := filePos.Pos

// Tell the server that we understand the format of events
// that will be used if binlog_checksum is enabled on the server.
if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err)
if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil {
t.Fatalf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err)
}

// Write ComBinlogDump packet with to start streaming events from here.
if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil {
if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
}

Expand Down
16 changes: 10 additions & 6 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,14 @@ type flavor interface {
// as the new replication source (without changing any GTID position).
setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string

// resetBinaryLogsCommand returns the command to reset the binary logs.
resetBinaryLogsCommand() string

// status returns the result of the appropriate status command,
// with parsed replication position.
status(c *Conn) (replication.ReplicationStatus, error)

// primaryStatus returns the result of 'SHOW MASTER STATUS',
// primaryStatus returns the result of 'SHOW BINARY LOG STATUS',
// with parsed executed position.
primaryStatus(c *Conn) (replication.PrimaryStatus, error)

Expand Down Expand Up @@ -185,15 +188,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl
f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}}
}
case strings.HasPrefix(serverVersion, mysql8VersionPrefix):
recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability)
if recent {
if latest, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 2, 0); latest {
f = mysqlFlavor82{mysqlFlavor{serverVersion: serverVersion}}
} else if recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability); recent {
f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}}
} else {
f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor8Legacy{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
default:
// If unknown, return the most basic flavor: MySQL 57.
f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}}
f = mysqlFlavor57{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}}
}
return f, f.supportsCapability, canonicalVersion
}
Expand Down Expand Up @@ -400,7 +404,7 @@ func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) {
return c.flavor.status(c)
}

// ShowPrimaryStatus executes the right SHOW MASTER STATUS command,
// ShowPrimaryStatus executes the right SHOW BINARY LOG STATUS command,
// and returns a parsed executed Position, as well as file based Position.
func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) {
return c.flavor.primaryStatus(c)
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host s
return "unsupported"
}

// resetBinaryLogsCommand is part of the Flavor interface.
func (flv *filePosFlavor) resetBinaryLogsCommand() string {
return "unsupported"
}

// status is part of the Flavor interface.
func (flv *filePosFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */)
Expand Down
6 changes: 5 additions & 1 deletion go/mysql/flavor_mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string
return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ")
}

func (mariadbFlavor) resetBinaryLogsCommand() string {
return "RESET MASTER"
}

// status is part of the Flavor interface.
func (mariadbFlavor) status(c *Conn) (replication.ReplicationStatus, error) {
qr, err := c.ExecuteFetch("SHOW ALL SLAVES STATUS", 100, true /* wantfields */)
Expand Down Expand Up @@ -288,7 +292,7 @@ func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configu

// replicationNetTimeout is part of the Flavor interface.
func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) {
qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false)
qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 43b33fb

Please sign in to comment.