Skip to content

Commit

Permalink
ExecuteFetch: error on multiple result sets (#14949)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
Signed-off-by: Manan Gupta <manan@planetscale.com>
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
Signed-off-by: Vicent Marti <vmg@strn.cat>
Co-authored-by: Dirkjan Bussink <d.bussink@gmail.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
Co-authored-by: Harshit Gangal <harshit@planetscale.com>
Co-authored-by: Vicent Marti <vmg@strn.cat>
  • Loading branch information
5 people authored Feb 14, 2024
1 parent a0ce8bc commit 8960bc3
Show file tree
Hide file tree
Showing 26 changed files with 225 additions and 101 deletions.
13 changes: 12 additions & 1 deletion go/mysql/endtoend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
assert.EqualValues(t, 1, result.RowsAffected, "insert into returned RowsAffected")
}

qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ' updated'); select * from a; select count(*) from a", 300, true)
// Verify that a ExecuteFetchMultiDrain leaves the connection/packet in valid state.
err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 1'); select * from a; select count(*) from a")
expectNoError(t, err)
// If the previous command leaves packet in invalid state, this will fail.
qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ', fetch multi'); select * from a; select count(*) from a", 300, true)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(multi result)", more, true)
assert.EqualValues(t, 255, qr.RowsAffected)
Expand All @@ -225,6 +229,13 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
expectFlag(t, "ReadQueryResult(2)", more, false)
assert.EqualValues(t, 1, len(qr.Rows), "ReadQueryResult(1)")

// Verify that a ExecuteFetchMultiDrain is happy to operate again after all the above.
err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 2'); select * from a; select count(*) from a")
expectNoError(t, err)

err = conn.ExecuteFetchMultiDrain("update b set name = concat(name, ' nonexistent table'); select * from a; select count(*) from a")
require.Error(t, err)

_, err = conn.ExecuteFetch("drop table a", 10, true)
require.NoError(t, err)
}
Expand Down
44 changes: 43 additions & 1 deletion go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package mysql

import (
"errors"
"fmt"
"math"
"strconv"
Expand All @@ -34,6 +35,17 @@ import (

// This file contains the methods related to queries.

var (
ErrExecuteFetchMultipleResults = vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected multiple results. Use ExecuteFetchMulti instead.")
)

const (
// Use as `maxrows` in `ExecuteFetch` and related functions, to indicate no rows should be fetched.
// This is different than specifying `0`, because `0` means "expect zero results", while this means
// "do not attempt to read any results into memory".
FETCH_NO_ROWS = math.MinInt
)

//
// Client side methods.
//
Expand Down Expand Up @@ -303,10 +315,35 @@ func (c *Conn) parseRow(data []byte, fields []*querypb.Field, reader func([]byte
// 2. if the server closes the connection when a command is in flight,
// readComQueryResponse will fail, and we'll return CRServerLost(2013).
func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) {
result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields)
result, more, err := c.ExecuteFetchMulti(query, maxrows, wantfields)
if more {
// Multiple results are unexpected. Prioritize this "unexpected" error over whatever error we got from the first result.
err = errors.Join(ErrExecuteFetchMultipleResults, err)
}
// draining to make the connection clean.
err = c.drainMoreResults(more, err)
return result, err
}

// ExecuteFetchMultiDrain is for executing multiple statements in one call, but without
// caring for any results. The function returns an error if any of the statements fail.
// The function drains the query results of all statements, even if there's an error.
func (c *Conn) ExecuteFetchMultiDrain(query string) (err error) {
_, more, err := c.ExecuteFetchMulti(query, FETCH_NO_ROWS, false)
return c.drainMoreResults(more, err)
}

// drainMoreResults ensures to drain all query results, even if there's an error.
// We collect all errors until we consume all results.
func (c *Conn) drainMoreResults(more bool, err error) error {
for more {
var moreErr error
_, more, _, moreErr = c.ReadQueryResult(FETCH_NO_ROWS, false)
err = errors.Join(err, moreErr)
}
return err
}

// ExecuteFetchMulti is for fetching multiple results from a multi-statement result.
// It returns an additional 'more' flag. If it is set, you must fetch the additional
// results using ReadQueryResult.
Expand Down Expand Up @@ -460,6 +497,11 @@ func (c *Conn) ReadQueryResult(maxrows int, wantfields bool) (*sqltypes.Result,
return nil, false, 0, ParseErrorPacket(data)
}

if maxrows == FETCH_NO_ROWS {
c.recycleReadPacket()
continue
}

// Check we're not over the limit before we add more.
if len(result.Rows) == maxrows {
c.recycleReadPacket()
Expand Down
30 changes: 30 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,16 @@ func (vttablet *VttabletProcess) QueryTabletWithDB(query string, dbname string)
return executeQuery(conn, query)
}

// MultiQueryTabletWithDB lets you execute multiple queries on a specific DB in this tablet.
func (vttablet *VttabletProcess) MultiQueryTabletWithDB(query string, dbname string) error {
conn, err := vttablet.defaultConn(dbname)
if err != nil {
return err
}
defer conn.Close()
return executeMultiQuery(conn, query)
}

// executeQuery will retry the query up to 10 times with a small sleep in between each try.
// This allows the tests to be more robust in the face of transient failures.
func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) {
Expand All @@ -536,6 +546,26 @@ func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) {
return result, err
}

// executeMultiQuery will retry the given multi query up to 10 times with a small sleep in between each try.
// This allows the tests to be more robust in the face of transient failures.
func executeMultiQuery(dbConn *mysql.Conn, query string) (err error) {
retries := 10
retryDelay := 1 * time.Second
for i := 0; i < retries; i++ {
if i > 0 {
// We only audit from 2nd attempt and onwards, otherwise this is just too verbose.
log.Infof("Executing query %s (attempt %d of %d)", query, (i + 1), retries)
}
err = dbConn.ExecuteFetchMultiDrain(query)
if err == nil {
break
}
time.Sleep(retryDelay)
}

return err
}

// GetDBVar returns first matching database variable's value
func (vttablet *VttabletProcess) GetDBVar(varName string, ksName string) (string, error) {
return vttablet.getDBSystemValues("variables", varName, ksName)
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/mysqlserver/mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestTimeout(t *testing.T) {
require.Nilf(t, err, "unable to connect mysql: %v", err)
defer conn.Close()

_, err = conn.ExecuteFetch("SELECT SLEEP(5);", 1, false)
_, err = conn.ExecuteFetch("SELECT SLEEP(5)", 1, false)
require.NotNilf(t, err, "quiry timeout error expected")
mysqlErr, ok := err.(*sqlerror.SQLError)
require.Truef(t, ok, "invalid error type")
Expand All @@ -132,7 +132,7 @@ func TestInvalidField(t *testing.T) {
require.Nilf(t, err, "unable to connect mysql: %v", err)
defer conn.Close()

_, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test;", 1, false)
_, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test", 1, false)
require.NotNil(t, err, "invalid field error expected")
mysqlErr, ok := err.(*sqlerror.SQLError)
require.Truef(t, ok, "invalid error type")
Expand All @@ -153,7 +153,7 @@ func TestWarnings(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, qr.Rows, "number of rows")

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err, "SHOW WARNINGS")
assert.EqualValues(t, 1, len(qr.Rows), "number of rows")
assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows)
Expand All @@ -164,7 +164,7 @@ func TestWarnings(t *testing.T) {
_, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false)
require.NoError(t, err)

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err)
assert.Empty(t, qr.Rows)

Expand All @@ -175,7 +175,7 @@ func TestWarnings(t *testing.T) {
_, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false)
require.NoError(t, err)

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err)
assert.Empty(t, qr.Rows)
}
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet)
}

utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave;", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master;", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet)
}

utils.ValidateTopology(t, clusterInstance, true)
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})

// assert that there is indeed only 1 row in tablets[1
res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test;`, tablets[1])
res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test`, tablets[1])
assert.Equal(t, 1, len(res.Rows))

// Perform a graceful reparent operation
Expand Down Expand Up @@ -217,8 +217,8 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus

if !downPrimary {
// commands to stop the current primary
demoteCommands := "SET GLOBAL read_only = ON; FLUSH TABLES WITH READ LOCK; UNLOCK TABLES"
utils.RunSQL(ctx, t, demoteCommands, tablets[0])
demoteCommands := []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"}
utils.RunSQLs(ctx, t, demoteCommands, tablets[0])

//Get the position of the old primary and wait for the new one to catch up.
err := utils.WaitForReplicationPosition(t, tablets[0], tablets[1])
Expand Down
12 changes: 9 additions & 3 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,16 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet. All commands are
// run in a single connection.
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
tabletParams := getMysqlConnParam(tablet)
conn, err := mysql.Connect(ctx, &tabletParams)
require.Nil(t, err)
defer conn.Close()

for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
result := execute(t, conn, sql)
results = append(results, result)
}
return results
Expand Down Expand Up @@ -704,7 +710,7 @@ func SetReplicationSourceFailed(tablet *cluster.Vttablet, prsOut string) bool {

// CheckReplicationStatus checks that the replication for sql and io threads is setup as expected
func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sqlThreadRunning bool, ioThreadRunning bool) {
res := RunSQL(ctx, t, "show slave status;", tablet)
res := RunSQL(ctx, t, "show slave status", tablet)
if ioThreadRunning {
require.Equal(t, "Yes", res.Rows[0][10].ToString())
} else {
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
type threadParams struct {
quit bool
rpcs int // Number of queries successfully executed.
errors int // Number of failed queries.
errors []error // Errors returned by the queries.
waitForNotification chan bool // Channel used to notify the main thread that this thread executed
notifyLock sync.Mutex // notifyLock guards the two fields notifyAfterNSuccessfulRpcs/rpcsSoFar.
notifyAfterNSuccessfulRpcs int // If 0, notifications are disabled
Expand All @@ -96,14 +96,14 @@ func (c *threadParams) threadRun(wg *sync.WaitGroup, vtParams *mysql.ConnParams)
if c.reservedConn {
_, err = conn.ExecuteFetch("set default_week_format = 1", 1000, true)
if err != nil {
c.errors++
c.errors = append(c.errors, err)
log.Errorf("error setting default_week_format: %v", err)
}
}
for !c.quit {
err = c.executeFunction(c, conn)
if err != nil {
c.errors++
c.errors = append(c.errors, err)
log.Errorf("error executing function %s: %v", c.typ, err)
}
c.rpcs++
Expand Down Expand Up @@ -343,8 +343,8 @@ func (bt *BufferingTest) Test(t *testing.T) {
updateThreadInstance.stop()

// Both threads must not see any error
assert.Zero(t, readThreadInstance.errors, "found errors in read queries")
assert.Zero(t, updateThreadInstance.errors, "found errors in tx queries")
assert.Empty(t, readThreadInstance.errors, "found errors in read queries")
assert.Empty(t, updateThreadInstance.errors, "found errors in tx queries")

//At least one thread should have been buffered.
//This may fail if a failover is too fast. Add retries then.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"vitess.io/vitess/go/vt/log"
)

const (
demoteQuery = "SET GLOBAL read_only = ON;FLUSH TABLES WITH READ LOCK;UNLOCK TABLES;"
promoteQuery = "STOP SLAVE;RESET SLAVE ALL;SET GLOBAL read_only = OFF;"
var (
demoteQueries = []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"}
promoteQueries = []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}

hostname = "localhost"
)
Expand All @@ -48,7 +48,8 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro
replica := clusterInstance.Keyspaces[0].Shards[0].Vttablets[1]
oldPrimary := primary
newPrimary := replica
primary.VttabletProcess.QueryTablet(demoteQuery, keyspaceUnshardedName, true)
err := primary.VttabletProcess.QueryTabletMultiple(demoteQueries, keyspaceUnshardedName, true)
require.NoError(t, err)

// Wait for replica to catch up to primary.
cluster.WaitForReplicationPos(t, primary, replica, false, time.Minute)
Expand All @@ -62,19 +63,27 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro
}

// Promote replica to new primary.
replica.VttabletProcess.QueryTablet(promoteQuery, keyspaceUnshardedName, true)
err = replica.VttabletProcess.QueryTabletMultiple(promoteQueries, keyspaceUnshardedName, true)
require.NoError(t, err)

// Configure old primary to replicate from new primary.

_, gtID := cluster.GetPrimaryPosition(t, *newPrimary, hostname)

// Use 'localhost' as hostname because Travis CI worker hostnames
// are too long for MySQL replication.
changeSourceCommands := fmt.Sprintf("RESET SLAVE;SET GLOBAL gtid_slave_pos = '%s';CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d ,MASTER_USER='vt_repl', MASTER_USE_GTID = slave_pos;START SLAVE;", gtID, "localhost", newPrimary.MySQLPort)
oldPrimary.VttabletProcess.QueryTablet(changeSourceCommands, keyspaceUnshardedName, true)
changeSourceCommands := []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", "localhost", newPrimary.MySQLPort),
"START SLAVE",
}
err = oldPrimary.VttabletProcess.QueryTabletMultiple(changeSourceCommands, keyspaceUnshardedName, true)
require.NoError(t, err)

// Notify the new vttablet primary about the reparent.
err := clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newPrimary.Alias)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", newPrimary.Alias)
require.NoError(t, err)
}

Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ var (
) Engine=InnoDB;
`
sqlCreateView = `
create or replace view v1 as select * from t1;
create or replace view v1 as select * from t1
`
sqlSchema = sqlCreateTable + sqlCreateView
sqlSchema = []string{sqlCreateTable, sqlCreateView}

vSchema = `
{
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestMain(m *testing.M) {
// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
SchemaSQL: strings.Join(sqlSchema, ";"),
VSchema: vSchema,
}

Expand Down Expand Up @@ -147,8 +147,9 @@ func checkTableRows(t *testing.T, tableName string, expect int64) {
}

func populateTable(t *testing.T) {
_, err := primaryTablet.VttabletProcess.QueryTablet(sqlSchema, keyspaceName, true)
err := primaryTablet.VttabletProcess.QueryTabletMultiple(sqlSchema, keyspaceName, true)
require.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet("delete from t1", keyspaceName, true)
require.NoError(t, err)
_, err = primaryTablet.VttabletProcess.QueryTablet("insert into t1 (id, value) values (null, md5(rand()))", keyspaceName, true)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestResetReplicationParameters(t *testing.T) {
require.NoError(t, err)

// Set a replication source on the tablet and start replication
_, err = tablet.VttabletProcess.QueryTablet("stop slave;change master to master_host = 'localhost', master_port = 123;start slave;", keyspaceName, false)
err = tablet.VttabletProcess.QueryTabletMultiple([]string{"stop slave", "change master to master_host = 'localhost', master_port = 123", "start slave"}, keyspaceName, false)
require.NoError(t, err)

// Check the replica status.
Expand Down
Loading

0 comments on commit 8960bc3

Please sign in to comment.