diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 618bdac292c..96811da0e85 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -479,13 +479,9 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { testcases := []struct { name string sourceKeyspace, targetKeyspace *testKeyspace - preFunc func(t *testing.T, env *testEnv) req *vtctldatapb.WorkflowSwitchTrafficRequest - expectedSourceQueries []*queryResult - expectedTargetQueries []*queryResult want *vtctldatapb.WorkflowSwitchTrafficResponse wantErr bool - postFunc func(t *testing.T, env *testEnv) }{ { name: "basic forward", @@ -540,15 +536,6 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) defer env.close() env.tmc.schema = schema - if tc.preFunc != nil { - tc.preFunc(t, env) - } - if tc.expectedSourceQueries != nil { - require.NotNil(t, env.tablets[tc.sourceKeyspace.KeyspaceName]) - for _, eq := range tc.expectedSourceQueries { - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, eq) - } - } if tc.req.Direction == int32(DirectionForward) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, cutoverQR) @@ -601,51 +588,218 @@ func TestMoveTablesTrafficSwitching(t *testing.T) { env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) } - if tc.expectedTargetQueries != nil { - require.NotNil(t, env.tablets[tc.targetKeyspace.KeyspaceName]) - for _, eq := range tc.expectedTargetQueries { - env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, eq) - } - } got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) if (err != nil) != tc.wantErr { require.Fail(t, "unexpected error value", "Server.WorkflowSwitchTraffic() error = %v, wantErr %v", err, tc.wantErr) return } require.Equal(t, tc.want.String(), got.String(), "Server.WorkflowSwitchTraffic() = %v, want %v", got, tc.want) - if tc.postFunc != nil { - tc.postFunc(t, env) - } else { // Default post checks - // Confirm that we have the expected routing rules. - rr, err := env.ts.GetRoutingRules(ctx) - require.NoError(t, err) - to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) - if tc.req.Direction == int32(DirectionBackward) { - to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + + // Confirm that we have the expected routing rules. + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + for _, rr := range rr.Rules { + for _, tt := range rr.ToTables { + require.Equal(t, to, tt) } - for _, rr := range rr.Rules { - for _, tt := range rr.ToTables { - require.Equal(t, to, tt) + } + // Confirm that we have the expected denied tables entires. + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) + require.NoError(t, err) + switch { + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): + require.True(t, hasDeniedTableEntry(si)) + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): + require.True(t, hasDeniedTableEntry(si)) } } - // Confirm that we have the expected denied tables entires. - for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { - for _, shardName := range keyspace.ShardNames { - si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) - require.NoError(t, err) - switch { - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): - require.True(t, hasDeniedTableEntry(si)) - case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): - require.False(t, hasDeniedTableEntry(si)) - case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): - require.True(t, hasDeniedTableEntry(si)) - } + } + }) + } +} + +func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + vrID := 1 + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + "t1": { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + copyTableQR := &queryResult{ + query: fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)", + vrID, vrID), + result: &querypb.QueryResult{}, + } + journalQR := &queryResult{ + query: "/select val from _vt.resharding_journal.*", + result: &querypb.QueryResult{}, + } + lockTableQR := &queryResult{ + query: fmt.Sprintf("LOCK TABLES `%s` READ", tableName), + result: &querypb.QueryResult{}, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + req *vtctldatapb.WorkflowSwitchTrafficRequest + want []string + }{ + { + name: "basic forward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionForward), + TabletTypes: tabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tableName, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:0;position:%s]", sourceKeyspaceName, tableName, sourceKeyspaceName, position), + "Wait for vreplication on stopped streams to catchup for up to 30s", + fmt.Sprintf("Create reverse vreplication workflow %s", ReverseWorkflowName(workflowName)), + "Create journal entries on source databases", + fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", targetKeyspaceName, tableName), + fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", sourceKeyspaceName, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName), + fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d,tablet:%d]", startingTargetTabletUID, startingTargetTabletUID+tabletUIDStep), + fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:-80;tablet:%d;workflow:%s;dbname:vt_%s,keyspace:%s;shard:80-;tablet:%d;workflow:%s;dbname:vt_%s]", + targetKeyspaceName, startingTargetTabletUID, workflowName, targetKeyspaceName, targetKeyspaceName, startingTargetTabletUID+tabletUIDStep, workflowName, targetKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + }, + }, + { + name: "basic backward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: tabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tableName, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", + targetKeyspaceName, tableName, targetKeyspaceName, position, targetKeyspaceName, position), + "Wait for vreplication on stopped streams to catchup for up to 30s", + fmt.Sprintf("Create reverse vreplication workflow %s", workflowName), + "Create journal entries on source databases", + fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", sourceKeyspaceName, tableName), + fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", targetKeyspaceName, sourceKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tableName), + fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d]", startingSourceTabletUID), + fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:0;tablet:%d;workflow:%s;dbname:vt_%s]", + sourceKeyspaceName, startingSourceTabletUID, ReverseWorkflowName(workflowName), sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + require.NotNil(t, tc.req) + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + if tc.req.Direction == int32(DirectionForward) { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, lockTableQR) + } + } else { + env.tmc.reverse.Store(true) + // Setup the routing rules as they would be after having previously done SwitchTraffic. + ks := env.targetKeyspace.KeyspaceName + toTarget := []string{ks + "." + tableName} + rules := make(map[string][]string) + for _, tabletType := range tabletTypes { + tt := strings.ToLower(tabletType.String()) + if tabletType == topodatapb.TabletType_PRIMARY { + rules[tableName] = toTarget + rules[ks+"."+tableName] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget + } else { + rules[tableName+"@"+tt] = toTarget + rules[ks+"."+tableName+"@"+tt] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget } } + err := topotools.SaveRoutingRules(ctx, env.ts, rules) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } } + got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) + require.NoError(t, err) + + require.EqualValues(t, tc.want, got.DryRunResults, "Server.WorkflowSwitchTraffic(DryRun:true) = %v, want %v", got.DryRunResults, tc.want) }) } }