Skip to content

Commit

Permalink
VReplication: Qualify and SQL escape tables in created AutoIncrement …
Browse files Browse the repository at this point in the history
…VSchema definitions (#17174)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Nov 7, 2024
1 parent 9946ce8 commit 31b956b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
21 changes: 18 additions & 3 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,31 @@ func (mz *materializer) deploySchema() error {
}

if removeAutoInc {
var replaceFunc func(columnName string)
var replaceFunc func(columnName string) error
if mz.ms.GetWorkflowOptions().ShardedAutoIncrementHandling == vtctldatapb.ShardedAutoIncrementHandling_REPLACE {
replaceFunc = func(columnName string) {
replaceFunc = func(columnName string) error {
mu.Lock()
defer mu.Unlock()
// At this point we've already confirmed that the table exists in the target
// vschema.
table := targetVSchema.Tables[ts.TargetTable]
// Don't override or redo anything that already exists.
if table != nil && table.AutoIncrement == nil {
seqTableName := fmt.Sprintf(autoSequenceTableFormat, ts.TargetTable)
tableName, err := sqlescape.UnescapeID(ts.TargetTable)
if err != nil {
return err
}
seqTableName, err := sqlescape.EnsureEscaped(fmt.Sprintf(autoSequenceTableFormat, tableName))
if err != nil {
return err
}
if mz.ms.GetWorkflowOptions().GlobalKeyspace != "" {
seqKeyspace, err := sqlescape.EnsureEscaped(mz.ms.WorkflowOptions.GlobalKeyspace)
if err != nil {
return err
}
seqTableName = fmt.Sprintf("%s.%s", seqKeyspace, seqTableName)
}
// Create a Vitess AutoIncrement definition -- which uses a sequence -- to
// replace the MySQL auto_increment definition that we removed.
table.AutoIncrement = &vschemapb.AutoIncrement{
Expand All @@ -405,6 +419,7 @@ func (mz *materializer) deploySchema() error {
}
updatedVSchema = true
}
return nil
}
}
ddl, err = stripAutoIncrement(ddl, mz.env.Parser(), replaceFunc)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
tableName := ts.TargetTable
table, err := venv.Parser().TableFromStatement(ts.SourceExpression)
if err == nil {
tableName = table.Name.String()
tableName = sqlparser.String(table.Name)
}
var (
cols []string
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,12 @@ func TestMoveTablesDDLFlag(t *testing.T) {
// 2. REMOVE the tables' MySQL auto_increment clauses
// 3. REPLACE the table's MySQL auto_increment clauses with Vitess sequences
func TestShardedAutoIncHandling(t *testing.T) {
tableName := "t1"
tableName := "`t-1`"
tableDDL := fmt.Sprintf("create table %s (id int not null auto_increment primary key, c1 varchar(10))", tableName)
validateEmptyTableQuery := fmt.Sprintf("select 1 from `%s` limit 1", tableName)
validateEmptyTableQuery := fmt.Sprintf("select 1 from %s limit 1", tableName)
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
SourceKeyspace: "source-ks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: tableName,
Expand Down Expand Up @@ -863,7 +863,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
},
AutoIncrement: &vschemapb.AutoIncrement{ // AutoIncrement definition added
Column: "id",
Sequence: fmt.Sprintf(autoSequenceTableFormat, tableName),
Sequence: fmt.Sprintf("`%s`.`%s`", ms.SourceKeyspace, fmt.Sprintf(autoSequenceTableFormat, strings.ReplaceAll(tableName, "`", ""))),
},
},
},
Expand Down Expand Up @@ -931,7 +931,7 @@ func TestShardedAutoIncHandling(t *testing.T) {
if tc.wantTargetVSchema != nil {
targetVSchema, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema))
require.True(t, proto.Equal(targetVSchema, tc.wantTargetVSchema), "got: %v, want: %v", targetVSchema, tc.wantTargetVSchema)
}
}
})
Expand Down
12 changes: 9 additions & 3 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,30 @@ func stripTableForeignKeys(ddl string, parser *sqlparser.Parser) (string, error)
// table definition. If an optional replace function is specified then that
// callback will be used to e.g. replace the MySQL clause with a Vitess
// VSchema AutoIncrement definition.
func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string)) (string, error) {
func stripAutoIncrement(ddl string, parser *sqlparser.Parser, replace func(columnName string) error) (string, error) {
newDDL, err := parser.ParseStrictDDL(ddl)
if err != nil {
return "", err
}

_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
switch node := node.(type) {
case *sqlparser.ColumnDefinition:
if node.Type.Options.Autoincrement {
node.Type.Options.Autoincrement = false
if replace != nil {
replace(sqlparser.String(node.Name))
if err := replace(sqlparser.String(node.Name)); err != nil {
return false, vterrors.Wrapf(err, "failed to replace auto_increment column %q in %q", sqlparser.String(node.Name), ddl)
}

}
}
}
return true, nil
}, newDDL)
if err != nil {
return "", err
}

return sqlparser.String(newDDL), nil
}
Expand Down

0 comments on commit 31b956b

Please sign in to comment.