diff --git a/examples/compose/fix_replication.sh b/examples/compose/fix_replication.sh index a528b06be6f..6aa9648308a 100755 --- a/examples/compose/fix_replication.sh +++ b/examples/compose/fix_replication.sh @@ -26,15 +26,15 @@ cd "$(dirname "${BASH_SOURCE[0]}")" function get_replication_status() { # Get replication status - STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW SLAVE STATUS\G") + STATUS_LINE=$(mysql -u$DB_USER -p$DB_PASS -h 127.0.0.1 -e "SHOW REPLICA STATUS\G") LAST_ERRNO=$(grep "Last_IO_Errno:" <<< "$STATUS_LINE" | awk '{ print $2 }') - SLAVE_SQL_RUNNING=$(grep "Slave_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') - SLAVE_IO_RUNNING=$(grep "Slave_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') - MASTER_HOST=$(grep "Master_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }') - MASTER_PORT=$(grep "Master_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }') + REPLICA_SQL_RUNNING=$(grep "Replica_SQL_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') + REPLICA_IO_RUNNING=$(grep "Replica_IO_Running:" <<< "$STATUS_LINE" | awk '{ print $2 }') + SOURCE_HOST=$(grep "Source_Host:" <<< "$STATUS_LINE" | awk '{ print $2 }') + SOURCE_PORT=$(grep "Source_Port:" <<< "$STATUS_LINE" | awk '{ print $2 }') - echo "Slave_SQL_Running: $SLAVE_SQL_RUNNING" - echo "Slave_IO_Running: $SLAVE_IO_RUNNING" + echo "Replica_SQL_Running: $REPLICA_SQL_RUNNING" + echo "Replica_IO_Running: $REPLICA_IO_RUNNING" echo "Last_IO_Errno: $LAST_ERRNO" } @@ -54,7 +54,7 @@ get_replication_status [ ${1:-''} != 'status' ] || exit 0; # Check if IO_Thread is running -if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then +if [[ $REPLICA_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then echo "Primary has purged bin logs that replica requires. Sync will require restore from mysqldump" if [[ -f $KEYSPACE.sql ]] ; then @@ -64,7 +64,7 @@ if [[ $SLAVE_IO_RUNNING = "No" && $LAST_ERRNO = 1236 ]]; then else echo "Starting mysqldump. This may take a while.." # Modify flags to user's requirements - if mysqldump -h $MASTER_HOST -P $MASTER_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \ + if mysqldump -h $SOURCE_HOST -P $SOURCE_PORT -u$DB_USER -p$DB_PASS --databases $KEYSPACE \ --triggers --routines --events --hex-blob --master-data=1 --quick --order-by-primary \ --no-autocommit --skip-comments --skip-add-drop-table --skip-add-locks \ --skip-disable-keys --single-transaction --set-gtid-purged=on --verbose > $KEYSPACE.sql; then diff --git a/go/mysql/constants.go b/go/mysql/constants.go index 816ae64778e..defcf37b871 100644 --- a/go/mysql/constants.go +++ b/go/mysql/constants.go @@ -289,6 +289,4 @@ func IsNum(typ uint8) bool { const ( readReplicationConnectionConfiguration = "SELECT * FROM performance_schema.replication_connection_configuration" - readReplicaNetTimeout = "select @@global.replica_net_timeout" - readSlaveNetTimeout = "select @@global.slave_net_timeout" ) diff --git a/go/mysql/endtoend/replication_test.go b/go/mysql/endtoend/replication_test.go index 441209b35f7..d3b9a6722ea 100644 --- a/go/mysql/endtoend/replication_test.go +++ b/go/mysql/endtoend/replication_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/mysql" @@ -46,15 +47,6 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor t.Fatal(err) } - // We need to know if this is MariaDB, to set the right flag. - if conn.IsMariaDB() { - // This flag is required to get GTIDs from MariaDB. - t.Log("MariaDB: sensing SET @mariadb_slave_capability=4") - if _, err := conn.ExecuteFetch("SET @mariadb_slave_capability=4", 0, false); err != nil { - t.Fatalf("failed to set @mariadb_slave_capability=4: %v", err) - } - } - // Switch server to RBR if needed. if rbr { if _, err := conn.ExecuteFetch("SET GLOBAL binlog_format='ROW'", 0, false); err != nil { @@ -63,25 +55,21 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor } // First we get the current binlog position. - result, err := conn.ExecuteFetch("SHOW MASTER STATUS", 1, true) - require.NoError(t, err, "SHOW MASTER STATUS failed: %v", err) + status, err := conn.ShowPrimaryStatus() + require.NoError(t, err, "retrieving primary status failed: %v", err) - if len(result.Fields) < 2 || result.Fields[0].Name != "File" || result.Fields[1].Name != "Position" || - len(result.Rows) != 1 { - t.Fatalf("SHOW MASTER STATUS returned unexpected result: %v", result) - } - file := result.Rows[0][0].ToString() - position, err := result.Rows[0][1].ToCastUint64() - require.NoError(t, err, "SHOW MASTER STATUS returned invalid position: %v", result.Rows[0][1]) + filePos := status.FilePosition.GTIDSet.(replication.FilePosGTID) + file := filePos.File + position := filePos.Pos // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. - if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { - t.Fatalf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err) + if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { + t.Fatalf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err) } // Write ComBinlogDump packet with to start streaming events from here. - if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil { + if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil { t.Fatalf("WriteComBinlogDump failed: %v", err) } diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 54b62ad2261..f732b1ccb88 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -123,11 +123,14 @@ type flavor interface { // as the new replication source (without changing any GTID position). setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string + // resetBinaryLogsCommand returns the command to reset the binary logs. + resetBinaryLogsCommand() string + // status returns the result of the appropriate status command, // with parsed replication position. status(c *Conn) (replication.ReplicationStatus, error) - // primaryStatus returns the result of 'SHOW MASTER STATUS', + // primaryStatus returns the result of 'SHOW BINARY LOG STATUS', // with parsed executed position. primaryStatus(c *Conn) (replication.PrimaryStatus, error) @@ -185,15 +188,16 @@ func GetFlavor(serverVersion string, flavorFunc func() flavor) (f flavor, capabl f = mariadbFlavor102{mariadbFlavor{serverVersion: fmt.Sprintf("%f", mariadbVersion)}} } case strings.HasPrefix(serverVersion, mysql8VersionPrefix): - recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability) - if recent { + if latest, _ := capabilities.ServerVersionAtLeast(serverVersion, 8, 2, 0); latest { + f = mysqlFlavor82{mysqlFlavor{serverVersion: serverVersion}} + } else if recent, _ := capabilities.MySQLVersionHasCapability(serverVersion, capabilities.ReplicaTerminologyCapability); recent { f = mysqlFlavor8{mysqlFlavor{serverVersion: serverVersion}} } else { - f = mysqlFlavor8Legacy{mysqlFlavor{serverVersion: serverVersion}} + f = mysqlFlavor8Legacy{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}} } default: // If unknown, return the most basic flavor: MySQL 57. - f = mysqlFlavor57{mysqlFlavor{serverVersion: serverVersion}} + f = mysqlFlavor57{mysqlFlavorLegacy{mysqlFlavor{serverVersion: serverVersion}}} } return f, f.supportsCapability, canonicalVersion } @@ -400,7 +404,7 @@ func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) { return c.flavor.status(c) } -// ShowPrimaryStatus executes the right SHOW MASTER STATUS command, +// ShowPrimaryStatus executes the right SHOW BINARY LOG STATUS command, // and returns a parsed executed Position, as well as file based Position. func (c *Conn) ShowPrimaryStatus() (replication.PrimaryStatus, error) { return c.flavor.primaryStatus(c) diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index e04a88759ec..5e766e81912 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -233,6 +233,11 @@ func (flv *filePosFlavor) setReplicationSourceCommand(params *ConnParams, host s return "unsupported" } +// resetBinaryLogsCommand is part of the Flavor interface. +func (flv *filePosFlavor) resetBinaryLogsCommand() string { + return "unsupported" +} + // status is part of the Flavor interface. func (flv *filePosFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 42792b4a01d..301ec2b0596 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -219,6 +219,10 @@ func (mariadbFlavor) setReplicationSourceCommand(params *ConnParams, host string return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") } +func (mariadbFlavor) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + // status is part of the Flavor interface. func (mariadbFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW ALL SLAVES STATUS", 100, true /* wantfields */) @@ -288,7 +292,7 @@ func (mariadbFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configu // replicationNetTimeout is part of the Flavor interface. func (mariadbFlavor) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false) + qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false) if err != nil { return 0, err } diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 8dba1c8f4dd..154de880f62 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -37,20 +37,17 @@ import ( type mysqlFlavor struct { serverVersion string } -type mysqlFlavor57 struct { - mysqlFlavor -} -type mysqlFlavor8Legacy struct { + +type mysqlFlavor8 struct { mysqlFlavor } -type mysqlFlavor8 struct { +type mysqlFlavor82 struct { mysqlFlavor } -var _ flavor = (*mysqlFlavor57)(nil) -var _ flavor = (*mysqlFlavor8Legacy)(nil) var _ flavor = (*mysqlFlavor8)(nil) +var _ flavor = (*mysqlFlavor82)(nil) // primaryGTIDSet is part of the Flavor interface. func (mysqlFlavor) primaryGTIDSet(c *Conn) (replication.GTIDSet, error) { @@ -103,51 +100,11 @@ func (mysqlFlavor) gtidMode(c *Conn) (string, error) { return qr.Rows[0][0].ToString(), nil } -func (mysqlFlavor) startReplicationCommand() string { - return "START SLAVE" -} - -func (mysqlFlavor) restartReplicationCommands() []string { - return []string{ - "STOP SLAVE", - "RESET SLAVE", - "START SLAVE", - } -} - -func (mysqlFlavor) startReplicationUntilAfter(pos replication.Position) string { - return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) -} - -func (mysqlFlavor) startSQLThreadUntilAfter(pos replication.Position) string { - return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) -} - -func (mysqlFlavor) stopReplicationCommand() string { - return "STOP SLAVE" -} - -func (mysqlFlavor) resetReplicationCommand() string { - return "RESET SLAVE ALL" -} - -func (mysqlFlavor) stopIOThreadCommand() string { - return "STOP SLAVE IO_THREAD" -} - -func (mysqlFlavor) stopSQLThreadCommand() string { - return "STOP SLAVE SQL_THREAD" -} - -func (mysqlFlavor) startSQLThreadCommand() string { - return "START SLAVE SQL_THREAD" -} - -func (f mysqlFlavor8) startReplicationCommand() string { +func (f mysqlFlavor) startReplicationCommand() string { return "START REPLICA" } -func (f mysqlFlavor8) restartReplicationCommands() []string { +func (f mysqlFlavor) restartReplicationCommands() []string { return []string{ "STOP REPLICA", "RESET REPLICA", @@ -155,40 +112,40 @@ func (f mysqlFlavor8) restartReplicationCommands() []string { } } -func (f mysqlFlavor8) startReplicationUntilAfter(pos replication.Position) string { +func (f mysqlFlavor) startReplicationUntilAfter(pos replication.Position) string { return fmt.Sprintf("START REPLICA UNTIL SQL_AFTER_GTIDS = '%s'", pos) } -func (f mysqlFlavor8) startSQLThreadUntilAfter(pos replication.Position) string { +func (f mysqlFlavor) startSQLThreadUntilAfter(pos replication.Position) string { return fmt.Sprintf("START REPLICA SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) } -func (f mysqlFlavor8) stopReplicationCommand() string { +func (f mysqlFlavor) stopReplicationCommand() string { return "STOP REPLICA" } -func (f mysqlFlavor8) resetReplicationCommand() string { +func (f mysqlFlavor) resetReplicationCommand() string { return "RESET REPLICA ALL" } -func (f mysqlFlavor8) stopIOThreadCommand() string { +func (f mysqlFlavor) stopIOThreadCommand() string { return "STOP REPLICA IO_THREAD" } -func (f mysqlFlavor8) stopSQLThreadCommand() string { +func (f mysqlFlavor) stopSQLThreadCommand() string { return "STOP REPLICA SQL_THREAD" } -func (f mysqlFlavor8) startSQLThreadCommand() string { +func (f mysqlFlavor) startSQLThreadCommand() string { return "START REPLICA SQL_THREAD" } // resetReplicationCommands is part of the Flavor interface. -func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { +func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { resetCommands := []string{ "STOP REPLICA", - "RESET REPLICA ALL", // "ALL" makes it forget source host:port. - "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. + "RESET BINARY LOGS AND GTIDS", // This will also clear gtid_executed and gtid_purged. } status, err := c.SemiSyncExtensionLoaded() if err != nil { @@ -205,32 +162,16 @@ func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { return resetCommands } -// resetReplicationParametersCommands is part of the Flavor interface. -func (mysqlFlavor8) resetReplicationParametersCommands(c *Conn) []string { - resetCommands := []string{ - "RESET REPLICA ALL", // "ALL" makes it forget source host:port. - } - return resetCommands -} - -// sendBinlogDumpCommand is part of the Flavor interface. -func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { - gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) - if !ok { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) - } - - // Build the command. - sidBlock := gtidSet.SIDBlock() - return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, 0, sidBlock) +func (mysqlFlavor) resetBinaryLogsCommand() string { + return "RESET BINARY LOGS AND GTIDS" } // resetReplicationCommands is part of the Flavor interface. -func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { +func (mysqlFlavor8) resetReplicationCommands(c *Conn) []string { resetCommands := []string{ - "STOP SLAVE", - "RESET SLAVE ALL", // "ALL" makes it forget source host:port. - "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + "STOP REPLICA", + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. + "RESET MASTER", // This will also clear gtid_executed and gtid_purged. } status, err := c.SemiSyncExtensionLoaded() if err != nil { @@ -247,45 +188,69 @@ func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { return resetCommands } +// resetReplicationCommands is part of the Flavor interface. +func (mysqlFlavor8) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + // resetReplicationParametersCommands is part of the Flavor interface. func (mysqlFlavor) resetReplicationParametersCommands(c *Conn) []string { resetCommands := []string{ - "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + "RESET REPLICA ALL", // "ALL" makes it forget source host:port. } return resetCommands } +// sendBinlogDumpCommand is part of the Flavor interface. +func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilename string, startPos replication.Position) error { + gtidSet, ok := startPos.GTIDSet.(replication.Mysql56GTIDSet) + if !ok { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "startPos.GTIDSet is wrong type - expected Mysql56GTIDSet, got: %#v", startPos.GTIDSet) + } + + // Build the command. + sidBlock := gtidSet.SIDBlock() + return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, 0, sidBlock) +} + // setReplicationPositionCommands is part of the Flavor interface. -func (mysqlFlavor) setReplicationPositionCommands(pos replication.Position) []string { +func (mysqlFlavor8) setReplicationPositionCommands(pos replication.Position) []string { return []string{ "RESET MASTER", // We must clear gtid_executed before setting gtid_purged. fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), } } -// status is part of the Flavor interface. -func (mysqlFlavor) status(c *Conn) (replication.ReplicationStatus, error) { - qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) +// setReplicationPositionCommands is part of the Flavor interface. +func (mysqlFlavor) setReplicationPositionCommands(pos replication.Position) []string { + return []string{ + "RESET BINARY LOGS AND GTIDS", // We must clear gtid_executed before setting gtid_purged. + fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), + } +} + +// primaryStatus is part of the Flavor interface. +func (mysqlFlavor8) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { + qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) if err != nil { - return replication.ReplicationStatus{}, err + return replication.PrimaryStatus{}, err } if len(qr.Rows) == 0 { - // The query returned no data, meaning the server - // is not configured as a replica. - return replication.ReplicationStatus{}, ErrNotReplica + // The query returned no data. We don't know how this could happen. + return replication.PrimaryStatus{}, ErrNoPrimaryStatus } resultMap, err := resultToMap(qr) if err != nil { - return replication.ReplicationStatus{}, err + return replication.PrimaryStatus{}, err } - return replication.ParseMysqlReplicationStatus(resultMap, false) + return replication.ParseMysqlPrimaryStatus(resultMap) } // primaryStatus is part of the Flavor interface. func (mysqlFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { - qr, err := c.ExecuteFetch("SHOW MASTER STATUS", 100, true /* wantfields */) + qr, err := c.ExecuteFetch("SHOW BINARY LOG STATUS", 100, true /* wantfields */) if err != nil { return replication.PrimaryStatus{}, err } @@ -330,19 +295,7 @@ func (mysqlFlavor) replicationConfiguration(c *Conn) (*replicationdata.Configura // replicationNetTimeout is part of the Flavor interface. func (mysqlFlavor) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readSlaveNetTimeout, 1, false) - if err != nil { - return 0, err - } - if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { - return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr) - } - return qr.Rows[0][0].ToInt32() -} - -// replicationNetTimeout is part of the Flavor interface. -func (mysqlFlavor8) replicationNetTimeout(c *Conn) (int32, error) { - qr, err := c.ExecuteFetch(readReplicaNetTimeout, 1, false) + qr, err := c.ExecuteFetch("select @@global.replica_net_timeout", 1, false) if err != nil { return 0, err } @@ -353,7 +306,7 @@ func (mysqlFlavor8) replicationNetTimeout(c *Conn) (int32, error) { } // status is part of the Flavor interface. -func (mysqlFlavor8) status(c *Conn) (replication.ReplicationStatus, error) { +func (mysqlFlavor) status(c *Conn) (replication.ReplicationStatus, error) { qr, err := c.ExecuteFetch("SHOW REPLICA STATUS", 100, true /* wantfields */) if err != nil { return replication.ReplicationStatus{}, err @@ -441,44 +394,6 @@ func (mysqlFlavor) baseShowTables() string { return "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()" } -// TablesWithSize56 is a query to select table along with size for mysql 5.6 -const TablesWithSize56 = `SELECT table_name, - table_type, - UNIX_TIMESTAMP(create_time) AS uts_create_time, - table_comment, - SUM(data_length + index_length), - SUM(data_length + index_length) -FROM information_schema.tables -WHERE table_schema = database() -GROUP BY table_name, - table_type, - uts_create_time, - table_comment` - -// TablesWithSize57 is a query to select table along with size for mysql 5.7. -// -// It's a little weird, because the JOIN predicate only works if the table and databases do not contain weird characters. -// If the join does not return any data, we fall back to the same fields as used in the mysql 5.6 query. -// -// We join with a subquery that materializes the data from `information_schema.innodb_sys_tablespaces` -// early for performance reasons. This effectively causes only a single read of `information_schema.innodb_sys_tablespaces` -// per query. -const TablesWithSize57 = `SELECT t.table_name, - t.table_type, - UNIX_TIMESTAMP(t.create_time), - t.table_comment, - IFNULL(SUM(i.file_size), SUM(t.data_length + t.index_length)), - IFNULL(SUM(i.allocated_size), SUM(t.data_length + t.index_length)) -FROM information_schema.tables t -LEFT OUTER JOIN ( - SELECT space, file_size, allocated_size, name - FROM information_schema.innodb_sys_tablespaces - WHERE name LIKE CONCAT(database(), '/%') - GROUP BY space, file_size, allocated_size, name -) i ON i.name = CONCAT(t.table_schema, '/', t.table_name) or i.name LIKE CONCAT(t.table_schema, '/', t.table_name, '#p#%') -WHERE t.table_schema = database() -GROUP BY t.table_name, t.table_type, t.create_time, t.table_comment` - // TablesWithSize80 is a query to select table along with size for mysql 8.0 // // Note the following: @@ -510,61 +425,16 @@ func (mysqlFlavor57) baseShowTablesWithSizes() string { } // supportsCapability is part of the Flavor interface. -func (f mysqlFlavor57) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { +func (f mysqlFlavor) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) } // baseShowTablesWithSizes is part of the Flavor interface. -func (mysqlFlavor8Legacy) baseShowTablesWithSizes() string { +func (mysqlFlavor) baseShowTablesWithSizes() string { return TablesWithSize80 } -// baseShowTablesWithSizes is part of the Flavor interface. -func (mysqlFlavor8) baseShowTablesWithSizes() string { - return TablesWithSize80 -} - -// supportsCapability is part of the Flavor interface. -func (f mysqlFlavor8Legacy) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { - return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) -} - -// supportsCapability is part of the Flavor interface. -func (f mysqlFlavor8) supportsCapability(capability capabilities.FlavorCapability) (bool, error) { - return capabilities.MySQLVersionHasCapability(f.serverVersion, capability) -} - func (mysqlFlavor) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { - args := []string{ - fmt.Sprintf("MASTER_HOST = '%s'", host), - fmt.Sprintf("MASTER_PORT = %d", port), - fmt.Sprintf("MASTER_USER = '%s'", params.Uname), - fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass), - fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry), - } - if params.SslEnabled() { - args = append(args, "MASTER_SSL = 1") - } - if params.SslCa != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa)) - } - if params.SslCaPath != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath)) - } - if params.SslCert != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert)) - } - if params.SslKey != "" { - args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey)) - } - if heartbeatInterval != 0 { - args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval)) - } - args = append(args, "MASTER_AUTO_POSITION = 1") - return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") -} - -func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { args := []string{ fmt.Sprintf("SOURCE_HOST = '%s'", host), fmt.Sprintf("SOURCE_PORT = %d", port), @@ -595,38 +465,6 @@ func (mysqlFlavor8) setReplicationSourceCommand(params *ConnParams, host string, } func (mysqlFlavor) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { - cmds := []string{ - "STOP SLAVE FOR CHANNEL '' ", - "STOP SLAVE IO_THREAD FOR CHANNEL ''", - } - - if params.SslCa != "" || params.SslCert != "" { - // We need to use TLS - cmd := fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1, MASTER_SSL=1", params.Host, params.Port, params.Uname, params.Pass) - if params.SslCa != "" { - cmd += fmt.Sprintf(", MASTER_SSL_CA='%s'", params.SslCa) - } - if params.SslCert != "" { - cmd += fmt.Sprintf(", MASTER_SSL_CERT='%s'", params.SslCert) - } - if params.SslKey != "" { - cmd += fmt.Sprintf(", MASTER_SSL_KEY='%s'", params.SslKey) - } - cmds = append(cmds, cmd+";") - } else { - // No TLS - cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass)) - } - - if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely - cmds = append(cmds, "START SLAVE") - } else { - cmds = append(cmds, fmt.Sprintf("START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", replPos.GTIDSet.Last())) - } - return cmds -} - -func (mysqlFlavor8) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { cmds := []string{ "STOP REPLICA FOR CHANNEL '' ", "STOP REPLICA IO_THREAD FOR CHANNEL ''", @@ -659,9 +497,5 @@ func (mysqlFlavor8) catchupToGTIDCommands(params *ConnParams, replPos replicatio } func (mysqlFlavor) binlogReplicatedUpdates() string { - return "@@global.log_slave_updates" -} - -func (mysqlFlavor8) binlogReplicatedUpdates() string { return "@@global.log_replica_updates" } diff --git a/go/mysql/flavor_mysql_legacy.go b/go/mysql/flavor_mysql_legacy.go new file mode 100644 index 00000000000..21fd3c89c8c --- /dev/null +++ b/go/mysql/flavor_mysql_legacy.go @@ -0,0 +1,259 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "fmt" + "strings" + + "vitess.io/vitess/go/mysql/replication" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +// mysqlFlavor implements the Flavor interface for Mysql. +type mysqlFlavorLegacy struct { + mysqlFlavor +} +type mysqlFlavor57 struct { + mysqlFlavorLegacy +} +type mysqlFlavor8Legacy struct { + mysqlFlavorLegacy +} + +var _ flavor = (*mysqlFlavor57)(nil) +var _ flavor = (*mysqlFlavor8Legacy)(nil) +var _ flavor = (*mysqlFlavor8)(nil) + +// TablesWithSize56 is a query to select table along with size for mysql 5.6 +const TablesWithSize56 = `SELECT table_name, + table_type, + UNIX_TIMESTAMP(create_time) AS uts_create_time, + table_comment, + SUM(data_length + index_length), + SUM(data_length + index_length) +FROM information_schema.tables +WHERE table_schema = database() +GROUP BY table_name, + table_type, + uts_create_time, + table_comment` + +// TablesWithSize57 is a query to select table along with size for mysql 5.7. +// +// It's a little weird, because the JOIN predicate only works if the table and databases do not contain weird characters. +// If the join does not return any data, we fall back to the same fields as used in the mysql 5.6 query. +// +// We join with a subquery that materializes the data from `information_schema.innodb_sys_tablespaces` +// early for performance reasons. This effectively causes only a single read of `information_schema.innodb_sys_tablespaces` +// per query. +const TablesWithSize57 = `SELECT t.table_name, + t.table_type, + UNIX_TIMESTAMP(t.create_time), + t.table_comment, + IFNULL(SUM(i.file_size), SUM(t.data_length + t.index_length)), + IFNULL(SUM(i.allocated_size), SUM(t.data_length + t.index_length)) +FROM information_schema.tables t +LEFT OUTER JOIN ( + SELECT space, file_size, allocated_size, name + FROM information_schema.innodb_sys_tablespaces + WHERE name LIKE CONCAT(database(), '/%') + GROUP BY space, file_size, allocated_size, name +) i ON i.name = CONCAT(t.table_schema, '/', t.table_name) or i.name LIKE CONCAT(t.table_schema, '/', t.table_name, '#p#%') +WHERE t.table_schema = database() +GROUP BY t.table_name, t.table_type, t.create_time, t.table_comment` + +func (mysqlFlavorLegacy) startReplicationCommand() string { + return "START SLAVE" +} + +func (mysqlFlavorLegacy) restartReplicationCommands() []string { + return []string{ + "STOP SLAVE", + "RESET SLAVE", + "START SLAVE", + } +} + +func (mysqlFlavorLegacy) startReplicationUntilAfter(pos replication.Position) string { + return fmt.Sprintf("START SLAVE UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + +func (mysqlFlavorLegacy) startSQLThreadUntilAfter(pos replication.Position) string { + return fmt.Sprintf("START SLAVE SQL_THREAD UNTIL SQL_AFTER_GTIDS = '%s'", pos) +} + +func (mysqlFlavorLegacy) stopReplicationCommand() string { + return "STOP SLAVE" +} + +func (mysqlFlavorLegacy) resetReplicationCommand() string { + return "RESET SLAVE ALL" +} + +func (mysqlFlavorLegacy) stopIOThreadCommand() string { + return "STOP SLAVE IO_THREAD" +} + +func (mysqlFlavorLegacy) stopSQLThreadCommand() string { + return "STOP SLAVE SQL_THREAD" +} + +func (mysqlFlavorLegacy) startSQLThreadCommand() string { + return "START SLAVE SQL_THREAD" +} + +// resetReplicationCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) resetReplicationCommands(c *Conn) []string { + resetCommands := []string{ + "STOP SLAVE", + "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + "RESET MASTER", // This will also clear gtid_executed and gtid_purged. + } + status, err := c.SemiSyncExtensionLoaded() + if err != nil { + return resetCommands + } + switch status { + case SemiSyncTypeSource: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_source_enabled = false, GLOBAL rpl_semi_sync_replica_enabled = false") // semi-sync will be enabled if needed when replica is started. + case SemiSyncTypeMaster: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. + default: + // Nothing to do. + } + return resetCommands +} + +// resetReplicationParametersCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) resetReplicationParametersCommands(c *Conn) []string { + resetCommands := []string{ + "RESET SLAVE ALL", // "ALL" makes it forget source host:port. + } + return resetCommands +} + +// status is part of the Flavor interface. +func (mysqlFlavorLegacy) status(c *Conn) (replication.ReplicationStatus, error) { + qr, err := c.ExecuteFetch("SHOW SLAVE STATUS", 100, true /* wantfields */) + if err != nil { + return replication.ReplicationStatus{}, err + } + if len(qr.Rows) == 0 { + // The query returned no data, meaning the server + // is not configured as a replica. + return replication.ReplicationStatus{}, ErrNotReplica + } + + resultMap, err := resultToMap(qr) + if err != nil { + return replication.ReplicationStatus{}, err + } + + return replication.ParseMysqlReplicationStatus(resultMap, false) +} + +// replicationNetTimeout is part of the Flavor interface. +func (mysqlFlavorLegacy) replicationNetTimeout(c *Conn) (int32, error) { + qr, err := c.ExecuteFetch("select @@global.slave_net_timeout", 1, false) + if err != nil { + return 0, err + } + if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected result format for slave_net_timeout: %#v", qr) + } + return qr.Rows[0][0].ToInt32() +} + +func (mysqlFlavorLegacy) catchupToGTIDCommands(params *ConnParams, replPos replication.Position) []string { + cmds := []string{ + "STOP SLAVE FOR CHANNEL '' ", + "STOP SLAVE IO_THREAD FOR CHANNEL ''", + } + + if params.SslCa != "" || params.SslCert != "" { + // We need to use TLS + cmd := fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1, MASTER_SSL=1", params.Host, params.Port, params.Uname, params.Pass) + if params.SslCa != "" { + cmd += fmt.Sprintf(", MASTER_SSL_CA='%s'", params.SslCa) + } + if params.SslCert != "" { + cmd += fmt.Sprintf(", MASTER_SSL_CERT='%s'", params.SslCert) + } + if params.SslKey != "" { + cmd += fmt.Sprintf(", MASTER_SSL_KEY='%s'", params.SslKey) + } + cmds = append(cmds, cmd+";") + } else { + // No TLS + cmds = append(cmds, fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='%s', MASTER_PASSWORD='%s', MASTER_AUTO_POSITION=1;", params.Host, params.Port, params.Uname, params.Pass)) + } + + if replPos.IsZero() { // when the there is no afterPos, that means need to replicate completely + cmds = append(cmds, "START SLAVE") + } else { + cmds = append(cmds, fmt.Sprintf("START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", replPos.GTIDSet.Last())) + } + return cmds +} + +func (mysqlFlavorLegacy) setReplicationSourceCommand(params *ConnParams, host string, port int32, heartbeatInterval float64, connectRetry int) string { + args := []string{ + fmt.Sprintf("MASTER_HOST = '%s'", host), + fmt.Sprintf("MASTER_PORT = %d", port), + fmt.Sprintf("MASTER_USER = '%s'", params.Uname), + fmt.Sprintf("MASTER_PASSWORD = '%s'", params.Pass), + fmt.Sprintf("MASTER_CONNECT_RETRY = %d", connectRetry), + } + if params.SslEnabled() { + args = append(args, "MASTER_SSL = 1") + } + if params.SslCa != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CA = '%s'", params.SslCa)) + } + if params.SslCaPath != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CAPATH = '%s'", params.SslCaPath)) + } + if params.SslCert != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_CERT = '%s'", params.SslCert)) + } + if params.SslKey != "" { + args = append(args, fmt.Sprintf("MASTER_SSL_KEY = '%s'", params.SslKey)) + } + if heartbeatInterval != 0 { + args = append(args, fmt.Sprintf("MASTER_HEARTBEAT_PERIOD = %v", heartbeatInterval)) + } + args = append(args, "MASTER_AUTO_POSITION = 1") + return "CHANGE MASTER TO\n " + strings.Join(args, ",\n ") +} + +func (mysqlFlavorLegacy) resetBinaryLogsCommand() string { + return "RESET MASTER" +} + +func (mysqlFlavorLegacy) binlogReplicatedUpdates() string { + return "@@global.log_slave_updates" +} + +// setReplicationPositionCommands is part of the Flavor interface. +func (mysqlFlavorLegacy) setReplicationPositionCommands(pos replication.Position) []string { + return []string{ + "RESET MASTER", // We must clear gtid_executed before setting gtid_purged. + fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", pos), + } +} diff --git a/go/mysql/flavor_mysql_test.go b/go/mysql/flavor_mysql_test.go index ff135d529ab..a85c39e2807 100644 --- a/go/mysql/flavor_mysql_test.go +++ b/go/mysql/flavor_mysql_test.go @@ -20,73 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" -) - -func TestMysql57SetReplicationSourceCommand(t *testing.T) { - params := &ConnParams{ - Uname: "username", - Pass: "password", - } - host := "localhost" - port := int32(123) - connectRetry := 1234 - want := `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_AUTO_POSITION = 1` - - conn := &Conn{flavor: mysqlFlavor57{}} - got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) - var heartbeatInterval float64 = 5.4 - want = `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_HEARTBEAT_PERIOD = 5.4, - MASTER_AUTO_POSITION = 1` - - got = conn.SetReplicationSourceCommand(params, host, port, heartbeatInterval, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, heartbeatInterval, connectRetry, got, want) - -} - -func TestMysql57SetReplicationSourceCommandSSL(t *testing.T) { - params := &ConnParams{ - Uname: "username", - Pass: "password", - SslCa: "ssl-ca", - SslCaPath: "ssl-ca-path", - SslCert: "ssl-cert", - SslKey: "ssl-key", - } - params.EnableSSL() - host := "localhost" - port := int32(123) - connectRetry := 1234 - want := `CHANGE MASTER TO - MASTER_HOST = 'localhost', - MASTER_PORT = 123, - MASTER_USER = 'username', - MASTER_PASSWORD = 'password', - MASTER_CONNECT_RETRY = 1234, - MASTER_SSL = 1, - MASTER_SSL_CA = 'ssl-ca', - MASTER_SSL_CAPATH = 'ssl-ca-path', - MASTER_SSL_CERT = 'ssl-cert', - MASTER_SSL_KEY = 'ssl-key', - MASTER_AUTO_POSITION = 1` - - conn := &Conn{flavor: mysqlFlavor57{}} - got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) - assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) -} + "vitess.io/vitess/go/mysql/replication" +) func TestMysql8SetReplicationSourceCommand(t *testing.T) { params := &ConnParams{ @@ -152,3 +88,29 @@ func TestMysql8SetReplicationSourceCommandSSL(t *testing.T) { got := conn.SetReplicationSourceCommand(params, host, port, 0, connectRetry) assert.Equal(t, want, got, "mysqlFlavor.SetReplicationSourceCommand(%#v, %#v, %#v, %#v) = %#v, want %#v", params, host, port, connectRetry, got, want) } + +func TestMysql8SetReplicationPositionCommands(t *testing.T) { + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + conn := &Conn{flavor: mysqlFlavor8{}} + queries := conn.SetReplicationPositionCommands(pos) + assert.Equal(t, []string{"RESET MASTER", "SET GLOBAL gtid_purged = ''"}, queries) +} + +func TestMysql82SetReplicationPositionCommands(t *testing.T) { + pos := replication.Position{GTIDSet: replication.Mysql56GTIDSet{}} + conn := &Conn{flavor: mysqlFlavor82{}} + queries := conn.SetReplicationPositionCommands(pos) + assert.Equal(t, []string{"RESET BINARY LOGS AND GTIDS", "SET GLOBAL gtid_purged = ''"}, queries) +} + +func TestMysql8ResetReplicationParametersCommands(t *testing.T) { + conn := &Conn{flavor: mysqlFlavor8{}} + queries := conn.ResetReplicationParametersCommands() + assert.Equal(t, []string{"RESET REPLICA ALL"}, queries) +} + +func TestMysql82ResetReplicationParametersCommands(t *testing.T) { + conn := &Conn{flavor: mysqlFlavor82{}} + queries := conn.ResetReplicationParametersCommands() + assert.Equal(t, []string{"RESET REPLICA ALL"}, queries) +} diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 8096774474c..df3dc060742 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -238,7 +238,7 @@ func fetchStatusForGroupReplication(c *Conn, query string, onResult func([]sqlty return onResult(qr.Rows[0]) } -// primaryStatus returns the result of 'SHOW MASTER STATUS', +// primaryStatus returns the result of 'SHOW BINARY LOG STATUS', // with parsed executed position. func (mysqlGRFlavor) primaryStatus(c *Conn) (replication.PrimaryStatus, error) { return mysqlFlavor{}.primaryStatus(c) diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 9d046002555..08baaa169c8 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -197,3 +197,9 @@ func (c *Conn) BinlogInformation() (string, bool, bool, string, error) { } return binlogFormat, logBin == 1, logReplicaUpdates == 1, binlogRowImage, nil } + +// ResetBinaryLogsCommand returns the command used to reset the +// binary logs on the server. +func (c *Conn) ResetBinaryLogsCommand() string { + return c.flavor.resetBinaryLogsCommand() +} diff --git a/go/mysql/replication/primary_status.go b/go/mysql/replication/primary_status.go index 679b152f9d4..511777a5a4a 100644 --- a/go/mysql/replication/primary_status.go +++ b/go/mysql/replication/primary_status.go @@ -24,7 +24,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -// PrimaryStatus holds replication information from SHOW MASTER STATUS. +// PrimaryStatus holds replication information from SHOW BINARY LOG STATUS. type PrimaryStatus struct { // Position represents the server's GTID based position. Position Position @@ -52,7 +52,7 @@ func ParseMysqlPrimaryStatus(resultMap map[string]string) (PrimaryStatus, error) return status, nil } -// ParsePrimaryStatus parses the common fields of SHOW MASTER STATUS. +// ParsePrimaryStatus parses the common fields of SHOW BINARY LOG STATUS. func ParsePrimaryStatus(fields map[string]string) PrimaryStatus { status := PrimaryStatus{} diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index a562df4e79f..7dada7a77d2 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -327,13 +327,17 @@ func tearDown(t *testing.T, initMysql bool) { } caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) require.True(t, caughtUp, "Timed out waiting for all replicas to catch up") - promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"} + + promoteCommands := []string{"STOP REPLICA", "RESET REPLICA ALL"} disableSemiSyncCommandsSource := []string{"SET GLOBAL rpl_semi_sync_source_enabled = false", " SET GLOBAL rpl_semi_sync_replica_enabled = false"} disableSemiSyncCommandsMaster := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { - err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true) + resetCmd, err := tablet.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) + cmds := append(promoteCommands, resetCmd) + err = tablet.VttabletProcess.QueryTabletMultiple(cmds, keyspaceName, true) require.NoError(t, err) semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index c2f20ec505a..e9dd4804e57 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -467,6 +467,16 @@ func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (mysql.SemiSyncType, return conn.SemiSyncExtensionLoaded() } +// SemiSyncExtensionLoaded returns what type of semi-sync extension is loaded +func (vttablet *VttabletProcess) ResetBinaryLogsCommand() (string, error) { + conn, err := vttablet.TabletConn("", false) + if err != nil { + return "", err + } + defer conn.Close() + return conn.ResetBinaryLogsCommand(), nil +} + // QueryTabletMultiple lets you execute multiple queries -- without any // results -- against the tablet. func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error { diff --git a/go/test/endtoend/reparent/emergencyreparent/ers_test.go b/go/test/endtoend/reparent/emergencyreparent/ers_test.go index 0eaac97a4f2..584bccfdfb7 100644 --- a/go/test/endtoend/reparent/emergencyreparent/ers_test.go +++ b/go/test/endtoend/reparent/emergencyreparent/ers_test.go @@ -525,9 +525,9 @@ func TestReplicationStopped(t *testing.T) { tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]}) - err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`) + err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `STOP REPLICA SQL_THREAD;`) require.NoError(t, err) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[2].Alias, `STOP SLAVE;`) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[2].Alias, `STOP REPLICA;`) require.NoError(t, err) // Run an additional command in the current primary which will only be acked by tablets[3] and be in its relay log. insertedVal := utils.ConfirmReplication(t, tablets[0], nil) @@ -536,7 +536,7 @@ func TestReplicationStopped(t *testing.T) { require.Error(t, err, "ERS should fail with 2 replicas having replication stopped") // Start replication back on tablet[1] - err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `START SLAVE;`) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", tablets[1].Alias, `START REPLICA;`) require.NoError(t, err) // Failover to tablets[3] again. This time it should succeed out, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s") diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index d9d6cb06b79..ae9bd6bbc9b 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -231,8 +231,10 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus _, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname) // tablets[0] will now be a replica of tablets[1 + resetCmd, err := tablets[0].VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeReplicationSourceCommands := []string{ - "RESET MASTER", + resetCmd, "RESET REPLICA", fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort), @@ -243,9 +245,11 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus baseTime := time.Now().UnixNano() / 1000000000 // tablets[2 will be a replica of tablets[1 + resetCmd, err = tablets[2].VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeReplicationSourceCommands = []string{ "STOP REPLICA", - "RESET MASTER", + resetCmd, fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort), "START REPLICA", @@ -262,7 +266,7 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus } // update topology with the new server - err := clusterInstance.VtctldClientProcess.ExecuteCommand("TabletExternallyReparented", + err = clusterInstance.VtctldClientProcess.ExecuteCommand("TabletExternallyReparented", tablets[1].Alias) require.NoError(t, err) diff --git a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go index 7bb139438f8..c8aaf0ae7fc 100644 --- a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go @@ -72,9 +72,11 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro // Use 'localhost' as hostname because Travis CI worker hostnames // are too long for MySQL replication. + resetCmd, err := oldPrimary.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) changeSourceCommands := []string{ "STOP REPLICA", - "RESET MASTER", + resetCmd, fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s', SOURCE_PORT=%d, SOURCE_USER='vt_repl', SOURCE_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort), "START REPLICA", diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 7c645f96ee1..09da820d17e 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -330,7 +330,9 @@ func cleanAndStartVttablet(t *testing.T, clusterInfo *VTOrcClusterInfo, vttablet _, err = RunSQL(t, "STOP REPLICA", vttablet, "") require.NoError(t, err) // reset the binlog - _, err = RunSQL(t, "RESET MASTER", vttablet, "") + resetCmd, err := vttablet.VttabletProcess.ResetBinaryLogsCommand() + require.NoError(t, err) + _, err = RunSQL(t, resetCmd, vttablet, "") require.NoError(t, err) // set read-only to true _, err = RunSQL(t, "SET GLOBAL read_only = ON", vttablet, "") diff --git a/go/vt/binlog/binlog_connection.go b/go/vt/binlog/binlog_connection.go index f7c7acd8e9c..0fd13fd984f 100644 --- a/go/vt/binlog/binlog_connection.go +++ b/go/vt/binlog/binlog_connection.go @@ -91,8 +91,8 @@ func connectForReplication(cp dbconfigs.Connector) (*mysql.Conn, error) { } // Tell the server that we understand the format of events // that will be used if binlog_checksum is enabled on the server. - if _, err := conn.ExecuteFetch("SET @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { - return nil, fmt.Errorf("failed to set @master_binlog_checksum=@@global.binlog_checksum: %v", err) + if _, err := conn.ExecuteFetch("SET @source_binlog_checksum = @@global.binlog_checksum, @master_binlog_checksum=@@global.binlog_checksum", 0, false); err != nil { + return nil, fmt.Errorf("failed to set @source_binlog_checksum=@@global.binlog_checksum: %v", err) } return conn, nil diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 0da2d18e06d..7052dcbdf87 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -351,13 +351,9 @@ func ensureRestoredGTIDPurgedMatchesManifest(ctx context.Context, manifest *Back } params.Logger.Infof("Restore: @@gtid_purged does not equal manifest's GTID position. Setting @@gtid_purged to %v", gtid) // This is not good. We want to apply a new @@gtid_purged value. - query := "RESET MASTER" // required dialect in 5.7 - if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { - return vterrors.Wrapf(err, "error issuing %v", query) - } - query = fmt.Sprintf("SET GLOBAL gtid_purged='%s'", gtid) - if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { - return vterrors.Wrapf(err, "failed to apply `%s` after restore", query) + err = params.Mysqld.SetReplicationPosition(ctx, manifest.Position) + if err != nil { + return vterrors.Wrap(err, "error setting replication position") } return nil } diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go index 5c40ab20a8b..5d8256cfa2e 100644 --- a/go/vt/proto/replicationdata/replicationdata.pb.go +++ b/go/vt/proto/replicationdata/replicationdata.pb.go @@ -417,7 +417,7 @@ func (x *StopReplicationStatus) GetAfter() *Status { return nil } -// PrimaryStatus is the replication status for a MySQL primary (returned by 'show master status'). +// PrimaryStatus is the replication status for a MySQL primary (returned by 'show binary log status'). type PrimaryStatus struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go index 8a9a4d6569f..6ec17060dd3 100644 --- a/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go +++ b/go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go @@ -3935,7 +3935,7 @@ type DemotePrimaryResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // PrimaryStatus represents the response from calling `SHOW MASTER STATUS` on a primary that has been demoted. + // PrimaryStatus represents the response from calling `SHOW BINARY LOG STATUS` on a primary that has been demoted. PrimaryStatus *replicationdata.PrimaryStatus `protobuf:"bytes,2,opt,name=primary_status,json=primaryStatus,proto3" json:"primary_status,omitempty"` } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index b2f2a01affe..3e745222092 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -76,7 +76,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful replicationStatusProto = replication.ReplicationStatusToProto(replicationStatus) } - // Primary status - "SHOW MASTER STATUS" + // Primary status - "SHOW BINARY LOG STATUS" primaryStatus, err := tm.MysqlDaemon.PrimaryStatus(ctx) var primaryStatusProto *replicationdatapb.PrimaryStatus if err != nil && err != mysql.ErrNoPrimaryStatus { diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto index 8c71669e80d..1a8b608f984 100644 --- a/proto/replicationdata.proto +++ b/proto/replicationdata.proto @@ -73,7 +73,7 @@ enum StopReplicationMode { IOTHREADONLY = 1; } -// PrimaryStatus is the replication status for a MySQL primary (returned by 'show master status'). +// PrimaryStatus is the replication status for a MySQL primary (returned by 'show binary log status'). message PrimaryStatus { string position = 1; string file_position = 2; diff --git a/proto/tabletmanagerdata.proto b/proto/tabletmanagerdata.proto index 2b782d847cc..f853e2e4ea8 100644 --- a/proto/tabletmanagerdata.proto +++ b/proto/tabletmanagerdata.proto @@ -424,7 +424,7 @@ message DemotePrimaryResponse { //string deprecated_position = 1 [deprecated = true]; reserved 1; - // PrimaryStatus represents the response from calling `SHOW MASTER STATUS` on a primary that has been demoted. + // PrimaryStatus represents the response from calling `SHOW BINARY LOG STATUS` on a primary that has been demoted. replicationdata.PrimaryStatus primary_status = 2; }