From f1fa115fadfe2d6f764c0e9f3926ce9d2897234b Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 5 Sep 2024 21:37:48 +0530 Subject: [PATCH] ddl to commit open transaction before sending ddl to vttablet on shard targeting Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/misc_test.go | 12 ++++++++ go/vt/vtgate/engine/send.go | 18 ++++++++++++ go/vt/vtgate/planbuilder/ddl.go | 5 ++-- go/vt/vtgate/planbuilder/show.go | 2 +- go/vt/vttablet/endtoend/settings_test.go | 36 ++++++------------------ 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index bcb4f68a935..6bad486f951 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -798,6 +798,18 @@ func TestRowCountExceed(t *testing.T) { utils.AssertContainsError(t, conn, "select id1 from t1 where id1 < 1000", `Row count exceeded 100`) } +func TestDDLTargeted(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + utils.Exec(t, conn, "use `ks/-80`") + utils.Exec(t, conn, `begin`) + utils.Exec(t, conn, `create table ddl_targeted (id bigint primary key)`) + utils.Exec(t, conn, `commit`) +} + func TestLookupErrorMetric(t *testing.T) { conn, closer := start(t) defer closer() diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 31c9e9e0eb0..6867ff543af 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -47,6 +47,8 @@ type Send struct { // IsDML specifies how to deal with autocommit behaviour IsDML bool + IsDDL bool + // SingleShardOnly specifies that the query must be send to only single shard SingleShardOnly bool @@ -94,6 +96,10 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout) defer cancelFunc() + if err := s.commitIfDDL(ctx, vcursor); err != nil { + return nil, err + } + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return nil, err @@ -158,6 +164,10 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV // TryStreamExecute implements Primitive interface func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + if err := s.commitIfDDL(ctx, vcursor); err != nil { + return err + } + rss, err := s.checkAndReturnShards(ctx, vcursor) if err != nil { return err @@ -204,3 +214,11 @@ func (s *Send) description() PrimitiveDescription { Other: other, } } + +// commitIfDDL commits any open transaction before executing the ddl query. +func (s *Send) commitIfDDL(ctx context.Context, vcursor VCursor) error { + if s.IsDDL { + return vcursor.Session().Commit(ctx) + } + return nil +} diff --git a/go/vt/vtgate/planbuilder/ddl.go b/go/vt/vtgate/planbuilder/ddl.go index 4c4b3791c20..f4b8ab6976f 100644 --- a/go/vt/vtgate/planbuilder/ddl.go +++ b/go/vt/vtgate/planbuilder/ddl.go @@ -45,7 +45,7 @@ func (fk *fkContraint) FkWalk(node sqlparser.SQLNode) (kontinue bool, err error) // and which chooses which of the two to invoke at runtime. func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { if vschema.Destination() != nil { - return buildByPassPlan(sql, vschema) + return buildByPassPlan(sql, vschema, true) } normalDDLPlan, onlineDDLPlan, err := buildDDLPlans(ctx, sql, ddlStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) if err != nil { @@ -80,7 +80,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser return newPlanResult(eddl, tc.getTables()...), nil } -func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, error) { +func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) { keyspace, err := vschema.DefaultKeyspace() if err != nil { return nil, err @@ -89,6 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, erro Keyspace: keyspace, TargetDestination: vschema.Destination(), Query: sql, + IsDDL: isDDL, } return newPlanResult(send), nil } diff --git a/go/vt/vtgate/planbuilder/show.go b/go/vt/vtgate/planbuilder/show.go index f5a5282ceab..82035adaa87 100644 --- a/go/vt/vtgate/planbuilder/show.go +++ b/go/vt/vtgate/planbuilder/show.go @@ -45,7 +45,7 @@ const ( func buildShowPlan(sql string, stmt *sqlparser.Show, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) { if vschema.Destination() != nil { - return buildByPassPlan(sql, vschema) + return buildByPassPlan(sql, vschema, false) } var prim engine.Primitive diff --git a/go/vt/vttablet/endtoend/settings_test.go b/go/vt/vttablet/endtoend/settings_test.go index a459ad15844..0ccaf958737 100644 --- a/go/vt/vttablet/endtoend/settings_test.go +++ b/go/vt/vttablet/endtoend/settings_test.go @@ -138,16 +138,9 @@ func TestDDLNoConnectionReservationOnSettings(t *testing.T) { query := "create table temp(c_date datetime default '0000-00-00')" setting := "set sql_mode='TRADITIONAL'" - for _, withTx := range []bool{false, true} { - if withTx { - err := client.Begin(false) - require.NoError(t, err) - } - _, err := client.ReserveExecute(query, []string{setting}, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") - assert.Zero(t, client.ReservedID()) - } + _, err := client.ReserveExecute(query, []string{setting}, nil) + assert.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode") + assert.Zero(t, client.ReservedID()) } func TestDMLNoConnectionReservationOnSettings(t *testing.T) { @@ -156,7 +149,10 @@ func TestDMLNoConnectionReservationOnSettings(t *testing.T) { _, err := client.Execute("create table temp(c_date datetime)", nil) require.NoError(t, err) - defer client.Execute("drop table temp", nil) + defer func() { + client.Rollback() + client.Execute("drop table temp", nil) + }() _, err = client.Execute("insert into temp values ('2022-08-25')", nil) require.NoError(t, err) @@ -211,9 +207,8 @@ func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) { query := "create table temp(c_date datetime default '0000-00-00')" setting := "set sql_mode='TRADITIONAL'" - _, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") + _, err := client.ReserveExecute(query, []string{setting}, nil) + require.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode") assert.Zero(t, client.ReservedID()) } @@ -297,12 +292,6 @@ func TestTempTableOnReserveExecute(t *testing.T) { require.NoError(t, client.Release()) - _, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil) - require.NoError(t, err) - assert.NotZero(t, client.ReservedID()) - require.NoError(t, - client.Release()) - // drop the table _, err = client.Execute("drop table if exists temp", nil) require.NoError(t, err) @@ -318,13 +307,6 @@ func TestTempTableOnReserveExecute(t *testing.T) { assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables") require.NoError(t, client.Release()) - - _, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil) - require.Error(t, err, "create table should have failed with TRADITIONAL mode") - require.Contains(t, err.Error(), "Invalid default value") - assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables") - require.NoError(t, - client.Release()) } func TestInfiniteSessions(t *testing.T) {