From 8960bc38e6e0e180c7f56f3ba9232258d8a6f24a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 14 Feb 2024 11:44:59 +0200 Subject: [PATCH] `ExecuteFetch`: error on multiple result sets (#14949) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Signed-off-by: Dirkjan Bussink Signed-off-by: Manan Gupta Signed-off-by: Harshit Gangal Signed-off-by: Vicent Marti Co-authored-by: Dirkjan Bussink Co-authored-by: Manan Gupta Co-authored-by: Harshit Gangal Co-authored-by: Vicent Marti --- go/mysql/endtoend/client_test.go | 13 +++++- go/mysql/query.go | 44 ++++++++++++++++++- go/test/endtoend/cluster/vttablet_process.go | 30 +++++++++++++ .../endtoend/mysqlserver/mysql_server_test.go | 10 ++--- .../reparent/newfeaturetest/reparent_test.go | 4 +- .../reparent/plannedreparent/reparent_test.go | 6 +-- go/test/endtoend/reparent/utils/utils.go | 12 +++-- .../buffer/buffer_test_helpers.go | 10 ++--- .../buffer/reparent/failover_buffer_test.go | 25 +++++++---- .../tabletmanager/tablegc/tablegc_test.go | 9 ++-- go/test/endtoend/tabletmanager/tablet_test.go | 2 +- go/test/endtoend/utils/mysql.go | 12 ++++- go/test/endtoend/utils/utils.go | 9 ++++ .../endtoend/vreplication/time_zone_test.go | 2 +- go/test/endtoend/vreplication/vstream_test.go | 2 +- .../vtgate/errors_as_warnings/main_test.go | 2 +- .../endtoend/vtgate/queries/kill/main_test.go | 3 +- .../endtoend/vtgate/unsharded/main_test.go | 30 ++++++------- .../primaryfailure/primary_failure_test.go | 26 +++++------ go/test/endtoend/vtorc/utils/utils.go | 31 ++++++------- go/vt/mysqlctl/mysqld.go | 2 +- go/vt/vttablet/endtoend/call_test.go | 5 ++- go/vt/vttablet/tabletmanager/tm_init_test.go | 8 ++-- .../vreplication/vplayer_flaky_test.go | 2 +- go/vt/vttablet/tabletserver/query_executor.go | 5 +++ .../vstreamer/uvstreamer_flaky_test.go | 22 +++++----- 26 files changed, 225 insertions(+), 101 deletions(-) diff --git a/go/mysql/endtoend/client_test.go b/go/mysql/endtoend/client_test.go index 6591c454e8a..ce01c57369d 100644 --- a/go/mysql/endtoend/client_test.go +++ b/go/mysql/endtoend/client_test.go @@ -210,7 +210,11 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) { assert.EqualValues(t, 1, result.RowsAffected, "insert into returned RowsAffected") } - qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ' updated'); select * from a; select count(*) from a", 300, true) + // Verify that a ExecuteFetchMultiDrain leaves the connection/packet in valid state. + err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 1'); select * from a; select count(*) from a") + expectNoError(t, err) + // If the previous command leaves packet in invalid state, this will fail. + qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ', fetch multi'); select * from a; select count(*) from a", 300, true) expectNoError(t, err) expectFlag(t, "ExecuteMultiFetch(multi result)", more, true) assert.EqualValues(t, 255, qr.RowsAffected) @@ -225,6 +229,13 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) { expectFlag(t, "ReadQueryResult(2)", more, false) assert.EqualValues(t, 1, len(qr.Rows), "ReadQueryResult(1)") + // Verify that a ExecuteFetchMultiDrain is happy to operate again after all the above. + err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 2'); select * from a; select count(*) from a") + expectNoError(t, err) + + err = conn.ExecuteFetchMultiDrain("update b set name = concat(name, ' nonexistent table'); select * from a; select count(*) from a") + require.Error(t, err) + _, err = conn.ExecuteFetch("drop table a", 10, true) require.NoError(t, err) } diff --git a/go/mysql/query.go b/go/mysql/query.go index 7aab6ee13f0..22299e5cc80 100644 --- a/go/mysql/query.go +++ b/go/mysql/query.go @@ -17,6 +17,7 @@ limitations under the License. package mysql import ( + "errors" "fmt" "math" "strconv" @@ -34,6 +35,17 @@ import ( // This file contains the methods related to queries. +var ( + ErrExecuteFetchMultipleResults = vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected multiple results. Use ExecuteFetchMulti instead.") +) + +const ( + // Use as `maxrows` in `ExecuteFetch` and related functions, to indicate no rows should be fetched. + // This is different than specifying `0`, because `0` means "expect zero results", while this means + // "do not attempt to read any results into memory". + FETCH_NO_ROWS = math.MinInt +) + // // Client side methods. // @@ -303,10 +315,35 @@ func (c *Conn) parseRow(data []byte, fields []*querypb.Field, reader func([]byte // 2. if the server closes the connection when a command is in flight, // readComQueryResponse will fail, and we'll return CRServerLost(2013). func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) { - result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields) + result, more, err := c.ExecuteFetchMulti(query, maxrows, wantfields) + if more { + // Multiple results are unexpected. Prioritize this "unexpected" error over whatever error we got from the first result. + err = errors.Join(ErrExecuteFetchMultipleResults, err) + } + // draining to make the connection clean. + err = c.drainMoreResults(more, err) return result, err } +// ExecuteFetchMultiDrain is for executing multiple statements in one call, but without +// caring for any results. The function returns an error if any of the statements fail. +// The function drains the query results of all statements, even if there's an error. +func (c *Conn) ExecuteFetchMultiDrain(query string) (err error) { + _, more, err := c.ExecuteFetchMulti(query, FETCH_NO_ROWS, false) + return c.drainMoreResults(more, err) +} + +// drainMoreResults ensures to drain all query results, even if there's an error. +// We collect all errors until we consume all results. +func (c *Conn) drainMoreResults(more bool, err error) error { + for more { + var moreErr error + _, more, _, moreErr = c.ReadQueryResult(FETCH_NO_ROWS, false) + err = errors.Join(err, moreErr) + } + return err +} + // ExecuteFetchMulti is for fetching multiple results from a multi-statement result. // It returns an additional 'more' flag. If it is set, you must fetch the additional // results using ReadQueryResult. @@ -460,6 +497,11 @@ func (c *Conn) ReadQueryResult(maxrows int, wantfields bool) (*sqltypes.Result, return nil, false, 0, ParseErrorPacket(data) } + if maxrows == FETCH_NO_ROWS { + c.recycleReadPacket() + continue + } + // Check we're not over the limit before we add more. if len(result.Rows) == maxrows { c.recycleReadPacket() diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index f92382d5f2d..c98ed37afc0 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -512,6 +512,16 @@ func (vttablet *VttabletProcess) QueryTabletWithDB(query string, dbname string) return executeQuery(conn, query) } +// MultiQueryTabletWithDB lets you execute multiple queries on a specific DB in this tablet. +func (vttablet *VttabletProcess) MultiQueryTabletWithDB(query string, dbname string) error { + conn, err := vttablet.defaultConn(dbname) + if err != nil { + return err + } + defer conn.Close() + return executeMultiQuery(conn, query) +} + // executeQuery will retry the query up to 10 times with a small sleep in between each try. // This allows the tests to be more robust in the face of transient failures. func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) { @@ -536,6 +546,26 @@ func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) { return result, err } +// executeMultiQuery will retry the given multi query up to 10 times with a small sleep in between each try. +// This allows the tests to be more robust in the face of transient failures. +func executeMultiQuery(dbConn *mysql.Conn, query string) (err error) { + retries := 10 + retryDelay := 1 * time.Second + for i := 0; i < retries; i++ { + if i > 0 { + // We only audit from 2nd attempt and onwards, otherwise this is just too verbose. + log.Infof("Executing query %s (attempt %d of %d)", query, (i + 1), retries) + } + err = dbConn.ExecuteFetchMultiDrain(query) + if err == nil { + break + } + time.Sleep(retryDelay) + } + + return err +} + // GetDBVar returns first matching database variable's value func (vttablet *VttabletProcess) GetDBVar(varName string, ksName string) (string, error) { return vttablet.getDBSystemValues("variables", varName, ksName) diff --git a/go/test/endtoend/mysqlserver/mysql_server_test.go b/go/test/endtoend/mysqlserver/mysql_server_test.go index caed342688d..6b691582c66 100644 --- a/go/test/endtoend/mysqlserver/mysql_server_test.go +++ b/go/test/endtoend/mysqlserver/mysql_server_test.go @@ -116,7 +116,7 @@ func TestTimeout(t *testing.T) { require.Nilf(t, err, "unable to connect mysql: %v", err) defer conn.Close() - _, err = conn.ExecuteFetch("SELECT SLEEP(5);", 1, false) + _, err = conn.ExecuteFetch("SELECT SLEEP(5)", 1, false) require.NotNilf(t, err, "quiry timeout error expected") mysqlErr, ok := err.(*sqlerror.SQLError) require.Truef(t, ok, "invalid error type") @@ -132,7 +132,7 @@ func TestInvalidField(t *testing.T) { require.Nilf(t, err, "unable to connect mysql: %v", err) defer conn.Close() - _, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test;", 1, false) + _, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test", 1, false) require.NotNil(t, err, "invalid field error expected") mysqlErr, ok := err.(*sqlerror.SQLError) require.Truef(t, ok, "invalid error type") @@ -153,7 +153,7 @@ func TestWarnings(t *testing.T) { require.NoError(t, err) assert.Empty(t, qr.Rows, "number of rows") - qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) + qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false) require.NoError(t, err, "SHOW WARNINGS") assert.EqualValues(t, 1, len(qr.Rows), "number of rows") assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows) @@ -164,7 +164,7 @@ func TestWarnings(t *testing.T) { _, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false) require.NoError(t, err) - qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) + qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false) require.NoError(t, err) assert.Empty(t, qr.Rows) @@ -175,7 +175,7 @@ func TestWarnings(t *testing.T) { _, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false) require.NoError(t, err) - qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false) + qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false) require.NoError(t, err) assert.Empty(t, qr.Rows) } diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index d5f37dc8604..b570509f1a7 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -131,8 +131,8 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) { utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet) } - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave;", tablet) - utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master;", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet) + utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet) } utils.ValidateTopology(t, clusterInstance, true) diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go index 014570d8439..6aa5972b928 100644 --- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go +++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go @@ -104,7 +104,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) { utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]}) // assert that there is indeed only 1 row in tablets[1 - res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test;`, tablets[1]) + res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test`, tablets[1]) assert.Equal(t, 1, len(res.Rows)) // Perform a graceful reparent operation @@ -217,8 +217,8 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus if !downPrimary { // commands to stop the current primary - demoteCommands := "SET GLOBAL read_only = ON; FLUSH TABLES WITH READ LOCK; UNLOCK TABLES" - utils.RunSQL(ctx, t, demoteCommands, tablets[0]) + demoteCommands := []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"} + utils.RunSQLs(ctx, t, demoteCommands, tablets[0]) //Get the position of the old primary and wait for the new one to catch up. err := utils.WaitForReplicationPosition(t, tablets[0], tablets[1]) diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 0f48f2b3fa8..675648dcf37 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -258,10 +258,16 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams { return connParams } -// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet +// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet. All commands are +// run in a single connection. func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) { + tabletParams := getMysqlConnParam(tablet) + conn, err := mysql.Connect(ctx, &tabletParams) + require.Nil(t, err) + defer conn.Close() + for _, sql := range sqls { - result := RunSQL(ctx, t, sql, tablet) + result := execute(t, conn, sql) results = append(results, result) } return results @@ -704,7 +710,7 @@ func SetReplicationSourceFailed(tablet *cluster.Vttablet, prsOut string) bool { // CheckReplicationStatus checks that the replication for sql and io threads is setup as expected func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sqlThreadRunning bool, ioThreadRunning bool) { - res := RunSQL(ctx, t, "show slave status;", tablet) + res := RunSQL(ctx, t, "show slave status", tablet) if ioThreadRunning { require.Equal(t, "Yes", res.Rows[0][10].ToString()) } else { diff --git a/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go b/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go index 979f33a11be..96cbab6fcd8 100644 --- a/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go +++ b/go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go @@ -71,7 +71,7 @@ const ( type threadParams struct { quit bool rpcs int // Number of queries successfully executed. - errors int // Number of failed queries. + errors []error // Errors returned by the queries. waitForNotification chan bool // Channel used to notify the main thread that this thread executed notifyLock sync.Mutex // notifyLock guards the two fields notifyAfterNSuccessfulRpcs/rpcsSoFar. notifyAfterNSuccessfulRpcs int // If 0, notifications are disabled @@ -96,14 +96,14 @@ func (c *threadParams) threadRun(wg *sync.WaitGroup, vtParams *mysql.ConnParams) if c.reservedConn { _, err = conn.ExecuteFetch("set default_week_format = 1", 1000, true) if err != nil { - c.errors++ + c.errors = append(c.errors, err) log.Errorf("error setting default_week_format: %v", err) } } for !c.quit { err = c.executeFunction(c, conn) if err != nil { - c.errors++ + c.errors = append(c.errors, err) log.Errorf("error executing function %s: %v", c.typ, err) } c.rpcs++ @@ -343,8 +343,8 @@ func (bt *BufferingTest) Test(t *testing.T) { updateThreadInstance.stop() // Both threads must not see any error - assert.Zero(t, readThreadInstance.errors, "found errors in read queries") - assert.Zero(t, updateThreadInstance.errors, "found errors in tx queries") + assert.Empty(t, readThreadInstance.errors, "found errors in read queries") + assert.Empty(t, updateThreadInstance.errors, "found errors in tx queries") //At least one thread should have been buffered. //This may fail if a failover is too fast. Add retries then. diff --git a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go index d3828eb8166..2be57120050 100644 --- a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go @@ -29,9 +29,9 @@ import ( "vitess.io/vitess/go/vt/log" ) -const ( - demoteQuery = "SET GLOBAL read_only = ON;FLUSH TABLES WITH READ LOCK;UNLOCK TABLES;" - promoteQuery = "STOP SLAVE;RESET SLAVE ALL;SET GLOBAL read_only = OFF;" +var ( + demoteQueries = []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"} + promoteQueries = []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"} hostname = "localhost" ) @@ -48,7 +48,8 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro replica := clusterInstance.Keyspaces[0].Shards[0].Vttablets[1] oldPrimary := primary newPrimary := replica - primary.VttabletProcess.QueryTablet(demoteQuery, keyspaceUnshardedName, true) + err := primary.VttabletProcess.QueryTabletMultiple(demoteQueries, keyspaceUnshardedName, true) + require.NoError(t, err) // Wait for replica to catch up to primary. cluster.WaitForReplicationPos(t, primary, replica, false, time.Minute) @@ -62,7 +63,8 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro } // Promote replica to new primary. - replica.VttabletProcess.QueryTablet(promoteQuery, keyspaceUnshardedName, true) + err = replica.VttabletProcess.QueryTabletMultiple(promoteQueries, keyspaceUnshardedName, true) + require.NoError(t, err) // Configure old primary to replicate from new primary. @@ -70,11 +72,18 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro // Use 'localhost' as hostname because Travis CI worker hostnames // are too long for MySQL replication. - changeSourceCommands := fmt.Sprintf("RESET SLAVE;SET GLOBAL gtid_slave_pos = '%s';CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d ,MASTER_USER='vt_repl', MASTER_USE_GTID = slave_pos;START SLAVE;", gtID, "localhost", newPrimary.MySQLPort) - oldPrimary.VttabletProcess.QueryTablet(changeSourceCommands, keyspaceUnshardedName, true) + changeSourceCommands := []string{ + "STOP SLAVE", + "RESET MASTER", + fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID), + fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort), + "START SLAVE", + } + err = oldPrimary.VttabletProcess.QueryTabletMultiple(changeSourceCommands, keyspaceUnshardedName, true) + require.NoError(t, err) // Notify the new vttablet primary about the reparent. - err := clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newPrimary.Alias) + err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newPrimary.Alias) require.NoError(t, err) } diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go index cfe961c4558..028d9788e5e 100644 --- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go +++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go @@ -52,9 +52,9 @@ var ( ) Engine=InnoDB; ` sqlCreateView = ` - create or replace view v1 as select * from t1; + create or replace view v1 as select * from t1 ` - sqlSchema = sqlCreateTable + sqlCreateView + sqlSchema = []string{sqlCreateTable, sqlCreateView} vSchema = ` { @@ -110,7 +110,7 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, - SchemaSQL: sqlSchema, + SchemaSQL: strings.Join(sqlSchema, ";"), VSchema: vSchema, } @@ -147,8 +147,9 @@ func checkTableRows(t *testing.T, tableName string, expect int64) { } func populateTable(t *testing.T) { - _, err := primaryTablet.VttabletProcess.QueryTablet(sqlSchema, keyspaceName, true) + err := primaryTablet.VttabletProcess.QueryTabletMultiple(sqlSchema, keyspaceName, true) require.NoError(t, err) + _, err = primaryTablet.VttabletProcess.QueryTablet("delete from t1", keyspaceName, true) require.NoError(t, err) _, err = primaryTablet.VttabletProcess.QueryTablet("insert into t1 (id, value) values (null, md5(rand()))", keyspaceName, true) diff --git a/go/test/endtoend/tabletmanager/tablet_test.go b/go/test/endtoend/tabletmanager/tablet_test.go index 4fe5a70d125..6212b4a418b 100644 --- a/go/test/endtoend/tabletmanager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/tablet_test.go @@ -82,7 +82,7 @@ func TestResetReplicationParameters(t *testing.T) { require.NoError(t, err) // Set a replication source on the tablet and start replication - _, err = tablet.VttabletProcess.QueryTablet("stop slave;change master to master_host = 'localhost', master_port = 123;start slave;", keyspaceName, false) + err = tablet.VttabletProcess.QueryTabletMultiple([]string{"stop slave", "change master to master_host = 'localhost', master_port = 123", "start slave"}, keyspaceName, false) require.NoError(t, err) // Check the replica status. diff --git a/go/test/endtoend/utils/mysql.go b/go/test/endtoend/utils/mysql.go index 1e770b87516..41a70e2dfa4 100644 --- a/go/test/endtoend/utils/mysql.go +++ b/go/test/endtoend/utils/mysql.go @@ -45,7 +45,17 @@ const mysqlShutdownTimeout = 1 * time.Minute // The mysql.ConnParams to connect to the new database is returned, along with a function to // teardown the database. func NewMySQL(cluster *cluster.LocalProcessCluster, dbName string, schemaSQL ...string) (mysql.ConnParams, func(), error) { - mysqlParam, _, closer, error := NewMySQLWithMysqld(cluster.GetAndReservePort(), cluster.Hostname, dbName, schemaSQL...) + // Even though we receive schemaSQL as a variadic argument, we ensure to further split it into singular statements. + parser := sqlparser.NewTestParser() + var sqls []string + for _, sql := range schemaSQL { + split, err := parser.SplitStatementToPieces(sql) + if err != nil { + return mysql.ConnParams{}, nil, err + } + sqls = append(sqls, split...) + } + mysqlParam, _, closer, error := NewMySQLWithMysqld(cluster.GetAndReservePort(), cluster.Hostname, dbName, sqls...) return mysqlParam, closer, error } diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index b6bf207decb..d9e94911e30 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -154,6 +154,15 @@ func Exec(t testing.TB, conn *mysql.Conn, query string) *sqltypes.Result { return qr } +// ExecMulti executes the given (potential multi) queries using the given connection. +// The test fails if any of the queries produces an error +func ExecMulti(t testing.TB, conn *mysql.Conn, query string) error { + t.Helper() + err := conn.ExecuteFetchMultiDrain(query) + require.NoError(t, err, "for query: "+query) + return err +} + // ExecCompareMySQL executes the given query against both Vitess and MySQL and compares // the two result set. If there is a mismatch, the difference will be printed and the // test will fail. If the query produces an error in either Vitess or MySQL, the test diff --git a/go/test/endtoend/vreplication/time_zone_test.go b/go/test/endtoend/vreplication/time_zone_test.go index ff334c593fe..2c0a9a4f5a5 100644 --- a/go/test/endtoend/vreplication/time_zone_test.go +++ b/go/test/endtoend/vreplication/time_zone_test.go @@ -56,7 +56,7 @@ func TestMoveTablesTZ(t *testing.T) { // it seems to take some time for the mysql server to load time zone info after the tables in mysql db have been populated loadTimeZoneInfo := func(tab *cluster.VttabletProcess, sql, timezone string) { - _, err := tab.QueryTabletWithDB(timeZoneSQL, "mysql") + err := tab.MultiQueryTabletWithDB(timeZoneSQL, "mysql") require.NoError(t, err) timer := time.NewTimer(1 * time.Minute) for { diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 8b21cf6fb60..dee8243d5e9 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -219,7 +219,7 @@ func insertRow(keyspace, table string, id int) { if vtgateConn == nil { return } - vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false) + vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", keyspace), 1000, false) vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { diff --git a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go index 24cade5b550..71f4a2353f7 100644 --- a/go/test/endtoend/vtgate/errors_as_warnings/main_test.go +++ b/go/test/endtoend/vtgate/errors_as_warnings/main_test.go @@ -153,7 +153,7 @@ func TestScatterErrsAsWarns(t *testing.T) { assertContainsOneOf(t, mode.conn, showQ, expectedWarnings...) // invalid_field should throw error and not warning - _, err = mode.conn.ExecuteFetch("SELECT /*vt+ PLANNER=Gen4 SCATTER_ERRORS_AS_WARNINGS */ invalid_field from t1;", 1, false) + _, err = mode.conn.ExecuteFetch("SELECT /*vt+ PLANNER=Gen4 SCATTER_ERRORS_AS_WARNINGS */ invalid_field from t1", 1, false) require.Error(t, err) serr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError) require.Equal(t, sqlerror.ERBadFieldError, serr.Number(), serr.Error()) diff --git a/go/test/endtoend/vtgate/queries/kill/main_test.go b/go/test/endtoend/vtgate/queries/kill/main_test.go index 836603c91ee..cc74be24336 100644 --- a/go/test/endtoend/vtgate/queries/kill/main_test.go +++ b/go/test/endtoend/vtgate/queries/kill/main_test.go @@ -134,7 +134,8 @@ func dropData(t *testing.T) { defer conn.Close() utils.Exec(t, conn, "drop table if exists test") - utils.Exec(t, conn, schema) + utils.Exec(t, conn, "drop table if exists test_idx") + utils.ExecMulti(t, conn, schema) } func getRandomString(size int) string { diff --git a/go/test/endtoend/vtgate/unsharded/main_test.go b/go/test/endtoend/vtgate/unsharded/main_test.go index 7405a7dd87f..461a3c73b35 100644 --- a/go/test/endtoend/vtgate/unsharded/main_test.go +++ b/go/test/endtoend/vtgate/unsharded/main_test.go @@ -97,53 +97,53 @@ CREATE TABLE allDefaults ( } ` - createProcSQL = `use vt_customer; + createProcSQL = []string{` CREATE PROCEDURE sp_insert() BEGIN insert into allDefaults () values (); END; - +`, ` CREATE PROCEDURE sp_delete() BEGIN delete from allDefaults; END; - +`, ` CREATE PROCEDURE sp_multi_dml() BEGIN insert into allDefaults () values (); delete from allDefaults; END; - +`, ` CREATE PROCEDURE sp_variable() BEGIN insert into allDefaults () values (); SELECT min(id) INTO @myvar FROM allDefaults; DELETE FROM allDefaults WHERE id = @myvar; END; - +`, ` CREATE PROCEDURE sp_select() BEGIN SELECT * FROM allDefaults; END; - +`, ` CREATE PROCEDURE sp_all() BEGIN insert into allDefaults () values (); select * from allDefaults; delete from allDefaults; END; - +`, ` CREATE PROCEDURE in_parameter(IN val int) BEGIN insert into allDefaults(id) values(val); END; - +`, ` CREATE PROCEDURE out_parameter(OUT val int) BEGIN insert into allDefaults(id) values (128); select 128 into val from dual; END; -` +`} ) var enableSettingsPool bool @@ -196,7 +196,7 @@ func runAllTests(m *testing.M) int { } primaryTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess - if _, err := primaryTablet.QueryTablet(createProcSQL, KeyspaceName, false); err != nil { + if err := primaryTablet.QueryTabletMultiple(createProcSQL, KeyspaceName, true); err != nil { log.Fatal(err.Error()) return 1 } @@ -332,13 +332,11 @@ func TestCallProcedure(t *testing.T) { utils.AssertMatches(t, conn, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("'CALL' not supported in sharded mode")]]`) - _, err = conn.ExecuteFetch(`CALL sp_select()`, 1000, true) - require.Error(t, err) - require.Contains(t, err.Error(), "Multi-Resultset not supported in stored procedure") + err = conn.ExecuteFetchMultiDrain(`CALL sp_select()`) + require.ErrorContains(t, err, "Multi-Resultset not supported in stored procedure") - _, err = conn.ExecuteFetch(`CALL sp_all()`, 1000, true) - require.Error(t, err) - require.Contains(t, err.Error(), "Multi-Resultset not supported in stored procedure") + err = conn.ExecuteFetchMultiDrain(`CALL sp_all()`) + require.ErrorContains(t, err, "Multi-Resultset not supported in stored procedure") qr = utils.Exec(t, conn, `CALL sp_delete()`) require.GreaterOrEqual(t, 1, int(qr.RowsAffected)) diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 180f367d7fb..e226e8d13ae 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -438,9 +438,8 @@ func TestLostRdonlyOnPrimaryFailure(t *testing.T) { // check that replication is setup correctly utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{rdonly, aheadRdonly, replica}, 15*time.Second) - // revoke super privileges from vtorc on replica and rdonly so that it is unable to repair the replication - utils.ChangePrivileges(t, `REVOKE SUPER ON *.* FROM 'orc_client_user'@'%'`, replica, "orc_client_user") - utils.ChangePrivileges(t, `REVOKE SUPER ON *.* FROM 'orc_client_user'@'%'`, rdonly, "orc_client_user") + // disable recoveries on vtorc so that it is unable to repair the replication + utils.DisableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // stop replication on the replica and rdonly. err := clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replica.Alias) @@ -467,9 +466,8 @@ func TestLostRdonlyOnPrimaryFailure(t *testing.T) { utils.PermanentlyRemoveVttablet(clusterInfo, curPrimary) }() - // grant super privileges back to vtorc on replica and rdonly so that it can repair - utils.ChangePrivileges(t, `GRANT SUPER ON *.* TO 'orc_client_user'@'%'`, replica, "orc_client_user") - utils.ChangePrivileges(t, `GRANT SUPER ON *.* TO 'orc_client_user'@'%'`, rdonly, "orc_client_user") + // enable recoveries back on vtorc so that it can repair + utils.EnableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // vtorc must promote the lagging replica and not the rdonly, since it has a MustNotPromoteRule promotion rule utils.CheckPrimaryTablet(t, clusterInfo, replica, true) @@ -667,8 +665,8 @@ func TestDownPrimaryPromotionRuleWithLag(t *testing.T) { // newly started tablet does not replicate from anyone yet, we will allow vtorc to fix this too utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{crossCellReplica, replica, rdonly}, 25*time.Second) - // revoke super privileges from vtorc on crossCellReplica so that it is unable to repair the replication - utils.ChangePrivileges(t, `REVOKE SUPER ON *.* FROM 'orc_client_user'@'%'`, crossCellReplica, "orc_client_user") + // disable recoveries for vtorc so that it is unable to repair the replication. + utils.DisableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // stop replication on the crossCellReplica. err := clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", crossCellReplica.Alias) @@ -684,8 +682,8 @@ func TestDownPrimaryPromotionRuleWithLag(t *testing.T) { err = clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", crossCellReplica.Alias) require.NoError(t, err) - // grant super privileges back to vtorc on crossCellReplica so that it can repair - utils.ChangePrivileges(t, `GRANT SUPER ON *.* TO 'orc_client_user'@'%'`, crossCellReplica, "orc_client_user") + // enable recoveries back on vtorc so that it can repair + utils.EnableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // assert that the crossCellReplica is indeed lagging and does not have the new insertion by checking the count of rows in the table out, err := utils.RunSQL(t, "SELECT * FROM vt_insert_test", crossCellReplica, "vt_ks") @@ -748,8 +746,8 @@ func TestDownPrimaryPromotionRuleWithLagCrossCenter(t *testing.T) { // newly started tablet does not replicate from anyone yet, we will allow vtorc to fix this too utils.CheckReplication(t, clusterInfo, curPrimary, []*cluster.Vttablet{crossCellReplica, replica, rdonly}, 25*time.Second) - // revoke super privileges from vtorc on replica so that it is unable to repair the replication - utils.ChangePrivileges(t, `REVOKE SUPER ON *.* FROM 'orc_client_user'@'%'`, replica, "orc_client_user") + // disable recoveries from vtorc so that it is unable to repair the replication + utils.DisableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // stop replication on the replica. err := clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replica.Alias) @@ -765,8 +763,8 @@ func TestDownPrimaryPromotionRuleWithLagCrossCenter(t *testing.T) { err = clusterInfo.ClusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replica.Alias) require.NoError(t, err) - // grant super privileges back to vtorc on replica so that it can repair - utils.ChangePrivileges(t, `GRANT SUPER ON *.* TO 'orc_client_user'@'%'`, replica, "orc_client_user") + // enable recoveries back on vtorc so that it can repair + utils.EnableGlobalRecoveries(t, clusterInfo.ClusterInstance.VTOrcProcesses[0]) // assert that the replica is indeed lagging and does not have the new insertion by checking the count of rows in the table out, err := utils.RunSQL(t, "SELECT * FROM vt_insert_test", replica, "vt_ks") diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 07b5b016fcc..11294319658 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -683,21 +683,6 @@ func PermanentlyRemoveVttablet(clusterInfo *VTOrcClusterInfo, tablet *cluster.Vt } } -// ChangePrivileges is used to change the privileges of the given user. These commands are executed such that they are not replicated -func ChangePrivileges(t *testing.T, sql string, tablet *cluster.Vttablet, user string) { - _, err := RunSQL(t, "SET sql_log_bin = OFF;"+sql+";SET sql_log_bin = ON;", tablet, "") - require.NoError(t, err) - - res, err := RunSQL(t, fmt.Sprintf("SELECT id FROM INFORMATION_SCHEMA.PROCESSLIST WHERE user = '%s'", user), tablet, "") - require.NoError(t, err) - for _, row := range res.Rows { - id, err := row[0].ToInt64() - require.NoError(t, err) - _, err = RunSQL(t, fmt.Sprintf("kill %d", id), tablet, "") - require.NoError(t, err) - } -} - // ResetPrimaryLogs is used reset the binary logs func ResetPrimaryLogs(t *testing.T, curPrimary *cluster.Vttablet) { _, err := RunSQL(t, "FLUSH BINARY LOGS", curPrimary, "") @@ -1121,3 +1106,19 @@ func PrintVTOrcLogsOnFailure(t *testing.T, clusterInstance *cluster.LocalProcess log.Errorf("%s", string(content)) } } + +// EnableGlobalRecoveries enables global recoveries for the given VTOrc. +func EnableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/enable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries enabled\n", resp) +} + +// DisableGlobalRecoveries disables global recoveries for the given VTOrc. +func DisableGlobalRecoveries(t *testing.T, vtorc *cluster.VTOrcProcess) { + status, resp, err := MakeAPICall(t, vtorc, "/api/disable-global-recoveries") + require.NoError(t, err) + assert.Equal(t, 200, status) + assert.Equal(t, "Global recoveries disabled\n", resp) +} diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 6de6425925e..5c11c6055e8 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -412,7 +412,7 @@ func (mysqld *Mysqld) startNoWait(cnf *Mycnf, mysqldArgs ...string) error { }() err = cmd.Start() if err != nil { - return err + return vterrors.Wrapf(err, "failed to start mysqld") } mysqld.mutex.Lock() diff --git a/go/vt/vttablet/endtoend/call_test.go b/go/vt/vttablet/endtoend/call_test.go index 477a099aa76..a1a2eae792a 100644 --- a/go/vt/vttablet/endtoend/call_test.go +++ b/go/vt/vttablet/endtoend/call_test.go @@ -75,12 +75,16 @@ func TestCallProcedure(t *testing.T) { wantErr bool } tcases := []testcases{{ + query: "call proc_dml()", + }, { query: "call proc_select1()", wantErr: true, }, { query: "call proc_select4()", wantErr: true, }, { + // Again, make sure the connection isn't dirty and does not contain leftover + // result sets from previous tests. query: "call proc_dml()", }} @@ -92,7 +96,6 @@ func TestCallProcedure(t *testing.T) { return } require.NoError(t, err) - }) } } diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go index 0636e7db633..c44bb846eb3 100644 --- a/go/vt/vttablet/tabletmanager/tm_init_test.go +++ b/go/vt/vttablet/tabletmanager/tm_init_test.go @@ -907,7 +907,7 @@ func startMySQLAndCreateUser(t *testing.T, testUser string) (vttest.LocalCluster connParams := cluster.MySQLConnParams() conn, err := mysql.Connect(context.Background(), &connParams) require.NoError(t, err) - _, err = conn.ExecuteFetch(fmt.Sprintf(`CREATE USER '%v'@'localhost';`, testUser), 1000, false) + _, err = conn.ExecuteFetch(fmt.Sprintf(`CREATE USER '%v'@'localhost'`, testUser), 1000, false) conn.Close() return cluster, err @@ -917,11 +917,11 @@ func startMySQLAndCreateUser(t *testing.T, testUser string) (vttest.LocalCluster func grantAllPrivilegesToUser(t *testing.T, connParams mysql.ConnParams, testUser string) { conn, err := mysql.Connect(context.Background(), &connParams) require.NoError(t, err) - _, err = conn.ExecuteFetch(fmt.Sprintf(`GRANT ALL ON *.* TO '%v'@'localhost';`, testUser), 1000, false) + _, err = conn.ExecuteFetch(fmt.Sprintf(`GRANT ALL ON *.* TO '%v'@'localhost'`, testUser), 1000, false) require.NoError(t, err) - _, err = conn.ExecuteFetch(fmt.Sprintf(`GRANT GRANT OPTION ON *.* TO '%v'@'localhost';`, testUser), 1000, false) + _, err = conn.ExecuteFetch(fmt.Sprintf(`GRANT GRANT OPTION ON *.* TO '%v'@'localhost'`, testUser), 1000, false) require.NoError(t, err) - _, err = conn.ExecuteFetch("FLUSH PRIVILEGES;", 1000, false) + _, err = conn.ExecuteFetch("FLUSH PRIVILEGES", 1000, false) require.NoError(t, err) conn.Close() } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 04738ee7857..dd51fb6c042 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -2901,7 +2901,7 @@ func TestPlayerInvalidDates(t *testing.T) { fmt.Sprintf("drop table %s.dst1", vrepldb), }) pos := primaryPosition(t) - execStatements(t, []string{"set sql_mode='';insert into src1 values(1, '0000-00-00');set sql_mode='STRICT_TRANS_TABLES';"}) + execStatements(t, []string{"set sql_mode=''", "insert into src1 values(1, '0000-00-00')", "set sql_mode='STRICT_TRANS_TABLES'"}) env.SchemaEngine.Reload(context.Background()) // default mysql flavor allows invalid dates: so disallow explicitly for this test diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 844ce753152..5690c209ebb 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -18,12 +18,14 @@ package tabletserver import ( "context" + "errors" "fmt" "io" "strings" "sync" "time" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" @@ -881,6 +883,9 @@ func (qre *QueryExecutor) execCallProc() (*sqltypes.Result, error) { } qr, err := qre.execDBConn(conn.Conn, sql, true) + if errors.Is(err, mysql.ErrExecuteFetchMultipleResults) { + return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "Multi-Resultset not supported in stored procedure") + } if err != nil { return nil, rewriteOUTParamError(err) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 203052e981e..ea6e0fb76aa 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -240,17 +240,17 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { insertRow(t, "t1", 1, numInitialRows+4) insertRow(t, "t2", 2, numInitialRows+3) // savepoints should not be sent in the event stream - execStatement(t, ` -begin; -insert into t3 (id31, id32) values (12, 360); -savepoint a; -insert into t3 (id31, id32) values (13, 390); -rollback work to savepoint a; -savepoint b; -insert into t3 (id31, id32) values (13, 390); -release savepoint b; -commit;" -`) + execStatements(t, []string{ + "begin", + "insert into t3 (id31, id32) values (12, 360)", + "savepoint a", + "insert into t3 (id31, id32) values (13, 390)", + "rollback work to savepoint a", + "savepoint b", + "insert into t3 (id31, id32) values (13, 390)", + "release savepoint b", + "commit", + }) } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)