From eb901d9d2616288e0800c2529da714fd37c2bbf9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 7 Jul 2024 18:14:05 +0200 Subject: [PATCH 01/10] Test coverage for vreplication_test. 47.2% => 47.6% Signed-off-by: Rohit Nayak --- .../workflow/vreplication_stream_test.go | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 go/vt/vtctl/workflow/vreplication_stream_test.go diff --git a/go/vt/vtctl/workflow/vreplication_stream_test.go b/go/vt/vtctl/workflow/vreplication_stream_test.go new file mode 100644 index 00000000000..d393ebc50c3 --- /dev/null +++ b/go/vt/vtctl/workflow/vreplication_stream_test.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "fmt" + "reflect" + "testing" +) + +func TestVReplicationStreams(t *testing.T) { + var streams VReplicationStreams + for i := 1; i <= 3; i++ { + streams = append(streams, &VReplicationStream{ID: int32(i), Workflow: fmt.Sprintf("workflow%d", i)}) + } + + tests := []struct { + name string + funcUnderTest func(VReplicationStreams) interface{} + expectedResult interface{} + }{ + {"Test IDs", func(s VReplicationStreams) interface{} { return s.IDs() }, []int32{1, 2, 3}}, + {"Test Values", func(s VReplicationStreams) interface{} { return s.Values() }, "(1, 2, 3)"}, + {"Test Workflows", func(s VReplicationStreams) interface{} { return s.Workflows() }, []string{"workflow1", "workflow2", "workflow3"}}, + {"Test Copy", func(s VReplicationStreams) interface{} { return s.Copy() }, streams.Copy()}, + {"Test ToSlice", func(s VReplicationStreams) interface{} { return s.ToSlice() }, []*VReplicationStream{streams[0], streams[1], streams[2]}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.funcUnderTest(streams) + if !reflect.DeepEqual(result, tt.expectedResult) { + t.Errorf("Failed %s: expected %v, got %v", tt.name, tt.expectedResult, result) + } + }) + } +} From 526a59c2190ed161af30a44042f2b457edf39a85 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 7 Jul 2024 18:45:02 +0200 Subject: [PATCH 02/10] TestCreateDefaultShardRoutingRules: 47.6 => 47.9 Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/utils_test.go | 47 ++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index d79c4710b77..0c347845a15 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -7,6 +7,7 @@ import ( "math/rand/v2" "os" "os/exec" + "reflect" "sync" "testing" "time" @@ -16,12 +17,58 @@ import ( "vitess.io/vitess/go/testfiles" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/etcd2topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" ) +func TestCreateDefaultShardRoutingRules(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceShard := "0" + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{sourceShard}, + } + targetKeyspace := + &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + ms := &vtctldata.MaterializeSettings{ + Workflow: "wf1", + SourceKeyspace: sourceKeyspace.KeyspaceName, + TargetKeyspace: targetKeyspace.KeyspaceName, + TableSettings: []*vtctldata.TableMaterializeSettings{ + { + TargetTable: "t1", + SourceExpression: "select * from t1", + }, + { + TargetTable: "t2", + SourceExpression: "select * from t2", + }, + }, + Cell: "zone1", + SourceShards: []string{sourceShard}, + } + err := createDefaultShardRoutingRules(ctx, ms, env.ts) + require.NoError(t, err) + rules, err := topotools.GetShardRoutingRules(ctx, env.ts) + require.NoError(t, err) + require.Len(t, rules, 1) + want := map[string]string{ + fmt.Sprintf("%s.%s", targetKeyspace.KeyspaceName, sourceShard): sourceKeyspace.KeyspaceName, + } + if !reflect.DeepEqual(want, rules) { + require.FailNow(t, "unexpected rules", rules) + } +} + // TestUpdateKeyspaceRoutingRule confirms that the keyspace routing rules are updated correctly. func TestUpdateKeyspaceRoutingRule(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) From db590c70d9b5e5fa9b4354a4e33633b29e6bb1c4 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 26 Jul 2024 22:40:50 +0530 Subject: [PATCH 03/10] TestMount Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/mount_test.go | 76 ++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 go/vt/vtctl/workflow/mount_test.go diff --git a/go/vt/vtctl/workflow/mount_test.go b/go/vt/vtctl/workflow/mount_test.go new file mode 100644 index 00000000000..972b37c62ed --- /dev/null +++ b/go/vt/vtctl/workflow/mount_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtenv" +) + +func TestMount(t *testing.T) { + const ( + extCluster = "extcluster" + topoType = "etcd2" + topoServer = "localhost:2379" + topoRoot = "/vitess/global" + ) + ctx := context.Background() + ts := memorytopo.NewServer(ctx, "cell") + tmc := &fakeTMC{} + s := NewServer(vtenv.NewTestEnv(), ts, tmc) + + resp, err := s.MountRegister(ctx, &vtctldatapb.MountRegisterRequest{ + Name: extCluster, + TopoType: topoType, + TopoServer: topoServer, + TopoRoot: topoRoot, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + respList, err := s.MountList(ctx, &vtctldatapb.MountListRequest{}) + require.NoError(t, err) + require.NotNil(t, respList) + require.Equal(t, []string{extCluster}, respList.Names) + + respShow, err := s.MountShow(ctx, &vtctldatapb.MountShowRequest{ + Name: extCluster, + }) + require.NoError(t, err) + require.NotNil(t, respShow) + require.Equal(t, extCluster, respShow.Name) + require.Equal(t, topoType, respShow.TopoType) + require.Equal(t, topoServer, respShow.TopoServer) + require.Equal(t, topoRoot, respShow.TopoRoot) + + respUnregister, err := s.MountUnregister(ctx, &vtctldatapb.MountUnregisterRequest{ + Name: extCluster, + }) + require.NoError(t, err) + require.NotNil(t, respUnregister) + + respList, err = s.MountList(ctx, &vtctldatapb.MountListRequest{}) + require.NoError(t, err) + require.NotNil(t, respList) + require.Nil(t, respList.Names) +} From 88a24da80034029de3eb4e2b16a3e850b1f9b339 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 10:32:59 +0200 Subject: [PATCH 04/10] Add initial unit test for StreamMigrator Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/framework_test.go | 33 +++- go/vt/vtctl/workflow/materializer_env_test.go | 4 +- go/vt/vtctl/workflow/materializer_test.go | 14 +- go/vt/vtctl/workflow/resharder_test.go | 4 +- go/vt/vtctl/workflow/stream_migrator_test.go | 160 ++++++++++++++++++ 5 files changed, 207 insertions(+), 8 deletions(-) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 1d25aafa75f..b5d0a308261 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -256,6 +256,9 @@ type testTMClient struct { readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string + // Stack of ReadVReplicationWorkflowsResponse to return, in order, for each shard + readVReplicationWorkflowsResponses map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse + env *testEnv // For access to the env config from tmc methods. reverse atomic.Bool // Are we reversing traffic? frozen atomic.Bool // Are the workflows frozen? @@ -267,6 +270,7 @@ func newTestTMClient(env *testEnv) *testTMClient { vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), env: env, } @@ -285,6 +289,10 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil } +func (tmc *testTMClient) GetWorkflowKey(keyspace, shard string) string { + return fmt.Sprintf("%s/%s", keyspace, shard) +} + func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -463,6 +471,10 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet * tmc.mu.Lock() defer tmc.mu.Unlock() + workflowKey := tmc.GetWorkflowKey(tablet.Keyspace, tablet.Shard) + if resp := tmc.getVReplicationWorkflowsResponse(workflowKey); resp != nil { + return resp, nil + } workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables if len(req.IncludeWorkflows) > 0 { for _, wf := range req.IncludeWorkflows { @@ -494,7 +506,7 @@ func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet * }, }, }, - Pos: "MySQL56/" + position, + Pos: position, TimeUpdated: protoutil.TimeToProto(time.Now()), TimeHeartbeat: protoutil.TimeToProto(time.Now()), }, @@ -541,6 +553,25 @@ func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *top return nil } +func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + tmc.readVReplicationWorkflowsResponses[key] = append(tmc.readVReplicationWorkflowsResponses[key], resp) +} + +func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse { + if len(tmc.readVReplicationWorkflowsResponses) == 0 { + return nil + } + responses, ok := tmc.readVReplicationWorkflowsResponses[key] + if !ok || len(responses) == 0 { + return nil + } + resp := tmc.readVReplicationWorkflowsResponses[key][0] + tmc.readVReplicationWorkflowsResponses[key] = tmc.readVReplicationWorkflowsResponses[key][1:] + return resp +} + // // Utility / helper functions. // diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 569651f85ca..aada59c244d 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -61,7 +61,7 @@ type testMaterializerEnv struct { venv *vtenv.Environment } -//---------------------------------------------- +// ---------------------------------------------- // testMaterializerEnv func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sourceShards, targetShards []string) *testMaterializerEnv { @@ -426,7 +426,7 @@ func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Conte }, }, }, - Pos: "MySQL56/" + position, + Pos: position, TimeUpdated: protoutil.TimeToProto(time.Now()), TimeHeartbeat: protoutil.TimeToProto(time.Now()), } diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 51a7d22d5eb..763dd7c04d3 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -44,7 +44,7 @@ import ( ) const ( - position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" + position = "MySQL56/9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" mzCheckJournal = "/select val from _vt.resharding_journal where id=" mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" @@ -56,6 +56,14 @@ var ( defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() ) +func gtid(position string) string { + arr := strings.Split(position, "/") + if len(arr) != 2 { + return "" + } + return arr[1] +} + func TestStripForeignKeys(t *testing.T) { tcs := []struct { desc string @@ -577,7 +585,7 @@ func TestMoveTablesDDLFlag(t *testing.T) { sourceShard, err := env.topoServ.GetShardNames(ctx, ms.SourceKeyspace) require.NoError(t, err) want := fmt.Sprintf("shard_streams:{key:\"%s/%s\" value:{streams:{id:1 tablet:{cell:\"%s\" uid:200} source_shard:\"%s/%s\" position:\"%s\" status:\"Running\" info:\"VStream Lag: 0s\"}}} traffic_state:\"Reads Not Switched. Writes Not Switched\"", - ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0], position) + ms.TargetKeyspace, targetShard[0], env.cell, ms.SourceKeyspace, sourceShard[0], gtid(position)) res, err := env.ws.MoveTablesCreate(ctx, &vtctldatapb.MoveTablesCreateRequest{ Workflow: ms.Workflow, @@ -636,7 +644,7 @@ func TestMoveTablesNoRoutingRules(t *testing.T) { Uid: 200, }, SourceShard: fmt.Sprintf("%s/%s", ms.SourceKeyspace, sourceShard[0]), - Position: position, + Position: gtid(position), Status: binlogdatapb.VReplicationWorkflowState_Running.String(), Info: "VStream Lag: 0s", }, diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go index 1bb2f065e0f..6353f36db9f 100644 --- a/go/vt/vtctl/workflow/resharder_test.go +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -84,7 +84,7 @@ func TestReshardCreate(t *testing.T) { { Id: 1, Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, - SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + SourceShard: "targetks/0", Position: gtid(position), Status: "Running", Info: "VStream Lag: 0s", }, }, }, @@ -93,7 +93,7 @@ func TestReshardCreate(t *testing.T) { { Id: 1, Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, - SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + SourceShard: "targetks/0", Position: gtid(position), Status: "Running", Info: "VStream Lag: 0s", }, }, }, diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index 38ae10280f7..f27e4a161e7 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -21,6 +21,12 @@ import ( "encoding/json" "testing" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -347,3 +353,157 @@ func stringifyVRS(streams []*VReplicationStream) string { b, _ := json.Marshal(converted) return string(b) } + +func TestBuildStreamMigrator(t *testing.T) { + ctx := context.Background() + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "sourceks" + sourceKeyspace := &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + } + env := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace) + defer env.close() + vs := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, + }, + "ref": { + Type: vindexes.TypeReference, + }, + }, + } + initialKeyRange, err := key.ParseShardingSpec("-") + if err != nil || len(initialKeyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(initialKeyRange)) + } + leftKeyRange, err := key.ParseShardingSpec("-80") + if err != nil || len(leftKeyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(leftKeyRange)) + } + + rightKeyRange, err := key.ParseShardingSpec("80-") + if err != nil || len(rightKeyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(rightKeyRange)) + } + ksschema, err := vindexes.BuildKeyspaceSchema(vs, "ks", sqlparser.NewTestParser()) + require.NoError(t, err, "could not create test keyspace %+v", vs) + sources := make(map[string]*MigrationSource, len(sourceKeyspace.ShardNames)) + targets := make(map[string]*MigrationTarget, len(targetKeyspace.ShardNames)) + for i, shard := range sourceKeyspace.ShardNames { + tablet := env.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] + kr, _ := key.ParseShardingSpec(shard) + sources[shard] = &MigrationSource{ + si: topo.NewShardInfo(sourceKeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + } + for i, shard := range targetKeyspace.ShardNames { + tablet := env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] + kr, _ := key.ParseShardingSpec(shard) + targets[shard] = &MigrationTarget{ + si: topo.NewShardInfo(targetKeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + } + ts := &testTrafficSwitcher{ + trafficSwitcher: trafficSwitcher{ + migrationType: binlogdatapb.MigrationType_SHARDS, + workflow: "wf1", + id: 1, + sources: sources, + targets: targets, + sourceKeyspace: sourceKeyspaceName, + targetKeyspace: targetKeyspaceName, + sourceKSSchema: ksschema, + workflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + ws: env.ws, + }, + sourceKeyspaceSchema: ksschema, + } + parser := sqlparser.NewTestParser() + + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + wfName := "wfMat1" + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: wfName, + WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize, + }) + id := int32(1) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "t1", Filter: "select * from t1"}, + }, + }, + }, + Pos: "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := env.tmc.GetWorkflowKey(sourceKeyspaceName, "0") + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, &wfs, &wfs, &wfs, + } + for _, resp := range workflowResponses { + env.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } + sourceTabletIds := []int{100} + + for _, id := range sourceTabletIds { + queries := []string{ + "select distinct vrepl_id from _vt.copy_state where vrepl_id in (1)", + "update _vt.vreplication set state='Stopped', message='for cutover' where id in (1)", + } + for _, q := range queries { + env.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } + + // Shard -80 + env.tmc.expectVRQuery(200, "delete from _vt.vreplication where db_name='vt_sourceks' and workflow in ('wfMat1')", &sqltypes.Result{}) + // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the shards even if the key ranges don't intersect. TBD + env.tmc.expectVRQuery(200, "/insert into _vt.vreplication.*shard:\"-80\".*in_keyrange.*c1.*-80.*shard:\"80-\".*in_keyrange.*c1.*-80.*", &sqltypes.Result{}) + + // Shard 80- + env.tmc.expectVRQuery(210, "delete from _vt.vreplication where db_name='vt_sourceks' and workflow in ('wfMat1')", &sqltypes.Result{}) + env.tmc.expectVRQuery(210, "/insert into _vt.vreplication.*shard:\"-80\".*in_keyrange.*c1.*80-.*shard:\"80-\".*in_keyrange.*c1.*80-.*", &sqltypes.Result{}) + + sm, err := BuildStreamMigrator(ctx, ts, false, parser) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 1, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) +} From 5b09dfad3516aea26d63cbc97612020b16e7b5b8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 12:50:49 +0200 Subject: [PATCH 05/10] Refactor TestCreateDefaultShardRoutingRules Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/utils_test.go | 97 +++++++++++++++++++----------- 1 file changed, 63 insertions(+), 34 deletions(-) diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 0c347845a15..1f35b3720b1 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -24,48 +24,77 @@ import ( "vitess.io/vitess/go/vt/topotools" ) +// TestCreateDefaultShardRoutingRules confirms that the default shard routing rules are created correctly for sharded +// and unsharded keyspaces. func TestCreateDefaultShardRoutingRules(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - sourceShard := "0" - sourceKeyspace := &testKeyspace{ + + ks1 := &testKeyspace{ KeyspaceName: "sourceks", - ShardNames: []string{sourceShard}, } - targetKeyspace := - &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, + ks2 := &testKeyspace{ + KeyspaceName: "targetks", + } + + type testCase struct { + name string + sourceKeyspace *testKeyspace + targetKeyspace *testKeyspace + shards []string + want map[string]string + } + getExpectedRules := func(sourceKeyspace, targetKeyspace *testKeyspace) map[string]string { + rules := make(map[string]string) + for _, targetShard := range targetKeyspace.ShardNames { + rules[fmt.Sprintf("%s.%s", targetKeyspace.KeyspaceName, targetShard)] = sourceKeyspace.KeyspaceName } - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - ms := &vtctldata.MaterializeSettings{ - Workflow: "wf1", - SourceKeyspace: sourceKeyspace.KeyspaceName, - TargetKeyspace: targetKeyspace.KeyspaceName, - TableSettings: []*vtctldata.TableMaterializeSettings{ - { - TargetTable: "t1", - SourceExpression: "select * from t1", - }, - { - TargetTable: "t2", - SourceExpression: "select * from t2", - }, - }, - Cell: "zone1", - SourceShards: []string{sourceShard}, + return rules + } - err := createDefaultShardRoutingRules(ctx, ms, env.ts) - require.NoError(t, err) - rules, err := topotools.GetShardRoutingRules(ctx, env.ts) - require.NoError(t, err) - require.Len(t, rules, 1) - want := map[string]string{ - fmt.Sprintf("%s.%s", targetKeyspace.KeyspaceName, sourceShard): sourceKeyspace.KeyspaceName, + testCases := []testCase{ + { + name: "unsharded", + sourceKeyspace: ks1, + targetKeyspace: ks2, + shards: []string{"0"}, + }, + { + name: "sharded", + sourceKeyspace: ks2, + targetKeyspace: ks1, + shards: []string{"-80", "80-"}, + }, } - if !reflect.DeepEqual(want, rules) { - require.FailNow(t, "unexpected rules", rules) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.sourceKeyspace.ShardNames = tc.shards + tc.targetKeyspace.ShardNames = tc.shards + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + ms := &vtctldata.MaterializeSettings{ + Workflow: "wf1", + SourceKeyspace: tc.sourceKeyspace.KeyspaceName, + TargetKeyspace: tc.targetKeyspace.KeyspaceName, + TableSettings: []*vtctldata.TableMaterializeSettings{ + { + TargetTable: "t1", + SourceExpression: "select * from t1", + }, + }, + Cell: "zone1", + SourceShards: tc.sourceKeyspace.ShardNames, + } + err := createDefaultShardRoutingRules(ctx, ms, env.ts) + require.NoError(t, err) + rules, err := topotools.GetShardRoutingRules(ctx, env.ts) + require.NoError(t, err) + require.Len(t, rules, len(tc.shards)) + want := getExpectedRules(tc.sourceKeyspace, tc.targetKeyspace) + if !reflect.DeepEqual(want, rules) { + require.FailNowf(t, "unexpected rules", "got: %v, want: %v", rules, tc.want) + } + }) } } From b8a20f8cb0c0b2dc5bbdcbb80ee66bf05e7d4f3f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 16:45:52 +0200 Subject: [PATCH 06/10] Refactored test for stream migrator Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/mount_test.go | 1 + go/vt/vtctl/workflow/stream_migrator_test.go | 192 ++++++++++-------- .../workflow/vreplication_stream_test.go | 1 + 3 files changed, 111 insertions(+), 83 deletions(-) diff --git a/go/vt/vtctl/workflow/mount_test.go b/go/vt/vtctl/workflow/mount_test.go index 972b37c62ed..2fec275e4cb 100644 --- a/go/vt/vtctl/workflow/mount_test.go +++ b/go/vt/vtctl/workflow/mount_test.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/vt/vtenv" ) +// TestMount tests various Mount-related methods. func TestMount(t *testing.T) { const ( extCluster = "extcluster" diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index f27e4a161e7..2ee4fc66bbb 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -19,23 +19,22 @@ package workflow import ( "context" "encoding/json" + "fmt" "testing" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/sqlparser" - + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) @@ -354,81 +353,87 @@ func stringifyVRS(streams []*VReplicationStream) string { return string(b) } -func TestBuildStreamMigrator(t *testing.T) { - ctx := context.Background() - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "sourceks" - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"-80", "80-"}, - } - env := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace) - defer env.close() - vs := &vschemapb.Keyspace{ - Sharded: true, - Vindexes: map[string]*vschemapb.Vindex{ - "xxhash": { - Type: "xxhash", - }, +var testVSchema = &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", }, - Tables: map[string]*vschemapb.Table{ - "t1": { - ColumnVindexes: []*vschemapb.ColumnVindex{{ - Columns: []string{"c1"}, - Name: "xxhash", - }}, - }, - "t2": { - ColumnVindexes: []*vschemapb.ColumnVindex{{ - Columns: []string{"c1"}, - Name: "xxhash", - }}, - }, - "ref": { - Type: vindexes.TypeReference, - }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, + }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Columns: []string{"c1"}, + Name: "xxhash", + }}, }, + "ref": { + Type: vindexes.TypeReference, + }, + }, +} + +var ( + commerceKeyspace = &testKeyspace{ + KeyspaceName: "commerce", + ShardNames: []string{"0"}, } - initialKeyRange, err := key.ParseShardingSpec("-") - if err != nil || len(initialKeyRange) != 1 { - t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(initialKeyRange)) + customerUnshardedKeyspace = &testKeyspace{ + KeyspaceName: "customer", + ShardNames: []string{"0"}, } - leftKeyRange, err := key.ParseShardingSpec("-80") - if err != nil || len(leftKeyRange) != 1 { - t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(leftKeyRange)) + customerShardedKeyspace = &testKeyspace{ + KeyspaceName: "customer", + ShardNames: []string{"-80", "80-"}, } +) - rightKeyRange, err := key.ParseShardingSpec("80-") - if err != nil || len(rightKeyRange) != 1 { - t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(rightKeyRange)) - } - ksschema, err := vindexes.BuildKeyspaceSchema(vs, "ks", sqlparser.NewTestParser()) - require.NoError(t, err, "could not create test keyspace %+v", vs) +type streamMigratorEnv struct { + tenv *testEnv + ts *testTrafficSwitcher + sourceTabletIds []int + targetTabletIds []int +} + +func (env *streamMigratorEnv) close() { + env.tenv.close() +} + +func newStreamMigratorEnv(ctx context.Context, t *testing.T, sourceKeyspace, targetKeyspace *testKeyspace) *streamMigratorEnv { + tenv := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace) + env := &streamMigratorEnv{tenv: tenv} + + ksschema, err := vindexes.BuildKeyspaceSchema(testVSchema, "ks", sqlparser.NewTestParser()) + require.NoError(t, err, "could not create test keyspace %+v", testVSchema) sources := make(map[string]*MigrationSource, len(sourceKeyspace.ShardNames)) targets := make(map[string]*MigrationTarget, len(targetKeyspace.ShardNames)) for i, shard := range sourceKeyspace.ShardNames { - tablet := env.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] + tablet := tenv.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] kr, _ := key.ParseShardingSpec(shard) sources[shard] = &MigrationSource{ - si: topo.NewShardInfo(sourceKeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + si: topo.NewShardInfo(sourceKeyspace.KeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), primary: &topo.TabletInfo{ Tablet: tablet, }, } + env.sourceTabletIds = append(env.sourceTabletIds, int(tablet.Alias.Uid)) } for i, shard := range targetKeyspace.ShardNames { - tablet := env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] + tablet := tenv.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] kr, _ := key.ParseShardingSpec(shard) targets[shard] = &MigrationTarget{ - si: topo.NewShardInfo(targetKeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), + si: topo.NewShardInfo(targetKeyspace.KeyspaceName, shard, &topodatapb.Shard{KeyRange: kr[0]}, nil), primary: &topo.TabletInfo{ Tablet: tablet, }, } + env.targetTabletIds = append(env.targetTabletIds, int(tablet.Alias.Uid)) } ts := &testTrafficSwitcher{ trafficSwitcher: trafficSwitcher{ @@ -437,66 +442,86 @@ func TestBuildStreamMigrator(t *testing.T) { id: 1, sources: sources, targets: targets, - sourceKeyspace: sourceKeyspaceName, - targetKeyspace: targetKeyspaceName, + sourceKeyspace: sourceKeyspace.KeyspaceName, + targetKeyspace: targetKeyspace.KeyspaceName, sourceKSSchema: ksschema, workflowType: binlogdatapb.VReplicationWorkflowType_Reshard, - ws: env.ws, + ws: tenv.ws, }, sourceKeyspaceSchema: ksschema, } - parser := sqlparser.NewTestParser() + env.ts = ts + + return env +} +func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sourceShard string) { var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse wfName := "wfMat1" wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ Workflow: wfName, WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize, }) - id := int32(1) wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ Id: id, Bls: &binlogdatapb.BinlogSource{ - Keyspace: sourceKeyspaceName, - Shard: "0", + Keyspace: env.tenv.sourceKeyspace.KeyspaceName, + Shard: sourceShard, Filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{ {Match: "t1", Filter: "select * from t1"}, }, }, }, - Pos: "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + Pos: position, State: binlogdatapb.VReplicationWorkflowState_Running, }) - workflowKey := env.tmc.GetWorkflowKey(sourceKeyspaceName, "0") + workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard) workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ nil, &wfs, &wfs, &wfs, } for _, resp := range workflowResponses { - env.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) } - sourceTabletIds := []int{100} - - for _, id := range sourceTabletIds { + for _, id := range env.sourceTabletIds { + queries := []string{ + fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id), + fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in (%d)", id), + } + for _, q := range queries { + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } + for _, id := range env.targetTabletIds { queries := []string{ - "select distinct vrepl_id from _vt.copy_state where vrepl_id in (1)", - "update _vt.vreplication set state='Stopped', message='for cutover' where id in (1)", + fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, wfName), } for _, q := range queries { - env.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) } } +} - // Shard -80 - env.tmc.expectVRQuery(200, "delete from _vt.vreplication where db_name='vt_sourceks' and workflow in ('wfMat1')", &sqltypes.Result{}) - // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the shards even if the key ranges don't intersect. TBD - env.tmc.expectVRQuery(200, "/insert into _vt.vreplication.*shard:\"-80\".*in_keyrange.*c1.*-80.*shard:\"80-\".*in_keyrange.*c1.*-80.*", &sqltypes.Result{}) +func TestBuildStreamMigrator(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + tmc := env.tenv.tmc + + addMaterializeWorkflow(t, env, 100, "0") - // Shard 80- - env.tmc.expectVRQuery(210, "delete from _vt.vreplication where db_name='vt_sourceks' and workflow in ('wfMat1')", &sqltypes.Result{}) - env.tmc.expectVRQuery(210, "/insert into _vt.vreplication.*shard:\"-80\".*in_keyrange.*c1.*80-.*shard:\"80-\".*in_keyrange.*c1.*80-.*", &sqltypes.Result{}) + // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the shards even if the key ranges don't intersect. TBD + getInsert := func(shard string) string { + s := "/insert into _vt.vreplication.*" + s += fmt.Sprintf("shard:\"-80\".*in_keyrange.*c1.*%s.*", shard) + s += fmt.Sprintf("shard:\"80-\".*in_keyrange.*c1.*%s.*", shard) + return s + } + tmc.expectVRQuery(200, getInsert("-80"), &sqltypes.Result{}) + tmc.expectVRQuery(210, getInsert("80-"), &sqltypes.Result{}) - sm, err := BuildStreamMigrator(ctx, ts, false, parser) + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) require.NoError(t, err) require.NotNil(t, sm) require.NotNil(t, sm.streams) @@ -506,4 +531,5 @@ func TestBuildStreamMigrator(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(workflows)) require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 1) } diff --git a/go/vt/vtctl/workflow/vreplication_stream_test.go b/go/vt/vtctl/workflow/vreplication_stream_test.go index d393ebc50c3..6269cfa978e 100644 --- a/go/vt/vtctl/workflow/vreplication_stream_test.go +++ b/go/vt/vtctl/workflow/vreplication_stream_test.go @@ -22,6 +22,7 @@ import ( "testing" ) +// TestVReplicationStreams tests various methods of VReplicationStreams. func TestVReplicationStreams(t *testing.T) { var streams VReplicationStreams for i := 1; i <= 3; i++ { From 93a7d825dd74892a6d9702b33affbc85c5fb9b1b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 16:52:56 +0200 Subject: [PATCH 07/10] Add test for no migrating streams Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/stream_migrator_test.go | 26 +++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index 2ee4fc66bbb..7b581f13d1c 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -478,7 +478,8 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour }) workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard) workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ - nil, &wfs, &wfs, &wfs, + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls } for _, resp := range workflowResponses { env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) @@ -503,7 +504,7 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour } } -func TestBuildStreamMigrator(t *testing.T) { +func TestBuildStreamMigratorOneMaterialize(t *testing.T) { ctx := context.Background() env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) defer env.close() @@ -511,7 +512,8 @@ func TestBuildStreamMigrator(t *testing.T) { addMaterializeWorkflow(t, env, 100, "0") - // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the shards even if the key ranges don't intersect. TBD + // FIXME: Note: currently it is not optimal: we create two streams for each shard from all the + // shards even if the key ranges don't intersect. TBD getInsert := func(shard string) string { s := "/insert into _vt.vreplication.*" s += fmt.Sprintf("shard:\"-80\".*in_keyrange.*c1.*%s.*", shard) @@ -533,3 +535,21 @@ func TestBuildStreamMigrator(t *testing.T) { require.NoError(t, sm.MigrateStreams(ctx)) require.Len(t, sm.templates, 1) } + +func TestBuildStreamMigratorNoStreams(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 0, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 0) +} From 8ca8200cc071012688d8526664e22d1c4351bf2b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 17:17:13 +0200 Subject: [PATCH 08/10] More refactor Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/stream_migrator_test.go | 87 +++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index 7b581f13d1c..ffae171199d 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -405,6 +405,22 @@ func (env *streamMigratorEnv) close() { env.tenv.close() } +func (env *streamMigratorEnv) addSourceQueries(queries []string) { + for _, id := range env.sourceTabletIds { + for _, q := range queries { + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } +} + +func (env *streamMigratorEnv) addTargetQueries(queries []string) { + for _, id := range env.targetTabletIds { + for _, q := range queries { + env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) + } + } +} + func newStreamMigratorEnv(ctx context.Context, t *testing.T, sourceKeyspace, targetKeyspace *testKeyspace) *streamMigratorEnv { tenv := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace) env := &streamMigratorEnv{tenv: tenv} @@ -484,6 +500,50 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour for _, resp := range workflowResponses { env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) } + queries := []string{ + fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id), + fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in (%d)", id), + fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, wfName), + } + env.addSourceQueries(queries) + queries = []string{ + fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, wfName), + } + env.addTargetQueries(queries) + +} + +func addReferenceWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sourceShard string) { + var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse + wfName := "wfRef1" + wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{ + Workflow: wfName, + WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize, + }) + wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{ + Id: id, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: env.tenv.sourceKeyspace.KeyspaceName, + Shard: sourceShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "ref", Filter: "select * from ref"}, + }, + }, + }, + Pos: position, + State: binlogdatapb.VReplicationWorkflowState_Running, + }) + workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard) + workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{ + nil, // this is the response for getting stopped workflows + &wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls + } + for _, resp := range workflowResponses { + env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) + } for _, id := range env.sourceTabletIds { queries := []string{ fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id), @@ -496,7 +556,7 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour for _, id := range env.targetTabletIds { queries := []string{ fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", - env.tenv.sourceKeyspace.KeyspaceName, wfName), + env.tenv.targetKeyspace.KeyspaceName, wfName), } for _, q := range queries { env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) @@ -534,6 +594,11 @@ func TestBuildStreamMigratorOneMaterialize(t *testing.T) { require.Equal(t, 1, len(workflows)) require.NoError(t, sm.MigrateStreams(ctx)) require.Len(t, sm.templates, 1) + env.addTargetQueries([]string{ + fmt.Sprintf("update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow in ('%s')", + env.tenv.sourceKeyspace.KeyspaceName, "wfMat1"), + }) + require.NoError(t, StreamMigratorFinalize(ctx, env.ts, []string{"wfMat1"})) } func TestBuildStreamMigratorNoStreams(t *testing.T) { @@ -553,3 +618,23 @@ func TestBuildStreamMigratorNoStreams(t *testing.T) { require.NoError(t, sm.MigrateStreams(ctx)) require.Len(t, sm.templates, 0) } + +func TestBuildStreamMigratorRefStream(t *testing.T) { + ctx := context.Background() + env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace) + defer env.close() + + addReferenceWorkflow(t, env, 100, "0") + + sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser()) + require.NoError(t, err) + require.NotNil(t, sm) + require.NotNil(t, sm.streams) + require.Equal(t, 0, len(sm.streams)) + + workflows, err := sm.StopStreams(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(workflows)) + require.NoError(t, sm.MigrateStreams(ctx)) + require.Len(t, sm.templates, 0) +} From 6b7688abaa09c89a8c41a2359adbeceb8f26a390 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jul 2024 17:18:42 +0200 Subject: [PATCH 09/10] Cleanup reference test Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/stream_migrator_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/go/vt/vtctl/workflow/stream_migrator_test.go b/go/vt/vtctl/workflow/stream_migrator_test.go index ffae171199d..5e9c2a79038 100644 --- a/go/vt/vtctl/workflow/stream_migrator_test.go +++ b/go/vt/vtctl/workflow/stream_migrator_test.go @@ -544,24 +544,6 @@ func addReferenceWorkflow(t *testing.T, env *streamMigratorEnv, id int32, source for _, resp := range workflowResponses { env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp) } - for _, id := range env.sourceTabletIds { - queries := []string{ - fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id), - fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in (%d)", id), - } - for _, q := range queries { - env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) - } - } - for _, id := range env.targetTabletIds { - queries := []string{ - fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')", - env.tenv.targetKeyspace.KeyspaceName, wfName), - } - for _, q := range queries { - env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{}) - } - } } func TestBuildStreamMigratorOneMaterialize(t *testing.T) { From a1aba96875c9435574b991a1c822617d2dcf09a2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 18 Aug 2024 19:35:38 +0200 Subject: [PATCH 10/10] Address review comments Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/utils_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/vt/vtctl/workflow/utils_test.go b/go/vt/vtctl/workflow/utils_test.go index 1f35b3720b1..b315e1aa991 100644 --- a/go/vt/vtctl/workflow/utils_test.go +++ b/go/vt/vtctl/workflow/utils_test.go @@ -7,7 +7,6 @@ import ( "math/rand/v2" "os" "os/exec" - "reflect" "sync" "testing" "time" @@ -91,9 +90,7 @@ func TestCreateDefaultShardRoutingRules(t *testing.T) { require.NoError(t, err) require.Len(t, rules, len(tc.shards)) want := getExpectedRules(tc.sourceKeyspace, tc.targetKeyspace) - if !reflect.DeepEqual(want, rules) { - require.FailNowf(t, "unexpected rules", "got: %v, want: %v", rules, tc.want) - } + require.EqualValues(t, want, rules) }) } }