Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL allowed outside of transaction in vttablet #16661

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,18 @@ func TestRowCountExceed(t *testing.T) {
utils.AssertContainsError(t, conn, "select id1 from t1 where id1 < 1000", `Row count exceeded 100`)
}

func TestDDLTargeted(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "use `ks/-80`")
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `create table ddl_targeted (id bigint primary key)`)
utils.Exec(t, conn, `commit`)
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
}

func TestLookupErrorMetric(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Send struct {
// IsDML specifies how to deal with autocommit behaviour
IsDML bool

IsDDL bool

// SingleShardOnly specifies that the query must be send to only single shard
SingleShardOnly bool

Expand Down Expand Up @@ -94,6 +96,10 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout)
defer cancelFunc()

if err := s.commitIfDDL(ctx, vcursor); err != nil {
return nil, err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,6 +164,10 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
if err := s.commitIfDDL(ctx, vcursor); err != nil {
return err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
Expand Down Expand Up @@ -204,3 +214,11 @@ func (s *Send) description() PrimitiveDescription {
Other: other,
}
}

// commitIfDDL commits any open transaction before executing the ddl query.
func (s *Send) commitIfDDL(ctx context.Context, vcursor VCursor) error {
if s.IsDDL {
return vcursor.Session().Commit(ctx)
}
return nil
}
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fk *fkContraint) FkWalk(node sqlparser.SQLNode) (kontinue bool, err error)
// and which chooses which of the two to invoke at runtime.
func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, true)
}
normalDDLPlan, onlineDDLPlan, err := buildDDLPlans(ctx, sql, ddlStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
Expand Down Expand Up @@ -80,7 +80,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser
return newPlanResult(eddl, tc.getTables()...), nil
}

func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, error) {
func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) {
keyspace, err := vschema.DefaultKeyspace()
if err != nil {
return nil, err
Expand All @@ -89,6 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, erro
Keyspace: keyspace,
TargetDestination: vschema.Destination(),
Query: sql,
IsDDL: isDDL,
}
return newPlanResult(send), nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (

func buildShowPlan(sql string, stmt *sqlparser.Show, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, false)
}

var prim engine.Primitive
Expand Down
36 changes: 9 additions & 27 deletions go/vt/vttablet/endtoend/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,9 @@ func TestDDLNoConnectionReservationOnSettings(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.Zero(t, client.ReservedID())
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
assert.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

func TestDMLNoConnectionReservationOnSettings(t *testing.T) {
Expand All @@ -156,7 +149,10 @@ func TestDMLNoConnectionReservationOnSettings(t *testing.T) {

_, err := client.Execute("create table temp(c_date datetime)", nil)
require.NoError(t, err)
defer client.Execute("drop table temp", nil)
defer func() {
client.Rollback()
client.Execute("drop table temp", nil)
}()

_, err = client.Execute("insert into temp values ('2022-08-25')", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -211,9 +207,8 @@ func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

_, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

Expand Down Expand Up @@ -297,12 +292,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())

// drop the table
_, err = client.Execute("drop table if exists temp", nil)
require.NoError(t, err)
Expand All @@ -318,13 +307,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())
}

func TestInfiniteSessions(t *testing.T) {
Expand Down
58 changes: 24 additions & 34 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/tableacl"
Expand All @@ -45,10 +47,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

// QueryExecutor is used for executing a query request.
Expand Down Expand Up @@ -192,8 +191,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return qr, nil
case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
return qre.execOther()
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanDDL, p.PlanLoad:
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanLoad:
return qre.execAutocommit(qre.txConnExec)
case p.PlanDDL:
return qre.execDDL(nil)
case p.PlanUpdateLimit, p.PlanDeleteLimit:
return qre.execAsTransaction(qre.txConnExec)
case p.PlanCallProc:
Expand Down Expand Up @@ -538,7 +539,7 @@ func (qre *QueryExecutor) checkAccess(authorized *tableacl.ACLResult, tableName
return nil
}

func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, error) {
func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (result *sqltypes.Result, err error) {
// Let's see if this is a normal DDL statement or an Online DDL statement.
// An Online DDL statement is identified by /*vt+ .. */ comment with expected directives, like uuid etc.
if onlineDDL, err := schema.OnlineDDLFromCommentedStatement(qre.plan.FullStmt); err == nil {
Expand All @@ -549,6 +550,21 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
}
}

if conn == nil {
conn, err = qre.tsv.te.txPool.createConn(qre.ctx, qre.options, qre.setting)
if err != nil {
return nil, err
}
defer conn.Release(tx.ConnRelease)
}

// A DDL statement should commit the current transaction in the VTGate.
// The change was made in PR: https://github.com/vitessio/vitess/pull/14110 in v18.
mattlord marked this conversation as resolved.
Show resolved Hide resolved
// DDL statement received by vttablet will be outside of a transaction.
if conn.txProps != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL statement executed inside a transaction")
}

isTemporaryTable := false
if ddlStmt, ok := qre.plan.FullStmt.(sqlparser.DDLStatement); ok {
isTemporaryTable = ddlStmt.IsTemporary()
Expand Down Expand Up @@ -580,19 +596,7 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
return nil, err
}
}
result, err := qre.execStatefulConn(conn, sql, true)
if err != nil {
return nil, err
}
// Only perform this operation when the connection has transaction open.
// TODO: This actually does not retain the old transaction. We should see how to provide correct behaviour to client.
if conn.txProps != nil {
err = qre.BeginAgain(qre.ctx, conn)
if err != nil {
return nil, err
}
}
return result, nil
return qre.execStatefulConn(conn, sql, true)
}

func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result, error) {
Expand All @@ -603,20 +607,6 @@ func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result,
return result, nil
}

// BeginAgain commits the existing transaction and begins a new one
func (*QueryExecutor) BeginAgain(ctx context.Context, dc *StatefulConnection) error {
if dc.IsClosed() || dc.TxProperties().Autocommit {
return nil
}
if _, err := dc.Exec(ctx, "commit", 1, false); err != nil {
return err
}
if _, err := dc.Exec(ctx, "begin", 1, false); err != nil {
return err
}
return nil
}

func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
env := evalengine.NewExpressionEnv(qre.ctx, qre.bindVars, evalengine.NewEmptyVCursor(qre.tsv.Environment(), time.Local))
result, err := env.Evaluate(qre.plan.NextCount)
Expand Down
Loading
Loading