Skip to content

Commit

Permalink
Add functionality to Materialize for copying reference tables to all …
Browse files Browse the repository at this point in the history
…shards in a target keyspace

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Sep 13, 2024
1 parent 0bbea60 commit 8ae47bd
Show file tree
Hide file tree
Showing 10 changed files with 2,569 additions and 2,273 deletions.
2 changes: 2 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ var (
MySQLServerVersion string
TruncateUILen int
TruncateErrLen int
IsReference bool
Tables []string
}{}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func commandCreate(cmd *cobra.Command, args []string) error {
Cell: strings.Join(common.CreateOptions.Cells, ","),
TabletTypes: topoproto.MakeStringTypeCSV(common.CreateOptions.TabletTypes),
TabletSelectionPreference: tsp,
IsReference: common.CreateOptions.IsReference,
Tables: common.CreateOptions.Tables,
}

createOptions.TableSettings.parser, err = sqlparser.New(sqlparser.Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ func registerCommands(root *cobra.Command) {
create.Flags().StringVar(&createOptions.SourceKeyspace, "source-keyspace", "", "Keyspace where the tables queried in the 'source_expression' values within table-settings live.")
create.MarkFlagRequired("source-keyspace")
create.Flags().Var(&createOptions.TableSettings, "table-settings", "A JSON array defining what tables to materialize using what select statements. See the --help output for more details.")
create.MarkFlagRequired("table-settings")
create.Flags().BoolVar(&common.CreateOptions.StopAfterCopy, "stop-after-copy", false, "Stop the workflow after it's finished copying the existing rows and before it starts replicating changes.")
create.Flags().StringVar(&common.CreateOptions.MySQLServerVersion, "mysql_server_version", fmt.Sprintf("%s-Vitess", config.DefaultMySQLVersion), "Configure the MySQL version to use for example for the parser.")
create.Flags().IntVar(&common.CreateOptions.TruncateUILen, "sql-max-length-ui", 512, "truncate queries in debug UIs to the given length (default 512)")
create.Flags().IntVar(&common.CreateOptions.TruncateErrLen, "sql-max-length-errors", 0, "truncate queries in error logs to the given length (default unlimited)")
create.Flags().BoolVar(&common.CreateOptions.IsReference, "reference", false, "For specifying materialize of reference tables")
create.Flags().StringSliceVarP(&common.CreateOptions.Tables, "tables", "t", nil, "Used with --reference to specify the tables to materialize")
base.AddCommand(create)

// Generic workflow commands.
Expand Down
95 changes: 95 additions & 0 deletions go/test/endtoend/vreplication/materialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vreplication

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -235,3 +236,97 @@ func TestMaterializeVtctldClient(t *testing.T) {
testShardedMaterialize(t, true)
})
}

const (
refSchema = `
create table ref1 (
id bigint not null,
val varbinary(10) not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
create table ref2 (
id bigint not null,
id2 bigint not null,
primary key (id)
) engine=InnoDB default charset=utf8mb4 collate=utf8mb4_unicode_ci;
`

refSourceVSchema = `
{
"tables": {
"ref1": {
"type": "reference"
},
"ref2": {
"type": "reference"
}
}
}
`
refTargetVSchema = `
{
"tables": {
"ref1": {
"type": "reference",
"source": "ks1.ref1"
},
"ref2": {
"type": "reference",
"source": "ks1.ref2"
}
}
}
`
initRef1DataQuery = `insert into ks1.ref1(id, val) values (1, 'abc'), (2, 'def'), (3, 'ghi')`
initRef2DataQuery = `insert into ks1.ref2(id, id2) values (1, 1), (2, 2), (3, 3)`
)

func TestReferenceTableMaterialize(t *testing.T) {
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()
shards := []string{"-80", "80-"}
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "ks1", "0", refSourceVSchema, refSchema, defaultReplicas, defaultRdonly, 100, nil)
vc.AddKeyspace(t, []*Cell{defaultCell}, "ks2", strings.Join(shards, ","), refTargetVSchema, refSchema, defaultReplicas, defaultRdonly, 200, nil)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
_, err := vtgateConn.ExecuteFetch(initRef1DataQuery, 0, false)
require.NoError(t, err)
_, err = vtgateConn.ExecuteFetch(initRef2DataQuery, 0, false)
require.NoError(t, err)

err = vc.VtctldClient.ExecuteCommand("Materialize", "--target-keyspace", "ks2", "--workflow", "wf1", "create",
"--source-keyspace", "ks1", "--reference", "--tables", "ref1,ref2")
require.NoError(t, err, "Materialize")
for _, shard := range shards {
tab := vc.getPrimaryTablet(t, "ks2", shard)
catchup(t, tab, "wf1", "Materialize")
}

for _, shard := range shards {
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref1", 3)
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, val from ref1",
`[[INT64(1) VARBINARY("abc")] [INT64(2) VARBINARY("def")] [INT64(3) VARBINARY("ghi")]]`)
waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref2", 3)
waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref2",
`[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)]]`)
}
vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil)

queries := []string{
"insert into ks1.ref1(id, val) values (4, 'jkl'), (5, 'mno')",
"insert into ks1.ref2(id, id2) values (4, 4), (5, 5)",
"delete from ks1.ref1 where id=1",
"delete from ks1.ref2 where id=1",
"update ks1.ref1 set val='xyz'",
"update ks1.ref2 set id2=3 where id=2",
}
for _, query := range queries {
execVtgateQuery(t, vtgateConn, "ks1", query)
}
vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil)
}
Loading

0 comments on commit 8ae47bd

Please sign in to comment.