Skip to content

Commit

Permalink
Workflow should create tables from the source's schema for reference …
Browse files Browse the repository at this point in the history
…tables. Self-review.

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Sep 15, 2024
1 parent 35e8f8d commit 55d06cd
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 38 deletions.
6 changes: 4 additions & 2 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ var (
MySQLServerVersion string
TruncateUILen int
TruncateErrLen int
IsReference bool
Tables []string

// IsReference and Tables are used while materializing reference tables.
IsReference bool
Tables []string
}{}
)

Expand Down
12 changes: 8 additions & 4 deletions go/cmd/vtctldclient/command/vreplication/materialize/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

const missingCreateParams = "either --table-settings, for a regular Materialize workflow, or, --reference and --tables must be provided, if materializing reference tables"
const missingCreateParams = "either --table-settings (for a regular Materialize workflow) or (--reference and --tables, if materializing reference tables) must be specified"

var (
createOptions = struct {
Expand Down Expand Up @@ -90,11 +90,15 @@ should be copied as-is from the source keyspace. Here's an example value for tab
if common.CreateOptions.IsReference && len(common.CreateOptions.Tables) > 0 {
isReference = true
}
if !hasTableSettings && !isReference {
switch {
case !hasTableSettings && !isReference:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, missingCreateParams)
}
if hasTableSettings && isReference {
case hasTableSettings && isReference:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify both --table-settings and --reference/--tables")
case common.CreateOptions.IsReference && len(common.CreateOptions.Tables) == 0:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify --reference without --tables")
case !common.CreateOptions.IsReference && len(common.CreateOptions.Tables) > 0:
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cannot specify --tables without --reference")
}
return nil
},
Expand Down
62 changes: 34 additions & 28 deletions go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,17 @@ func TestMaterializeVtctldClient(t *testing.T) {

const (
refSchema = `
create table ref1 (
id bigint not null,
val varbinary(10) not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref2 (
id bigint not null,
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref1 (
id bigint not null,
val varbinary(10) not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref2 (
id bigint not null,
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
`

refSourceVSchema = `
{
"tables": {
Expand All @@ -265,22 +264,23 @@ const (
`
refTargetVSchema = `
{
"tables": {
"ref1": {
"type": "reference",
"source": "ks1.ref1"
},
"ref2": {
"type": "reference",
"source": "ks1.ref2"
}
}
"tables": {
"ref1": {
"type": "reference",
"source": "ks1.ref1"
},
"ref2": {
"type": "reference",
"source": "ks1.ref2"
}
}
}
`
initRef1DataQuery = `insert into ks1.ref1(id, val) values (1, 'abc'), (2, 'def'), (3, 'ghi')`
initRef2DataQuery = `insert into ks1.ref2(id, id2) values (1, 1), (2, 2), (3, 3)`
)

// TestReferenceTableMaterialize tests materializing reference tables.
func TestReferenceTableMaterialize(t *testing.T) {
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
Expand All @@ -289,12 +289,14 @@ func TestReferenceTableMaterialize(t *testing.T) {
shards := []string{"-80", "80-"}
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "ks1", "0", refSourceVSchema, refSchema, defaultReplicas, defaultRdonly, 100, nil)
vc.AddKeyspace(t, []*Cell{defaultCell}, "ks2", strings.Join(shards, ","), refTargetVSchema, refSchema, defaultReplicas, defaultRdonly, 200, nil)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "ks1", "0", refSourceVSchema, refSchema, defaultReplicas, defaultRdonly, 100, nil)
require.NoError(t, err)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "ks2", strings.Join(shards, ","), refTargetVSchema, "", defaultReplicas, defaultRdonly, 200, nil)
require.NoError(t, err)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
_, err := vtgateConn.ExecuteFetch(initRef1DataQuery, 0, false)
_, err = vtgateConn.ExecuteFetch(initRef1DataQuery, 0, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(initRef2DataQuery, 0, false)
require.NoError(t, err)
Expand All @@ -318,15 +320,19 @@ func TestReferenceTableMaterialize(t *testing.T) {
vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil)

queries := []string{
"insert into ks1.ref1(id, val) values (4, 'jkl'), (5, 'mno')",
"insert into ks1.ref2(id, id2) values (4, 4), (5, 5)",
"delete from ks1.ref1 where id=1",
"delete from ks1.ref2 where id=1",
"update ks1.ref1 set val='xyz'",
"update ks1.ref2 set id2=3 where id=2",
"delete from ks1.ref1 where id=1",
"delete from ks1.ref2 where id=1",
"insert into ks1.ref1(id, val) values (4, 'jkl'), (5, 'mno')",
"insert into ks1.ref2(id, id2) values (4, 4), (5, 5)",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "ks1", query)
}
for _, shard := range shards {
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref1", 4)
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref2", 4)
}
vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil)
}
5 changes: 3 additions & 2 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ func (mz *materializer) deploySchema() error {
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
mu.Lock()
if len(sourceDDLs) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet
ms.TableSettings = append(ms.TableSettings, &vtctldatapb.TableMaterializeSettings{
TargetTable: table,
SourceExpression: fmt.Sprintf("select * from %s", table),
CreateDdl: "create table %s like " + table,
CreateDdl: createDDLAsCopyDropForeignKeys,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ message MaterializeSettings {
tabletmanagerdata.TabletSelectionPreference tablet_selection_preference = 15;
bool atomic_copy = 16;
WorkflowOptions workflow_options = 17;

// IsReference and Tables are set if the materialization is for reference tables.
bool is_reference = 18;
repeated string tables = 19;
}
Expand Down

0 comments on commit 55d06cd

Please sign in to comment.