From a2342d83ce68647005348d7ab72d4f9fa6b4a190 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Thu, 25 Apr 2024 11:40:08 +0200 Subject: [PATCH 1/2] Switch to use semisync source / replica plugins The plugins with the old terminology are deprecated and will be removed in the future. In MySQL 8.4.0 also the old terminology for regular replication is removed. This starts the move to use the new style. Since MySQL 8.0.26 the new semisync plugin is available, so we should start using it from that version on. This means we also need new default MySQL cnf files to set up things correctly. We add here a mysql8026.cnf and also a pre-emptive mysql84.cnf. The latter is needed to start the work in the future for MySQL 8.4.0. The main change here is that the deprecated mysql_native_password plugin needs to be explicitly enabled. Removing our usage of mysql_native_password is another separate significant effort. The main issue there is how we want to deal with certificates etc. which end up being required for replication if you want to use caching_sha2_password which adds a whole layer of complexity we've never had to deal with. Signed-off-by: Dirkjan Bussink --- ...ter_endtoend_ers_prs_newfeatures_heavy.yml | 2 +- .../cluster_endtoend_onlineddl_vrepl.yml | 2 +- ...luster_endtoend_onlineddl_vrepl_stress.yml | 2 +- ..._endtoend_onlineddl_vrepl_stress_suite.yml | 2 +- ...cluster_endtoend_onlineddl_vrepl_suite.yml | 2 +- .../cluster_endtoend_schemadiff_vrepl.yml | 2 +- ...dtoend_vreplication_across_db_versions.yml | 4 +- .../cluster_endtoend_vreplication_basic.yml | 4 +- ...luster_endtoend_vreplication_cellalias.yml | 4 +- ...dtoend_vreplication_foreign_key_stress.yml | 4 +- ...vreplication_migrate_vdiff2_convert_tz.yml | 4 +- ...ter_endtoend_vreplication_multi_tenant.yml | 4 +- ...ion_partial_movetables_and_materialize.yml | 4 +- .../cluster_endtoend_vreplication_v2.yml | 4 +- ..._vtctlbackup_sharded_clustertest_heavy.yml | 2 +- .../cluster_endtoend_vtgate_general_heavy.yml | 2 +- .../cluster_endtoend_vtgate_vindex_heavy.yml | 2 +- config/embed.go | 6 + config/mycnf/mysql8026.cnf | 37 +++++ config/mycnf/mysql84.cnf | 39 +++++ go/cmd/vtcombo/cli/main.go | 11 +- go/mysql/flavor_mariadb.go | 3 +- go/mysql/flavor_mysql.go | 11 +- go/mysql/replication.go | 25 ++- .../backup/vtbackup/backup_only_test.go | 68 ++++---- .../backup/vtctlbackup/backup_utils.go | 152 ++++++++++-------- go/test/endtoend/cluster/vttablet_process.go | 10 ++ .../reparent/emergencyreparent/ers_test.go | 10 +- .../reparent/newfeaturetest/reparent_test.go | 15 +- .../reparent/plannedreparent/reparent_test.go | 32 ++-- go/test/endtoend/reparent/utils/utils.go | 79 +++++++-- go/test/endtoend/utils/mysql_test.go | 8 +- go/test/endtoend/vtorc/general/vtorc_test.go | 39 ++++- go/test/endtoend/vtorc/utils/utils.go | 45 +++++- go/vt/mysqlctl/builtinbackupengine.go | 4 +- go/vt/mysqlctl/fakemysqldaemon.go | 21 +-- go/vt/mysqlctl/mysql_daemon.go | 15 +- go/vt/mysqlctl/mysqld.go | 10 +- go/vt/mysqlctl/replication.go | 126 +++++++++++---- go/vt/mysqlctl/replication_test.go | 55 ++++--- go/vt/vttablet/tabletmanager/rpc_actions.go | 14 +- go/vt/vttablet/tabletmanager/rpc_backup.go | 2 +- .../vttablet/tabletmanager/rpc_replication.go | 56 +++---- go/vt/vttablet/tabletmanager/tm_init.go | 4 +- test/templates/cluster_endtoend_test.tpl | 4 +- test/vtop_example.sh | 2 +- 46 files changed, 660 insertions(+), 293 deletions(-) create mode 100644 config/mycnf/mysql8026.cnf create mode 100644 config/mycnf/mysql84.cnf diff --git a/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml b/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml index 7759e23bac6..ed867793859 100644 --- a/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml +++ b/.github/workflows/cluster_endtoend_ers_prs_newfeatures_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml index 962e377cb43..c87febaa26b 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml index 5060e3ca491..0fc4c116c32 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml index c051b21dace..67daba59719 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_stress_suite.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml b/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml index 028b97b389d..a30def564fd 100644 --- a/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml +++ b/.github/workflows/cluster_endtoend_onlineddl_vrepl_suite.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml b/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml index 2b6dc2c80b4..3c756a7f733 100644 --- a/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml +++ b/.github/workflows/cluster_endtoend_schemadiff_vrepl.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml b/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml index 9565145f985..679b40deb32 100644 --- a/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml +++ b/.github/workflows/cluster_endtoend_vreplication_across_db_versions.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_basic.yml b/.github/workflows/cluster_endtoend_vreplication_basic.yml index 6dfbf5a7453..952a148186b 100644 --- a/.github/workflows/cluster_endtoend_vreplication_basic.yml +++ b/.github/workflows/cluster_endtoend_vreplication_basic.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_cellalias.yml b/.github/workflows/cluster_endtoend_vreplication_cellalias.yml index b3158028a43..415c3e920e4 100644 --- a/.github/workflows/cluster_endtoend_vreplication_cellalias.yml +++ b/.github/workflows/cluster_endtoend_vreplication_cellalias.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml index 891117e4f2d..53d44b16375 100644 --- a/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml +++ b/.github/workflows/cluster_endtoend_vreplication_foreign_key_stress.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml b/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml index adc9cdf3851..12108c3b63e 100644 --- a/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml +++ b/.github/workflows/cluster_endtoend_vreplication_migrate_vdiff2_convert_tz.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml b/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml index fa81b22e903..623af8aa1d3 100644 --- a/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml +++ b/.github/workflows/cluster_endtoend_vreplication_multi_tenant.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml b/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml index bce39eea479..3d71efc0e0e 100644 --- a/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml +++ b/.github/workflows/cluster_endtoend_vreplication_partial_movetables_and_materialize.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vreplication_v2.yml b/.github/workflows/cluster_endtoend_vreplication_v2.yml index b6cddc7a66c..37a881993f2 100644 --- a/.github/workflows/cluster_endtoend_vreplication_v2.yml +++ b/.github/workflows/cluster_endtoend_vreplication_v2.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -153,7 +153,7 @@ jobs: slow-query-log=OFF EOF - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF diff --git a/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml b/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml index 8d7f8e88ec6..6e1086515b5 100644 --- a/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtctlbackup_sharded_clustertest_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml b/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml index a1281c6b85e..1167977e6b0 100644 --- a/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtgate_general_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml b/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml index ee14f5903a1..e921e1c5129 100644 --- a/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml +++ b/.github/workflows/cluster_endtoend_vtgate_vindex_heavy.yml @@ -137,7 +137,7 @@ jobs: # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF diff --git a/config/embed.go b/config/embed.go index b2a9333e6de..6660e749aa6 100644 --- a/config/embed.go +++ b/config/embed.go @@ -16,3 +16,9 @@ var MycnfMySQL57 string //go:embed mycnf/mysql80.cnf var MycnfMySQL80 string + +//go:embed mycnf/mysql8026.cnf +var MycnfMySQL8026 string + +//go:embed mycnf/mysql84.cnf +var MycnfMySQL84 string diff --git a/config/mycnf/mysql8026.cnf b/config/mycnf/mysql8026.cnf new file mode 100644 index 00000000000..c7755be488f --- /dev/null +++ b/config/mycnf/mysql8026.cnf @@ -0,0 +1,37 @@ +# This file is auto-included when MySQL 8.0.26 or later is detected. + +# MySQL 8.0 enables binlog by default with sync_binlog and TABLE info repositories +# It does not enable GTIDs or enforced GTID consistency + +gtid_mode = ON +enforce_gtid_consistency +relay_log_recovery = 1 +binlog_expire_logs_seconds = 259200 + +# disable mysqlx +mysqlx = 0 + +# 8.0 changes the default auth-plugin to caching_sha2_password +default_authentication_plugin = mysql_native_password + +# Semi-sync replication is required for automated unplanned failover +# (when the primary goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# VTTablet will enable semi-sync at the proper time when replication is set up, +# or when a primary is promoted or demoted based on the durability policy configured. +plugin-load = rpl_semi_sync_source=semisync_source.so;rpl_semi_sync_replica=semisync_replica.so + +# MySQL 8.0.26 and later will not load plugins during --initialize +# which makes these options unknown. Prefixing with --loose +# tells the server it's fine if they are not understood. +loose_rpl_semi_sync_source_timeout = 1000000000000000000 +loose_rpl_semi_sync_source_wait_no_replica = 1 + +# In order to protect against any errand GTIDs we will start the mysql instance +# in super-read-only mode. +super-read-only + +# Replication parameters to ensure reparents are fast. +replica_net_timeout = 8 + diff --git a/config/mycnf/mysql84.cnf b/config/mycnf/mysql84.cnf new file mode 100644 index 00000000000..90d7a535602 --- /dev/null +++ b/config/mycnf/mysql84.cnf @@ -0,0 +1,39 @@ +# This file is auto-included when MySQL 8.4.0 or later is detected. + +# MySQL 8.0 enables binlog by default with sync_binlog and TABLE info repositories +# It does not enable GTIDs or enforced GTID consistency + +gtid_mode = ON +enforce_gtid_consistency +relay_log_recovery = 1 +binlog_expire_logs_seconds = 259200 + +# disable mysqlx +mysqlx = 0 + +# 8.4 changes the default auth-plugin to caching_sha2_password and +# disables mysql_native_password by default. +mysql_native_password = ON +default_authentication_plugin = mysql_native_password + +# Semi-sync replication is required for automated unplanned failover +# (when the primary goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# VTTablet will enable semi-sync at the proper time when replication is set up, +# or when a primary is promoted or demoted based on the durability policy configured. +plugin-load = rpl_semi_sync_source=semisync_source.so;rpl_semi_sync_replica=semisync_replica.so + +# MySQL 8.0.26 and later will not load plugins during --initialize +# which makes these options unknown. Prefixing with --loose +# tells the server it's fine if they are not understood. +loose_rpl_semi_sync_source_timeout = 1000000000000000000 +loose_rpl_semi_sync_source_wait_no_replica = 1 + +# In order to protect against any errand GTIDs we will start the mysql instance +# in super-read-only mode. +super-read-only + +# Replication parameters to ensure reparents are fast. +replica_net_timeout = 8 + diff --git a/go/cmd/vtcombo/cli/main.go b/go/cmd/vtcombo/cli/main.go index d18c22ddfbb..e30f809b96b 100644 --- a/go/cmd/vtcombo/cli/main.go +++ b/go/cmd/vtcombo/cli/main.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/dbconfigs" @@ -216,7 +217,7 @@ func run(cmd *cobra.Command, args []string) (err error) { return err } servenv.OnClose(func() { - ctx, cancel := context.WithTimeout(context.Background(), mysqlctl.DefaultShutdownTimeout+10*time.Second) + ctx, cancel := context.WithTimeout(cmd.Context(), mysqlctl.DefaultShutdownTimeout+10*time.Second) defer cancel() mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout) }) @@ -240,7 +241,7 @@ func run(cmd *cobra.Command, args []string) (err error) { if err != nil { // ensure we start mysql in the event we fail here if startMysql { - ctx, cancel := context.WithTimeout(context.Background(), mysqlctl.DefaultShutdownTimeout+10*time.Second) + ctx, cancel := context.WithTimeout(cmd.Context(), mysqlctl.DefaultShutdownTimeout+10*time.Second) defer cancel() mysqld.Shutdown(ctx, cnf, true, mysqlctl.DefaultShutdownTimeout) } @@ -387,11 +388,11 @@ func (mysqld *vtcomboMysqld) StopReplication(hookExtraEnv map[string]string) err } // SetSemiSyncEnabled implements the MysqlDaemon interface -func (mysqld *vtcomboMysqld) SetSemiSyncEnabled(source, replica bool) error { +func (mysqld *vtcomboMysqld) SetSemiSyncEnabled(ctx context.Context, source, replica bool) error { return nil } // SemiSyncExtensionLoaded implements the MysqlDaemon interface -func (mysqld *vtcomboMysqld) SemiSyncExtensionLoaded() (bool, error) { - return true, nil +func (mysqld *vtcomboMysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + return mysql.SemiSyncTypeSource, nil } diff --git a/go/mysql/flavor_mariadb.go b/go/mysql/flavor_mariadb.go index 2f77a71ea00..e9552b6097d 100644 --- a/go/mysql/flavor_mariadb.go +++ b/go/mysql/flavor_mariadb.go @@ -144,7 +144,8 @@ func (mariadbFlavor) resetReplicationCommands(c *Conn) []string { "RESET MASTER", "SET GLOBAL gtid_slave_pos = ''", } - if c.SemiSyncExtensionLoaded() { + semisyncType, _ := c.SemiSyncExtensionLoaded() + if semisyncType == SemiSyncTypeMaster { resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. } return resetCommands diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index f413c8ef1fb..05fbdd25e46 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -154,8 +154,17 @@ func (mysqlFlavor) resetReplicationCommands(c *Conn) []string { "RESET SLAVE ALL", // "ALL" makes it forget source host:port. "RESET MASTER", // This will also clear gtid_executed and gtid_purged. } - if c.SemiSyncExtensionLoaded() { + status, err := c.SemiSyncExtensionLoaded() + if err != nil { + return resetCommands + } + switch status { + case SemiSyncTypeSource: + resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_source_enabled = false, GLOBAL rpl_semi_sync_replica_enabled = false") // semi-sync will be enabled if needed when replica is started. + case SemiSyncTypeMaster: resetCommands = append(resetCommands, "SET GLOBAL rpl_semi_sync_master_enabled = false, GLOBAL rpl_semi_sync_slave_enabled = false") // semi-sync will be enabled if needed when replica is started. + default: + // Nothing to do. } return resetCommands } diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 399698d6a2a..ec30301e328 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -138,12 +138,29 @@ func (c *Conn) WriteBinlogEvent(ev BinlogEvent, semiSyncEnabled bool) error { return nil } +type SemiSyncType int8 + +const ( + SemiSyncTypeUnknown SemiSyncType = iota + SemiSyncTypeOff + SemiSyncTypeSource + SemiSyncTypeMaster +) + // SemiSyncExtensionLoaded checks if the semisync extension has been loaded. // It should work for both MariaDB and MySQL. -func (c *Conn) SemiSyncExtensionLoaded() bool { - qr, err := c.ExecuteFetch("SHOW GLOBAL VARIABLES LIKE 'rpl_semi_sync%'", 10, false) +func (c *Conn) SemiSyncExtensionLoaded() (SemiSyncType, error) { + qr, err := c.ExecuteFetch("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", 10, false) if err != nil { - return false + return SemiSyncTypeUnknown, err + } + for _, row := range qr.Rows { + if row[0].ToString() == "rpl_semi_sync_source_enabled" { + return SemiSyncTypeSource, nil + } + if row[0].ToString() == "rpl_semi_sync_master_enabled" { + return SemiSyncTypeMaster, nil + } } - return len(qr.Rows) >= 1 + return SemiSyncTypeOff, nil } diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index ecb04741d7b..a562df4e79f 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -85,10 +85,10 @@ func TestTabletInitialBackup(t *testing.T) { // TabletExternallyReparented err = localCluster.VtctldClientProcess.ExecuteCommand( "SetWritable", primary.Alias, "true") - require.Nil(t, err) + require.NoError(t, err) err = localCluster.VtctldClientProcess.ExecuteCommand( "TabletExternallyReparented", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) restore(t, replica1, "replica", "SERVING") // Run the entire backup test @@ -134,14 +134,14 @@ func firstBackupTest(t *testing.T, tabletType string) { // Store initial backup counts backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert data on primary, wait for replica to get it _, err = primary.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Add a single row with value 'test1' to the primary tablet _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test1')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Check that the specified tablet has the expected number of rows cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 1) @@ -158,7 +158,7 @@ func firstBackupTest(t *testing.T, tabletType string) { // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2) // even though we change the value of compression it won't affect @@ -168,7 +168,7 @@ func firstBackupTest(t *testing.T, tabletType string) { defer func() { mysqlctl.CompressionEngineName = "pgzip" }() // now bring up the other replica, letting it restore from backup. err = localCluster.InitTablet(replica2, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) restore(t, replica2, "replica", "SERVING") // Replica2 takes time to serve. Sleeping for 5 sec. time.Sleep(5 * time.Second) @@ -181,7 +181,7 @@ func firstBackupTest(t *testing.T, tabletType string) { func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader { mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock") - require.Nil(t, err) + require.NoError(t, err) defer os.Remove(mysqlSocket.Name()) // Prepare opentsdb stats file path. @@ -214,7 +214,7 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo log.Infof("starting backup tablet %s", time.Now()) err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...) - require.Nil(t, err) + require.NoError(t, err) f, err := os.OpenFile(statsPath, os.O_RDONLY, 0) require.NoError(t, err) @@ -223,7 +223,7 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo func verifyBackupCount(t *testing.T, shardKsName string, expected int) []string { backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) assert.Equalf(t, expected, len(backups), "invalid number of backups") return backups } @@ -251,7 +251,7 @@ func listBackups(shardKsName string) ([]string, error) { func removeBackups(t *testing.T) { // Remove all the backups from the shard backups, err := listBackups(shardKsName) - require.Nil(t, err) + require.NoError(t, err) for _, backup := range backups { _, err := localCluster.VtctlProcess.ExecuteCommandWithOutput( "--backup_storage_implementation", "file", @@ -259,7 +259,7 @@ func removeBackups(t *testing.T) { path.Join(os.Getenv("VTDATAROOT"), "tmp", "backupstorage"), "RemoveBackup", shardKsName, backup, ) - require.Nil(t, err) + require.NoError(t, err) } } @@ -267,18 +267,18 @@ func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) { // Initialize tablets for _, tablet := range []cluster.Vttablet{*primary, *replica1} { err := localCluster.InitTablet(&tablet, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) if startTablet { err = tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } } if initShardPrimary { // choose primary and start replication err := localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) } } @@ -293,7 +293,7 @@ func restore(t *testing.T, tablet *cluster.Vttablet, tabletType string, waitForS tablet.VttabletProcess.ServingStatus = waitForState tablet.VttabletProcess.SupportsBackup = true err := tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) { @@ -302,11 +302,11 @@ func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) // Teardown Tablet err := tablet.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) // Shutdown Mysql err = tablet.MysqlctlProcess.Stop() - require.Nil(t, err) + require.NoError(t, err) // Clear out the previous data tablet.MysqlctlProcess.CleanupFiles(tablet.TabletUID) @@ -315,7 +315,7 @@ func resetTabletDirectory(t *testing.T, tablet cluster.Vttablet, initMysql bool) // Init the Mysql tablet.MysqlctlProcess.InitDBFile = newInitDBFile err = tablet.MysqlctlProcess.Start() - require.Nil(t, err) + require.NoError(t, err) } } @@ -323,24 +323,36 @@ func tearDown(t *testing.T, initMysql bool) { // reset replication for _, db := range []string{"_vt", "vt_insert_test"} { _, err := primary.VttabletProcess.QueryTablet(fmt.Sprintf("drop database if exists %s", db), keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2}) require.True(t, caughtUp, "Timed out waiting for all replicas to catch up") promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"} - disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} + + disableSemiSyncCommandsSource := []string{"SET GLOBAL rpl_semi_sync_source_enabled = false", " SET GLOBAL rpl_semi_sync_replica_enabled = false"} + disableSemiSyncCommandsMaster := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"} + for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true) - require.Nil(t, err) - err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() + require.NoError(t, err) + + switch semisyncType { + case mysql.SemiSyncTypeSource: + err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsSource, keyspaceName, true) + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsMaster, keyspaceName, true) + require.NoError(t, err) + } } for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} { resetTabletDirectory(t, tablet, initMysql) // DeleteTablet on a primary will cause tablet to shutdown, so should only call it after tablet is already shut down err := localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", tablet.Alias) - require.Nil(t, err) + require.NoError(t, err) } } @@ -359,7 +371,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // Check if server supports disable/enable redo log. qr, err := conn.ExecuteFetch("SELECT 1 FROM performance_schema.global_status WHERE variable_name = 'innodb_redo_log_enabled'", 1, false) - require.Nil(t, err) + require.NoError(t, err) // If not, there's nothing to test. if len(qr.Rows) == 0 { return @@ -368,7 +380,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // MY-013600 // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_ib_wrn_redo_disabled qr, err = conn.ExecuteFetch("SELECT 1 FROM performance_schema.error_log WHERE error_code = 'MY-013600'", 1, false) - require.Nil(t, err) + require.NoError(t, err) if len(qr.Rows) != 1 { // Keep trying, possible we haven't disabled yet. continue @@ -377,7 +389,7 @@ func verifyDisableEnableRedoLogs(ctx context.Context, t *testing.T, mysqlSocket // MY-013601 // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_ib_wrn_redo_enabled qr, err = conn.ExecuteFetch("SELECT 1 FROM performance_schema.error_log WHERE error_code = 'MY-013601'", 1, false) - require.Nil(t, err) + require.NoError(t, err) if len(qr.Rows) != 1 { // Keep trying, possible we haven't disabled yet. continue diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 14063a8daac..9227ce39516 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -34,9 +34,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" @@ -457,7 +457,7 @@ func primaryBackup(t *testing.T) { localCluster.VerifyBackupCount(t, shardKsName, 0) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) // We'll restore this on the primary later to test restores using a backup timestamp firstBackupTimestamp := time.Now().UTC().Format(mysqlctl.BackupTimestampFormat) @@ -466,18 +466,18 @@ func primaryBackup(t *testing.T) { assert.Contains(t, backups[0], primary.Alias) _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // Verify that we have all the new data -- we should have 2 records now... // And only 1 record after we restore using the first backup timestamp cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", "--allow-primary", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) backups = localCluster.VerifyBackupCount(t, shardKsName, 2) assert.Contains(t, backups[1], primary.Alias) @@ -488,33 +488,33 @@ func primaryBackup(t *testing.T) { // data from after the older/first backup err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // Delete the current primary tablet (replica2) so that the original primary tablet (primary) can be restored from the // older/first backup w/o it replicating the subsequent insert done after the first backup was taken err = localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) // Restore the older/first backup -- using the timestamp we saved -- on the original primary tablet (primary) err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", "--backup-timestamp", firstBackupTimestamp, primary.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars()) // Re-init the shard -- making the original primary tablet (primary) primary again -- for subsequent tests err = localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) // Verify that we don't have the record created after the older/first backup cluster.VerifyRowsInTablet(t, primary, keyspaceName, 1) verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } // Test a primary and replica from the same backup. @@ -527,18 +527,18 @@ func primaryReplicaSameBackup(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // now bring up the other replica, letting it restore from backup. restoreWaitForBackup(t, "replica", nil, true) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // check the new replica has the data cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -546,11 +546,11 @@ func primaryReplicaSameBackup(t *testing.T) { // Promote replica2 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data on replica2 (current primary) _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") @@ -564,18 +564,18 @@ func primaryReplicaSameBackup(t *testing.T) { // // Take another backup on the replica. err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) // Insert more data on replica2 (current primary). _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 4) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) restartPrimaryAndReplica(t) } @@ -594,13 +594,13 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // now bring up the other replica, with change in compression engine // this is to verify that restore will read engine name from manifest instead of reading the new values @@ -612,7 +612,7 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { } restoreWaitForBackup(t, "replica", cDetails, false) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, timeout) - require.Nil(t, err) + require.NoError(t, err) // check the new replica has the data cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) @@ -620,11 +620,11 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // Promote replica2 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica2.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data on replica2 (current primary) _, err = replica2.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // Force replica1 to restore from backup. verifyRestoreTablet(t, replica1, "SERVING") @@ -635,18 +635,18 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { // Promote replica1 to primary err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // Insert more data on replica1 (current primary). _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test4')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // wait for replica2 to catch up. cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) // Now take replica2 backup with gzip (new compressor) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica2.VttabletProcess.GetVars()) @@ -654,7 +654,7 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { verifyRestoreTablet(t, replica2, "SERVING") cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) restartPrimaryAndReplica(t) } @@ -686,22 +686,22 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // reparent to replica1 err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data to new primary _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // force the old primary to restore at the latest backup. method(t, primary) @@ -717,13 +717,13 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { func restoreUsingRestart(t *testing.T, tablet *cluster.Vttablet) { err := tablet.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) verifyRestoreTablet(t, tablet, "SERVING") } func restoreInPlace(t *testing.T, tablet *cluster.Vttablet) { err := localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", tablet.Alias) - require.Nil(t, err) + require.NoError(t, err) } func restartPrimaryAndReplica(t *testing.T) { @@ -748,12 +748,12 @@ func restartPrimaryAndReplica(t *testing.T) { } for _, tablet := range []*cluster.Vttablet{primary, replica1} { err := localCluster.InitTablet(tablet, keyspaceName, shardName) - require.Nil(t, err) + require.NoError(t, err) err = tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } err := localCluster.VtctldClientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) - require.Nil(t, err) + require.NoError(t, err) } func stopAllTablets() { @@ -793,23 +793,23 @@ func terminatedRestore(t *testing.T) { // backup the replica err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, replica1.Alias, topodata.TabletType_REPLICA) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) // reparent to replica1 err = localCluster.VtctldClientProcess.ExecuteCommand("PlannedReparentShard", "--new-primary", replica1.Alias, shardKsName) - require.Nil(t, err) + require.NoError(t, err) // insert more data to new primary _, err = replica1.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) terminateRestore(t) @@ -817,7 +817,7 @@ func terminatedRestore(t *testing.T) { checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) err = localCluster.VtctldClientProcess.ExecuteCommand("RestoreFromBackup", primary.Alias) - require.Nil(t, err) + require.NoError(t, err) checkTabletType(t, primary.Alias, topodata.TabletType_REPLICA) _, err = os.Stat(path.Join(primary.VttabletProcess.Directory, "restore_in_progress")) @@ -835,7 +835,7 @@ func checkTabletType(t *testing.T, alias string, tabletType topodata.TabletType) // for loop for 15 seconds to check if tablet type is correct for i := 0; i < 15; i++ { output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("GetTablet", alias) - require.Nil(t, err) + require.NoError(t, err) var tabletPB topodata.Tablet err = json2.Unmarshal([]byte(output), &tabletPB) require.NoError(t, err) @@ -859,7 +859,7 @@ func doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) { // now backup err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) }() // Perform a graceful reparent operation @@ -874,7 +874,7 @@ func doNotDemoteNewlyPromotedPrimaryIfReparentingDuringBackup(t *testing.T) { "--new-primary", replica1.Alias, fmt.Sprintf("%s/%s", keyspaceName, shardName), ) - require.Nil(t, err) + require.NoError(t, err) // check that we reparented checkTabletType(t, replica1.Alias, topodata.TabletType_PRIMARY) @@ -903,47 +903,47 @@ func vtctlBackup(t *testing.T, tabletType string) { // StopReplication on replica1. We verify that the replication works fine later in // verifyInitialReplication. So this will also check that VTOrc is running. err := localCluster.VtctldClientProcess.ExecuteCommand("StopReplication", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) verifyInitialReplication(t) restoreWaitForBackup(t, tabletType, nil, true) err = localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) + require.NoError(t, err) backups := localCluster.VerifyBackupCount(t, shardKsName, 1) verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2) verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) err = replica2.VttabletProcess.TearDown() - require.Nil(t, err) + require.NoError(t, err) err = localCluster.VtctldClientProcess.ExecuteCommand("DeleteTablets", replica2.Alias) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } func InitTestTable(t *testing.T) { _, err := primary.VttabletProcess.QueryTablet("DROP TABLE IF EXISTS vt_insert_test", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) _, err = primary.VttabletProcess.QueryTablet(vtInsertTest, keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) } // This will create schema in primary, insert some data to primary and verify the same data in replica func verifyInitialReplication(t *testing.T) { InitTestTable(t) _, err := primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test1')", keyspaceName, true) - require.Nil(t, err) + require.NoError(t, err) cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 1) } @@ -968,12 +968,12 @@ func restoreWaitForBackup(t *testing.T, tabletType string, cDetails *Compression replica2.VttabletProcess.ExtraArgs = replicaTabletArgs replica2.VttabletProcess.ServingStatus = "" err := replica2.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) } func RemoveBackup(t *testing.T, backupName string) { err := localCluster.VtctldClientProcess.ExecuteCommand("RemoveBackup", shardKsName, backupName) - require.Nil(t, err) + require.NoError(t, err) } func verifyAfterRemovingBackupNoBackupShouldBePresent(t *testing.T, backups []string) { @@ -990,10 +990,10 @@ func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) tablet.ValidateTabletRestart(t) tablet.VttabletProcess.ServingStatus = "" err := tablet.VttabletProcess.Setup() - require.Nil(t, err) + require.NoError(t, err) if status != "" { err = tablet.VttabletProcess.WaitForTabletStatusesForTimeout([]string{status}, 25*time.Second) - require.Nil(t, err) + require.NoError(t, err) } // We restart replication here because semi-sync will not be set correctly on tablet startup since // we deprecated enable_semi_sync. StartReplication RPC fixes the semi-sync settings by consulting the @@ -1011,12 +1011,24 @@ func verifyRestoreTablet(t *testing.T, tablet *cluster.Vttablet, status string) } func verifySemiSyncStatus(t *testing.T, vttablet *cluster.Vttablet, expectedStatus string) { - status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", keyspaceName) - require.Nil(t, err) - assert.Equal(t, status, expectedStatus) - status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_slave_status", keyspaceName) - require.Nil(t, err) - assert.Equal(t, status, expectedStatus) + semisyncType, err := vttablet.VttabletProcess.SemiSyncExtensionLoaded() + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_replica_status", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + case mysql.SemiSyncTypeMaster: + status, err := vttablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + status, err = vttablet.VttabletProcess.GetDBStatus("rpl_semi_sync_slave_status", keyspaceName) + require.NoError(t, err) + assert.Equal(t, status, expectedStatus) + } } func terminateBackup(t *testing.T, alias string) { @@ -1037,7 +1049,7 @@ func terminateBackup(t *testing.T, alias string) { reader, _ := tmpProcess.StdoutPipe() err := tmpProcess.Start() - require.Nil(t, err) + require.NoError(t, err) found := false scanner := bufio.NewScanner(reader) @@ -1071,7 +1083,7 @@ func terminateRestore(t *testing.T) { reader, _ := tmpProcess.StdoutPipe() err := tmpProcess.Start() - require.Nil(t, err) + require.NoError(t, err) found := false scanner := bufio.NewScanner(reader) @@ -1095,7 +1107,7 @@ func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backup numBackups := len(waitForNumBackups(t, -1)) err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias) - require.Nil(t, err) + require.NoError(t, err) backups = waitForNumBackups(t, numBackups+1) require.NotEmpty(t, backups) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index bd016b1c1c4..c2f20ec505a 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -457,6 +457,16 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD return executeQuery(conn, query) } +// SemiSyncExtensionLoaded returns what type of semi-sync extension is loaded +func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (mysql.SemiSyncType, error) { + conn, err := vttablet.TabletConn("", false) + if err != nil { + return mysql.SemiSyncTypeUnknown, err + } + defer conn.Close() + return conn.SemiSyncExtensionLoaded() +} + // QueryTabletMultiple lets you execute multiple queries -- without any // results -- against the tablet. func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error { diff --git a/go/test/endtoend/reparent/emergencyreparent/ers_test.go b/go/test/endtoend/reparent/emergencyreparent/ers_test.go index f0cf4f2cd6a..95942b5f16b 100644 --- a/go/test/endtoend/reparent/emergencyreparent/ers_test.go +++ b/go/test/endtoend/reparent/emergencyreparent/ers_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" "vitess.io/vitess/go/vt/log" @@ -301,7 +302,14 @@ func TestPullFromRdonly(t *testing.T) { require.NoError(t, err) // stop semi-sync on the primary so that any transaction now added does not require an ack - utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0]) + semisyncType, err := utils.SemiSyncExtensionLoaded(ctx, tablets[0]) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_source_enabled = false", tablets[0]) + case mysql.SemiSyncTypeMaster: + utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0]) + } // confirm that rdonly is able to replicate from our primary // This will also introduce a new transaction into the rdonly tablet which the other 2 replicas don't have diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index 44cd89a306a..9382a746385 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" ) @@ -131,8 +132,18 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) { utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet) } - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet) - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet) + semisyncType, err := utils.SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_replica", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_source", tablet) + case mysql.SemiSyncTypeMaster: + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet) + default: + require.Fail(t, "Unknown semi sync type") + } } utils.ValidateTopology(t, clusterInstance, true) diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index 07e488b52d0..de10e9921c1 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -26,11 +26,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/mysql/replication" - "google.golang.org/protobuf/encoding/protojson" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" "vitess.io/vitess/go/vt/log" @@ -356,38 +354,38 @@ func TestChangeTypeSemiSync(t *testing.T) { // The flag is only an indication of the value to use next time // we turn replication on, so also check the status. // rdonly1 is not replicating, so its status is off. - utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "ON") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") - utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, replica, true) + utils.CheckSemisyncEnabled(ctx, t, rdonly1, false) + utils.CheckSemisyncEnabled(ctx, t, rdonly2, false) + utils.CheckSemisyncStatus(ctx, t, replica, true) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) + utils.CheckSemisyncStatus(ctx, t, rdonly2, false) // Change replica to rdonly while replicating, should turn off semi-sync, and restart replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "rdonly") require.NoError(t, err) - utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, replica, false) + utils.CheckSemisyncStatus(ctx, t, replica, false) // Change rdonly1 to replica, should turn on semi-sync, and not start replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "replica") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, rdonly1, true) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) utils.CheckReplicaStatus(ctx, t, rdonly1) // Now change from replica back to rdonly, make sure replication is still not enabled. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "rdonly") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF") - utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF") + utils.CheckSemisyncEnabled(ctx, t, rdonly1, false) + utils.CheckSemisyncStatus(ctx, t, rdonly1, false) utils.CheckReplicaStatus(ctx, t, rdonly1) // Change rdonly2 to replica, should turn on semi-sync, and restart replication. err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdonly2.Alias, "replica") require.NoError(t, err) - utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "ON") - utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "ON") + utils.CheckSemisyncEnabled(ctx, t, rdonly2, true) + utils.CheckSemisyncStatus(ctx, t, rdonly2, true) } // TestCrossCellDurability tests 2 things - diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 91fa4c66e3c..39f29c789d7 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -476,9 +476,20 @@ func CheckInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vtta } func CheckSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - require.Equal(t, semiSyncVal, dbVar) + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", "") + require.NoError(t, err) + require.Equal(t, semiSyncVal, dbVar) + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + require.NoError(t, err) + require.Equal(t, semiSyncVal, dbVar) + default: + require.Fail(t, "Unknown semi sync type") + } } // CheckCountOfInsertedValues checks that the number of inserted values matches the given count on the given tablet @@ -679,30 +690,70 @@ func positionAtLeast(t *testing.T, tablet *cluster.Vttablet, a string, b string) return isAtleast } -// CheckDBvar checks the db var -func CheckDBvar(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { +func CheckSemisyncEnabled(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) { tabletParams := getMysqlConnParam(tablet) conn, err := mysql.Connect(ctx, &tabletParams) require.NoError(t, err) defer conn.Close() - qr := execute(t, conn, fmt.Sprintf("show variables like '%s'", variable)) - got := fmt.Sprintf("%v", qr.Rows) - want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status) - assert.Equal(t, want, got) + status := "OFF" + if enabled { + status = "ON" + } + + semisyncType, err := SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + qr := execute(t, conn, "show variables like 'rpl_semi_sync_replica_enabled'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_replica_enabled", status) + assert.Equal(t, want, got) + case mysql.SemiSyncTypeMaster: + qr := execute(t, conn, "show variables like 'rpl_semi_sync_slave_enabled'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_slave_enabled", status) + assert.Equal(t, want, got) + } } -// CheckDBstatus checks the db status -func CheckDBstatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { +func CheckSemisyncStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) { tabletParams := getMysqlConnParam(tablet) conn, err := mysql.Connect(ctx, &tabletParams) require.NoError(t, err) defer conn.Close() - qr := execute(t, conn, fmt.Sprintf("show status like '%s'", variable)) - got := fmt.Sprintf("%v", qr.Rows) - want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status) - assert.Equal(t, want, got) + status := "OFF" + if enabled { + status = "ON" + } + + semisyncType, err := SemiSyncExtensionLoaded(ctx, tablet) + require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + qr := execute(t, conn, "show status like 'Rpl_semi_sync_replica_status'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_replica_status", status) + assert.Equal(t, want, got) + case mysql.SemiSyncTypeMaster: + qr := execute(t, conn, "show status like 'Rpl_semi_sync_slave_status'") + got := fmt.Sprintf("%v", qr.Rows) + want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_slave_status", status) + assert.Equal(t, want, got) + default: + assert.Fail(t, "unknown semi-sync type") + } +} + +func SemiSyncExtensionLoaded(ctx context.Context, tablet *cluster.Vttablet) (mysql.SemiSyncType, error) { + tabletParams := getMysqlConnParam(tablet) + conn, err := mysql.Connect(ctx, &tabletParams) + if err != nil { + return mysql.SemiSyncTypeUnknown, err + } + defer conn.Close() + return conn.SemiSyncExtensionLoaded() } // SetReplicationSourceFailed returns true if the given output from PRS had failed because the given tablet was diff --git a/go/test/endtoend/utils/mysql_test.go b/go/test/endtoend/utils/mysql_test.go index c8c09a3f979..f5984010924 100644 --- a/go/test/endtoend/utils/mysql_test.go +++ b/go/test/endtoend/utils/mysql_test.go @@ -373,17 +373,17 @@ func TestGetPreviousGTIDs(t *testing.T) { func TestSemiSyncEnabled(t *testing.T) { require.NotNil(t, mysqld) - err := mysqld.SetSemiSyncEnabled(true, false) + err := mysqld.SetSemiSyncEnabled(context.Background(), true, false) assert.NoError(t, err) - p, r := mysqld.SemiSyncEnabled() + p, r := mysqld.SemiSyncEnabled(context.Background()) assert.True(t, p) assert.False(t, r) - err = mysqld.SetSemiSyncEnabled(false, true) + err = mysqld.SetSemiSyncEnabled(context.Background(), false, true) assert.NoError(t, err) - p, r = mysqld.SemiSyncEnabled() + p, r = mysqld.SemiSyncEnabled(context.Background()) assert.False(t, p) assert.True(t, r) } diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index 38bc5f34df9..1adc091be59 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/vtorc/utils" "vitess.io/vitess/go/vt/log" @@ -349,12 +350,44 @@ func TestSemiSync(t *testing.T) { // check that the replication is setup correctly utils.CheckReplication(t, newCluster, primary, []*cluster.Vttablet{rdonly, replica1, replica2}, 10*time.Second) - _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", replica1, "") + semisyncType, err := utils.SemiSyncExtensionLoaded(t, replica1) require.NoError(t, err) - _, err = utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 1", rdonly, "") + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_replica_enabled = 0", replica1, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", replica1, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } + + semisyncType, err = utils.SemiSyncExtensionLoaded(t, rdonly) require.NoError(t, err) - _, err = utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_master_enabled = 0", primary, "") + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_replica_enabled = 0", rdonly, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_slave_enabled = 0", rdonly, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } + + semisyncType, err = utils.SemiSyncExtensionLoaded(t, primary) require.NoError(t, err) + switch semisyncType { + case mysql.SemiSyncTypeSource: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_source_enabled = 0", primary, "") + require.NoError(t, err) + case mysql.SemiSyncTypeMaster: + _, err := utils.RunSQL(t, "SET GLOBAL rpl_semi_sync_master_enabled = 0", primary, "") + require.NoError(t, err) + default: + require.Fail(t, "unexpected semi-sync type %v", semisyncType) + } timeout := time.After(20 * time.Second) for { diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 00f75740338..056276594d8 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -896,16 +896,40 @@ func AddSemiSyncKeyspace(t *testing.T, clusterInfo *VTOrcClusterInfo) { // IsSemiSyncSetupCorrectly checks that the semi-sync is setup correctly on the given vttablet func IsSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) bool { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - return semiSyncVal == dbVar + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + default: + assert.Fail(t, "semisync extension not loaded") + return false + } } // IsPrimarySemiSyncSetupCorrectly checks that the priamry side semi-sync is setup correctly on the given vttablet func IsPrimarySemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) bool { - dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_master_enabled", "") + semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded() require.NoError(t, err) - return semiSyncVal == dbVar + switch semisyncType { + case mysql.SemiSyncTypeSource: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_source_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + case mysql.SemiSyncTypeMaster: + dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_master_enabled", "") + require.NoError(t, err) + return semiSyncVal == dbVar + default: + assert.Fail(t, "semisync extension not loaded") + return false + } } // WaitForReadOnlyValue waits for the read_only global variable to reach the provided value @@ -1114,3 +1138,16 @@ func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { assert.Equal(t, 200, status) assert.Equal(t, "Global recoveries disabled\n", resp) } + +// SemiSyncExtensionLoaded is used to check which semisync extension is loaded. +func SemiSyncExtensionLoaded(t *testing.T, tablet *cluster.Vttablet) (mysql.SemiSyncType, error) { + // Get Connection + tabletParams := getMysqlConnParam(tablet, "") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + conn, err := mysql.Connect(ctx, &tabletParams) + require.Nil(t, err) + defer conn.Close() + + return conn.SemiSyncExtensionLoaded() +} diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index fbc078c870b..0755d4c93c1 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -404,7 +404,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac superReadOnly := true //nolint readOnly := true //nolint var replicationPosition replication.Position - semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled() + semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled(ctx) // See if we need to restart replication after backup. params.Logger.Infof("getting current replication status") @@ -519,7 +519,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac // the plugin isn't even loaded, and the server variables don't exist. params.Logger.Infof("restoring semi-sync settings from before backup: primary=%v, replica=%v", semiSyncSource, semiSyncReplica) - err := params.Mysqld.SetSemiSyncEnabled(semiSyncSource, semiSyncReplica) + err := params.Mysqld.SetSemiSyncEnabled(ctx, semiSyncSource, semiSyncReplica) if err != nil { return backupResult, err } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index e0d9a6c1252..c1be5f7ae24 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -26,6 +26,7 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -171,9 +172,9 @@ type FakeMysqlDaemon struct { // FetchSuperQueryResults is used by FetchSuperQuery. FetchSuperQueryMap map[string]*sqltypes.Result - // SemiSyncPrimaryEnabled represents the state of rpl_semi_sync_master_enabled. + // SemiSyncPrimaryEnabled represents the state of rpl_semi_sync_source_enabled. SemiSyncPrimaryEnabled bool - // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_slave_enabled. + // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_replica_enabled. SemiSyncReplicaEnabled bool // TimeoutHook is a func that can be called at the beginning of @@ -667,19 +668,19 @@ func (fmd *FakeMysqlDaemon) GetAllPrivsConnection(ctx context.Context) (*dbconnp } // SetSemiSyncEnabled is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(primary, replica bool) error { +func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(ctx context.Context, primary, replica bool) error { fmd.SemiSyncPrimaryEnabled = primary fmd.SemiSyncReplicaEnabled = replica return nil } // SemiSyncEnabled is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (primary, replica bool) { +func (fmd *FakeMysqlDaemon) SemiSyncEnabled(ctx context.Context) (primary, replica bool) { return fmd.SemiSyncPrimaryEnabled, fmd.SemiSyncReplicaEnabled } // SemiSyncStatus is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncStatus() (bool, bool) { +func (fmd *FakeMysqlDaemon) SemiSyncStatus(ctx context.Context) (bool, bool) { // The fake assumes the status worked. if fmd.SemiSyncPrimaryEnabled { return true, false @@ -688,22 +689,22 @@ func (fmd *FakeMysqlDaemon) SemiSyncStatus() (bool, bool) { } // SemiSyncClients is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncClients() uint32 { +func (fmd *FakeMysqlDaemon) SemiSyncClients(ctx context.Context) uint32 { return 0 } // SemiSyncExtensionLoaded is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncExtensionLoaded() (bool, error) { - return true, nil +func (fmd *FakeMysqlDaemon) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + return mysql.SemiSyncTypeSource, nil } // SemiSyncSettings is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncSettings() (timeout uint64, numReplicas uint32) { +func (fmd *FakeMysqlDaemon) SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) { return 10000000, 1 } // SemiSyncReplicationStatus is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus() (bool, error) { +func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus(ctx context.Context) (bool, error) { // The fake assumes the status worked. return fmd.SemiSyncReplicaEnabled, nil } diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index cb9882e7052..e396f128ee0 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -20,6 +20,7 @@ import ( "context" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconnpool" @@ -60,13 +61,13 @@ type MysqlDaemon interface { ReplicationStatus() (replication.ReplicationStatus, error) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) GetGTIDPurged(ctx context.Context) (replication.Position, error) - SetSemiSyncEnabled(source, replica bool) error - SemiSyncEnabled() (source, replica bool) - SemiSyncExtensionLoaded() (bool, error) - SemiSyncStatus() (source, replica bool) - SemiSyncClients() (count uint32) - SemiSyncSettings() (timeout uint64, numReplicas uint32) - SemiSyncReplicationStatus() (bool, error) + SetSemiSyncEnabled(ctx context.Context, source, replica bool) error + SemiSyncEnabled(ctx context.Context) (source, replica bool) + SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) + SemiSyncStatus(ctx context.Context) (source, replica bool) + SemiSyncClients(ctx context.Context) (count uint32) + SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) + SemiSyncReplicationStatus(ctx context.Context) (bool, error) ResetReplicationParameters(ctx context.Context) error GetBinlogInformation(ctx context.Context) (binlogFormat string, logEnabled bool, logReplicaUpdate bool, binlogRowImage string, err error) GetGTIDMode(ctx context.Context) (gtidMode string, err error) diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 4a18f0a9f3a..1ee7c72bf86 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -118,6 +118,8 @@ type Mysqld struct { mutex sync.Mutex onTermFuncs []func() cancelWaitCmd chan struct{} + + semiSyncType mysql.SemiSyncType } func init() { @@ -928,7 +930,13 @@ func (mysqld *Mysqld) getMycnfTemplate() string { log.Infof("this version of Vitess does not include built-in support for %v %v", mysqld.capabilities.flavor, mysqld.capabilities.version) } case 8: - versionConfig = config.MycnfMySQL80 + if mysqld.capabilities.version.Minor >= 4 { + versionConfig = config.MycnfMySQL84 + } else if mysqld.capabilities.version.Minor >= 1 || mysqld.capabilities.version.Patch >= 26 { + versionConfig = config.MycnfMySQL8026 + } else { + versionConfig = config.MycnfMySQL80 + } default: log.Infof("this version of Vitess does not include built-in support for %v %v", mysqld.capabilities.flavor, mysqld.capabilities.version) } diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 8603b172606..47d0a727fed 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -612,9 +612,48 @@ func (mysqld *Mysqld) GetPreviousGTIDs(ctx context.Context, binlog string) (prev return previousGtids, nil } +var ErrNoSemiSync = errors.New("semi-sync plugin not loaded") + +func (mysqld *Mysqld) SemiSyncType(ctx context.Context) mysql.SemiSyncType { + if mysqld.semiSyncType == mysql.SemiSyncTypeUnknown { + mysqld.semiSyncType, _ = mysqld.SemiSyncExtensionLoaded(ctx) + } + return mysqld.semiSyncType +} + +func (mysqld *Mysqld) enableSemiSyncQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SET GLOBAL rpl_semi_sync_source_enabled = %v, GLOBAL rpl_semi_sync_replica_enabled = %v", nil + case mysql.SemiSyncTypeMaster: + return "SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v", nil + } + return "", ErrNoSemiSync +} + +func (mysqld *Mysqld) semiSyncClientsQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SHOW STATUS LIKE 'Rpl_semi_sync_source_clients'", nil + case mysql.SemiSyncTypeMaster: + return "SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'", nil + } + return "", ErrNoSemiSync +} + +func (mysqld *Mysqld) semiSyncReplicationStatusQuery(ctx context.Context) (string, error) { + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + return "SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", nil + case mysql.SemiSyncTypeMaster: + return "SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", nil + } + return "", ErrNoSemiSync +} + // SetSemiSyncEnabled enables or disables semi-sync replication for // primary and/or replica mode. -func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { +func (mysqld *Mysqld) SetSemiSyncEnabled(ctx context.Context, primary, replica bool) error { log.Infof("Setting semi-sync mode: primary=%v, replica=%v", primary, replica) // Convert bool to int. @@ -626,9 +665,11 @@ func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { s = 1 } - err := mysqld.ExecuteSuperQuery(context.TODO(), fmt.Sprintf( - "SET GLOBAL rpl_semi_sync_master_enabled = %v, GLOBAL rpl_semi_sync_slave_enabled = %v", - p, s)) + query, err := mysqld.enableSemiSyncQuery(ctx) + if err != nil { + return err + } + err = mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf(query, p, s)) if err != nil { return fmt.Errorf("can't set semi-sync mode: %v; make sure plugins are loaded in my.cnf", err) } @@ -637,30 +678,46 @@ func (mysqld *Mysqld) SetSemiSyncEnabled(primary, replica bool) error { // SemiSyncEnabled returns whether semi-sync is enabled for primary or replica. // If the semi-sync plugin is not loaded, we assume semi-sync is disabled. -func (mysqld *Mysqld) SemiSyncEnabled() (primary, replica bool) { - vars, err := mysqld.fetchVariables(context.TODO(), "rpl_semi_sync_%_enabled") +func (mysqld *Mysqld) SemiSyncEnabled(ctx context.Context) (primary, replica bool) { + vars, err := mysqld.fetchVariables(ctx, "rpl_semi_sync_%_enabled") if err != nil { return false, false } - primary = vars["rpl_semi_sync_master_enabled"] == "ON" - replica = vars["rpl_semi_sync_slave_enabled"] == "ON" + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + primary = vars["rpl_semi_sync_source_enabled"] == "ON" + replica = vars["rpl_semi_sync_replica_enabled"] == "ON" + case mysql.SemiSyncTypeMaster: + primary = vars["rpl_semi_sync_master_enabled"] == "ON" + replica = vars["rpl_semi_sync_slave_enabled"] == "ON" + } return primary, replica } // SemiSyncStatus returns the current status of semi-sync for primary and replica. -func (mysqld *Mysqld) SemiSyncStatus() (primary, replica bool) { - vars, err := mysqld.fetchStatuses(context.TODO(), "Rpl_semi_sync_%_status") +func (mysqld *Mysqld) SemiSyncStatus(ctx context.Context) (primary, replica bool) { + vars, err := mysqld.fetchStatuses(ctx, "Rpl_semi_sync_%_status") if err != nil { return false, false } - primary = vars["Rpl_semi_sync_master_status"] == "ON" - replica = vars["Rpl_semi_sync_slave_status"] == "ON" + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + primary = vars["Rpl_semi_sync_source_status"] == "ON" + replica = vars["Rpl_semi_sync_replica_status"] == "ON" + case mysql.SemiSyncTypeMaster: + primary = vars["Rpl_semi_sync_master_status"] == "ON" + replica = vars["Rpl_semi_sync_slave_status"] == "ON" + } return primary, replica } // SemiSyncClients returns the number of semi-sync clients for the primary. -func (mysqld *Mysqld) SemiSyncClients() uint32 { - qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'") +func (mysqld *Mysqld) SemiSyncClients(ctx context.Context) uint32 { + query, err := mysqld.semiSyncClientsQuery(ctx) + if err != nil { + return 0 + } + qr, err := mysqld.FetchSuperQuery(ctx, query) if err != nil { return 0 } @@ -673,24 +730,35 @@ func (mysqld *Mysqld) SemiSyncClients() uint32 { } // SemiSyncSettings returns the settings of semi-sync which includes the timeout and the number of replicas to wait for. -func (mysqld *Mysqld) SemiSyncSettings() (timeout uint64, numReplicas uint32) { - vars, err := mysqld.fetchVariables(context.TODO(), "rpl_semi_sync_%") +func (mysqld *Mysqld) SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32) { + vars, err := mysqld.fetchVariables(ctx, "rpl_semi_sync_%") if err != nil { return 0, 0 } - timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64) - numReplicasUint, _ := strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32) + var numReplicasUint uint64 + switch mysqld.SemiSyncType(ctx) { + case mysql.SemiSyncTypeSource: + timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_source_timeout"], 10, 64) + numReplicasUint, _ = strconv.ParseUint(vars["rpl_semi_sync_source_wait_for_replica_count"], 10, 32) + case mysql.SemiSyncTypeMaster: + timeout, _ = strconv.ParseUint(vars["rpl_semi_sync_master_timeout"], 10, 64) + numReplicasUint, _ = strconv.ParseUint(vars["rpl_semi_sync_master_wait_for_slave_count"], 10, 32) + } return timeout, uint32(numReplicasUint) } // SemiSyncReplicationStatus returns whether semi-sync is currently used by replication. -func (mysqld *Mysqld) SemiSyncReplicationStatus() (bool, error) { - qr, err := mysqld.FetchSuperQuery(context.TODO(), "SHOW STATUS LIKE 'rpl_semi_sync_slave_status'") +func (mysqld *Mysqld) SemiSyncReplicationStatus(ctx context.Context) (bool, error) { + query, err := mysqld.semiSyncReplicationStatusQuery(ctx) + if err != nil { + return false, err + } + qr, err := mysqld.FetchSuperQuery(ctx, query) if err != nil { return false, err } if len(qr.Rows) != 1 { - return false, errors.New("no rpl_semi_sync_slave_status variable in mysql") + return false, errors.New("no rpl_semi_sync_replica_status variable in mysql") } if qr.Rows[0][1].ToString() == "ON" { return true, nil @@ -699,14 +767,12 @@ func (mysqld *Mysqld) SemiSyncReplicationStatus() (bool, error) { } // SemiSyncExtensionLoaded returns whether semi-sync plugins are loaded. -func (mysqld *Mysqld) SemiSyncExtensionLoaded() (bool, error) { - qr, err := mysqld.FetchSuperQuery(context.Background(), "SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'") - if err != nil { - return false, err - } - pluginPresent, err := qr.Rows[0][0].ToBool() - if err != nil { - return false, err +func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSyncType, error) { + conn, connErr := getPoolReconnect(ctx, mysqld.dbaPool) + if connErr != nil { + return mysql.SemiSyncTypeUnknown, connErr } - return pluginPresent, nil + defer conn.Recycle() + + return conn.Conn.SemiSyncExtensionLoaded() } diff --git a/go/vt/mysqlctl/replication_test.go b/go/vt/mysqlctl/replication_test.go index d117a96ab89..fdf6b99f6d9 100644 --- a/go/vt/mysqlctl/replication_test.go +++ b/go/vt/mysqlctl/replication_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -556,16 +557,16 @@ func TestSetSemiSyncEnabled(t *testing.T) { defer testMysqld.Close() // We expect this query to be executed - err := testMysqld.SetSemiSyncEnabled(true, true) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 1") + err := testMysqld.SetSemiSyncEnabled(context.Background(), true, true) + assert.ErrorContains(t, err, "semisync plugin not loaded") // We expect this query to be executed - err = testMysqld.SetSemiSyncEnabled(true, false) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 1, GLOBAL rpl_semi_sync_slave_enabled = 0") + err = testMysqld.SetSemiSyncEnabled(context.Background(), true, false) + assert.ErrorContains(t, err, "semisync plugin not loaded") // We expect this query to be executed - err = testMysqld.SetSemiSyncEnabled(false, true) - assert.ErrorContains(t, err, "SET GLOBAL rpl_semi_sync_master_enabled = 0, GLOBAL rpl_semi_sync_slave_enabled = 1") + err = testMysqld.SetSemiSyncEnabled(context.Background(), false, true) + assert.ErrorContains(t, err, "semisync plugin not loaded") } func TestSemiSyncEnabled(t *testing.T) { @@ -577,12 +578,12 @@ func TestSemiSyncEnabled(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_master_enabled|OFF", "rpl_semi_sync_slave_enabled|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|OFF", "rpl_semi_sync_replica_enabled|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - p, r := testMysqld.SemiSyncEnabled() + p, r := testMysqld.SemiSyncEnabled(context.Background()) assert.False(t, p) assert.True(t, r) } @@ -596,12 +597,13 @@ func TestSemiSyncStatus(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_%_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "Rpl_semi_sync_master_status|ON", "Rpl_semi_sync_slave_status|OFF")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_%_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "Rpl_semi_sync_source_status|ON", "Rpl_semi_sync_replica_status|OFF")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - p, r := testMysqld.SemiSyncStatus() + p, r := testMysqld.SemiSyncStatus(context.Background()) assert.True(t, p) assert.False(t, r) } @@ -615,12 +617,13 @@ func TestSemiSyncClients(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_master_clients'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "val1|12")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'Rpl_semi_sync_source_clients'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "val1|12")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res := testMysqld.SemiSyncClients() + res := testMysqld.SemiSyncClients(context.Background()) assert.Equal(t, uint32(12), res) } @@ -633,12 +636,13 @@ func TestSemiSyncSettings(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_master_timeout|123", "rpl_semi_sync_master_wait_for_slave_count|80")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_source_timeout|123", "rpl_semi_sync_source_wait_for_replica_count|80")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - timeout, replicas := testMysqld.SemiSyncSettings() + timeout, replicas := testMysqld.SemiSyncSettings(context.Background()) assert.Equal(t, uint64(123), timeout) assert.Equal(t, uint32(80), replicas) } @@ -652,18 +656,19 @@ func TestSemiSyncReplicationStatus(t *testing.T) { dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|ON")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_replica_status|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res, err := testMysqld.SemiSyncReplicationStatus() + res, err := testMysqld.SemiSyncReplicationStatus(context.Background()) assert.NoError(t, err) assert.True(t, res) - db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_slave_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_slave_status|OFF")) + db.AddQuery("SHOW STATUS LIKE 'rpl_semi_sync_replica_status'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|uint64"), "rpl_semi_sync_replica_status|OFF")) - res, err = testMysqld.SemiSyncReplicationStatus() + res, err = testMysqld.SemiSyncReplicationStatus(context.Background()) assert.NoError(t, err) assert.False(t, res) } @@ -675,20 +680,22 @@ func TestSemiSyncExtensionLoaded(t *testing.T) { params := db.ConnParams() cp := *params dbc := dbconfigs.NewTestDBConfigs(cp, cp, "fakesqldb") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() db.AddQuery("SELECT 1", &sqltypes.Result{}) - db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "1")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1|field2", "varchar|varchar"), "rpl_semi_sync_source_enabled|ON", "rpl_semi_sync_replica_enabled|ON")) testMysqld := NewMysqld(dbc) defer testMysqld.Close() - res, err := testMysqld.SemiSyncExtensionLoaded() + res, err := testMysqld.SemiSyncExtensionLoaded(ctx) assert.NoError(t, err) - assert.True(t, res) + assert.Contains(t, []mysql.SemiSyncType{mysql.SemiSyncTypeSource, mysql.SemiSyncTypeMaster}, res) - db.AddQuery("SELECT COUNT(*) > 0 AS plugin_loaded FROM information_schema.plugins WHERE plugin_name LIKE 'rpl_semi_sync%'", sqltypes.MakeTestResult(sqltypes.MakeTestFields("field1", "int64"), "0")) + db.AddQuery("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", &sqltypes.Result{}) - res, err = testMysqld.SemiSyncExtensionLoaded() + res, err = testMysqld.SemiSyncExtensionLoaded(ctx) assert.NoError(t, err) - assert.False(t, res) + assert.Equal(t, mysql.SemiSyncTypeOff, res) } diff --git a/go/vt/vttablet/tabletmanager/rpc_actions.go b/go/vt/vttablet/tabletmanager/rpc_actions.go index 16d3513355c..bcf6aadf22e 100644 --- a/go/vt/vttablet/tabletmanager/rpc_actions.go +++ b/go/vt/vttablet/tabletmanager/rpc_actions.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/hook" @@ -82,7 +83,7 @@ func (tm *TabletManager) ChangeType(ctx context.Context, tabletType topodatapb.T } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -102,7 +103,7 @@ func (tm *TabletManager) changeTypeLocked(ctx context.Context, tabletType topoda } // Let's see if we need to fix semi-sync acking. - if err := tm.fixSemiSyncAndReplication(tm.Tablet().Type, semiSync); err != nil { + if err := tm.fixSemiSyncAndReplication(ctx, tm.Tablet().Type, semiSync); err != nil { return vterrors.Wrap(err, "fixSemiSyncAndReplication failed, may not ack correctly") } return nil @@ -147,19 +148,20 @@ func (tm *TabletManager) RunHealthCheck(ctx context.Context) { tm.QueryServiceControl.BroadcastHealth() } -func (tm *TabletManager) convertBoolToSemiSyncAction(semiSync bool) (SemiSyncAction, error) { - semiSyncExtensionLoaded, err := tm.MysqlDaemon.SemiSyncExtensionLoaded() +func (tm *TabletManager) convertBoolToSemiSyncAction(ctx context.Context, semiSync bool) (SemiSyncAction, error) { + semiSyncExtensionLoaded, err := tm.MysqlDaemon.SemiSyncExtensionLoaded(ctx) if err != nil { return SemiSyncActionNone, err } - if semiSyncExtensionLoaded { + switch semiSyncExtensionLoaded { + case mysql.SemiSyncTypeSource, mysql.SemiSyncTypeMaster: if semiSync { return SemiSyncActionSet, nil } else { return SemiSyncActionUnset, nil } - } else { + default: if semiSync { return SemiSyncActionNone, vterrors.VT09013() } else { diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go index 9c361eac400..de49849ef53 100644 --- a/go/vt/vttablet/tabletmanager/rpc_backup.go +++ b/go/vt/vttablet/tabletmanager/rpc_backup.go @@ -136,7 +136,7 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req } isSemiSync := reparentutil.IsReplicaSemiSync(durability, shardPrimary.Tablet, tabletInfo.Tablet) - semiSyncAction, err := tm.convertBoolToSemiSyncAction(isSemiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(bgCtx, isSemiSync) if err != nil { l.Errorf("Failed to convert bool to semisync action, error: %v", err) return diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index ff8cb3a9b57..d202e13e2d9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -134,16 +134,16 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful } // Semi sync settings - "show global variables like 'rpl_semi_sync_%_enabled'" - primarySemiSync, replicaSemiSync := tm.MysqlDaemon.SemiSyncEnabled() + primarySemiSync, replicaSemiSync := tm.MysqlDaemon.SemiSyncEnabled(ctx) // Semi sync status - "show status like 'Rpl_semi_sync_%_status'" - primarySemiSyncStatus, replicaSemiSyncStatus := tm.MysqlDaemon.SemiSyncStatus() + primarySemiSyncStatus, replicaSemiSyncStatus := tm.MysqlDaemon.SemiSyncStatus(ctx) - // Semi sync clients count - "show status like 'semi_sync_primary_clients'" - semiSyncClients := tm.MysqlDaemon.SemiSyncClients() + // Semi sync clients count - "show status like 'semi_sync_source_clients'" + semiSyncClients := tm.MysqlDaemon.SemiSyncClients(ctx) // Semi sync settings - "show status like 'rpl_semi_sync_%' - semiSyncTimeout, semiSyncNumReplicas := tm.MysqlDaemon.SemiSyncSettings() + semiSyncTimeout, semiSyncNumReplicas := tm.MysqlDaemon.SemiSyncSettings(ctx) return &replicationdatapb.FullStatus{ ServerId: serverID, @@ -274,12 +274,12 @@ func (tm *TabletManager) StartReplication(ctx context.Context, semiSync bool) er } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } - if err := tm.fixSemiSync(tm.Tablet().Type, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tm.Tablet().Type, semiSyncAction); err != nil { return err } return tm.MysqlDaemon.StartReplication(tm.hookExtraEnv()) @@ -363,7 +363,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return "", err } @@ -377,7 +377,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string // Enforce semi-sync after changing the tablet type to PRIMARY. Otherwise, the // primary will hang while trying to create the database. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return "", err } @@ -413,7 +413,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -443,7 +443,7 @@ func (tm *TabletManager) InitReplica(ctx context.Context, parent *topodatapb.Tab if tt == topodatapb.TabletType_PRIMARY { tt = topodatapb.TabletType_REPLICA } - if err := tm.fixSemiSync(tt, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tt, semiSyncAction); err != nil { return err } @@ -547,15 +547,15 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // Here, we check if the primary side semi sync is enabled or not. If it isn't enabled then we do not need to take any action. // If it is enabled then we should turn it off and revert in case of failure. - if tm.isPrimarySideSemiSyncEnabled() { + if tm.isPrimarySideSemiSyncEnabled(ctx) { // If using semi-sync, we need to disable primary-side. - if err := tm.fixSemiSync(topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_REPLICA, SemiSyncActionSet); err != nil { return nil, err } defer func() { if finalErr != nil && revertPartialFailure && wasPrimary { // enable primary-side semi-sync again - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, SemiSyncActionSet); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, SemiSyncActionSet); err != nil { log.Warningf("fixSemiSync(PRIMARY) failed during revert: %v", err) } } @@ -583,13 +583,13 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } // If using semi-sync, we need to enable source-side. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return err } @@ -655,7 +655,7 @@ func (tm *TabletManager) SetReplicationSource(ctx context.Context, parentAlias * } defer tm.unlock() - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return err } @@ -720,7 +720,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA if tabletType == topodatapb.TabletType_PRIMARY { tabletType = topodatapb.TabletType_REPLICA } - if err := tm.fixSemiSync(tabletType, semiSync); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSync); err != nil { return err } // Update the primary/source address only if needed. @@ -909,13 +909,13 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str return "", err } - semiSyncAction, err := tm.convertBoolToSemiSyncAction(semiSync) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, semiSync) if err != nil { return "", err } // If using semi-sync, we need to enable it before going read-write. - if err := tm.fixSemiSync(topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil { return "", err } @@ -934,27 +934,27 @@ func isPrimaryEligible(tabletType topodatapb.TabletType) bool { return false } -func (tm *TabletManager) fixSemiSync(tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { +func (tm *TabletManager) fixSemiSync(ctx context.Context, tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { switch semiSync { case SemiSyncActionNone: return nil case SemiSyncActionSet: // Always enable replica-side since it doesn't hurt to keep it on for a primary. // The primary-side needs to be off for a replica, or else it will get stuck. - return tm.MysqlDaemon.SetSemiSyncEnabled(tabletType == topodatapb.TabletType_PRIMARY, true) + return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, tabletType == topodatapb.TabletType_PRIMARY, true) case SemiSyncActionUnset: - return tm.MysqlDaemon.SetSemiSyncEnabled(false, false) + return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, false, false) default: return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync) } } -func (tm *TabletManager) isPrimarySideSemiSyncEnabled() bool { - semiSyncEnabled, _ := tm.MysqlDaemon.SemiSyncEnabled() +func (tm *TabletManager) isPrimarySideSemiSyncEnabled(ctx context.Context) bool { + semiSyncEnabled, _ := tm.MysqlDaemon.SemiSyncEnabled(ctx) return semiSyncEnabled } -func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { +func (tm *TabletManager) fixSemiSyncAndReplication(ctx context.Context, tabletType topodatapb.TabletType, semiSync SemiSyncAction) error { if semiSync == SemiSyncActionNone { // Semi-sync handling is not required. return nil @@ -967,7 +967,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT return nil } - if err := tm.fixSemiSync(tabletType, semiSync); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSync); err != nil { return vterrors.Wrapf(err, "failed to fixSemiSync(%v)", tabletType) } @@ -986,7 +986,7 @@ func (tm *TabletManager) fixSemiSyncAndReplication(tabletType topodatapb.TabletT // shouldAck := semiSync == SemiSyncActionSet shouldAck := isPrimaryEligible(tabletType) - acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus() + acking, err := tm.MysqlDaemon.SemiSyncReplicationStatus(ctx) if err != nil { return vterrors.Wrap(err, "failed to get SemiSyncReplicationStatus") } diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 5a4d7bcdb16..4b71883974e 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -974,12 +974,12 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t tablet.Type = tabletType - semiSyncAction, err := tm.convertBoolToSemiSyncAction(reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet)) + semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet)) if err != nil { return nil, err } - if err := tm.fixSemiSync(tabletType, semiSyncAction); err != nil { + if err := tm.fixSemiSync(ctx, tabletType, semiSyncAction); err != nil { return nil, err } diff --git a/test/templates/cluster_endtoend_test.tpl b/test/templates/cluster_endtoend_test.tpl index ac423251977..35f987559b4 100644 --- a/test/templates/cluster_endtoend_test.tpl +++ b/test/templates/cluster_endtoend_test.tpl @@ -183,7 +183,7 @@ jobs: {{if .LimitResourceUsage}} # Increase our open file descriptor limit as we could hit this ulimit -n 65536 - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf innodb_buffer_pool_dump_at_shutdown=OFF innodb_buffer_pool_in_core_file=OFF innodb_buffer_pool_load_at_startup=OFF @@ -201,7 +201,7 @@ jobs: {{end}} {{if .EnableBinlogTransactionCompression}} - cat <<-EOF>>./config/mycnf/mysql80.cnf + cat <<-EOF>>./config/mycnf/mysql8026.cnf binlog-transaction-compression=ON EOF {{end}} diff --git a/test/vtop_example.sh b/test/vtop_example.sh index dfd3719e4da..5ff90a2be7e 100755 --- a/test/vtop_example.sh +++ b/test/vtop_example.sh @@ -36,7 +36,7 @@ unset VTROOT # ensure that the examples can run without VTROOT now. function checkSemiSyncSetup() { for vttablet in $(kubectl get pods --no-headers -o custom-columns=":metadata.name" | grep "vttablet") ; do echo "Checking semi-sync in $vttablet" - kubectl exec "$vttablet" -c mysqld -- mysql -S "/vt/socket/mysql.sock" -u root -e "show variables like 'rpl_semi_sync_slave_enabled'" | grep "OFF" + kubectl exec "$vttablet" -c mysqld -- mysql -S "/vt/socket/mysql.sock" -u root -e "show variables like 'rpl_semi_sync_replica_enabled'" | grep "OFF" if [ $? -ne 0 ]; then echo "Semi Sync setup on $vttablet" exit 1 From 500a31fcf84db39a67b5c20a28a65bc1c665923f Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Fri, 26 Apr 2024 08:16:29 +0200 Subject: [PATCH 2/2] Fix test assertions Signed-off-by: Dirkjan Bussink --- go/vt/mysqlctl/replication_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/mysqlctl/replication_test.go b/go/vt/mysqlctl/replication_test.go index fdf6b99f6d9..cb84ea8ff04 100644 --- a/go/vt/mysqlctl/replication_test.go +++ b/go/vt/mysqlctl/replication_test.go @@ -558,15 +558,15 @@ func TestSetSemiSyncEnabled(t *testing.T) { // We expect this query to be executed err := testMysqld.SetSemiSyncEnabled(context.Background(), true, true) - assert.ErrorContains(t, err, "semisync plugin not loaded") + assert.ErrorIs(t, err, ErrNoSemiSync) // We expect this query to be executed err = testMysqld.SetSemiSyncEnabled(context.Background(), true, false) - assert.ErrorContains(t, err, "semisync plugin not loaded") + assert.ErrorIs(t, err, ErrNoSemiSync) // We expect this query to be executed err = testMysqld.SetSemiSyncEnabled(context.Background(), false, true) - assert.ErrorContains(t, err, "semisync plugin not loaded") + assert.ErrorIs(t, err, ErrNoSemiSync) } func TestSemiSyncEnabled(t *testing.T) {