Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add engine primitive to handle query timeout #16619

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion go/test/endtoend/vtgate/queries/timeout/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,41 @@ func TestQueryTimeoutWithTables(t *testing.T) {
utils.Exec(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=500 */ sleep(0.1) from t1 where id1 = 1")
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=20 */ sleep(0.1) from t1 where id1 = 1")
require.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
if utils.BinaryIsAtLeastAtVersion(21, "vtgate") {
assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded")
} else {
assert.Contains(t, err.Error(), "context deadline exceeded")
}
assert.Contains(t, err.Error(), "(errno 1317) (sqlstate 70100)")
}

// TestOverallQueryTimeout tests that the query timeout is applied to the overall execution of a query
// and not just individual routes.
func TestOverallQueryTimeout(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate")
mcmp, closer := start(t)
defer closer()

mcmp.Exec("insert into t1(id1, id2) values (2,2),(3,3)")
// After inserting the rows above, if we run the following query, we will end up doing join on vtgate
// that issues one select query on the left side and 2 on the right side. The queries on the right side
// take 2 and 3 seconds each to run. If we have an overall timeout for 4 seconds, then it should fail.

_, err := utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=4000 */ sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1")
assert.Error(t, err)
assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded")

// Let's also check that setting the session variable also works.
utils.Exec(t, mcmp.VtConn, "set query_timeout=4000")
_, err = utils.ExecAllowError(t, mcmp.VtConn, "select sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1")
assert.Error(t, err)
assert.ErrorContains(t, err, "Query execution was interrupted, maximum statement execution time exceeded")

// Increasing the timeout should pass the query.
utils.Exec(t, mcmp.VtConn, "set query_timeout=10000")
_ = utils.Exec(t, mcmp.VtConn, "select sleep(u2.id2), u1.id2 from t1 u1 join t1 u2 where u1.id2 = u2.id1")
}

// TestQueryTimeoutWithShardTargeting tests the query timeout with shard targeting.
func TestQueryTimeoutWithShardTargeting(t *testing.T) {
mcmp, closer := start(t)
Expand Down
8 changes: 8 additions & 0 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type VSchemaWrapper struct {
Dest key.Destination
SysVarEnabled bool
ForeignKeyChecksState *bool
SessionQueryTimeout int
Version plancontext.PlannerVersion
EnableViews bool
TestBuilder func(query string, vschema plancontext.VSchema, keyspace string) (*engine.Plan, error)
Expand Down Expand Up @@ -132,6 +133,13 @@ func (vw *VSchemaWrapper) Environment() *vtenv.Environment {
return vw.Env
}

func (vw *VSchemaWrapper) GetQueryTimeout(queryTimeoutFromComments int) int {
if queryTimeoutFromComments != 0 {
return queryTimeoutFromComments
}
return vw.SessionQueryTimeout
}

func (vw *VSchemaWrapper) PlannerWarning(_ string) {
}

Expand Down
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ var (
VT14004 = errorWithoutState("VT14004", vtrpcpb.Code_UNAVAILABLE, "cannot find keyspace for: %s", "The specified keyspace could not be found.")
VT14005 = errorWithoutState("VT14005", vtrpcpb.Code_UNAVAILABLE, "cannot lookup sidecar database for keyspace: %s", "Failed to read sidecar database identifier.")

VT15001 = errorWithoutState("VT15001", vtrpcpb.Code_DEADLINE_EXCEEDED, "Query execution was interrupted, maximum statement execution time exceeded", "Query execution was interrupted, maximum statement execution time exceeded")

// Errors is a list of errors that must match all the variables
// defined above to enable auto-documentation of error codes.
Errors = []func(args ...any) *VitessError{
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -41,6 +42,9 @@ type fakePrimitive struct {

log []string

// sleepTime is the time for which the fake primitive sleeps before returning the results.
sleepTime time.Duration

allResultsInOneCall bool

async bool
Expand Down Expand Up @@ -71,6 +75,9 @@ func (f *fakePrimitive) GetTableName() string {

func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
if f.sleepTime != 0 {
time.Sleep(f.sleepTime)
}
if f.results == nil {
return nil, f.sendErr
}
Expand All @@ -85,6 +92,9 @@ func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVar

func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields))
if f.sleepTime != 0 {
time.Sleep(f.sleepTime)
}
if f.results == nil {
return f.sendErr
}
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ var _ SessionActions = (*noopVCursor)(nil)

// noopVCursor is used to build other vcursors.
type noopVCursor struct {
inTx bool
inTx bool
queryTimeout int
}

// MySQLVersion implements VCursor.
Expand Down Expand Up @@ -298,7 +299,10 @@ func (t *noopVCursor) SetQueryTimeout(maxExecutionTime int64) {
}

func (t *noopVCursor) GetQueryTimeout(queryTimeoutFromComments int) int {
return queryTimeoutFromComments
if queryTimeoutFromComments != 0 {
return queryTimeoutFromComments
}
return t.queryTimeout
}

func (t *noopVCursor) SetSkipQueryPlanCache(context.Context, bool) error {
Expand Down
104 changes: 104 additions & 0 deletions go/vt/vtgate/engine/timeout_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package engine

import (
"context"
"time"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vterrors"
)

// TimeoutHandler is a primitive that adds a timeout to the execution of a query.
type TimeoutHandler struct {
Timeout int
Input Primitive
}

var _ Primitive = (*TimeoutHandler)(nil)

// NewTimeoutHandler creates a new timeout handler.
func NewTimeoutHandler(input Primitive, timeout int) *TimeoutHandler {
return &TimeoutHandler{
Timeout: timeout,
Input: input,
}
}

// RouteType is part of the Primitive interface
func (t *TimeoutHandler) RouteType() string {
return t.Input.RouteType()
}

// GetKeyspaceName is part of the Primitive interface
func (t *TimeoutHandler) GetKeyspaceName() string {
return t.Input.GetKeyspaceName()
}

// GetTableName is part of the Primitive interface
func (t *TimeoutHandler) GetTableName() string {
return t.Input.GetTableName()
}

// GetFields is part of the Primitive interface
func (t *TimeoutHandler) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return t.Input.GetFields(ctx, vcursor, bindVars)
}

// NeedsTransaction is part of the Primitive interface
func (t *TimeoutHandler) NeedsTransaction() bool {
return t.Input.NeedsTransaction()
}

// TryExecute is part of the Primitive interface
func (t *TimeoutHandler) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (res *sqltypes.Result, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond)
defer cancel()

complete := make(chan any)
go func() {
res, err = t.Input.TryExecute(ctx, vcursor, bindVars, wantfields)
close(complete)
}()

select {
case <-ctx.Done():
return nil, vterrors.VT15001()
case <-complete:
return res, err
}
}

// TryStreamExecute is part of the Primitive interface
func (t *TimeoutHandler) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) (err error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Millisecond)
defer cancel()

complete := make(chan any)
go func() {
err = t.Input.TryStreamExecute(ctx, vcursor, bindVars, wantfields, callback)
close(complete)
}()

select {
case <-ctx.Done():
return vterrors.VT15001()
case <-complete:
return err
}
}

// Inputs is part of the Primitive interface
func (t *TimeoutHandler) Inputs() ([]Primitive, []map[string]any) {
return []Primitive{t.Input}, nil
}

// description is part of the Primitive interface
func (t *TimeoutHandler) description() PrimitiveDescription {
return PrimitiveDescription{
OperatorType: "TimeoutHandler",
Other: map[string]any{
"Timeout": t.Timeout,
},
}
}
59 changes: 59 additions & 0 deletions go/vt/vtgate/engine/timeout_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package engine

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
)

// TestTimeoutHandler tests timeout handler primitive.
func TestTimeoutHandler(t *testing.T) {
tests := []struct {
name string
sleepTime time.Duration
timeout int
wantErr string
}{
{
name: "Timeout without failure",
sleepTime: 100 * time.Millisecond,
timeout: 1000,
wantErr: "",
}, {
name: "Timeout with failure",
sleepTime: 2 * time.Second,
timeout: 100,
wantErr: "VT15001: Query execution was interrupted, maximum statement execution time exceeded",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := createTimeoutHandlerForTesting(tt.timeout, tt.sleepTime).TryExecute(context.Background(), &noopVCursor{}, nil, false)
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
err = createTimeoutHandlerForTesting(tt.timeout, tt.sleepTime).TryStreamExecute(context.Background(), &noopVCursor{}, nil, false, func(result *sqltypes.Result) error {
return nil
})
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
})
}
}

