Skip to content

Commit

Permalink
ddl to commit open transaction before sending ddl to vttablet on shar…
Browse files Browse the repository at this point in the history
…d targeting

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Sep 5, 2024
1 parent 33f32b4 commit f1fa115
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 30 deletions.
12 changes: 12 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,18 @@ func TestRowCountExceed(t *testing.T) {
utils.AssertContainsError(t, conn, "select id1 from t1 where id1 < 1000", `Row count exceeded 100`)
}

func TestDDLTargeted(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "use `ks/-80`")
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `create table ddl_targeted (id bigint primary key)`)
utils.Exec(t, conn, `commit`)
}

func TestLookupErrorMetric(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Send struct {
// IsDML specifies how to deal with autocommit behaviour
IsDML bool

IsDDL bool

// SingleShardOnly specifies that the query must be send to only single shard
SingleShardOnly bool

Expand Down Expand Up @@ -94,6 +96,10 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout)
defer cancelFunc()

if err := s.commitIfDDL(ctx, vcursor); err != nil {
return nil, err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,6 +164,10 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
if err := s.commitIfDDL(ctx, vcursor); err != nil {
return err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
Expand Down Expand Up @@ -204,3 +214,11 @@ func (s *Send) description() PrimitiveDescription {
Other: other,
}
}

// commitIfDDL commits any open transaction before executing the ddl query.
func (s *Send) commitIfDDL(ctx context.Context, vcursor VCursor) error {
if s.IsDDL {
return vcursor.Session().Commit(ctx)
}
return nil
}
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fk *fkContraint) FkWalk(node sqlparser.SQLNode) (kontinue bool, err error)
// and which chooses which of the two to invoke at runtime.
func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, true)
}
normalDDLPlan, onlineDDLPlan, err := buildDDLPlans(ctx, sql, ddlStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
Expand Down Expand Up @@ -80,7 +80,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser
return newPlanResult(eddl, tc.getTables()...), nil
}

func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, error) {
func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) {
keyspace, err := vschema.DefaultKeyspace()
if err != nil {
return nil, err
Expand All @@ -89,6 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, erro
Keyspace: keyspace,
TargetDestination: vschema.Destination(),
Query: sql,
IsDDL: isDDL,
}
return newPlanResult(send), nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (

func buildShowPlan(sql string, stmt *sqlparser.Show, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, false)
}

var prim engine.Primitive
Expand Down
36 changes: 9 additions & 27 deletions go/vt/vttablet/endtoend/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,9 @@ func TestDDLNoConnectionReservationOnSettings(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.Zero(t, client.ReservedID())
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
assert.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

func TestDMLNoConnectionReservationOnSettings(t *testing.T) {
Expand All @@ -156,7 +149,10 @@ func TestDMLNoConnectionReservationOnSettings(t *testing.T) {

_, err := client.Execute("create table temp(c_date datetime)", nil)
require.NoError(t, err)
defer client.Execute("drop table temp", nil)
defer func() {
client.Rollback()
client.Execute("drop table temp", nil)
}()

_, err = client.Execute("insert into temp values ('2022-08-25')", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -211,9 +207,8 @@ func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

_, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

Expand Down Expand Up @@ -297,12 +292,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())

// drop the table
_, err = client.Execute("drop table if exists temp", nil)
require.NoError(t, err)
Expand All @@ -318,13 +307,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())
}

func TestInfiniteSessions(t *testing.T) {
Expand Down

0 comments on commit f1fa115

Please sign in to comment.