diff --git a/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml b/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml index 7759e23bac6..ed867793859 100644 --- a/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml +++ b/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml index 962e377cb43..c87febaa26b 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml index 5060e3ca491..0fc4c116c32 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml index c051b21dace..67daba59719 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml index 028b97b389d..a30def564fd 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml b/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml index 2b6dc2c80b4..3c756a7f733 100644 --- a/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml +++ b/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml b/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml index 9565145f985..679b40deb32 100644 --- a/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml +++ b/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_basic.yml b/.github/workflows/cluster_endtoend_vreplication_basic.yml index 6dfbf5a7453..952a148186b 100644 --- a/.github/workflows/cluster_endtoend_vreplication_basic.yml +++ b/.github/workflows/cluster_endtoend_vreplication_basic.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_cellalias.yml b/.github/workflows/cluster_endtoend_vreplication_cellalias.yml index b3158028a43..415c3e920e4 100644 --- a/.github/workflows/cluster_endtoend_vreplication_cellalias.yml +++ b/.github/workflows/cluster_endtoend_vreplication_cellalias.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml index 891117e4f2d..53d44b16375 100644 --- a/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml +++ b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml b/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml index adc9cdf3851..12108c3b63e 100644 --- a/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml +++ b/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml b/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml index fa81b22e903..623af8aa1d3 100644 --- a/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml +++ b/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml b/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml index bce39eea479..3d71efc0e0e 100644 --- a/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml +++ b/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_v2.yml b/.github/workflows/cluster_endtoend_vreplication_v2.yml index b6cddc7a66c..37a881993f2 100644 --- a/.github/workflows/cluster_endtoend_vreplication_v2.yml +++ b/.github/workflows/cluster_endtoend_vreplication_v2.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml b/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml index 8d7f8e88ec6..6e1086515b5 100644 --- a/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml b/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml index a1281c6b85e..1167977e6b0 100644 --- a/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml b/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml index ee14f5903a1..e921e1c5129 100644 --- a/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/config/embed.go b/config/embed.go index b2a9333e6de..6660e749aa6 100644 --- a/config/embed.go +++ b/config/embed.go @@ -16,3 +16,9 @@ var MycnfMySQL57 string //go:embed mycnf/mysql80.cnf var MycnfMySQL80 string + +//go:embed mycnf/mysql8026.cnf +var MycnfMySQL8026 string + +//go:embed mycnf/mysql84.cnf +var MycnfMySQL84 string diff --git a/config/mycnf/mysql8026.cnf b/config/mycnf/mysql8026.cnf new file mode 100644 index 00000000000..c7755be488f --- /dev/null +++ b/config/mycnf/mysql8026.cnf @@ -0,0 +1,37 @@ +# This file is auto-included when MySQL 8.0.26 or later is detected. + +# MySQL 8.0 enables binlog by default with sync_binlog and TABLE info repositories +# It does not enable GTIDs or enforced GTID consistency + +gtid_mode = ON +enforce_gtid_consistency +relay_log_recovery = 1 +binlog_expire_logs_seconds = 259200 + +# disable mysqlx +mysqlx = 0 + +# 8.0 changes the default auth-plugin to caching_sha2_password +default_authentication_plugin = mysql_native_password + +# Semi-sync replication is required for automated unplanned failover +# (when the primary goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# VTTablet will enable semi-sync at the proper time when replication is set up, +# or when a primary is promoted or demoted based on the durability policy configured. +plugin-load = rpl_semi_sync_source=semisync_source.so;rpl_semi_sync_replica=semisync_replica.so + +# MySQL 8.0.26 and later will not load plugins during --initialize +# which makes these options unknown. Prefixing with --loose +# tells the server it's fine if they are not understood. +loose_rpl_semi_sync_source_timeout = 1000000000000000000 +loose_rpl_semi_sync_source_wait_no_replica = 1 + +# In order to protect against any errand GTIDs we will start the mysql instance +# in super-read-only mode. +super-read-only + +# Replication parameters to ensure reparents are fast. +replica_net_timeout = 8 + diff --git a/config/mycnf/mysql84.cnf b/config/mycnf/mysql84.cnf new file mode 100644 index 00000000000..90d7a535602 --- /dev/null +++ b/config/mycnf/mysql84.cnf @@ -0,0 +1,39 @@ +# This file is auto-included when MySQL 8.4.0 or later is detected. + +# MySQL 8.0 enables binlog by default with sync_binlog and TABLE info repositories +# It does not enable GTIDs or enforced GTID consistency + +gtid_mode = ON +enforce_gtid_consistency +relay_log_recovery = 1 +binlog_expire_logs_seconds = 259200 + +# disable mysqlx +mysqlx = 0 + +# 8.4 changes the default auth-plugin to caching_sha2_password and +# disables mysql_native_password by default. +mysql_native_password = ON +default_authentication_plugin = mysql_native_password + +# Semi-sync replication is required for automated unplanned failover +# (when the primary goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# VTTablet will enable semi-sync at the proper time when replication is set up, +# or when a primary is promoted or demoted based on the durability policy configured. +plugin-load = rpl_semi_sync_source=semisync_source.so;rpl_semi_sync_replica=semisync_replica.so + +# MySQL 8.0.26 and later will not load plugins during --initialize +# which makes these options unknown. Prefixing with --loose +# tells the server it's fine if they are not understood. +loose_rpl_semi_sync_source_timeout = 1000000000000000000 +loose_rpl_semi_sync_source_wait_no_replica = 1 + +# In order to protect against any errand GTIDs we will start the mysql instance +# in super-read-only mode. +super-read-only + +# Replication parameters to ensure reparents are fast. +replica_net_timeout = 8 + diff --git a/go/cmd/vtcombo/cli/main.go b/go/cmd/vtcombo/cli/main.go index d18c22ddfbb..e30f809b96b 100644 --- a/go/cmd/vtcombo/cli/main.go +++ b/go/cmd/vtcombo/cli/main.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/dbconfigs" @@ -216,7 +217,7 @@ func run(cmd *cobra.Command, args []string) (err error) { return err } servenv.OnClose(func() { - ctx, cancel := context.WithTimeout(context.Background(), mysqlctl.DefaultShutdownTimeout+10*time.Second) + ctx, cancel := context.WithTimeout(cmd.Context(), mysqlctl.DefaultShutdownTimeout+10*time.Second) defer cancel() mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout) }) @@ -240,7 +241,7 @@ func run(cmd *cobra.Command, args []string) (err error) { if err != nil { // ensure we start mysql in the event we fail here if startMysql { - ctx, cancel := context.WithTimeout(context.Background(), mysqlctl.DefaultShutdownTimeout+10*time.Second) + ctx, cancel := context.WithTimeout(cmd.Context(), mysqlctl.DefaultShutdownTimeout+10*time.Second) defer cancel() mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout) } @@ -387,11 +388,11 @@ func (mysqld *vtcomboMysqld) StopReplication(hookExtraEnv map[string]string) err } // SetSemiSyncEnabled implements the MysqlDaemon interface -func (mysqld *vtcomboMysqld) SetSemiSyncEnabled(source, replica bool) error { +func (mysqld *vtcomboMysqld) SetSemiSyncEnabled(ctx context.Context, source, replica bool) error { return nil } // SemiSyncExtensionLoaded implements the MysqlDaemon interface -func (mysqld *vtcomboMysqld) SemiSyncExtensionLoaded() (bool, error) { - return true, nil +func (mysqld *vtcomboMysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + return mysql.SemiSyncTypeSource, nil } diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 2f77a71ea00..e9552b6097d 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -144,7 +144,8 @@ func (mariadbFlavor) resetReplicationCommands(c *Conn) []string { "RESET MASTER", "SET GLOBAL gtid_slave_pos = ''", } - if c.SemiSyncExtensionLoaded() { + semisyncType, _ := c.SemiSyncExtensionLoaded() + if semisyncType == SemiSyncTypeMaster { resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. } return resetCommands diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index f413c8ef1fb..05fbdd25e46 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -154,8 +154,17 @@ func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { "RESET SLAVE ALL", // "ALL" makes it forget source host:port. "RESET MASTER", // This will also clear gtid_executed and gtid_purged. } - if c.SemiSyncExtensionLoaded() { + status, err := c.SemiSyncExtensionLoaded() + if err != nil { + return resetCommands + } + switch status { + case SemiSyncTypeSource: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_source_enabled = false, GLOBAL rpl_semi_sync_replica_enabled = false") // semi-sync will be enabled if needed when replica is started. + case SemiSyncTypeMaster: resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. + default: + // Nothing to do. } return resetCommands } diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 399698d6a2a..ec30301e328 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -138,12 +138,29 @@ func (c *Conn) WriteBinlogEvent(ev BinlogEvent, semiSyncEnabled bool) error { return nil } +type SemiSyncType int8 + +const ( + SemiSyncTypeUnknown SemiSyncType = iota + SemiSyncTypeOff + SemiSyncTypeSource + SemiSyncTypeMaster +) + // SemiSyncExtensionLoaded checks if the semisync extension has been loaded. // It should work for both MariaDB and MySQL. -func (c *Conn) SemiSyncExtensionLoaded() bool { - qr, err := c.ExecuteFetch("SHOW GLOBAL VARIABLES LIKE 'rpl_semi_sync%'", 10, false) +func (c *Conn) SemiSyncExtensionLoaded() (SemiSyncType, error) { + qr, err := c.ExecuteFetch("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", 10, false) if err != nil { - return false + return SemiSyncTypeUnknown, err + } + for _, row := range qr.Rows { + if row[0].ToString() == "rpl_semi_sync_source_enabled" { + return SemiSyncTypeSource, nil + } + if row[0].ToString() == "rpl_semi_sync_master_enabled" { + return SemiSyncTypeMaster, nil + } } - return len(qr.Rows) >= 1 + return SemiSyncTypeOff, nil } diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index ecb04741d7b..a562df4e79f 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -85,10 +85,10 @@ func TestTabletInitialBackup(t *testing.T) { // TabletExternallyReparented err = localCluster.VtctldClientProcess.ExecuteCommand( "SetWritable", primary.Alias, "true") - require.Nil(t, err) + require.NoError(t, err) err = localCluster.VtctldClientProcess.ExecuteCommand( "TabletExternallyReparented", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) restore(t, replica1, "replica", "SERVING") // Run the entire backup test @@ -134,14 +134,14 @@ func firstBackupTest(t *testing.T, tabletType string) { // Store initial backup counts backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert data on primary, wait for replica to get it _, err = primary.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Add a single row with value 'test1' to the primary tablet _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test1')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Check that the specified tablet has the expected number of rows cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 1) @@ -158,7 +158,7 @@ func firstBackupTest(t *testing.T, tabletType string) { // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2) // even though we change the value of compression it won't affect @@ -168,7 +168,7 @@ func firstBackupTest(t *testing.T, tabletType string) { defer func() { mysqlctl.CompressionEngineName = "pgzip" }() // now bring up the other replica, letting it restore from backup. err = localCluster.InitTablet(replica2, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) restore(t, replica2, "replica", "SERVING") // Replica2 takes time to serve. Sleeping for 5 sec. time.Sleep(5 * time.Second) @@ -181,7 +181,7 @@ func firstBackupTest(t *testing.T, tabletType string) { func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader { mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock") - require.Nil(t, err) + require.NoError(t, err) defer os.Remove(mysqlSocket.Name()) // Prepare opentsdb stats file path. @@ -214,7 +214,7 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo log.Infof("starting backup tablet %s", time.Now()) err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...) - require.Nil(t, err) + require.NoError(t, err) f, err := os.OpenFile(statsPath, os.O_RDONLY, 0) require.NoError(t, err) @@ -223,7 +223,7 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string { backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) assert.Equalf(t, expected, len(backups), "invalid number of backups") return backups } @@ -251,7 +251,7 @@ func listBackups(shardKsName string) ([]string, error) { func removeBackups(t *testing.T) { // Remove all the backups from the shard backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) for _, backup := range backups { _, err := localCluster.VtctlProcess.ExecuteCommandWithOutput( "--backup_storage_implementation", "file", @@ -259,7 +259,7 @@ func removeBackups(t *testing.T) { path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), "RemoveBackup", shardKsName, backup, ) - require.Nil(t, err) + require.NoError(t, err) } } @@ -267,18 +267,18 @@ func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) { // Initialize tablets for _, tablet := range []cluster.Vttablet{*primary, *replica1} { err := localCluster.InitTablet(&tablet, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) if startTablet { err = tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } } if initShardPrimary { // choose primary and start replication err := localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) } } @@ -293,7 +293,7 @@ func restore(t *testing.T, tablet *cluster.Vttablet, tabletType string, waitForS tablet.VttabletProcess.ServingStatus = waitForState tablet.VttabletProcess.SupportsBackup = true err := tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) { @@ -302,11 +302,11 @@ func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) // Teardown Tablet err := tablet.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) // Shutdown Mysql err = tablet.MysqlctlProcess.Stop() - require.Nil(t, err) + require.NoError(t, err) // Clear out the previous data tablet.MysqlctlProcess.CleanupFiles(tablet.TabletUID) @@ -315,7 +315,7 @@ func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) // Init the Mysql tablet.MysqlctlProcess.InitDBFile = newInitDBFile err = tablet.MysqlctlProcess.Start() - require.Nil(t, err) + require.NoError(t, err) } } @@ -323,24 +323,36 @@ func tearDown(t *testing.T, initMysql bool) { // reset replication for _, db := range []string{"_vt", "vt_insert_test"} { _, err := primary.VttabletProcess.QueryTablet(fmt.Sprintf("drop database if exists %s", db), keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) require.True(t, caughtUp, "Timed out waiting for all replicas to catch up") promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"} - disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} + + disableSemiSyncCommandsSource := []string{"SET GLOBAL rpl_semi_sync_source_enabled = false", " SET GLOBAL rpl_semi_sync_replica_enabled = false"} + disableSemiSyncCommandsMaster := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} + for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true) - require.Nil(t, err) - err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() + require.NoError(t, err) + + switch semisyncType { + case mysql.SemiSyncTypeSource: + err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsSource, keyspaceName, true) + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsMaster, keyspaceName, true) + require.NoError(t, err) + } } for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { resetTabletDirectory(t, tablet, initMysql) // DeleteTablet on a primary will cause tablet to shutdown, so should only call it after tablet is already shut down err := localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", tablet.Alias) - require.Nil(t, err) + require.NoError(t, err) } } @@ -359,7 +371,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // Check if server supports disable/enable redo log. qr, err := conn.ExecuteFetch("SELECT 1 FROM performance_schema.global_status WHERE variable_name = 'innodb_redo_log_enabled'", 1, false) - require.Nil(t, err) + require.NoError(t, err) // If not, there's nothing to test. if len(qr.Rows) == 0 { return @@ -368,7 +380,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // MY-013600 // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_ib_wrn_redo_disabled qr, err = conn.ExecuteFetch("SELECT 1 FROM performance_schema.error_log WHERE error_code = 'MY-013600'", 1, false) - require.Nil(t, err) + require.NoError(t, err) if len(qr.Rows) != 1 { // Keep trying, possible we haven't disabled yet. continue @@ -377,7 +389,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // MY-013601 // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_ib_wrn_redo_enabled qr, err = conn.ExecuteFetch("SELECT 1 FROM performance_schema.error_log WHERE error_code = 'MY-013601'", 1, false) - require.Nil(t, err) + require.NoError(t, err) if len(qr.Rows) != 1 { // Keep trying, possible we haven't disabled yet. continue diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 14063a8daac..9227ce39516 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -34,9 +34,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" @@ -457,7 +457,7 @@ func primaryBackup(t *testing.T) { localCluster.VerifyBackupCount(t, shardKsName, 0) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) // We'll restore this on the primary later to test restores using a backup timestamp firstBackupTimestamp := time.Now().UTC().Format(mysqlctl.BackupTimestampFormat) @@ -466,18 +466,18 @@ func primaryBackup(t *testing.T) { assert.Contains(t, backups[0], primary.Alias) _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // Verify that we have all the new data -- we should have 2 records now... // And only 1 record after we restore using the first backup timestamp cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) backups = localCluster.VerifyBackupCount(t, shardKsName, 2) assert.Contains(t, backups[1], primary.Alias) @@ -488,33 +488,33 @@ func primaryBackup(t *testing.T) { // data from after the older/first backup err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // Delete the current primary tablet (replica2) so that the original primary tablet (primary) can be restored from the // older/first backup w/o it replicating the subsequent insert done after the first backup was taken err = localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) // Restore the older/first backup -- using the timestamp we saved -- on the original primary tablet (primary) err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", "--backup-timestamp", firstBackupTimestamp, primary.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars()) // Re-init the shard -- making the original primary tablet (primary) primary again -- for subsequent tests err = localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) // Verify that we don't have the record created after the older/first backup cluster.VerifyRowsInTablet(t, primary, keyspaceName, 1) verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } // Test a primary and replica from the same backup. @@ -527,18 +527,18 @@ func primaryReplicaSameBackup(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // now bring up the other replica, letting it restore from backup. restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // check the new replica has the data cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -546,11 +546,11 @@ func primaryReplicaSameBackup(t *testing.T) { // Promote replica2 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data on replica2 (current primary) _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") @@ -564,18 +564,18 @@ func primaryReplicaSameBackup(t *testing.T) { // // Take another backup on the replica. err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) // Insert more data on replica2 (current primary). _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 4) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) restartPrimaryAndReplica(t) } @@ -594,13 +594,13 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // now bring up the other replica, with change in compression engine // this is to verify that restore will read engine name from manifest instead of reading the new values @@ -612,7 +612,7 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { } restoreWaitForBackup(t, "replica", cDetails, false) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // check the new replica has the data cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -620,11 +620,11 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // Promote replica2 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data on replica2 (current primary) _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") @@ -635,18 +635,18 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // Promote replica1 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // Insert more data on replica1 (current primary). _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // wait for replica2 to catch up. cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) // Now take replica2 backup with gzip (new compressor) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica2.VttabletProcess.GetVars()) @@ -654,7 +654,7 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { verifyRestoreTablet(t, replica2, "SERVING") cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) restartPrimaryAndReplica(t) } @@ -686,22 +686,22 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // reparent to replica1 err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data to new primary _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // force the old primary to restore at the latest backup. method(t, primary) @@ -717,13 +717,13 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { func restoreUsingRestart(t *testing.T, tablet *cluster.Vttablet) { err := tablet.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) verifyRestoreTablet(t, tablet, "SERVING") } func restoreInPlace(t *testing.T, tablet *cluster.Vttablet) { err := localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", tablet.Alias) - require.Nil(t, err) + require.NoError(t, err) } func restartPrimaryAndReplica(t *testing.T) { @@ -748,12 +748,12 @@ func restartPrimaryAndReplica(t *testing.T) { } for _, tablet := range []*cluster.Vttablet{primary, replica1} { err := localCluster.InitTablet(tablet, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) err = tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } err := localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) } func stopAllTablets() { @@ -793,23 +793,23 @@ func terminatedRestore(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, replica1.Alias, topodata.TabletType_REPLICA) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // reparent to replica1 err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data to new primary _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) terminateRestore(t) @@ -817,7 +817,7 @@ func terminatedRestore(t *testing.T) { checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) _, err = os.Stat(path.Join(primary.VttabletProcess.Directory, "restore_in_progress")) @@ -835,7 +835,7 @@ func checkTabletType(t *testing.T, alias string, tabletType topodata.TabletType) // for loop for 15 seconds to check if tablet type is correct for i := 0; i < 15; i++ { output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("GetTablet", alias) - require.Nil(t, err) + require.NoError(t, err) var tabletPB topodata.Tablet err = json2.Unmarshal([]byte(output), &tabletPB) require.NoError(t, err) @@ -859,7 +859,7 @@ func doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) { // now backup err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) }() // Perform a graceful reparent operation @@ -874,7 +874,7 @@ func doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) { "--new-primary", replica1.Alias, fmt.Sprintf("%s/%s", keyspaceName, shardName), ) - require.Nil(t, err) + require.NoError(t, err) // check that we reparented checkTabletType(t, replica1.Alias, topodata.TabletType_PRIMARY) @@ -903,47 +903,47 @@ func vtctlBackup(t *testing.T, tabletType string) { // StopReplication on replica1. We verify that the replication works fine later in // verifyInitialReplication. So this will also check that VTOrc is running. err := localCluster.VtctldClientProcess.ExecuteCommand("StopReplication", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyInitialReplication(t) restoreWaitForBackup(t, tabletType, nil, true) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) backups := localCluster.VerifyBackupCount(t, shardKsName, 1) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) err = localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } func InitTestTable(t *testing.T) { _, err := primary.VttabletProcess.QueryTablet("DROP TABLE IF EXISTS vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } // This will create schema in primary, insert some data to primary and verify the same data in replica func verifyInitialReplication(t *testing.T) { InitTestTable(t) _, err := primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test1')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 1) } @@ -968,12 +968,12 @@ func restoreWaitForBackup(t *testing.T, tabletType string, cDetails *Compression replica2.VttabletProcess.ExtraArgs = replicaTabletArgs replica2.VttabletProcess.ServingStatus = "" err := replica2.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } func RemoveBackup(t *testing.T, backupName string) { err := localCluster.VtctldClientProcess.ExecuteCommand("RemoveBackup", shardKsName, backupName) - require.Nil(t, err) + require.NoError(t, err) } func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []string) { @@ -990,10 +990,10 @@ func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) tablet.ValidateTabletRestart(t) tablet.VttabletProcess.ServingStatus = "" err := tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) if status != "" { err = tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{status}, 25*time.Second) - require.Nil(t, err) + require.NoError(t, err) } // We restart replication here because semi-sync will not be set correctly on tablet startup since // we deprecated enable_semi_sync. StartReplication RPC fixes the semi-sync settings by consulting the @@ -1011,12 +1011,24 @@ func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) } func verifySemiSyncStatus(t *testing.T, vttablet *cluster.Vttablet, expectedStatus string) { - status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", keyspaceName) - require.Nil(t, err) - assert.Equal(t, status, expectedStatus) - status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_slave_status", keyspaceName) - require.Nil(t, err) - assert.Equal(t, status, expectedStatus) + semisyncType, err := vttablet.VttabletProcess.SemiSyncExtensionLoaded() + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_replica_status", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + case mysql.SemiSyncTypeMaster: + status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_slave_status", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + } } func terminateBackup(t *testing.T, alias string) { @@ -1037,7 +1049,7 @@ func terminateBackup(t *testing.T, alias string) { reader, _ := tmpProcess.StdoutPipe() err := tmpProcess.Start() - require.Nil(t, err) + require.NoError(t, err) found := false scanner := bufio.NewScanner(reader) @@ -1071,7 +1083,7 @@ func terminateRestore(t *testing.T) { reader, _ := tmpProcess.StdoutPipe() err := tmpProcess.Start() - require.Nil(t, err) + require.NoError(t, err) found := false scanner := bufio.NewScanner(reader) @@ -1095,7 +1107,7 @@ func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backup numBackups := len(waitForNumBackups(t, -1)) err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias) - require.Nil(t, err) + require.NoError(t, err) backups = waitForNumBackups(t, numBackups+1) require.NotEmpty(t, backups) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index bd016b1c1c4..c2f20ec505a 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -457,6 +457,16 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD return executeQuery(conn, query) } +// SemiSyncExtensionLoaded returns what type of semi-sync extension is loaded +func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (mysql.SemiSyncType, error) { + conn, err := vttablet.TabletConn("", false) + if err != nil { + return mysql.SemiSyncTypeUnknown, err + } + defer conn.Close() + return conn.SemiSyncExtensionLoaded() +} + // QueryTabletMultiple lets you execute multiple queries -- without any // results -- against the tablet. func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error { diff --git a/go/test/endtoend/reparent/emergencyreparent/ers_test.go b/go/test/endtoend/reparent/emergencyreparent/ers_test.go index f0cf4f2cd6a..95942b5f16b 100644 --- a/go/test/endtoend/reparent/emergencyreparent/ers_test.go +++ b/go/test/endtoend/reparent/emergencyreparent/ers_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" "vitess.io/vitess/go/vt/log" @@ -301,7 +302,14 @@ func TestPullFromRdonly(t *testing.T) { require.NoError(t, err) // stop semi-sync on the primary so that any transaction now added does not require an ack - utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0]) + semisyncType, err := utils.SemiSyncExtensionLoaded(ctx, tablets[0]) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_source_enabled = false", tablets[0]) + case mysql.SemiSyncTypeMaster: + utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0]) + } // confirm that rdonly is able to replicate from our primary // This will also introduce a new transaction into the rdonly tablet which the other 2 replicas don't have diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index 44cd89a306a..9382a746385 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" ) @@ -131,8 +132,18 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) { utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet) } - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet) - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet) + semisyncType, err := utils.SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_replica", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_source", tablet) + case mysql.SemiSyncTypeMaster: + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet) + default: + require.Fail(t, "Unknown semi sync type") + } } utils.ValidateTopology(t, clusterInstance, true) diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index 07e488b52d0..de10e9921c1 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -26,11 +26,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql/replication" - "google.golang.org/protobuf/encoding/protojson" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" "vitess.io/vitess/go/vt/log" @@ -356,38 +354,38 @@ func TestChangeTypeSemiSync(t *testing.T) { // The flag is only an indication of the value to use next time // we turn replication on, so also check the status. // rdonly1 is not replicating, so its status is off. - utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "ON") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") - utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, replica, true) + utils.CheckSemisyncEnabled(ctx, t, rdonly1, false) + utils.CheckSemisyncEnabled(ctx, t, rdonly2, false) + utils.CheckSemisyncStatus(ctx, t, replica, true) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) + utils.CheckSemisyncStatus(ctx, t, rdonly2, false) // Change replica to rdonly while replicating, should turn off semi-sync, and restart replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "rdonly") require.NoError(t, err) - utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, replica, false) + utils.CheckSemisyncStatus(ctx, t, replica, false) // Change rdonly1 to replica, should turn on semi-sync, and not start replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "replica") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, rdonly1, true) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) utils.CheckReplicaStatus(ctx, t, rdonly1) // Now change from replica back to rdonly, make sure replication is still not enabled. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "rdonly") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, rdonly1, false) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) utils.CheckReplicaStatus(ctx, t, rdonly1) // Change rdonly2 to replica, should turn on semi-sync, and restart replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly2.Alias, "replica") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "ON") + utils.CheckSemisyncEnabled(ctx, t, rdonly2, true) + utils.CheckSemisyncStatus(ctx, t, rdonly2, true) } // TestCrossCellDurability tests 2 things - diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 91fa4c66e3c..39f29c789d7 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -476,9 +476,20 @@ func CheckInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vtta } func CheckSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - require.Equal(t, semiSyncVal, dbVar) + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", "") + require.NoError(t, err) + require.Equal(t, semiSyncVal, dbVar) + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + require.NoError(t, err) + require.Equal(t, semiSyncVal, dbVar) + default: + require.Fail(t, "Unknown semi sync type") + } } // CheckCountOfInsertedValues checks that the number of inserted values matches the given count on the given tablet @@ -679,30 +690,70 @@ func positionAtLeast(t *testing.T, tablet *cluster.Vttablet, a string, b string) return isAtleast } -// CheckDBvar checks the db var -func CheckDBvar(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { +func CheckSemisyncEnabled(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) { tabletParams := getMysqlConnParam(tablet) conn, err := mysql.Connect(ctx, &tabletParams) require.NoError(t, err) defer conn.Close() - qr := execute(t, conn, fmt.Sprintf("show variables like '%s'", variable)) - got := fmt.Sprintf("%v", qr.Rows) - want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status) - assert.Equal(t, want, got) + status := "OFF" + if enabled { + status = "ON" + } + + semisyncType, err := SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + qr := execute(t, conn, "show variables like 'rpl_semi_sync_replica_enabled'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_replica_enabled", status) + assert.Equal(t, want, got) + case mysql.SemiSyncTypeMaster: + qr := execute(t, conn, "show variables like 'rpl_semi_sync_slave_enabled'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_slave_enabled", status) + assert.Equal(t, want, got) + } } -// CheckDBstatus checks the db status -func CheckDBstatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { +func CheckSemisyncStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) { tabletParams := getMysqlConnParam(tablet) conn, err := mysql.Connect(ctx, &tabletParams) require.NoError(t, err) defer conn.Close() - qr := execute(t, conn, fmt.Sprintf("show status like '%s'", variable)) - got := fmt.Sprintf("%v", qr.Rows) - want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status) - assert.Equal(t, want, got) + status := "OFF" + if enabled { + status = "ON" + } + + semisyncType, err := SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + qr := execute(t, conn, "show status like 'Rpl_semi_sync_replica_status'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_replica_status", status) + assert.Equal(t, want, got) + case mysql.SemiSyncTypeMaster: + qr := execute(t, conn, "show status like 'Rpl_semi_sync_slave_status'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_slave_status", status) + assert.Equal(t, want, got) + default: + assert.Fail(t, "unknown semi-sync type") + } +} + +func SemiSyncExtensionLoaded(ctx context.Context, tablet *cluster.Vttablet) (mysql.SemiSyncType, error) { + tabletParams := getMysqlConnParam(tablet) + conn, err := mysql.Connect(ctx, &tabletParams) + if err != nil { + return mysql.SemiSyncTypeUnknown, err + } + defer conn.Close() + return conn.SemiSyncExtensionLoaded() } // SetReplicationSourceFailed returns true if the given output from PRS had failed because the given tablet was diff --git a/go/test/endtoend/utils/mysql_test.go b/go/test/endtoend/utils/mysql_test.go index c8c09a3f979..f5984010924 100644 --- a/go/test/endtoend/utils/mysql_test.go +++ b/go/test/endtoend/utils/mysql_test.go @@ -373,17 +373,17 @@ func TestGetPreviousGTIDs(t *testing.T) { func TestSemiSyncEnabled(t *testing.T) { require.NotNil(t, mysqld) - err := mysqld.SetSemiSyncEnabled(true, false) + err := mysqld.SetSemiSyncEnabled(context.Background(), true, false) assert.NoError(t, err) - p, r := mysqld.SemiSyncEnabled() + p, r := mysqld.SemiSyncEnabled(context.Background()) assert.True(t, p) assert.False(t, r) - err = mysqld.SetSemiSyncEnabled(false, true) + err = mysqld.SetSemiSyncEnabled(context.Background(), false, true) assert.NoError(t, err) - p, r = mysqld.SemiSyncEnabled() + p, r = mysqld.SemiSyncEnabled(context.Background()) assert.False(t, p) assert.True(t, r) } diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 38bc5f34df9..1adc091be59 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/vtorc/utils" "vitess.io/vitess/go/vt/log" @@ -349,12 +350,44 @@ func TestSemiSync(t *testing.T) { // check that the replication is setup correctly utils.CheckReplication(t, newCluster, primary, []*cluster.Vttablet{rdonly, replica1, replica2}, 10*time.Second) - _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", replica1, "") + semisyncType, err := utils.SemiSyncExtensionLoaded(t, replica1) require.NoError(t, err) - _, err = utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 1", rdonly, "") + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_replica_enabled = 0", replica1, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", replica1, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } + + semisyncType, err = utils.SemiSyncExtensionLoaded(t, rdonly) require.NoError(t, err) - _, err = utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_master_enabled = 0", primary, "") + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_replica_enabled = 0", rdonly, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", rdonly, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } + + semisyncType, err = utils.SemiSyncExtensionLoaded(t, primary) require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_source_enabled = 0", primary, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_master_enabled = 0", primary, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } timeout := time.After(20 * time.Second) for { diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 00f75740338..056276594d8 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -896,16 +896,40 @@ func AddSemiSyncKeyspace(t *testing.T, clusterInfo *VTOrcClusterInfo) { // IsSemiSyncSetupCorrectly checks that the semi-sync is setup correctly on the given vttablet func IsSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) bool { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - return semiSyncVal == dbVar + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + default: + assert.Fail(t, "semisync extension not loaded") + return false + } } // IsPrimarySemiSyncSetupCorrectly checks that the priamry side semi-sync is setup correctly on the given vttablet func IsPrimarySemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) bool { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_master_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - return semiSyncVal == dbVar + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_source_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_master_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + default: + assert.Fail(t, "semisync extension not loaded") + return false + } } // WaitForReadOnlyValue waits for the read_only global variable to reach the provided value @@ -1114,3 +1138,16 @@ func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries disabled\n", resp) } + +// SemiSyncExtensionLoaded is used to check which semisync extension is loaded. +func SemiSyncExtensionLoaded(t *testing.T, tablet *cluster.Vttablet) (mysql.SemiSyncType, error) { + // Get Connection + tabletParams := getMysqlConnParam(tablet, "") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + conn, err := mysql.Connect(ctx, &tabletParams) + require.Nil(t, err) + defer conn.Close() + + return conn.SemiSyncExtensionLoaded() +} diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index fbc078c870b..0755d4c93c1 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -404,7 +404,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac superReadOnly := true //nolint readOnly := true //nolint var replicationPosition replication.Position - semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled() + semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled(ctx) // See if we need to restart replication after backup. params.Logger.Infof("getting current replication status") @@ -519,7 +519,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac // the plugin isn't even loaded, and the server variables don't exist. params.Logger.Infof("restoring semi-sync settings from before backup: primary=%v, replica=%v", semiSyncSource, semiSyncReplica) - err := params.Mysqld.SetSemiSyncEnabled(semiSyncSource, semiSyncReplica) + err := params.Mysqld.SetSemiSyncEnabled(ctx, semiSyncSource, semiSyncReplica) if err != nil { return backupResult, err } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e0d9a6c1252..c1be5f7ae24 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -171,9 +172,9 @@ type FakeMysqlDaemon struct { // FetchSuperQueryResults is used by FetchSuperQuery. FetchSuperQueryMap map[string]*sqltypes.Result - // SemiSyncPrimaryEnabled represents the state of rpl_semi_sync_master_enabled. + // SemiSyncPrimaryEnabled represents the state of rpl_semi_sync_source_enabled. SemiSyncPrimaryEnabled bool - // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_slave_enabled. + // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_replica_enabled. SemiSyncReplicaEnabled bool // TimeoutHook is a func that can be called at the beginning of @@ -667,19 +668,19 @@ func (fmd *FakeMysqlDaemon) GetAllPrivsConnection(ctx context.Context) (*dbconnp } // SetSemiSyncEnabled is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(primary, replica bool) error { +func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(ctx context.Context, primary, replica bool) error { fmd.SemiSyncPrimaryEnabled = primary fmd.SemiSyncReplicaEnabled = replica return nil } // SemiSyncEnabled is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (primary, replica bool) { +func (fmd *FakeMysqlDaemon) SemiSyncEnabled(ctx context.Context) (primary, replica bool) { return fmd.SemiSyncPrimaryEnabled, fmd.SemiSyncReplicaEnabled } // SemiSyncStatus is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncStatus() (bool, bool) { +func (fmd *FakeMysqlDaemon) SemiSyncStatus(ctx context.Context) (bool, bool) { // The fake assumes the status worked. if fmd.SemiSyncPrimaryEnabled { return true, false @@ -688,22 +689,22 @@ func (fmd *FakeMysqlDaemon) SemiSyncStatus() (bool, bool) { } // SemiSyncClients is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncClients() uint32 { +func (fmd *FakeMysqlDaemon) SemiSyncClients(ctx context.Context) uint32 { return 0 } // SemiSyncExtensionLoaded is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncExtensionLoaded() (bool, error) { - return true, nil +func (fmd *FakeMysqlDaemon) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + return mysql.SemiSyncTypeSource, nil } // SemiSyncSettings is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncSettings() (timeout uint64, numReplicas uint32) { +func (fmd *FakeMysqlDaemon) SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) { return 10000000, 1 } // SemiSyncReplicationStatus is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus() (bool, error) { +func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus(ctx context.Context) (bool, error) { // The fake assumes the status worked. return fmd.SemiSyncReplicaEnabled, nil } diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index cb9882e7052..e396f128ee0 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -20,6 +20,7 @@ import ( "context" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconnpool" @@ -60,13 +61,13 @@ type MysqlDaemon interface { ReplicationStatus() (replication.ReplicationStatus, error) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) GetGTIDPurged(ctx context.Context) (replication.Position, error) - SetSemiSyncEnabled(source, replica bool) error - SemiSyncEnabled() (source, replica bool) - SemiSyncExtensionLoaded() (bool, error) - SemiSyncStatus() (source, replica bool) - SemiSyncClients() (count uint32) - SemiSyncSettings() (timeout uint64, numReplicas uint32) - SemiSyncReplicationStatus() (bool, error) + SetSemiSyncEnabled(ctx context.Context, source, replica bool) error + SemiSyncEnabled(ctx context.Context) (source, replica bool) + SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) + SemiSyncStatus(ctx context.Context) (source, replica bool) + SemiSyncClients(ctx context.Context) (count uint32) + SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) + SemiSyncReplicationStatus(ctx context.Context) (bool, error) ResetReplicationParameters(ctx context.Context) error GetBinlogInformation(ctx context.Context) (binlogFormat string, logEnabled bool, logReplicaUpdate bool, binlogRowImage string, err error) GetGTIDMode(ctx context.Context) (gtidMode string, err error) diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 4a18f0a9f3a..1ee7c72bf86 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -118,6 +118,8 @@ type Mysqld struct { mutex sync.Mutex onTermFuncs []func() cancelWaitCmd chan struct{} + + semiSyncType mysql.SemiSyncType } func init() { @@ -928,7 +930,13 @@ func (mysqld *Mysqld) getMycnfTemplate() string { log.Infof("this version of Vitess does not include built-in support for %v %v", mysqld.capabilities.flavor, mysqld.capabilities.version) } case 8: - versionConfig = config.MycnfMySQL80 + if mysqld.capabilities.version.Minor >= 4 { + versionConfig = config.MycnfMySQL84 + } else if mysqld.capabilities.version.Minor >= 1 || mysqld.capabilities.version.Patch >= 26 { + versionConfig = config.MycnfMySQL8026 + } else { + versionConfig = config.MycnfMySQL80 + } default: log.Infof("this version of Vitess does not include built-in support for %v %v", mysqld.capabilities.flavor, mysqld.capabilities.version) } diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 8603b172606..47d0a727fed 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -612,9 +612,48 @@ func (mysqld *Mysqld) GetPreviousGTIDs(ctx context.Context, binlog string) (prev return previousGtids, nil } +var ErrNoSemiSync = errors.New("semi-sync plugin not loaded") + +func (mysqld *Mysqld) SemiSyncType(ctx context.Context) mysql.SemiSyncType { + if mysqld.semiSyncType == mysql.SemiSyncTypeUnknown { + mysqld.semiSyncType, _ = mysqld.SemiSyncExtensionLoaded(ctx) + } + return mysqld.semiSyncType +} + +func (mysqld *Mysqld) enableSemiSyncQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SET GLOBAL rpl_semi_sync_source_enabled = %v, GLOBAL rpl_semi_sync_replica_enabled = %v", nil + case mysql.SemiSyncTypeMaster: + return "SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v", nil + } + return "", ErrNoSemiSync +} + +func (mysqld *Mysqld) semiSyncClientsQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SHOW STATUS LIKE 'Rpl_semi_sync_source_clients'", nil + case mysql.SemiSyncTypeMaster: + return "SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'", nil + } + return "", ErrNoSemiSync +} + +func (mysqld *Mysqld) semiSyncReplicationStatusQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", nil + case mysql.SemiSyncTypeMaster: + return "SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", nil + } + return "", ErrNoSemiSync +} + // SetSemiSyncEnabled enables or disables semi-sync replication for // primary and/or replica mode. -func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { +func (mysqld *Mysqld) SetSemiSyncEnabled(ctx context.Context, primary, replica bool) error { log.Infof("Setting semi-sync mode: primary=%v, replica=%v", primary, replica) // Convert bool to int. @@ -626,9 +665,11 @@ func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { s = 1 } - err := mysqld.ExecuteSuperQuery(context.TODO(), fmt.Sprintf( - "SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v", - p, s)) + query, err := mysqld.enableSemiSyncQuery(ctx) + if err != nil { + return err + } + err = mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf(query, p, s)) if err != nil { return fmt.Errorf("can't set semi-sync mode: %v; make sure plugins are loaded in my.cnf", err) } @@ -637,30 +678,46 @@ func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { // SemiSyncEnabled returns whether semi-sync is enabled for primary or replica. // If the semi-sync plugin is not loaded, we assume semi-sync is disabled. -func (mysqld *Mysqld) SemiSyncEnabled() (primary, replica bool) { - vars, err := mysqld.fetchVariables(context.TODO(), "rpl_semi_sync_%_enabled") +func (mysqld *Mysqld) SemiSyncEnabled(ctx context.Context) (primary, replica bool) { + vars, err := mysqld.fetchVariables(ctx, "rpl_semi_sync_%_enabled") if err != nil { return false, false } - primary = vars["rpl_semi_sync_master_enabled"] == "ON" - replica = vars["rpl_semi_sync_slave_enabled"] == "ON" + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + primary = vars["rpl_semi_sync_source_enabled"] == "ON" + replica = vars["rpl_semi_sync_replica_enabled"] == "ON" + case mysql.SemiSyncTypeMaster: + primary = vars["rpl_semi_sync_master_enabled"] == "ON" + replica = vars["rpl_semi_sync_slave_enabled"] == "ON" + } return primary, replica } // SemiSyncStatus returns the current status of semi-sync for primary and replica. -func (mysqld *Mysqld) SemiSyncStatus() (primary, replica bool) { - vars, err := mysqld.fetchStatuses(context.TODO(), "Rpl_semi_sync_%_status") +func (mysqld *Mysqld) SemiSyncStatus(ctx context.Context) (primary, replica bool) { + vars, err := mysqld.fetchStatuses(ctx, "Rpl_semi_sync_%_status") if err != nil { return false, false } - primary = vars["Rpl_semi_sync_master_status"] == "ON" - replica = vars["Rpl_semi_sync_slave_status"] == "ON" + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + primary = vars["Rpl_semi_sync_source_status"] == "ON" + replica = vars["Rpl_semi_sync_replica_status"] == "ON" + case mysql.SemiSyncTypeMaster: + primary = vars["Rpl_semi_sync_master_status"] == "ON" + replica = vars["Rpl_semi_sync_slave_status"] == "ON" + } return primary, replica } // SemiSyncClients returns the number of semi-sync clients for the primary. -func (mysqld *Mysqld) SemiSyncClients() uint32 { - qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'") +func (mysqld *Mysqld) SemiSyncClients(ctx context.Context) uint32 { + query, err := mysqld.semiSyncClientsQuery(ctx) + if err != nil { + return 0 + } + qr, err := mysqld.FetchSuperQuery(ctx, query) if err != nil { return 0 } @@ -673,24 +730,35 @@ func (mysqld *Mysqld) SemiSyncClients() uint32 { } // SemiSyncSettings returns the settings of semi-sync which includes the timeout and the number of replicas to wait for. -func (mysqld *Mysqld) SemiSyncSettings() (timeout uint64, numReplicas uint32) { - vars, err := mysqld.fetchVariables(context.TODO(), "rpl_semi_sync_%") +func (mysqld *Mysqld) SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) { + vars, err := mysqld.fetchVariables(ctx, "rpl_semi_sync_%") if err != nil { return 0, 0 } - timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64) - numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32) + var numReplicasUint uint64 + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_source_timeout"], 10, 64) + numReplicasUint, _ = strconv.ParseUint(vars["rpl_semi_sync_source_wait_for_replica_count"], 10, 32) + case mysql.SemiSyncTypeMaster: + timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64) + numReplicasUint, _ = strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32) + } return timeout, uint32(numReplicasUint) } // SemiSyncReplicationStatus returns whether semi-sync is currently used by replication. -func (mysqld *Mysqld) SemiSyncReplicationStatus() (bool, error) { - qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW STATUS LIKE 'rpl_semi_sync_slave_status'") +func (mysqld *Mysqld) SemiSyncReplicationStatus(ctx context.Context) (bool, error) { + query, err := mysqld.semiSyncReplicationStatusQuery(ctx) + if err != nil { + return false, err + } + qr, err := mysqld.FetchSuperQuery(ctx, query) if err != nil { return false, err } if len(qr.Rows) != 1 { - return false, errors.New("no rpl_semi_sync_slave_status variable in mysql") + return false, errors.New("no rpl_semi_sync_replica_status variable in mysql") } if qr.Rows[0][1].ToString() == "ON" { return true, nil @@ -699,14 +767,12 @@ func (mysqld *Mysqld) SemiSyncReplicationStatus() (bool, error) { } // SemiSyncExtensionLoaded returns whether semi-sync plugins are loaded. -func (mysqld *Mysqld) SemiSyncExtensionLoaded() (bool, error) { - qr, err := mysqld.FetchSuperQuery(context.Background(), "SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'") - if err != nil { - return false, err - } - pluginPresent, err := qr.Rows[0][0].ToBool() - if err != nil { - return false, err +func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + conn, connErr := getPoolReconnect(ctx, mysqld.dbaPool) + if connErr != nil { + return mysql.SemiSyncTypeUnknown, connErr } - return pluginPresent, nil + defer conn.Recycle() + + return conn.Conn.SemiSyncExtensionLoaded() } diff --git a/go/vt/mysqlctl/replication_test.go b/go/vt/mysqlctl/replication_test.go index d117a96ab89..cb84ea8ff04 100644 --- a/go/vt/mysqlctl/replication_test.go +++ b/go/vt/mysqlctl/replication_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -556,16 +557,16 @@ func TestSetSemiSyncEnabled(t *testing.T) { defer testMysqld.Close() // We expect this query to be executed - err := testMysqld.SetSemiSyncEnabled(true, true) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 1") + err := testMysqld.SetSemiSyncEnabled(context.Background(), true, true) + assert.ErrorIs(t, err, ErrNoSemiSync) // We expect this query to be executed - err = testMysqld.SetSemiSyncEnabled(true, false) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 0") + err = testMysqld.SetSemiSyncEnabled(context.Background(), true, false) + assert.ErrorIs(t, err, ErrNoSemiSync) // We expect this query to be executed - err = testMysqld.SetSemiSyncEnabled(false, true) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 0, GLOBAL rpl_semi_sync_slave_enabled = 1") + err = testMysqld.SetSemiSyncEnabled(context.Background(), false, true) + assert.ErrorIs(t, err, ErrNoSemiSync) } func TestSemiSyncEnabled(t *testing.T) { @@ -577,12 +578,12 @@ func TestSemiSyncEnabled(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_master_enabled|OFF", "rpl_semi_sync_slave_enabled|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|OFF", "rpl_semi_sync_replica_enabled|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - p, r := testMysqld.SemiSyncEnabled() + p, r := testMysqld.SemiSyncEnabled(context.Background()) assert.False(t, p) assert.True(t, r) } @@ -596,12 +597,13 @@ func TestSemiSyncStatus(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_%_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "Rpl_semi_sync_master_status|ON", "Rpl_semi_sync_slave_status|OFF")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_%_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "Rpl_semi_sync_source_status|ON", "Rpl_semi_sync_replica_status|OFF")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - p, r := testMysqld.SemiSyncStatus() + p, r := testMysqld.SemiSyncStatus(context.Background()) assert.True(t, p) assert.False(t, r) } @@ -615,12 +617,13 @@ func TestSemiSyncClients(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "val1|12")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_source_clients'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "val1|12")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res := testMysqld.SemiSyncClients() + res := testMysqld.SemiSyncClients(context.Background()) assert.Equal(t, uint32(12), res) } @@ -633,12 +636,13 @@ func TestSemiSyncSettings(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_master_timeout|123", "rpl_semi_sync_master_wait_for_slave_count|80")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_source_timeout|123", "rpl_semi_sync_source_wait_for_replica_count|80")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - timeout, replicas := testMysqld.SemiSyncSettings() + timeout, replicas := testMysqld.SemiSyncSettings(context.Background()) assert.Equal(t, uint64(123), timeout) assert.Equal(t, uint32(80), replicas) } @@ -652,18 +656,19 @@ func TestSemiSyncReplicationStatus(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_replica_status|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res, err := testMysqld.SemiSyncReplicationStatus() + res, err := testMysqld.SemiSyncReplicationStatus(context.Background()) assert.NoError(t, err) assert.True(t, res) - db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|OFF")) + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_replica_status|OFF")) - res, err = testMysqld.SemiSyncReplicationStatus() + res, err = testMysqld.SemiSyncReplicationStatus(context.Background()) assert.NoError(t, err) assert.False(t, res) } @@ -675,20 +680,22 @@ func TestSemiSyncExtensionLoaded(t *testing.T) { params := db.ConnParams() cp := *params dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "1")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res, err := testMysqld.SemiSyncExtensionLoaded() + res, err := testMysqld.SemiSyncExtensionLoaded(ctx) assert.NoError(t, err) - assert.True(t, res) + assert.Contains(t, []mysql.SemiSyncType{mysql.SemiSyncTypeSource, mysql.SemiSyncTypeMaster}, res) - db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "0")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", &sqltypes.Result{}) - res, err = testMysqld.SemiSyncExtensionLoaded() + res, err = testMysqld.SemiSyncExtensionLoaded(ctx) assert.NoError(t, err) - assert.False(t, res) + assert.Equal(t, mysql.SemiSyncTypeOff, res) } diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 16d3513355c..bcf6aadf22e 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/hook" @@ -82,7 +83,7 @@ func (tm *TabletManager) ChangeType(ctx context.Context, tabletType topodatapb.T } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -102,7 +103,7 @@ func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topoda } // Let's see if we need to fix semi-sync acking. - if err := tm.fixSemiSyncAndReplication(tm.Tablet().Type, semiSync); err != nil { + if err := tm.fixSemiSyncAndReplication(ctx, tm.Tablet().Type, semiSync); err != nil { return vterrors.Wrap(err, "fixSemiSyncAndReplication failed, may not ack correctly") } return nil @@ -147,19 +148,20 @@ func (tm *TabletManager) RunHealthCheck(ctx context.Context) { tm.QueryServiceControl.BroadcastHealth() } -func (tm *TabletManager) convertBoolToSemiSyncAction(semiSync bool) (SemiSyncAction, error) { - semiSyncExtensionLoaded, err := tm.MysqlDaemon.SemiSyncExtensionLoaded() +func (tm *TabletManager) convertBoolToSemiSyncAction(ctx context.Context, semiSync bool) (SemiSyncAction, error) { + semiSyncExtensionLoaded, err := tm.MysqlDaemon.SemiSyncExtensionLoaded(ctx) if err != nil { return SemiSyncActionNone, err } - if semiSyncExtensionLoaded { + switch semiSyncExtensionLoaded { + case mysql.SemiSyncTypeSource, mysql.SemiSyncTypeMaster: if semiSync { return SemiSyncActionSet, nil } else { return SemiSyncActionUnset, nil } - } else { + default: if semiSync { return SemiSyncActionNone, vterrors.VT09013() } else { diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index 9c361eac400..de49849ef53 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -136,7 +136,7 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req } isSemiSync := reparentutil.IsReplicaSemiSync(durability, shardPrimary.Tablet, tabletInfo.Tablet) - semiSyncAction, err := tm.convertBoolToSemiSyncAction(isSemiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(bgCtx, isSemiSync) if err != nil { l.Errorf("Failed to convert bool to semisync action, error: %v", err) return diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index ff8cb3a9b57..d202e13e2d9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -134,16 +134,16 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful } // Semi sync settings - "show global variables like 'rpl_semi_sync_%_enabled'" - primarySemiSync, replicaSemiSync := tm.MysqlDaemon.SemiSyncEnabled() + primarySemiSync, replicaSemiSync := tm.MysqlDaemon.SemiSyncEnabled(ctx) // Semi sync status - "show status like 'Rpl_semi_sync_%_status'" - primarySemiSyncStatus, replicaSemiSyncStatus := tm.MysqlDaemon.SemiSyncStatus() + primarySemiSyncStatus, replicaSemiSyncStatus := tm.MysqlDaemon.SemiSyncStatus(ctx) - // Semi sync clients count - "show status like 'semi_sync_primary_clients'" - semiSyncClients := tm.MysqlDaemon.SemiSyncClients() + // Semi sync clients count - "show status like 'semi_sync_source_clients'" + semiSyncClients := tm.MysqlDaemon.SemiSyncClients(ctx) // Semi sync settings - "show status like 'rpl_semi_sync_%' - semiSyncTimeout, semiSyncNumReplicas := tm.MysqlDaemon.SemiSyncSettings() + semiSyncTimeout, semiSyncNumReplicas := tm.MysqlDaemon.SemiSyncSettings(ctx) return &replicationdatapb.FullStatus{ ServerId: serverID, @@ -274,12 +274,12 @@ func (tm *TabletManager) StartReplication(ctx context.Context, semiSync bool) er } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } - if err := tm.fixSemiSync(tm.Tablet().Type, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tm.Tablet().Type, semiSyncAction); err != nil { return err } return tm.MysqlDaemon.StartReplication(tm.hookExtraEnv()) @@ -363,7 +363,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return "", err } @@ -377,7 +377,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string // Enforce semi-sync after changing the tablet type to PRIMARY. Otherwise, the // primary will hang while trying to create the database. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return "", err } @@ -413,7 +413,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -443,7 +443,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab if tt == topodatapb.TabletType_PRIMARY { tt = topodatapb.TabletType_REPLICA } - if err := tm.fixSemiSync(tt, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tt, semiSyncAction); err != nil { return err } @@ -547,15 +547,15 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // Here, we check if the primary side semi sync is enabled or not. If it isn't enabled then we do not need to take any action. // If it is enabled then we should turn it off and revert in case of failure. - if tm.isPrimarySideSemiSyncEnabled() { + if tm.isPrimarySideSemiSyncEnabled(ctx) { // If using semi-sync, we need to disable primary-side. - if err := tm.fixSemiSync(topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { return nil, err } defer func() { if finalErr != nil && revertPartialFailure && wasPrimary { // enable primary-side semi-sync again - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, SemiSyncActionSet); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, SemiSyncActionSet); err != nil { log.Warningf("fixSemiSync(PRIMARY) failed during revert: %v", err) } } @@ -583,13 +583,13 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } // If using semi-sync, we need to enable source-side. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return err } @@ -655,7 +655,7 @@ func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias * } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -720,7 +720,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA if tabletType == topodatapb.TabletType_PRIMARY { tabletType = topodatapb.TabletType_REPLICA } - if err := tm.fixSemiSync(tabletType, semiSync); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSync); err != nil { return err } // Update the primary/source address only if needed. @@ -909,13 +909,13 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return "", err } // If using semi-sync, we need to enable it before going read-write. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return "", err } @@ -934,27 +934,27 @@ func isPrimaryEligible(tabletType topodatapb.TabletType) bool { return false } -func (tm *TabletManager) fixSemiSync(tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { +func (tm *TabletManager) fixSemiSync(ctx context.Context, tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { switch semiSync { case SemiSyncActionNone: return nil case SemiSyncActionSet: // Always enable replica-side since it doesn't hurt to keep it on for a primary. // The primary-side needs to be off for a replica, or else it will get stuck. - return tm.MysqlDaemon.SetSemiSyncEnabled(tabletType == topodatapb.TabletType_PRIMARY, true) + return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, tabletType == topodatapb.TabletType_PRIMARY, true) case SemiSyncActionUnset: - return tm.MysqlDaemon.SetSemiSyncEnabled(false, false) + return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, false, false) default: return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync) } } -func (tm *TabletManager) isPrimarySideSemiSyncEnabled() bool { - semiSyncEnabled, _ := tm.MysqlDaemon.SemiSyncEnabled() +func (tm *TabletManager) isPrimarySideSemiSyncEnabled(ctx context.Context) bool { + semiSyncEnabled, _ := tm.MysqlDaemon.SemiSyncEnabled(ctx) return semiSyncEnabled } -func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { +func (tm *TabletManager) fixSemiSyncAndReplication(ctx context.Context, tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { if semiSync == SemiSyncActionNone { // Semi-sync handling is not required. return nil @@ -967,7 +967,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT return nil } - if err := tm.fixSemiSync(tabletType, semiSync); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSync); err != nil { return vterrors.Wrapf(err, "failed to fixSemiSync(%v)", tabletType) } @@ -986,7 +986,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT // shouldAck := semiSync == SemiSyncActionSet shouldAck := isPrimaryEligible(tabletType) - acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus() + acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus(ctx) if err != nil { return vterrors.Wrap(err, "failed to get SemiSyncReplicationStatus") } diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 5a4d7bcdb16..4b71883974e 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -974,12 +974,12 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t tablet.Type = tabletType - semiSyncAction, err := tm.convertBoolToSemiSyncAction(reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet)) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet)) if err != nil { return nil, err } - if err := tm.fixSemiSync(tabletType, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSyncAction); err != nil { return nil, err } diff --git a/test/templates/cluster_endtoend_test.tpl b/test/templates/cluster_endtoend_test.tpl index ac423251977..35f987559b4 100644 --- a/test/templates/cluster_endtoend_test.tpl +++ b/test/templates/cluster_endtoend_test.tpl @@ -183,7 +183,7 @@ jobs: {{if .LimitResourceUsage}} # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -201,7 +201,7 @@ jobs: {{end}} {{if .EnableBinlogTransactionCompression}} - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF {{end}} diff --git a/test/vtop_example.sh b/test/vtop_example.sh index dfd3719e4da..5ff90a2be7e 100755 --- a/test/vtop_example.sh +++ b/test/vtop_example.sh @@ -36,7 +36,7 @@ unset VTROOT # ensure that the examples can run without VTROOT now. function checkSemiSyncSetup() { for vttablet in $(kubectl get pods --no-headers -o custom-columns=":metadata.name" | grep "vttablet") ; do echo "Checking semi-sync in $vttablet" - kubectl exec "$vttablet" -c mysqld -- mysql -S "/vt/socket/mysql.sock" -u root -e "show variables like 'rpl_semi_sync_slave_enabled'" | grep "OFF" + kubectl exec "$vttablet" -c mysqld -- mysql -S "/vt/socket/mysql.sock" -u root -e "show variables like 'rpl_semi_sync_replica_enabled'" | grep "OFF" if [ $? -ne 0 ]; then echo "Semi Sync setup on $vttablet" exit 1