// createTimeoutHandlerForTesting creates a TimeoutHandler for testing that has a fakePrimitive as an input.
func createTimeoutHandlerForTesting(timeout int, sleepTime time.Duration) *TimeoutHandler {
return NewTimeoutHandler(&fakePrimitive{
results: nil,
sleepTime: sleepTime,
}, timeout)
}
8 changes: 1 addition & 7 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,7 @@ func buildRoutePlan(stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVa

func createInstructionFor(ctx context.Context, query string, stmt sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) {
switch stmt := stmt.(type) {
case *sqlparser.Select, *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete:
configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query)
if err != nil {
return nil, err
}
return buildRoutePlan(stmt, reservedVars, vschema, configuredPlanner)
case *sqlparser.Union:
case *sqlparser.Select, *sqlparser.Insert, *sqlparser.Update, *sqlparser.Delete, *sqlparser.Union:
configuredPlanner, err := getConfiguredPlanner(vschema, stmt, query)
if err != nil {
return nil, err
Expand Down
12 changes: 11 additions & 1 deletion go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ func (s *planTestSuite) addPKsProvided(vschema *vindexes.VSchema, ks string, tbl
}
}

func (s *planTestSuite) TestQueryTimeout() {
vschemaWrapper := &vschemawrapper.VSchemaWrapper{
V: loadSchema(s.T(), "vschemas/schema.json", true),
Env: vtenv.NewTestEnv(),
SessionQueryTimeout: 200,
}

s.testFile("query_timeout_cases.json", vschemaWrapper, false)
}

func (s *planTestSuite) TestSystemTables57() {
// first we move everything to use 5.7 logic
env, err := vtenv.New(vtenv.Options{
Expand Down Expand Up @@ -683,7 +693,7 @@ func (s *planTestSuite) testFile(filename string, vschema *vschemawrapper.VSchem
if tcase.Skip {
t.Skip(message)
} else {
t.Errorf(message)
t.Error(message)
}
} else if tcase.Skip {
t.Errorf("query is correct even though it is skipped:\n %s", tcase.Query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ func (v *vschema) Environment() *vtenv.Environment {
return vtenv.NewTestEnv()
}

func (v *vschema) GetQueryTimeout(queryTimeoutFromComments int) int {
return queryTimeoutFromComments
}

func (v *vschema) ErrorIfShardedF(keyspace *vindexes.Keyspace, warn, errFmt string, params ...any) error {
// TODO implement me
panic("implement me")
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/plancontext/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type VSchema interface {
SetPlannerVersion(pv PlannerVersion)
ConnCollation() collations.ID
Environment() *vtenv.Environment
GetQueryTimeout(queryTimeoutFromComments int) int

// ErrorIfShardedF will return an error if the keyspace is sharded,
// and produce a warning if the vtgate if configured to do so
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/planbuilder/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,23 @@ func newBuildSelectPlan(
return nil, nil, err
}

plan = handleTimeout(plan, queryTimeout(selStmt.GetParsedComments().Directives()), vschema)

return plan, operators.TablesUsed(op), nil
}

// handleTimeout checks if there is a timeout that needs to be added to the query. If there is one
// then we wrap the plan in a TimeoutHandler primitive. If there is no timeout, we return the plan as is.
// Because we are accessing the query timeout stored in the session state, we have to also add this value to the plan key
// so that we don't end up using the same plan when the session variable changes.
func handleTimeout(plan engine.Primitive, queryTimeoutComment int, vschema plancontext.VSchema) engine.Primitive {
finalQueryTimeout := vschema.GetQueryTimeout(queryTimeoutComment)
if finalQueryTimeout == 0 {
return plan
}
return engine.NewTimeoutHandler(plan, finalQueryTimeout)
}

func createSelectOperator(ctx *plancontext.PlanningContext, selStmt sqlparser.SelectStatement, reservedVars *sqlparser.ReservedVars) (operators.Operator, error) {
err := queryRewrite(ctx, selStmt)
if err != nil {
Expand Down
Loading
Loading