From 43b33fbcf387202517babc9404e6fe7a3d971ed3 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Thu, 9 May 2024 16:34:56 +0200 Subject: [PATCH 1/2] mysql: Handle more deprecated SQL commands This deals with more deprecated SQL commands that now uses different terminology on newer MySQL versions. Also refactors things to reduce some duplication and to simplify some logic. We now treat the latest syntax as default and keep older syntax for older versions as needed explicitly. Signed-off-by: Dirkjan Bussink --- examples/compose/fix_replication.sh | 18 +- go/mysql/constants.go | 2 - go/mysql/endtoend/replication_test.go | 30 +- go/mysql/flavor.go | 16 +- go/mysql/flavor_filepos.go | 5 + go/mysql/flavor_mariadb.go | 6 +- go/mysql/flavor_mysql.go | 290 ++++-------------- go/mysql/flavor_mysql_legacy.go | 278 +++++++++++++++++ go/mysql/flavor_mysql_test.go | 94 ++---- go/mysql/flavor_mysqlgr.go | 2 +- go/mysql/replication.go | 6 + go/mysql/replication/primary_status.go | 4 +- .../backup/vtbackup/backup_only_test.go | 8 +- go/test/endtoend/cluster/vttablet_process.go | 10 + .../reparent/emergencyreparent/ers_test.go | 6 +- .../reparent/plannedreparent/reparent_test.go | 10 +- .../buffer/reparent/failover_buffer_test.go | 4 +- go/test/endtoend/vtorc/utils/utils.go | 4 +- go/vt/binlog/binlog_connection.go | 4 +- go/vt/mysqlctl/backup.go | 10 +- go/vt/mysqlctl/fakemysqldaemon.go | 3 +- .../replicationdata/replicationdata.pb.go | 2 +- .../tabletmanagerdata/tabletmanagerdata.pb.go | 2 +- .../endtoend/init_shard_primary_test.go | 12 +- .../vttablet/tabletmanager/rpc_replication.go | 2 +- go/vt/wrangler/testlib/backup_test.go | 49 +-- proto/replicationdata.proto | 2 +- proto/tabletmanagerdata.proto | 2 +- 28 files changed, 493 insertions(+), 388 deletions(-) create mode 100644 go/mysql/flavor_mysql_legacy.go diff --git a/examples/compose/fix_replication.sh b/examples/compose/fix_replication.sh index a528b06be6f..6aa9648308a 100755 --- a/examples/compose/fix_replication.sh +++ b/examples/compose/fix_replication.sh @@ -26,15 +26,15 @@ cd "$(dirname "${BASH_SOURCE[0]}")" function get_replication_status() { # Get replication status - STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW SLAVE STATUS\G") + STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW REPLICA STATUS\G") LAST_ERRNO=$(grep "Last_IO_Errno:" <<< "$STATUS_LINE" | awk '{ print $2 }') - SLAVE_SQL_RUNNING=$(grep "Slave_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') - SLAVE_IO_RUNNING=$(grep "Slave_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') - MASTER_HOST=$(grep "Master_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }') - MASTER_PORT=$(grep "Master_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }') + REPLICA_SQL_RUNNING=$(grep "Replica_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') + REPLICA_IO_RUNNING=$(grep "Replica_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') + SOURCE_HOST=$(grep "Source_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }') + SOURCE_PORT=$(grep "Source_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }') - echo "Slave_SQL_Running: $SLAVE_SQL_RUNNING" - echo "Slave_IO_Running: $SLAVE_IO_RUNNING" + echo "Replica_SQL_Running: $REPLICA_SQL_RUNNING" + echo "Replica_IO_Running: $REPLICA_IO_RUNNING" echo "Last_IO_Errno: $LAST_ERRNO" } @@ -54,7 +54,7 @@ get_replication_status [ ${1:-''} != 'status' ] || exit 0; # Check if IO_Thread is running -if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then +if [[ $REPLICA_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then echo "Primary has purged bin logs that replica requires. Sync will require restore from mysqldump" if [[ -f $KEYSPACE.sql ]] ; then @@ -64,7 +64,7 @@ if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then else echo "Starting mysqldump. This may take a while.." # Modify flags to user's requirements - if mysqldump -h $MASTER_HOST -P $MASTER_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \ + if mysqldump -h $SOURCE_HOST -P $SOURCE_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \ --triggers --routines --events --hex-blob --master-data=1 --quick --order-by-primary \ --no-autocommit --skip-comments --skip-add-drop-table --skip-add-locks \ --skip-disable-keys --single-transaction --set-gtid-purged=on --verbose > $KEYSPACE.sql; then diff --git a/go/mysql/constants.go b/go/mysql/constants.go index 816ae64778e..defcf37b871 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -289,6 +289,4 @@ func IsNum(typ uint8) bool { const ( readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration" - readReplicaNetTimeout = "select @@global.replica_net_timeout" - readSlaveNetTimeout = "select @@global.slave_net_timeout" ) diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index 441209b35f7..d3b9a6722ea 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/mysql" @@ -46,15 +47,6 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor t.Fatal(err) } - // We need to know if this is MariaDB, to set the right flag. - if conn.IsMariaDB() { - // This flag is required to get GTIDs from MariaDB. - t.Log("MariaDB: sensing SET @mariadb_slave_capability=4") - if _, err := conn.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil { - t.Fatalf("failed to set @mariadb_slave_capability=4: %v", err) - } - } - // Switch server to RBR if needed. if rbr { if _, err := conn.ExecuteFetch("SET GLOBAL binlog_format='ROW'", 0, false); err != nil { @@ -63,25 +55,21 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor } // First we get the current binlog position. - result, err := conn.ExecuteFetch("SHOW MASTER STATUS", 1, true) - require.NoError(t, err, "SHOW MASTER STATUS failed: %v", err) + status, err := conn.ShowPrimaryStatus() + require.NoError(t, err, "retrieving primary status failed: %v", err) - if len(result.Fields) < 2 || result.Fields[0].Name != "File" || result.Fields[1].Name != "Position" || - len(result.Rows) != 1 { - t.Fatalf("SHOW MASTER STATUS returned unexpected result: %v", result) - } - file := result.Rows[0][0].ToString() - position, err := result.Rows[0][1].ToCastUint64() - require.NoError(t, err, "SHOW MASTER STATUS returned invalid position: %v", result.Rows[0][1]) + filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID) + file := filePos.File + position := filePos.Pos // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. - if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { - t.Fatalf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err) + if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { + t.Fatalf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err) } // Write ComBinlogDump packet with to start streaming events from here. - if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil { + if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil { t.Fatalf("WriteComBinlogDump failed: %v", err) } diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 54b62ad2261..f732b1ccb88 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -123,11 +123,14 @@ type flavor interface { // as the new replication source (without changing any GTID position). setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string + // resetBinaryLogsCommand returns the command to reset the binary logs. + resetBinaryLogsCommand() string + // status returns the result of the appropriate status command, // with parsed replication position. status(c *Conn) (replication.ReplicationStatus, error) - // primaryStatus returns the result of 'SHOW MASTER STATUS', + // primaryStatus returns the result of 'SHOW BINARY LOG STATUS', // with parsed executed position. primaryStatus(c *Conn) (replication.PrimaryStatus, error) @@ -185,15 +188,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}} } case strings.HasPrefix(serverVersion, mysql8VersionPrefix): - recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability) - if recent { + if latest, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 2, 0); latest { + f = mysqlFlavor82{mysqlFlavor{serverVersion: serverVersion}} + } else if recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability); recent { f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}} } else { - f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}} + f = mysqlFlavor8Legacy{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}} } default: // If unknown, return the most basic flavor: MySQL 57. - f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}} + f = mysqlFlavor57{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}} } return f, f.supportsCapability, canonicalVersion } @@ -400,7 +404,7 @@ func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) { return c.flavor.status(c) } -// ShowPrimaryStatus executes the right SHOW MASTER STATUS command, +// ShowPrimaryStatus executes the right SHOW BINARY LOG STATUS command, // and returns a parsed executed Position, as well as file based Position. func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) { return c.flavor.primaryStatus(c) diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index e04a88759ec..5e766e81912 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -233,6 +233,11 @@ func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host s return "unsupported" } +// resetBinaryLogsCommand is part of the Flavor interface. +func (flv *filePosFlavor) resetBinaryLogsCommand() string { + return "unsupported" +} + // status is part of the Flavor interface. func (flv *filePosFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 42792b4a01d..301ec2b0596 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -219,6 +219,10 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") } +func (mariadbFlavor) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + // status is part of the Flavor interface. func (mariadbFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW ALL SLAVES STATUS", 100, true /* wantfields */) @@ -288,7 +292,7 @@ func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configu // replicationNetTimeout is part of the Flavor interface. func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false) + qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false) if err != nil { return 0, err } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 8dba1c8f4dd..154de880f62 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -37,20 +37,17 @@ import ( type mysqlFlavor struct { serverVersion string } -type mysqlFlavor57 struct { - mysqlFlavor -} -type mysqlFlavor8Legacy struct { + +type mysqlFlavor8 struct { mysqlFlavor } -type mysqlFlavor8 struct { +type mysqlFlavor82 struct { mysqlFlavor } -var _ flavor = (*mysqlFlavor57)(nil) -var _ flavor = (*mysqlFlavor8Legacy)(nil) var _ flavor = (*mysqlFlavor8)(nil) +var _ flavor = (*mysqlFlavor82)(nil) // primaryGTIDSet is part of the Flavor interface. func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { @@ -103,51 +100,11 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) { return qr.Rows[0][0].ToString(), nil } -func (mysqlFlavor) startReplicationCommand() string { - return "START SLAVE" -} - -func (mysqlFlavor) restartReplicationCommands() []string { - return []string{ - "STOP SLAVE", - "RESET SLAVE", - "START SLAVE", - } -} - -func (mysqlFlavor) startReplicationUntilAfter(pos replication.Position) string { - return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) -} - -func (mysqlFlavor) startSQLThreadUntilAfter(pos replication.Position) string { - return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) -} - -func (mysqlFlavor) stopReplicationCommand() string { - return "STOP SLAVE" -} - -func (mysqlFlavor) resetReplicationCommand() string { - return "RESET SLAVE ALL" -} - -func (mysqlFlavor) stopIOThreadCommand() string { - return "STOP SLAVE IO_THREAD" -} - -func (mysqlFlavor) stopSQLThreadCommand() string { - return "STOP SLAVE SQL_THREAD" -} - -func (mysqlFlavor) startSQLThreadCommand() string { - return "START SLAVE SQL_THREAD" -} - -func (f mysqlFlavor8) startReplicationCommand() string { +func (f mysqlFlavor) startReplicationCommand() string { return "START REPLICA" } -func (f mysqlFlavor8) restartReplicationCommands() []string { +func (f mysqlFlavor) restartReplicationCommands() []string { return []string{ "STOP REPLICA", "RESET REPLICA", @@ -155,40 +112,40 @@ func (f mysqlFlavor8) restartReplicationCommands() []string { } } -func (f mysqlFlavor8) startReplicationUntilAfter(pos replication.Position) string { +func (f mysqlFlavor) startReplicationUntilAfter(pos replication.Position) string { return fmt.Sprintf("START REPLICA UNTIL SQL_AFTER_GTIDS = '%s'", pos) } -func (f mysqlFlavor8) startSQLThreadUntilAfter(pos replication.Position) string { +func (f mysqlFlavor) startSQLThreadUntilAfter(pos replication.Position) string { return fmt.Sprintf("START REPLICA SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) } -func (f mysqlFlavor8) stopReplicationCommand() string { +func (f mysqlFlavor) stopReplicationCommand() string { return "STOP REPLICA" } -func (f mysqlFlavor8) resetReplicationCommand() string { +func (f mysqlFlavor) resetReplicationCommand() string { return "RESET REPLICA ALL" } -func (f mysqlFlavor8) stopIOThreadCommand() string { +func (f mysqlFlavor) stopIOThreadCommand() string { return "STOP REPLICA IO_THREAD" } -func (f mysqlFlavor8) stopSQLThreadCommand() string { +func (f mysqlFlavor) stopSQLThreadCommand() string { return "STOP REPLICA SQL_THREAD" } -func (f mysqlFlavor8) startSQLThreadCommand() string { +func (f mysqlFlavor) startSQLThreadCommand() string { return "START REPLICA SQL_THREAD" } // resetReplicationCommands is part of the Flavor interface. -func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { +func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { resetCommands := []string{ "STOP REPLICA", - "RESET REPLICA ALL", // "ALL" makes it forget source host:port. - "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. + "RESET BINARY LOGS AND GTIDS", // This will also clear gtid_executed and gtid_purged. } status, err := c.SemiSyncExtensionLoaded() if err != nil { @@ -205,32 +162,16 @@ func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { return resetCommands } -// resetReplicationParametersCommands is part of the Flavor interface. -func (mysqlFlavor8) resetReplicationParametersCommands(c *Conn) []string { - resetCommands := []string{ - "RESET REPLICA ALL", // "ALL" makes it forget source host:port. - } - return resetCommands -} - -// sendBinlogDumpCommand is part of the Flavor interface. -func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { - gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) - if !ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) - } - - // Build the command. - sidBlock := gtidSet.SIDBlock() - return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, 0, sidBlock) +func (mysqlFlavor) resetBinaryLogsCommand() string { + return "RESET BINARY LOGS AND GTIDS" } // resetReplicationCommands is part of the Flavor interface. -func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { +func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { resetCommands := []string{ - "STOP SLAVE", - "RESET SLAVE ALL", // "ALL" makes it forget source host:port. - "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + "STOP REPLICA", + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. + "RESET MASTER", // This will also clear gtid_executed and gtid_purged. } status, err := c.SemiSyncExtensionLoaded() if err != nil { @@ -247,45 +188,69 @@ func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { return resetCommands } +// resetReplicationCommands is part of the Flavor interface. +func (mysqlFlavor8) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + // resetReplicationParametersCommands is part of the Flavor interface. func (mysqlFlavor) resetReplicationParametersCommands(c *Conn) []string { resetCommands := []string{ - "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. } return resetCommands } +// sendBinlogDumpCommand is part of the Flavor interface. +func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { + gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) + if !ok { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) + } + + // Build the command. + sidBlock := gtidSet.SIDBlock() + return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, 0, sidBlock) +} + // setReplicationPositionCommands is part of the Flavor interface. -func (mysqlFlavor) setReplicationPositionCommands(pos replication.Position) []string { +func (mysqlFlavor8) setReplicationPositionCommands(pos replication.Position) []string { return []string{ "RESET MASTER", // We must clear gtid_executed before setting gtid_purged. fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), } } -// status is part of the Flavor interface. -func (mysqlFlavor) status(c *Conn) (replication.ReplicationStatus, error) { - qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) +// setReplicationPositionCommands is part of the Flavor interface. +func (mysqlFlavor) setReplicationPositionCommands(pos replication.Position) []string { + return []string{ + "RESET BINARY LOGS AND GTIDS", // We must clear gtid_executed before setting gtid_purged. + fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), + } +} + +// primaryStatus is part of the Flavor interface. +func (mysqlFlavor8) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { + qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) if err != nil { - return replication.ReplicationStatus{}, err + return replication.PrimaryStatus{}, err } if len(qr.Rows) == 0 { - // The query returned no data, meaning the server - // is not configured as a replica. - return replication.ReplicationStatus{}, ErrNotReplica + // The query returned no data. We don't know how this could happen. + return replication.PrimaryStatus{}, ErrNoPrimaryStatus } resultMap, err := resultToMap(qr) if err != nil { - return replication.ReplicationStatus{}, err + return replication.PrimaryStatus{}, err } - return replication.ParseMysqlReplicationStatus(resultMap, false) + return replication.ParseMysqlPrimaryStatus(resultMap) } // primaryStatus is part of the Flavor interface. func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { - qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) + qr, err := c.ExecuteFetch("SHOW BINARY LOG STATUS", 100, true /* wantfields */) if err != nil { return replication.PrimaryStatus{}, err } @@ -330,19 +295,7 @@ func (mysqlFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configura // replicationNetTimeout is part of the Flavor interface. func (mysqlFlavor) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false) - if err != nil { - return 0, err - } - if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr) - } - return qr.Rows[0][0].ToInt32() -} - -// replicationNetTimeout is part of the Flavor interface. -func (mysqlFlavor8) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readReplicaNetTimeout, 1, false) + qr, err := c.ExecuteFetch("select @@global.replica_net_timeout", 1, false) if err != nil { return 0, err } @@ -353,7 +306,7 @@ func (mysqlFlavor8) replicationNetTimeout(c *Conn) (int32, error) { } // status is part of the Flavor interface. -func (mysqlFlavor8) status(c *Conn) (replication.ReplicationStatus, error) { +func (mysqlFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW REPLICA STATUS", 100, true /* wantfields */) if err != nil { return replication.ReplicationStatus{}, err @@ -441,44 +394,6 @@ func (mysqlFlavor) baseShowTables() string { return "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()" } -// TablesWithSize56 is a query to select table along with size for mysql 5.6 -const TablesWithSize56 = `SELECT table_name, - table_type, - UNIX_TIMESTAMP(create_time) AS uts_create_time, - table_comment, - SUM(data_length + index_length), - SUM(data_length + index_length) -FROM information_schema.tables -WHERE table_schema = database() -GROUP BY table_name, - table_type, - uts_create_time, - table_comment` - -// TablesWithSize57 is a query to select table along with size for mysql 5.7. -// -// It's a little weird, because the JOIN predicate only works if the table and databases do not contain weird characters. -// If the join does not return any data, we fall back to the same fields as used in the mysql 5.6 query. -// -// We join with a subquery that materializes the data from `information_schema.innodb_sys_tablespaces` -// early for performance reasons. This effectively causes only a single read of `information_schema.innodb_sys_tablespaces` -// per query. -const TablesWithSize57 = `SELECT t.table_name, - t.table_type, - UNIX_TIMESTAMP(t.create_time), - t.table_comment, - IFNULL(SUM(i.file_size), SUM(t.data_length + t.index_length)), - IFNULL(SUM(i.allocated_size), SUM(t.data_length + t.index_length)) -FROM information_schema.tables t -LEFT OUTER JOIN ( - SELECT space, file_size, allocated_size, name - FROM information_schema.innodb_sys_tablespaces - WHERE name LIKE CONCAT(database(), '/%') - GROUP BY space, file_size, allocated_size, name -) i ON i.name = CONCAT(t.table_schema, '/', t.table_name) or i.name LIKE CONCAT(t.table_schema, '/', t.table_name, '#p#%') -WHERE t.table_schema = database() -GROUP BY t.table_name, t.table_type, t.create_time, t.table_comment` - // TablesWithSize80 is a query to select table along with size for mysql 8.0 // // Note the following: @@ -510,61 +425,16 @@ func (mysqlFlavor57) baseShowTablesWithSizes() string { } // supportsCapability is part of the Flavor interface. -func (f mysqlFlavor57) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { +func (f mysqlFlavor) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) } // baseShowTablesWithSizes is part of the Flavor interface. -func (mysqlFlavor8Legacy) baseShowTablesWithSizes() string { +func (mysqlFlavor) baseShowTablesWithSizes() string { return TablesWithSize80 } -// baseShowTablesWithSizes is part of the Flavor interface. -func (mysqlFlavor8) baseShowTablesWithSizes() string { - return TablesWithSize80 -} - -// supportsCapability is part of the Flavor interface. -func (f mysqlFlavor8Legacy) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { - return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) -} - -// supportsCapability is part of the Flavor interface. -func (f mysqlFlavor8) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { - return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) -} - func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { - args := []string{ - fmt.Sprintf("MASTER_HOST = '%s'", host), - fmt.Sprintf("MASTER_PORT = %d", port), - fmt.Sprintf("MASTER_USER = '%s'", params.Uname), - fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass), - fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry), - } - if params.SslEnabled() { - args = append(args, "MASTER_SSL = 1") - } - if params.SslCa != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa)) - } - if params.SslCaPath != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath)) - } - if params.SslCert != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert)) - } - if params.SslKey != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey)) - } - if heartbeatInterval != 0 { - args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval)) - } - args = append(args, "MASTER_AUTO_POSITION = 1") - return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") -} - -func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { args := []string{ fmt.Sprintf("SOURCE_HOST = '%s'", host), fmt.Sprintf("SOURCE_PORT = %d", port), @@ -595,38 +465,6 @@ func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, } func (mysqlFlavor) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { - cmds := []string{ - "STOP SLAVE FOR CHANNEL '' ", - "STOP SLAVE IO_THREAD FOR CHANNEL ''", - } - - if params.SslCa != "" || params.SslCert != "" { - // We need to use TLS - cmd := fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1, MASTER_SSL=1", params.Host, params.Port, params.Uname, params.Pass) - if params.SslCa != "" { - cmd += fmt.Sprintf(", MASTER_SSL_CA='%s'", params.SslCa) - } - if params.SslCert != "" { - cmd += fmt.Sprintf(", MASTER_SSL_CERT='%s'", params.SslCert) - } - if params.SslKey != "" { - cmd += fmt.Sprintf(", MASTER_SSL_KEY='%s'", params.SslKey) - } - cmds = append(cmds, cmd+";") - } else { - // No TLS - cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass)) - } - - if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely - cmds = append(cmds, "START SLAVE") - } else { - cmds = append(cmds, fmt.Sprintf("START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", replPos.GTIDSet.Last())) - } - return cmds -} - -func (mysqlFlavor8) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { cmds := []string{ "STOP REPLICA FOR CHANNEL '' ", "STOP REPLICA IO_THREAD FOR CHANNEL ''", @@ -659,9 +497,5 @@ func (mysqlFlavor8) catchupToGTIDCommands(params *ConnParams, replPos replicatio } func (mysqlFlavor) binlogReplicatedUpdates() string { - return "@@global.log_slave_updates" -} - -func (mysqlFlavor8) binlogReplicatedUpdates() string { return "@@global.log_replica_updates" } diff --git a/go/mysql/flavor_mysql_legacy.go b/go/mysql/flavor_mysql_legacy.go new file mode 100644 index 00000000000..97d8059e7b6 --- /dev/null +++ b/go/mysql/flavor_mysql_legacy.go @@ -0,0 +1,278 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "fmt" + "strings" + + "vitess.io/vitess/go/mysql/replication" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +// mysqlFlavor implements the Flavor interface for Mysql. +type mysqlFlavorLegacy struct { + mysqlFlavor +} +type mysqlFlavor57 struct { + mysqlFlavorLegacy +} +type mysqlFlavor8Legacy struct { + mysqlFlavorLegacy +} + +var _ flavor = (*mysqlFlavor57)(nil) +var _ flavor = (*mysqlFlavor8Legacy)(nil) +var _ flavor = (*mysqlFlavor8)(nil) + +// TablesWithSize56 is a query to select table along with size for mysql 5.6 +const TablesWithSize56 = `SELECT table_name, + table_type, + UNIX_TIMESTAMP(create_time) AS uts_create_time, + table_comment, + SUM(data_length + index_length), + SUM(data_length + index_length) +FROM information_schema.tables +WHERE table_schema = database() +GROUP BY table_name, + table_type, + uts_create_time, + table_comment` + +// TablesWithSize57 is a query to select table along with size for mysql 5.7. +// +// It's a little weird, because the JOIN predicate only works if the table and databases do not contain weird characters. +// If the join does not return any data, we fall back to the same fields as used in the mysql 5.6 query. +// +// We join with a subquery that materializes the data from `information_schema.innodb_sys_tablespaces` +// early for performance reasons. This effectively causes only a single read of `information_schema.innodb_sys_tablespaces` +// per query. +const TablesWithSize57 = `SELECT t.table_name, + t.table_type, + UNIX_TIMESTAMP(t.create_time), + t.table_comment, + IFNULL(SUM(i.file_size), SUM(t.data_length + t.index_length)), + IFNULL(SUM(i.allocated_size), SUM(t.data_length + t.index_length)) +FROM information_schema.tables t +LEFT OUTER JOIN ( + SELECT space, file_size, allocated_size, name + FROM information_schema.innodb_sys_tablespaces + WHERE name LIKE CONCAT(database(), '/%') + GROUP BY space, file_size, allocated_size, name +) i ON i.name = CONCAT(t.table_schema, '/', t.table_name) or i.name LIKE CONCAT(t.table_schema, '/', t.table_name, '#p#%') +WHERE t.table_schema = database() +GROUP BY t.table_name, t.table_type, t.create_time, t.table_comment` + +func (mysqlFlavorLegacy) startReplicationCommand() string { + return "START SLAVE" +} + +func (mysqlFlavorLegacy) restartReplicationCommands() []string { + return []string{ + "STOP SLAVE", + "RESET SLAVE", + "START SLAVE", + } +} + +func (mysqlFlavorLegacy) startReplicationUntilAfter(pos replication.Position) string { + return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + +func (mysqlFlavorLegacy) startSQLThreadUntilAfter(pos replication.Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + +func (mysqlFlavorLegacy) stopReplicationCommand() string { + return "STOP SLAVE" +} + +func (mysqlFlavorLegacy) resetReplicationCommand() string { + return "RESET SLAVE ALL" +} + +func (mysqlFlavorLegacy) stopIOThreadCommand() string { + return "STOP SLAVE IO_THREAD" +} + +func (mysqlFlavorLegacy) stopSQLThreadCommand() string { + return "STOP SLAVE SQL_THREAD" +} + +func (mysqlFlavorLegacy) startSQLThreadCommand() string { + return "START SLAVE SQL_THREAD" +} + +// resetReplicationCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) resetReplicationCommands(c *Conn) []string { + resetCommands := []string{ + "STOP SLAVE", + "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + } + status, err := c.SemiSyncExtensionLoaded() + if err != nil { + return resetCommands + } + switch status { + case SemiSyncTypeSource: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_source_enabled = false, GLOBAL rpl_semi_sync_replica_enabled = false") // semi-sync will be enabled if needed when replica is started. + case SemiSyncTypeMaster: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. + default: + // Nothing to do. + } + return resetCommands +} + +// resetReplicationParametersCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) resetReplicationParametersCommands(c *Conn) []string { + resetCommands := []string{ + "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + } + return resetCommands +} + +// status is part of the Flavor interface. +func (mysqlFlavorLegacy) status(c *Conn) (replication.ReplicationStatus, error) { + qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) + if err != nil { + return replication.ReplicationStatus{}, err + } + if len(qr.Rows) == 0 { + // The query returned no data, meaning the server + // is not configured as a replica. + return replication.ReplicationStatus{}, ErrNotReplica + } + + resultMap, err := resultToMap(qr) + if err != nil { + return replication.ReplicationStatus{}, err + } + + return replication.ParseMysqlReplicationStatus(resultMap, false) +} + +// replicationNetTimeout is part of the Flavor interface. +func (mysqlFlavorLegacy) replicationNetTimeout(c *Conn) (int32, error) { + qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false) + if err != nil { + return 0, err + } + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr) + } + return qr.Rows[0][0].ToInt32() +} + +func (mysqlFlavorLegacy) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { + cmds := []string{ + "STOP SLAVE FOR CHANNEL '' ", + "STOP SLAVE IO_THREAD FOR CHANNEL ''", + } + + if params.SslCa != "" || params.SslCert != "" { + // We need to use TLS + cmd := fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1, MASTER_SSL=1", params.Host, params.Port, params.Uname, params.Pass) + if params.SslCa != "" { + cmd += fmt.Sprintf(", MASTER_SSL_CA='%s'", params.SslCa) + } + if params.SslCert != "" { + cmd += fmt.Sprintf(", MASTER_SSL_CERT='%s'", params.SslCert) + } + if params.SslKey != "" { + cmd += fmt.Sprintf(", MASTER_SSL_KEY='%s'", params.SslKey) + } + cmds = append(cmds, cmd+";") + } else { + // No TLS + cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass)) + } + + if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely + cmds = append(cmds, "START SLAVE") + } else { + cmds = append(cmds, fmt.Sprintf("START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", replPos.GTIDSet.Last())) + } + return cmds +} + +func (mysqlFlavorLegacy) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { + args := []string{ + fmt.Sprintf("MASTER_HOST = '%s'", host), + fmt.Sprintf("MASTER_PORT = %d", port), + fmt.Sprintf("MASTER_USER = '%s'", params.Uname), + fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass), + fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry), + } + if params.SslEnabled() { + args = append(args, "MASTER_SSL = 1") + } + if params.SslCa != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa)) + } + if params.SslCaPath != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath)) + } + if params.SslCert != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert)) + } + if params.SslKey != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey)) + } + if heartbeatInterval != 0 { + args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval)) + } + args = append(args, "MASTER_AUTO_POSITION = 1") + return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") +} + +func (mysqlFlavorLegacy) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + +func (mysqlFlavorLegacy) binlogReplicatedUpdates() string { + return "@@global.log_slave_updates" +} + +// setReplicationPositionCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) setReplicationPositionCommands(pos replication.Position) []string { + return []string{ + "RESET MASTER", // We must clear gtid_executed before setting gtid_purged. + fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), + } +} + +// primaryStatus is part of the Flavor interface. +func (mysqlFlavorLegacy) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { + qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) + if err != nil { + return replication.PrimaryStatus{}, err + } + if len(qr.Rows) == 0 { + // The query returned no data. We don't know how this could happen. + return replication.PrimaryStatus{}, ErrNoPrimaryStatus + } + + resultMap, err := resultToMap(qr) + if err != nil { + return replication.PrimaryStatus{}, err + } + + return replication.ParseMysqlPrimaryStatus(resultMap) +} diff --git a/go/mysql/flavor_mysql_test.go b/go/mysql/flavor_mysql_test.go index ff135d529ab..a85c39e2807 100644 --- a/go/mysql/flavor_mysql_test.go +++ b/go/mysql/flavor_mysql_test.go @@ -20,73 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" -) - -func TestMysql57SetReplicationSourceCommand(t *testing.T) { - params := &ConnParams{ - Uname: "username", - Pass: "password", - } - host := "localhost" - port := int32(123) - connectRetry := 1234 - want := `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_AUTO_POSITION = 1` - - conn := &Conn{flavor: mysqlFlavor57{}} - got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) - var heartbeatInterval float64 = 5.4 - want = `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_HEARTBEAT_PERIOD = 5.4, - MASTER_AUTO_POSITION = 1` - - got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want) - -} - -func TestMysql57SetReplicationSourceCommandSSL(t *testing.T) { - params := &ConnParams{ - Uname: "username", - Pass: "password", - SslCa: "ssl-ca", - SslCaPath: "ssl-ca-path", - SslCert: "ssl-cert", - SslKey: "ssl-key", - } - params.EnableSSL() - host := "localhost" - port := int32(123) - connectRetry := 1234 - want := `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_SSL = 1, - MASTER_SSL_CA = 'ssl-ca', - MASTER_SSL_CAPATH = 'ssl-ca-path', - MASTER_SSL_CERT = 'ssl-cert', - MASTER_SSL_KEY = 'ssl-key', - MASTER_AUTO_POSITION = 1` - - conn := &Conn{flavor: mysqlFlavor57{}} - got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) -} + "vitess.io/vitess/go/mysql/replication" +) func TestMysql8SetReplicationSourceCommand(t *testing.T) { params := &ConnParams{ @@ -152,3 +88,29 @@ func TestMysql8SetReplicationSourceCommandSSL(t *testing.T) { got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) } + +func TestMysql8SetReplicationPositionCommands(t *testing.T) { + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + conn := &Conn{flavor: mysqlFlavor8{}} + queries := conn.SetReplicationPositionCommands(pos) + assert.Equal(t, []string{"RESET MASTER", "SET GLOBAL gtid_purged = ''"}, queries) +} + +func TestMysql82SetReplicationPositionCommands(t *testing.T) { + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + conn := &Conn{flavor: mysqlFlavor82{}} + queries := conn.SetReplicationPositionCommands(pos) + assert.Equal(t, []string{"RESET BINARY LOGS AND GTIDS", "SET GLOBAL gtid_purged = ''"}, queries) +} + +func TestMysql8ResetReplicationParametersCommands(t *testing.T) { + conn := &Conn{flavor: mysqlFlavor8{}} + queries := conn.ResetReplicationParametersCommands() + assert.Equal(t, []string{"RESET REPLICA ALL"}, queries) +} + +func TestMysql82ResetReplicationParametersCommands(t *testing.T) { + conn := &Conn{flavor: mysqlFlavor82{}} + queries := conn.ResetReplicationParametersCommands() + assert.Equal(t, []string{"RESET REPLICA ALL"}, queries) +} diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 8096774474c..df3dc060742 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -238,7 +238,7 @@ func fetchStatusForGroupReplication(c *Conn, query string, onResult func([]sqlty return onResult(qr.Rows[0]) } -// primaryStatus returns the result of 'SHOW MASTER STATUS', +// primaryStatus returns the result of 'SHOW BINARY LOG STATUS', // with parsed executed position. func (mysqlGRFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { return mysqlFlavor{}.primaryStatus(c) diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 9d046002555..08baaa169c8 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -197,3 +197,9 @@ func (c *Conn) BinlogInformation() (string, bool, bool, string, error) { } return binlogFormat, logBin == 1, logReplicaUpdates == 1, binlogRowImage, nil } + +// ResetBinaryLogsCommand returns the command used to reset the +// binary logs on the server. +func (c *Conn) ResetBinaryLogsCommand() string { + return c.flavor.resetBinaryLogsCommand() +} diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index 679b152f9d4..511777a5a4a 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -24,7 +24,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -// PrimaryStatus holds replication information from SHOW MASTER STATUS. +// PrimaryStatus holds replication information from SHOW BINARY LOG STATUS. type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position @@ -52,7 +52,7 @@ func ParseMysqlPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) return status, nil } -// ParsePrimaryStatus parses the common fields of SHOW MASTER STATUS. +// ParsePrimaryStatus parses the common fields of SHOW BINARY LOG STATUS. func ParsePrimaryStatus(fields map[string]string) PrimaryStatus { status := PrimaryStatus{} diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index a562df4e79f..7dada7a77d2 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -327,13 +327,17 @@ func tearDown(t *testing.T, initMysql bool) { } caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) require.True(t, caughtUp, "Timed out waiting for all replicas to catch up") - promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"} + + promoteCommands := []string{"STOP REPLICA", "RESET REPLICA ALL"} disableSemiSyncCommandsSource := []string{"SET GLOBAL rpl_semi_sync_source_enabled = false", " SET GLOBAL rpl_semi_sync_replica_enabled = false"} disableSemiSyncCommandsMaster := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { - err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true) + resetCmd, err := tablet.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) + cmds := append(promoteCommands, resetCmd) + err = tablet.VttabletProcess.QueryTabletMultiple(cmds, keyspaceName, true) require.NoError(t, err) semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index c2f20ec505a..e9dd4804e57 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -467,6 +467,16 @@ func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (mysql.SemiSyncType, return conn.SemiSyncExtensionLoaded() } +// SemiSyncExtensionLoaded returns what type of semi-sync extension is loaded +func (vttablet *VttabletProcess) ResetBinaryLogsCommand() (string, error) { + conn, err := vttablet.TabletConn("", false) + if err != nil { + return "", err + } + defer conn.Close() + return conn.ResetBinaryLogsCommand(), nil +} + // QueryTabletMultiple lets you execute multiple queries -- without any // results -- against the tablet. func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error { diff --git a/go/test/endtoend/reparent/emergencyreparent/ers_test.go b/go/test/endtoend/reparent/emergencyreparent/ers_test.go index 0eaac97a4f2..584bccfdfb7 100644 --- a/go/test/endtoend/reparent/emergencyreparent/ers_test.go +++ b/go/test/endtoend/reparent/emergencyreparent/ers_test.go @@ -525,9 +525,9 @@ func TestReplicationStopped(t *testing.T) { tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]}) - err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`) + err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `STOP REPLICA SQL_THREAD;`) require.NoError(t, err) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[2].Alias, `STOP SLAVE;`) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[2].Alias, `STOP REPLICA;`) require.NoError(t, err) // Run an additional command in the current primary which will only be acked by tablets[3] and be in its relay log. insertedVal := utils.ConfirmReplication(t, tablets[0], nil) @@ -536,7 +536,7 @@ func TestReplicationStopped(t *testing.T) { require.Error(t, err, "ERS should fail with 2 replicas having replication stopped") // Start replication back on tablet[1] - err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `START SLAVE;`) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `START REPLICA;`) require.NoError(t, err) // Failover to tablets[3] again. This time it should succeed out, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s") diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index d9d6cb06b79..ae9bd6bbc9b 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -231,8 +231,10 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus _, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname) // tablets[0] will now be a replica of tablets[1 + resetCmd, err := tablets[0].VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeReplicationSourceCommands := []string{ - "RESET MASTER", + resetCmd, "RESET REPLICA", fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort), @@ -243,9 +245,11 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus baseTime := time.Now().UnixNano() / 1000000000 // tablets[2 will be a replica of tablets[1 + resetCmd, err = tablets[2].VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeReplicationSourceCommands = []string{ "STOP REPLICA", - "RESET MASTER", + resetCmd, fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort), "START REPLICA", @@ -262,7 +266,7 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus } // update topology with the new server - err := clusterInstance.VtctldClientProcess.ExecuteCommand("TabletExternallyReparented", + err = clusterInstance.VtctldClientProcess.ExecuteCommand("TabletExternallyReparented", tablets[1].Alias) require.NoError(t, err) diff --git a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go index 7bb139438f8..c8aaf0ae7fc 100644 --- a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go @@ -72,9 +72,11 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro // Use 'localhost' as hostname because Travis CI worker hostnames // are too long for MySQL replication. + resetCmd, err := oldPrimary.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeSourceCommands := []string{ "STOP REPLICA", - "RESET MASTER", + resetCmd, fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort), "START REPLICA", diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 7c645f96ee1..09da820d17e 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -330,7 +330,9 @@ func cleanAndStartVttablet(t *testing.T, clusterInfo *VTOrcClusterInfo, vttablet _, err = RunSQL(t, "STOP REPLICA", vttablet, "") require.NoError(t, err) // reset the binlog - _, err = RunSQL(t, "RESET MASTER", vttablet, "") + resetCmd, err := vttablet.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) + _, err = RunSQL(t, resetCmd, vttablet, "") require.NoError(t, err) // set read-only to true _, err = RunSQL(t, "SET GLOBAL read_only = ON", vttablet, "") diff --git a/go/vt/binlog/binlog_connection.go b/go/vt/binlog/binlog_connection.go index f7c7acd8e9c..0fd13fd984f 100644 --- a/go/vt/binlog/binlog_connection.go +++ b/go/vt/binlog/binlog_connection.go @@ -91,8 +91,8 @@ func connectForReplication(cp dbconfigs.Connector) (*mysql.Conn, error) { } // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. - if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { - return nil, fmt.Errorf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err) + if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { + return nil, fmt.Errorf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err) } return conn, nil diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 0da2d18e06d..7052dcbdf87 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -351,13 +351,9 @@ func ensureRestoredGTIDPurgedMatchesManifest(ctx context.Context, manifest *Back } params.Logger.Infof("Restore: @@gtid_purged does not equal manifest's GTID position. Setting @@gtid_purged to %v", gtid) // This is not good. We want to apply a new @@gtid_purged value. - query := "RESET MASTER" // required dialect in 5.7 - if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { - return vterrors.Wrapf(err, "error issuing %v", query) - } - query = fmt.Sprintf("SET GLOBAL gtid_purged='%s'", gtid) - if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { - return vterrors.Wrapf(err, "failed to apply `%s` after restore", query) + err = params.Mysqld.SetReplicationPosition(ctx, manifest.Position) + if err != nil { + return vterrors.Wrap(err, "error setting replication position") } return nil } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index f1682bf9427..f6447eda549 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -470,7 +470,8 @@ func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos repl return fmt.Errorf("wrong pos for SetReplicationPosition: expected %v got %v", fmd.SetReplicationPositionPos, pos) } return fmd.ExecuteSuperQueryList(ctx, []string{ - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", }) } diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go index 5c40ab20a8b..5d8256cfa2e 100644 --- a/go/vt/proto/replicationdata/replicationdata.pb.go +++ b/go/vt/proto/replicationdata/replicationdata.pb.go @@ -417,7 +417,7 @@ func (x *StopReplicationStatus) GetAfter() *Status { return nil } -// PrimaryStatus is the replication status for a MySQL primary (returned by 'show master status'). +// PrimaryStatus is the replication status for a MySQL primary (returned by 'show binary log status'). type PrimaryStatus struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go index 8a9a4d6569f..6ec17060dd3 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go @@ -3935,7 +3935,7 @@ type DemotePrimaryResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // PrimaryStatus represents the response from calling `SHOW MASTER STATUS` on a primary that has been demoted. + // PrimaryStatus represents the response from calling `SHOW BINARY LOG STATUS` on a primary that has been demoted. PrimaryStatus *replicationdata.PrimaryStatus `protobuf:"bytes,2,opt,name=primary_status,json=primaryStatus,proto3" json:"primary_status,omitempty"` } diff --git a/go/vt/vtctl/grpcvtctldserver/endtoend/init_shard_primary_test.go b/go/vt/vtctl/grpcvtctldserver/endtoend/init_shard_primary_test.go index 91f2b1303a8..1f17782402f 100644 --- a/go/vt/vtctl/grpcvtctldserver/endtoend/init_shard_primary_test.go +++ b/go/vt/vtctl/grpcvtctldserver/endtoend/init_shard_primary_test.go @@ -70,7 +70,8 @@ func TestInitShardPrimary(t *testing.T) { "START REPLICA", // These come from InitShardPrimary "FAKE RESET ALL REPLICATION", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "FAKE SET SOURCE", "START REPLICA", } @@ -81,7 +82,8 @@ func TestInitShardPrimary(t *testing.T) { "FAKE SET SOURCE", "START REPLICA", "FAKE RESET ALL REPLICATION", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "FAKE SET SOURCE", "START REPLICA", } @@ -128,7 +130,8 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) { tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE RESET ALL REPLICATION", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "FAKE SET SOURCE", "START REPLICA", } @@ -136,7 +139,8 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) { tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "FAKE RESET ALL REPLICATION", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "FAKE SET SOURCE", "START REPLICA", } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index b2f2a01affe..3e745222092 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -76,7 +76,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful replicationStatusProto = replication.ReplicationStatusToProto(replicationStatus) } - // Primary status - "SHOW MASTER STATUS" + // Primary status - "SHOW BINARY LOG STATUS" primaryStatus, err := tm.MysqlDaemon.PrimaryStatus(ctx) var primaryStatusProto *replicationdatapb.PrimaryStatus if err != nil && err != mysql.ErrNoPrimaryStatus { diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index e0a94033360..5e73d266705 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -196,9 +196,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "START REPLICA", } sourceTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } sourceTablet.StartActionLoop(t, wr) defer sourceTablet.StopActionLoop(t) @@ -237,17 +235,18 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE RESET REPLICA ALL", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -289,14 +288,15 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { } primary.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE RESET REPLICA ALL", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "FAKE SET SOURCE", "START REPLICA", } @@ -491,17 +491,18 @@ func TestBackupRestoreLagged(t *testing.T) { "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE RESET REPLICA ALL", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -681,17 +682,18 @@ func TestRestoreUnreachablePrimary(t *testing.T) { "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE RESET REPLICA ALL", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE SET SOURCE", "START REPLICA", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -844,14 +846,15 @@ func TestDisableActiveReparents(t *testing.T) { }, } destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", "STOP REPLICA", "FAKE RESET REPLICA ALL", - "FAKE SET REPLICA POSITION", + "FAKE RESET BINARY LOGS AND GTIDS", + "FAKE SET GLOBAL gtid_purged", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, - "RESET MASTER": {}, - "SET GLOBAL gtid_purged": {}, + "SHOW DATABASES": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto index 8c71669e80d..1a8b608f984 100644 --- a/proto/replicationdata.proto +++ b/proto/replicationdata.proto @@ -73,7 +73,7 @@ enum StopReplicationMode { IOTHREADONLY = 1; } -// PrimaryStatus is the replication status for a MySQL primary (returned by 'show master status'). +// PrimaryStatus is the replication status for a MySQL primary (returned by 'show binary log status'). message PrimaryStatus { string position = 1; string file_position = 2; diff --git a/proto/tabletmanagerdata.proto b/proto/tabletmanagerdata.proto index 2b782d847cc..f853e2e4ea8 100644 --- a/proto/tabletmanagerdata.proto +++ b/proto/tabletmanagerdata.proto @@ -424,7 +424,7 @@ message DemotePrimaryResponse { //string deprecated_position = 1 [deprecated = true]; reserved 1; - // PrimaryStatus represents the response from calling `SHOW MASTER STATUS` on a primary that has been demoted. + // PrimaryStatus represents the response from calling `SHOW BINARY LOG STATUS` on a primary that has been demoted. replicationdata.PrimaryStatus primary_status = 2; } From 6d7b1dca240294f91cec58cb2b262acd97bcb950 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Tue, 14 May 2024 13:47:01 +0200 Subject: [PATCH 2/2] Address review comments Signed-off-by: Dirkjan Bussink --- go/mysql/flavor_mysql.go | 11 ++++++++++- go/mysql/flavor_mysql_legacy.go | 14 ++++++++++++-- go/test/endtoend/cluster/vttablet_process.go | 2 +- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 154de880f62..a1245257c74 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -33,15 +33,24 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -// mysqlFlavor implements the Flavor interface for Mysql. +// mysqlFlavor implements the Flavor interface for Mysql. This is +// the most up to date / recent flavor and uses the most modern +// replication commands and semantics. type mysqlFlavor struct { serverVersion string } +// mysqlFlavor8 is for later MySQL 8.0 versions. It's the same as +// the modern flavor, but overrides some specific commands that +// are only available on MySQL 8.2.0 and later. This is specifically +// commands like SHOW BINARY LOG STATUS. type mysqlFlavor8 struct { mysqlFlavor } +// mysqlFlavor82 is for MySQL 8.2.0 and later. It's the most modern +// flavor but has an explicit name so that it's clear it's explicitly +// for MySQL 8.2.0 and later. type mysqlFlavor82 struct { mysqlFlavor } diff --git a/go/mysql/flavor_mysql_legacy.go b/go/mysql/flavor_mysql_legacy.go index 97d8059e7b6..a5639cc944e 100644 --- a/go/mysql/flavor_mysql_legacy.go +++ b/go/mysql/flavor_mysql_legacy.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Vitess Authors. +Copyright 2024 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,13 +25,23 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -// mysqlFlavor implements the Flavor interface for Mysql. +// mysqlFlavorLegacy implements the Flavor interface for Mysql for +// older versions. This applies to MySQL 5.7 and early 8.0 versions from +// before the replication terminology deprecation. type mysqlFlavorLegacy struct { mysqlFlavor } + +// mysqlFlavor57 is the explicit flavor for MySQL 5.7. It's basically +// the same as the legacy flavor, but it has a separate name here to +// be explicit about the version. type mysqlFlavor57 struct { mysqlFlavorLegacy } + +// mysqlFlavor8 is the explicit flavor for MySQL 8.0. It's similarly to +// 5.7 the same as the legacy flavor, but has an explicit name to be +// clear it's used for early MySQL 8.0 versions. type mysqlFlavor8Legacy struct { mysqlFlavorLegacy } diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index e9dd4804e57..45db1dc4bd2 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -467,7 +467,7 @@ func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (mysql.SemiSyncType, return conn.SemiSyncExtensionLoaded() } -// SemiSyncExtensionLoaded returns what type of semi-sync extension is loaded +// ResetBinaryLogsCommand returns the commands to reset binary logs func (vttablet *VttabletProcess) ResetBinaryLogsCommand() (string, error) { conn, err := vttablet.TabletConn("", false) if err != nil {