diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 45db1dc4bd2..6bd60b63191 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -74,6 +74,7 @@ type VttabletProcess struct { SupportsBackup bool ExplicitServingStatus bool ServingStatus string + DbName string DbPassword string DbPort int DbFlavor string @@ -148,6 +149,8 @@ func (vttablet *VttabletProcess) Setup() (err error) { return } + vttablet.DbName = "vt_" + vttablet.Keyspace + vttablet.exit = make(chan error) go func() { if vttablet.proc != nil { @@ -442,8 +445,11 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro // CreateDB creates the database for keyspace func (vttablet *VttabletProcess) CreateDB(keyspace string) error { - _, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS vt_%s", keyspace), keyspace, false) - _, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace), keyspace, false) + if vttablet.DbName == "" { + vttablet.DbName = "vt_" + keyspace + } + _, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS %s", vttablet.DbName), keyspace, false) + _, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS %s", vttablet.DbName), keyspace, false) return err } diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 4764b213ad6..eca4c312ae7 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -92,7 +92,7 @@ func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout ti select { case <-ctx.Done(): require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v", - query, timeout, qr.Rows)) + query, timeout, qr)) case <-ticker.C: log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick) } @@ -147,19 +147,6 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri return qr } -func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result { - if strings.TrimSpace(query) == "" { - return nil - } - if database != "" { - execQuery(t, conn, "use `"+database+"`;") - } - execQuery(t, conn, "begin") - qr := execQueryWithRetry(t, conn, query, timeout) - execQuery(t, conn, "commit") - return qr -} - func checkHealth(t *testing.T, url string) bool { resp, err := http.Get(url) require.NoError(t, err) @@ -516,20 +503,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) { require.NotEmpty(t, output) gotDryRun := strings.Split(output, "\n") require.True(t, len(gotDryRun) > 3) - startRow := 3 - if strings.Contains(gotDryRun[0], "deprecated") { + var startRow int + if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient + startRow = 3 + } else if strings.Contains(gotDryRun[0], "deprecated") { startRow = 4 + } else { + startRow = 2 } gotDryRun = gotDryRun[startRow : len(gotDryRun)-1] if len(want) != len(gotDryRun) { - t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n")) + require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n")) } var match, fail bool fail = false for i, w := range want { w = strings.TrimSpace(w) g := strings.TrimSpace(gotDryRun[i]) - if w[0] == '/' { + if len(w) > 0 && w[0] == '/' { w = strings.TrimSpace(w[1:]) result := strings.HasPrefix(g, w) match = result @@ -538,11 +529,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) { } if !match { fail = true - t.Fatalf("want %s, got %s\n", w, gotDryRun[i]) + require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i]) } } if fail { - t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun) + require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun) } } @@ -578,7 +569,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st var err error found := false if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil { - t.Fatalf("%v %v", err, output) + require.Fail(t, "GetShard error", "%v %v", err, output) return false, err } jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) { @@ -602,8 +593,8 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want)) } -// confirmAllStreamsRunning confirms that all of the migrated streams are running -// after a Reshard. +// confirmAllStreamsRunning confirms that all of the workflow's streams are +// in the running state. func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) { query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'", sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query @@ -801,7 +792,7 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int { query := fmt.Sprintf("select count(*) from %s", table) - qr := execVtgateQuery(t, vtgateConn, "", query) + qr := execQuery(t, vtgateConn, query) numRows, _ := qr.Rows[0][0].ToInt() return numRows } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c06489006f8..db58f2880c2 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -367,6 +367,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3) reshardCustomer3to1Merge(t) confirmAllStreamsRunning(t, vtgateConn, "customer:0") + expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1) t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) { @@ -605,7 +606,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts) // Add cell alias containing only zone2 - result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias") + result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias") require.NoError(t, err, "command failed with output: %v", result) verifyClusterHealth(t, vc) @@ -722,10 +723,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'") execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')") waitForNoWorkflowLag(t, vc, targetKs, workflow) - for _, shard := range []string{"-80", "80-"} { - shardTarget := fmt.Sprintf("%s:%s", targetKs, shard) - if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 { - waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`) + for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} { + // Query the tablet's mysqld directly as the targets will have denied table entries. + dbc, err := tablet.TabletConn(targetKs, true) + require.NoError(t, err) + defer dbc.Close() + if res := execQuery(t, dbc, "select cid from customer"); len(res.Rows) > 0 { + waitForQueryResult(t, dbc, tablet.DbName, "select distinct dec80 from customer", `[[DECIMAL(0)]]`) dec80Replicated = true } } @@ -833,7 +837,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl printShardPositions(vc, ksShards) switchWrites(t, workflowType, ksWorkflow, true) - output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow) require.NoError(t, err) require.Contains(t, output, "'customer.reverse_bits'") require.Contains(t, output, "'customer.bmd5'") @@ -942,7 +946,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) { var err error for _, shard := range strings.Split("-80,80-", ",") { - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) if err == nil { t.Fatal("GetShard merchant:-80 failed") } @@ -951,7 +955,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) { for _, shard := range strings.Split("-40,40-c0,c0-", ",") { ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard) - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard) + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard) if err != nil { t.Fatalf("GetShard merchant failed for: %s: %v", shard, err) } @@ -1400,7 +1404,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) { waitDuration := 500 * time.Millisecond duration := maxWait for duration > 0 { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow) require.NoError(t, err) lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag") @@ -1483,7 +1487,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t } func applyVSchema(t *testing.T, vschema, keyspace string) { - err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace) + err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace) require.NoError(t, err) } @@ -1494,8 +1498,10 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry "workflow type specified: %s", workflowType) } ensureCanSwitch(t, workflowType, cells, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica", - "--dry_run", "SwitchTraffic", ksWorkflow) + ks, wf, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok) + output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic", + "--cells="+cells, "--tablet-types=rdonly,replica", "--dry-run") require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output)) if dryRunResults != nil { validateDryRunResults(t, output, dryRunResults) @@ -1503,10 +1509,13 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry } func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) { + ks, wf, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok) timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - _, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow) + _, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic", + "--cells="+cells, "--dry-run") if err == nil { return } @@ -1532,11 +1541,13 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b command = "ReverseTraffic" } ensureCanSwitch(t, workflowType, cells, ksWorkflow) - output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly", - command, ksWorkflow) + ks, wf, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok) + output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, + "--cells="+cells, "--tablet-types=rdonly") require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output)) - output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica", - command, ksWorkflow) + output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, + "--cells="+cells, "--tablet-types=replica") require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output)) } @@ -1575,8 +1586,10 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard", "workflow type specified: %s", workflowType) } - output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run", - "SwitchTraffic", ksWorkflow) + ks, wf, ok := strings.Cut(ksWorkflow, ".") + require.True(t, ok) + output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, + "SwitchTraffic", "--tablet-types=primary", "--dry-run") require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output)) validateDryRunResults(t, output, dryRunResults) } diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 4500a98868c..6073cfac6ab 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -19,31 +19,28 @@ package vreplication var dryRunResultsSwitchWritesCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:", - "/ Keyspace product, Shard 0 at Position", - "Wait for VReplication on stopped streams to catchup for up to 30s", - "Create reverse replication workflow p2c_reverse", + "/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:", + "Wait for vreplication on stopped streams to catchup for up to 30s", + "Create reverse vreplication workflow p2c_reverse", "Create journal entries on source databases", - "Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]", + "Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]", "Switch routing from keyspace product to keyspace customer", "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", - "Switch writes completed, freeze and delete vreplication streams on:", - " tablet 200 ", - " tablet 300 ", - "Start reverse replication streams on:", - " tablet 100 ", - "Mark vreplication streams frozen on:", - " Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer", - " Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer", + "Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]", + "Start reverse vreplication streams on: [tablet:100]", + "Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]", "Unlock keyspace customer", "Unlock keyspace product", + "", // Additional empty newline in the output } var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", "Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]", "Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated", + "Serving VSchema will be rebuilt for the customer keyspace", "Unlock keyspace product", + "", // Additional empty newline in the output } var dryRunResultsSwitchWritesM2m3 = []string{ diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index b9554bf789f..77983f20d2d 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -21,7 +21,7 @@ import ( "encoding/hex" "fmt" "path" - "reflect" + "slices" "sort" "strings" "sync" @@ -44,8 +44,8 @@ import ( ) const ( - dlTablesAlreadyPresent = "one or more tables are already present in the denylist" - dlTablesNotPresent = "cannot remove tables since one or more do not exist in the denylist" + dlTablesAlreadyPresent = "one or more tables were already present in the denylist" + dlTablesNotPresent = "one or more tables did not exist in the denylist" dlNoCellsForPrimary = "you cannot specify cells for a primary's tablet control" ) @@ -397,16 +397,15 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata } tc := si.GetTabletControl(tabletType) if tc == nil { - - // handle the case where the TabletControl object is new + // Handle the case where the TabletControl object is new. if remove { - // we try to remove from something that doesn't exist, - // log, but we're done. + // We tried to remove something that doesn't exist, log a warning. + // But we know that our work is done. log.Warningf("Trying to remove TabletControl.DeniedTables for missing type %v in shard %v/%v", tabletType, si.keyspace, si.shardName) return nil } - // trying to add more constraints with no existing record + // Add constraints to the new record. si.TabletControls = append(si.TabletControls, &topodatapb.Shard_TabletControl{ TabletType: tabletType, Cells: cells, @@ -422,16 +421,16 @@ func (si *ShardInfo) UpdateDeniedTables(ctx context.Context, tabletType topodata return nil } - // we have an existing record, check table lists matches and + // We have an existing record, update the table lists. if remove { si.removeCellsFromTabletControl(tc, tabletType, cells) } else { - if !reflect.DeepEqual(tc.DeniedTables, tables) { + if !slices.Equal(tc.DeniedTables, tables) { return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "trying to use two different sets of denied tables for shard %v/%v: %v and %v", si.keyspace, si.shardName, tc.DeniedTables, tables) } - tc.Cells = addCells(tc.Cells, cells) } + return nil } @@ -451,7 +450,8 @@ func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletContr } if remove { if len(newTables) != 0 { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, dlTablesNotPresent) + // These tables did not exist in the denied list so we don't need to remove them. + log.Warningf("%s:%s", dlTablesNotPresent, strings.Join(newTables, ",")) } var newDenyList []string if len(tables) != 0 { // legacy uses @@ -475,7 +475,16 @@ func (si *ShardInfo) updatePrimaryTabletControl(tc *topodatapb.Shard_TabletContr return nil } if len(newTables) != len(tables) { - return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, dlTablesAlreadyPresent) + // Some of the tables already existed in the DeniedTables list so we don't + // need to add them. + log.Warningf("%s:%s", dlTablesAlreadyPresent, strings.Join(tables, ",")) + // We do need to merge the lists, however. + tables = append(tables, newTables...) + tc.DeniedTables = append(tc.DeniedTables, tables...) + // And be sure to remove any duplicates. + slices.Sort(tc.DeniedTables) + tc.DeniedTables = slices.Compact(tc.DeniedTables) + return nil } tc.DeniedTables = append(tc.DeniedTables, tables...) return nil diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index 9afc8d0ea78..5e30a8aad2a 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/utils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -121,14 +122,14 @@ func TestUpdateSourcePrimaryDeniedTables(t *testing.T) { require.NoError(t, addToDenyList(ctx, si, primary, nil, tables2)) validateDenyList(t, si, primary, nil, append(tables1, tables2...)) - require.Error(t, addToDenyList(ctx, si, primary, nil, tables2), dlTablesAlreadyPresent) - require.Error(t, addToDenyList(ctx, si, primary, nil, []string{t1}), dlTablesAlreadyPresent) + require.NoError(t, addToDenyList(ctx, si, primary, nil, tables2)) + require.NoError(t, addToDenyList(ctx, si, primary, nil, []string{t1})) require.NoError(t, removeFromDenyList(ctx, si, primary, nil, tables2)) validateDenyList(t, si, primary, nil, tables1) - require.Error(t, removeFromDenyList(ctx, si, primary, nil, tables2), dlTablesNotPresent) - require.Error(t, removeFromDenyList(ctx, si, primary, nil, []string{t3}), dlTablesNotPresent) + require.NoError(t, removeFromDenyList(ctx, si, primary, nil, tables2)) + require.NoError(t, removeFromDenyList(ctx, si, primary, nil, []string{t3})) validateDenyList(t, si, primary, nil, tables1) require.NoError(t, removeFromDenyList(ctx, si, primary, nil, []string{t1})) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go new file mode 100644 index 00000000000..73b34015338 --- /dev/null +++ b/go/vt/vtctl/workflow/framework_test.go @@ -0,0 +1,458 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "os" + "regexp" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/protoutil" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + _flag "vitess.io/vitess/go/internal/flag" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +const ( + defaultCellName = "cell" + startingSourceTabletUID = 100 + startingTargetTabletUID = 200 + tabletUIDStep = 10 +) + +type testKeyspace struct { + KeyspaceName string + ShardNames []string +} + +type queryResult struct { + query string + result *querypb.QueryResult +} + +func TestMain(m *testing.M) { + _flag.ParseFlagsForTest() + os.Exit(m.Run()) +} + +type testEnv struct { + ws *Server + ts *topo.Server + tmc *testTMClient + sourceKeyspace, targetKeyspace *testKeyspace + // Keyed first by keyspace name, then tablet UID. + tablets map[string]map[int]*topodatapb.Tablet + cell string +} + +func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace, targetKeyspace *testKeyspace) *testEnv { + t.Helper() + env := &testEnv{ + ts: memorytopo.NewServer(ctx, cell), + sourceKeyspace: sourceKeyspace, + targetKeyspace: targetKeyspace, + tablets: make(map[string]map[int]*topodatapb.Tablet), + cell: cell, + } + venv := vtenv.NewTestEnv() + env.tmc = newTestTMClient(env) + env.ws = NewServer(venv, env.ts, env.tmc) + + tabletID := startingSourceTabletUID + for _, shardName := range sourceKeyspace.ShardNames { + _ = env.addTablet(t, ctx, tabletID, sourceKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY) + tabletID += tabletUIDStep + } + if sourceKeyspace.KeyspaceName != targetKeyspace.KeyspaceName { + tabletID = startingTargetTabletUID + for _, shardName := range targetKeyspace.ShardNames { + _ = env.addTablet(t, ctx, tabletID, targetKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY) + tabletID += tabletUIDStep + } + } + err := env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + + return env +} + +func (env *testEnv) close() { + for _, k := range maps.Values(env.tablets) { + for _, t := range maps.Values(k) { + env.deleteTablet(t) + } + } +} + +func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspace, shard string, tabletType topodatapb.TabletType) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + if env.tablets[keyspace] == nil { + env.tablets[keyspace] = make(map[int]*topodatapb.Tablet) + } + env.tablets[keyspace][id] = tablet + err := env.ws.ts.InitTablet(ctx, tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */) + require.NoError(t, err) + if tabletType == topodatapb.TabletType_PRIMARY { + _, err = env.ws.ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true + return nil + }) + require.NoError(t, err) + } + return tablet +} + +// addTableRoutingRules adds routing rules from the test env's source keyspace to +// its target keyspace for the given tablet types and tables. +func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) { + ks := env.targetKeyspace.KeyspaceName + rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3)) + for _, tabletType := range tabletTypes { + for _, tableName := range tables { + toTarget := []string{ks + "." + tableName} + tt := strings.ToLower(tabletType.String()) + if tabletType == topodatapb.TabletType_PRIMARY { + rules[tableName] = toTarget + rules[ks+"."+tableName] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget + } else { + rules[tableName+"@"+tt] = toTarget + rules[ks+"."+tableName+"@"+tt] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget + } + } + } + err := topotools.SaveRoutingRules(ctx, env.ts, rules) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) +} + +func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) { + _ = env.ts.DeleteTablet(context.Background(), tablet.Alias) + delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid)) +} + +type testTMClient struct { + tmclient.TabletManagerClient + schema map[string]*tabletmanagerdatapb.SchemaDefinition + + mu sync.Mutex + vrQueries map[int][]*queryResult + createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + + env *testEnv // For access to the env config from tmc methods. + reverse atomic.Bool // Are we reversing traffic? +} + +func newTestTMClient(env *testEnv) *testTMClient { + return &testTMClient{ + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + env: env, + } +} + +func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, req) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) + } + } + res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") + return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil +} + +func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, req) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect) + } + } + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables + if strings.Contains(req.Workflow, "lookup") { + workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex + } + res := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Workflow: req.Workflow, + WorkflowType: workflowType, + Streams: make([]*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream, 0, 2), + } + rules := make([]*binlogdatapb.Rule, len(tmc.schema)) + for i, table := range maps.Keys(tmc.schema) { + rules[i] = &binlogdatapb.Rule{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + } + } + blsKs := tmc.env.sourceKeyspace + if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName { + blsKs = tmc.env.targetKeyspace + } + for i, shard := range blsKs.ShardNames { + stream := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + Id: int32(i + 1), + Bls: &binlogdatapb.BinlogSource{ + Keyspace: blsKs.KeyspaceName, + Shard: shard, + Tables: maps.Keys(tmc.schema), + Filter: &binlogdatapb.Filter{ + Rules: rules, + }, + }, + } + res.Streams = append(res.Streams, stream) + } + + return res, nil +} + +func (tmc *testTMClient) DeleteVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.DeleteVReplicationWorkflowRequest) (response *tabletmanagerdatapb.DeleteVReplicationWorkflowResponse, err error) { + return &tabletmanagerdatapb.DeleteVReplicationWorkflowResponse{ + Result: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} + for _, table := range req.Tables { + if table == "/.*/" { + // Special case of all tables in keyspace. + for key, tableDefn := range tmc.schema { + if strings.HasPrefix(key, tablet.Keyspace+".") { + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + } + break + } + + key := tablet.Keyspace + "." + table + tableDefn := tmc.schema[key] + if tableDefn == nil { + continue + } + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + return schemaDefn, nil +} + +func (tmc *testTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.vrQueries[tabletID] = append(tmc.vrQueries[tabletID], &queryResult{ + query: query, + result: sqltypes.ResultToProto3(result), + }) +} + +func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, queryResult *queryResult) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for uid := range tmc.env.tablets[keyspace] { + tmc.vrQueries[uid] = append(tmc.vrQueries[uid], queryResult) + } +} + +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.createVReplicationWorkflowRequests[tabletID] = req +} + +func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + qrs := tmc.vrQueries[int(tablet.Alias.Uid)] + if len(qrs) == 0 { + return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + } + matched := false + if qrs[0].query[0] == '/' { + matched = regexp.MustCompile(qrs[0].query[1:]).MatchString(query) + } else { + matched = query == qrs[0].query + } + if !matched { + return nil, fmt.Errorf("tablet %v:\nunexpected query\n%s\nwant:\n%s", tablet, query, qrs[0].query) + } + tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:] + return qrs[0].result, nil +} + +func (tmc *testTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) { + // Reuse VReplicationExec. + return tmc.VReplicationExec(ctx, tablet, string(req.Query)) +} + +func (tmc *testTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest) (*querypb.QueryResult, error) { + return nil, nil +} + +// Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema. +func (tmc *testTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tablet, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { + stmts := strings.Split(change.SQL, ";") + + for _, stmt := range stmts { + _, err := tmc.ExecuteFetchAsDba(ctx, tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(stmt), + MaxRows: 0, + ReloadSchema: true, + }) + if err != nil { + return nil, err + } + } + + return nil, nil +} + +func (tmc *testTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { + return &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + VdiffUuid: req.VdiffUuid, + Output: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) { + return &tabletmanagerdatapb.HasVReplicationWorkflowsResponse{ + Has: false, + }, nil +} + +func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables + if len(req.IncludeWorkflows) > 0 { + for _, wf := range req.IncludeWorkflows { + if strings.Contains(wf, "lookup") { + workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex + } + } + ks := tmc.env.sourceKeyspace + if tmc.reverse.Load() { + ks = tmc.env.targetKeyspace + } + return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Workflow: req.IncludeWorkflows[0], + WorkflowType: workflowType, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + State: binlogdatapb.VReplicationWorkflowState_Running, + Bls: &binlogdatapb.BinlogSource{ + Keyspace: ks.KeyspaceName, + Shard: ks.ShardNames[0], + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "/.*/", + }, + }, + }, + }, + Pos: "MySQL56/" + position, + TimeUpdated: protoutil.TimeToProto(time.Now()), + TimeHeartbeat: protoutil.TimeToProto(time.Now()), + }, + }, + }, + }, + }, nil + } else { + return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{}, nil + } +} + +func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { + return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ + Result: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + return position, nil +} + +func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error { + return nil +} + +func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error { + return nil +} diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 587712f620c..fe49b24a10d 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -19,7 +19,6 @@ package workflow import ( "context" "fmt" - "os" "regexp" "strings" "sync" @@ -37,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tmclient" - _flag "vitess.io/vitess/go/internal/flag" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -46,11 +44,6 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -type queryResult struct { - query string - result *querypb.QueryResult -} - type testMaterializerEnv struct { ws *Server ms *vtctldatapb.MaterializeSettings @@ -65,11 +58,6 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { - _flag.ParseFlagsForTest() - os.Exit(m.Run()) -} - func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { t.Helper() env := &testMaterializerEnv{ @@ -77,8 +65,8 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M sources: sources, targets: targets, tablets: make(map[int]*topodatapb.Tablet), - topoServ: memorytopo.NewServer(ctx, "cell"), - cell: "cell", + topoServ: memorytopo.NewServer(ctx, defaultCellName), + cell: defaultCellName, tmc: newTestMaterializerTMClient(), } venv := vtenv.NewTestEnv() diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 17b01736a77..587caff3c8c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "math" - "reflect" "slices" "sort" "strings" @@ -1452,13 +1451,31 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } + isStandardMoveTables := func() bool { + return !mz.IsMultiTenantMigration() && !mz.isPartial + } + + ts, err := s.buildTrafficSwitcher(ctx, req.GetTargetKeyspace(), req.GetWorkflow()) + if err != nil { + return nil, err + } + sw := &switcher{s: s, ts: ts} + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") + if lockErr != nil { + ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) + return nil, lockErr + } + defer targetUnlock(&err) + ctx = lockCtx + // If we get an error after this point, where the vreplication streams/records // have been created, then we clean up the workflow's artifacts. defer func() { if err != nil { - ts, cerr := s.buildTrafficSwitcher(ctx, ms.TargetKeyspace, ms.Workflow) - if cerr != nil { - err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) + if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms + if cerr := ts.dropTargetDeniedTables(ctx); cerr != nil { + err = vterrors.Wrapf(err, "failed to cleanup denied table entries: %v", cerr) + } } if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil { err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) @@ -1473,9 +1490,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl }() // Now that the streams have been successfully created, let's put the associated - // routing rules in place. + // routing rules and denied tables entries in place. if externalTopo == nil { - if err := s.setupInitialRoutingRules(ctx, req, mz, tables, vschema); err != nil { + if err := s.setupInitialRoutingRules(ctx, req, mz, tables); err != nil { return nil, err } @@ -1484,6 +1501,11 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } } + if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms + if err := s.setupInitialDeniedTables(ctx, ts); err != nil { + return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards") + } + } if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -1547,7 +1569,24 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque return nil } -func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string, vschema *vschemapb.Keyspace) error { +func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { + if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { + return nil + } + return ts.ForAllTargets(func(target *MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables()) + }); err != nil { + return err + } + strCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) + defer cancel() + _, _, err := topotools.RefreshTabletsByShard(strCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + return err + }) +} + +func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error { if err := s.validateRoutingRuleFlags(req, mz); err != nil { return err } @@ -1616,7 +1655,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete") defer span.Finish() - ts, state, err := s.getWorkflowState(ctx, req.TargetKeyspace, req.Workflow) + ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow()) if err != nil { return nil, err } @@ -1630,8 +1669,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa var dryRunResults *[]string if state.WorkflowType == TypeMigrate { - dryRunResults, err = s.finalizeMigrateWorkflow(ctx, req.TargetKeyspace, req.Workflow, strings.Join(ts.tables, ","), - false, req.KeepData, req.KeepRoutingRules, req.DryRun) + dryRunResults, err = s.finalizeMigrateWorkflow(ctx, ts, strings.Join(ts.tables, ","), false, req.KeepData, req.KeepRoutingRules, req.DryRun) if err != nil { return nil, vterrors.Wrapf(err, "failed to finalize the %s workflow in the %s keyspace", req.Workflow, req.TargetKeyspace) @@ -1970,11 +2008,21 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe span.Annotate("keep_routing_rules", req.KeepRoutingRules) span.Annotate("shards", req.Shards) - // Cleanup related data and artifacts. - if _, err := s.DropTargets(ctx, req.Keyspace, req.Workflow, req.KeepData, req.KeepRoutingRules, false); err != nil { - if topo.IsErrType(err, topo.NoNode) { - return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace) + ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow()) + if err != nil { + log.Errorf("failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err) + return nil, err + } + + if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { + // Return an error if the workflow traffic is partially switched. + if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { + return nil, ErrWorkflowPartiallySwitched } + } + + if state.WorkflowType == TypeMigrate { + _, err := s.finalizeMigrateWorkflow(ctx, ts, "", true, req.GetKeepData(), req.GetKeepRoutingRules(), false) return nil, err } @@ -1993,7 +2041,9 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe s.optimizeCopyStateTable(tablet.Tablet) return res.Result, err } - res, err := vx.CallbackContext(ctx, callback) + delCtx, delCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout*2) + defer delCancel() + res, err := vx.CallbackContext(delCtx, callback) if err != nil { return nil, err } @@ -2002,6 +2052,16 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace) } + // Cleanup related data and artifacts. There are none for a LookupVindex workflow. + if ts.workflowType != binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { + if _, err := s.DropTargets(delCtx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil { + if topo.IsErrType(err, topo.NoNode) { + return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.GetKeyspace()) + } + return nil, err + } + } + response := &vtctldatapb.WorkflowDeleteResponse{} response.Summary = fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", req.Workflow, req.Keyspace) details := make([]*vtctldatapb.WorkflowDeleteResponse_TabletInfo, 0, len(res)) @@ -2012,6 +2072,9 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe } details = append(details, result) } + sort.Slice(details, func(i, j int) bool { // Ensure deterministic output + return topoproto.TabletAliasString(details[i].Tablet) < topoproto.TabletAliasString(details[j].Tablet) + }) response.Details = details return response, nil } @@ -2269,7 +2332,9 @@ func (s *Server) WorkflowUpdate(ctx context.Context, req *vtctldatapb.WorkflowUp } return res.Result, err } - res, err := vx.CallbackContext(ctx, callback) + updCtx, updCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout*2) + defer updCancel() + res, err := vx.CallbackContext(updCtx, callback) if err != nil { if topo.IsErrType(err, topo.NoNode) { return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace) @@ -2497,28 +2562,8 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) { // DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard // is cancelled. -func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { - ts, state, err := s.getWorkflowState(ctx, targetKeyspace, workflow) - if err != nil { - log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", targetKeyspace, workflow, err) - return nil, err - } - - // There is nothing to drop for a LookupVindex workflow. - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { - return nil, nil - } - - // Return an error if the workflow traffic is partially switched. - if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 { - return nil, ErrWorkflowPartiallySwitched - } - - if state.WorkflowType == TypeMigrate { - _, err := s.finalizeMigrateWorkflow(ctx, targetKeyspace, workflow, "", true, keepData, keepRoutingRules, dryRun) - return nil, err - } - +func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { + var err error ts.keepRoutingRules = keepRoutingRules var sw iswitcher if dryRun { @@ -2629,7 +2674,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf tables = append(tables, rule.Match) } sort.Strings(tables) - if !reflect.DeepEqual(ts.tables, tables) { + if !slices.Equal(ts.tables, tables) { return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "table lists are mismatched across streams: %v vs %v", ts.tables, tables) } } @@ -2803,8 +2848,8 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs shardInfo, err := s.ts.GetShard(ctx, keyspace, shard) if err != nil { if topo.IsErrType(err, topo.NoNode) { - log.Infof("Shard %v/%v doesn't seem to exist, cleaning up any potential leftover", keyspace, shard) - return s.ts.DeleteShard(ctx, keyspace, shard) + log.Warningf("Shard %v/%v did not exist when attempting to remove it", keyspace, shard) + return nil } return err } @@ -2942,13 +2987,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard // finalizeMigrateWorkflow deletes the streams for the Migrate workflow. // We only cleanup the target for external sources. -func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, workflow, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { - ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflow) - if err != nil { - ts.Logger().Errorf("buildTrafficSwitcher failed: %v", err) - return nil, err - } - var sw iswitcher +func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitcher, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) { + var ( + sw iswitcher + err error + ) if dryRun { sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()} } else { @@ -2966,7 +3009,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, wo return nil, err } if !cancel { - if err := sw.addParticipatingTablesToKeyspace(ctx, targetKeyspace, tableSpecs); err != nil { + if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil { return nil, err } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { @@ -3072,7 +3115,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor resp.Summary = fmt.Sprintf("%s dry run results for workflow %s.%s at %v", cmd, req.Keyspace, req.Workflow, time.Now().UTC().Format(time.RFC822)) resp.DryRunResults = dryRunResults } else { - log.Infof("SwitchTraffic done for workflow %s.%s", req.Keyspace, req.Workflow) + log.Infof("%s done for workflow %s.%s", cmd, req.Keyspace, req.Workflow) resp.Summary = fmt.Sprintf("%s was successful for workflow %s.%s", cmd, req.Keyspace, req.Workflow) // Reload the state after the SwitchTraffic operation // and return that as a string. @@ -3090,7 +3133,7 @@ func (s *Server) WorkflowSwitchTraffic(ctx context.Context, req *vtctldatapb.Wor } else { resp.CurrentState = currentState.String() } - log.Infof("SwitchTraffic done for workflow %s.%s, returning response %v", req.Keyspace, req.Workflow, resp) + log.Infof("%s done for workflow %s.%s, returning response %v", cmd, req.Keyspace, req.Workflow, resp) } return resp, nil } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 174cc2aaf6a..fb432403155 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -19,7 +19,11 @@ package workflow import ( "context" "fmt" + "slices" + "sort" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,6 +31,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtenv" @@ -34,6 +39,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -199,3 +205,578 @@ func TestVDiffCreate(t *testing.T) { }) } } + +func TestWorkflowDelete(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + "t1": { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + preFunc func(t *testing.T, env *testEnv) + req *vtctldatapb.WorkflowDeleteRequest + expectedSourceQueries []*queryResult + expectedTargetQueries []*queryResult + want *vtctldatapb.WorkflowDeleteResponse + wantErr bool + postFunc func(t *testing.T, env *testEnv) + }{ + { + name: "basic", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, tableName), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + }, + { + name: "basic with existing denied table entries", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + preFunc: func(t *testing.T, env *testEnv) { + lockCtx, targetUnlock, lockErr := env.ts.LockKeyspace(ctx, targetKeyspaceName, "test") + require.NoError(t, lockErr) + var err error + defer require.NoError(t, err) + defer targetUnlock(&err) + for _, shard := range env.targetKeyspace.ShardNames { + _, err := env.ts.UpdateShardFields(lockCtx, targetKeyspaceName, shard, func(si *topo.ShardInfo) error { + err := si.UpdateDeniedTables(lockCtx, topodatapb.TabletType_PRIMARY, nil, false, []string{tableName, "t2", "t3"}) + return err + }) + require.NoError(t, err) + } + }, + req: &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + }, + expectedSourceQueries: []*queryResult{ + { + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", + sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + }, + }, + expectedTargetQueries: []*queryResult{ + { + query: fmt.Sprintf("drop table `vt_%s`.`%s`", targetKeyspaceName, tableName), + result: &querypb.QueryResult{}, + }, + }, + want: &vtctldatapb.WorkflowDeleteResponse{ + Summary: fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", + workflowName, targetKeyspaceName), + Details: []*vtctldatapb.WorkflowDeleteResponse_TabletInfo{ + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + Deleted: true, + }, + { + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + Deleted: true, + }, + }, + }, + postFunc: func(t *testing.T, env *testEnv) { + for _, shard := range env.targetKeyspace.ShardNames { + si, err := env.ts.GetShard(ctx, targetKeyspaceName, shard) + require.NoError(t, err) + require.NotNil(t, si) + tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY) + require.NotNil(t, tc) + require.EqualValues(t, []string{"t2", "t3"}, tc.DeniedTables) + } + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + require.NotNil(t, tc.req) + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + if tc.expectedSourceQueries != nil { + require.NotNil(t, env.tablets[tc.sourceKeyspace.KeyspaceName]) + for _, eq := range tc.expectedSourceQueries { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, eq) + } + } + if tc.expectedTargetQueries != nil { + require.NotNil(t, env.tablets[tc.targetKeyspace.KeyspaceName]) + for _, eq := range tc.expectedTargetQueries { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, eq) + } + } + if tc.preFunc != nil { + tc.preFunc(t, env) + } + got, err := env.ws.WorkflowDelete(ctx, tc.req) + if (err != nil) != tc.wantErr { + require.Fail(t, "unexpected error value", "Server.WorkflowDelete() error = %v, wantErr %v", err, tc.wantErr) + return + } + require.EqualValues(t, got, tc.want, "Server.WorkflowDelete() = %v, want %v", got, tc.want) + if tc.postFunc != nil { + tc.postFunc(t, env) + } else { // Default post checks + // Confirm that we have no routing rules. + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + require.Zero(t, rr.Rules) + + // Confirm that we have no shard tablet controls, which is where + // DeniedTables live. + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) + require.NoError(t, err) + require.Zero(t, si.Shard.TabletControls) + } + } + } + }) + } +} + +func TestMoveTablesTrafficSwitching(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + vrID := 1 + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + copyTableQR := &queryResult{ + query: fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)", + vrID, vrID), + result: &querypb.QueryResult{}, + } + journalQR := &queryResult{ + query: "/select val from _vt.resharding_journal.*", + result: &querypb.QueryResult{}, + } + lockTableQR := &queryResult{ + query: fmt.Sprintf("LOCK TABLES `%s` READ", tableName), + result: &querypb.QueryResult{}, + } + cutoverQR := &queryResult{ + query: "/update _vt.vreplication set state='Stopped', message='stopped for cutover' where id=.*", + result: &querypb.QueryResult{}, + } + createWFQR := &queryResult{ + query: "/insert into _vt.vreplication.*", + result: &querypb.QueryResult{}, + } + deleteWFQR := &queryResult{ + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", targetKeyspaceName, workflowName), + result: &querypb.QueryResult{}, + } + deleteReverseWFQR := &queryResult{ + query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + } + createReverseWFQR := &queryResult{ + query: "/insert into _vt.vreplication.*_reverse.*", + result: &querypb.QueryResult{}, + } + createJournalQR := &queryResult{ + query: "/insert into _vt.resharding_journal.*", + result: &querypb.QueryResult{}, + } + freezeWFQR := &queryResult{ + query: fmt.Sprintf("update _vt.vreplication set message = 'FROZEN' where db_name='vt_%s' and workflow='%s'", targetKeyspaceName, workflowName), + result: &querypb.QueryResult{}, + } + freezeReverseWFQR := &queryResult{ + query: fmt.Sprintf("update _vt.vreplication set message = 'FROZEN' where db_name='vt_%s' and workflow='%s'", sourceKeyspaceName, ReverseWorkflowName(workflowName)), + result: &querypb.QueryResult{}, + } + + hasDeniedTableEntry := func(si *topo.ShardInfo) bool { + if si == nil || len(si.TabletControls) == 0 { + return false + } + for _, tc := range si.Shard.TabletControls { + return slices.Equal(tc.DeniedTables, []string{tableName}) + } + return false + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + req *vtctldatapb.WorkflowSwitchTrafficRequest + want *vtctldatapb.WorkflowSwitchTrafficResponse + wantErr bool + }{ + { + name: "basic forward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionForward), + TabletTypes: tabletTypes, + }, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "Reads Not Switched. Writes Not Switched", + CurrentState: "All Reads Switched. Writes Switched", + }, + }, + { + name: "basic backward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: tabletTypes, + }, + want: &vtctldatapb.WorkflowSwitchTrafficResponse{ + Summary: fmt.Sprintf("ReverseTraffic was successful for workflow %s.%s", targetKeyspaceName, workflowName), + StartState: "All Reads Switched. Writes Switched", + CurrentState: "Reads Not Switched. Writes Not Switched", + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + require.NotNil(t, tc.req) + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + if tc.req.Direction == int32(DirectionForward) { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, cutoverQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, lockTableQR) + } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, deleteReverseWFQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, createReverseWFQR) + } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, createJournalQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, freezeWFQR) + } else { + env.tmc.reverse.Store(true) + // Setup the routing rules as they would be after having previously done SwitchTraffic. + env.addTableRoutingRules(t, ctx, tabletTypes, []string{tableName}) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, cutoverQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, deleteWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createWFQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, createJournalQR) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, freezeReverseWFQR) + } + got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) + if (err != nil) != tc.wantErr { + require.Fail(t, "unexpected error value", "Server.WorkflowSwitchTraffic() error = %v, wantErr %v", err, tc.wantErr) + return + } + require.Equal(t, tc.want.String(), got.String(), "Server.WorkflowSwitchTraffic() = %v, want %v", got, tc.want) + + // Confirm that we have the expected routing rules. + rr, err := env.ts.GetRoutingRules(ctx) + require.NoError(t, err) + to := fmt.Sprintf("%s.%s", tc.targetKeyspace.KeyspaceName, tableName) + if tc.req.Direction == int32(DirectionBackward) { + to = fmt.Sprintf("%s.%s", tc.sourceKeyspace.KeyspaceName, tableName) + } + for _, rr := range rr.Rules { + for _, tt := range rr.ToTables { + require.Equal(t, to, tt) + } + } + // Confirm that we have the expected denied tables entires. + for _, keyspace := range []*testKeyspace{tc.sourceKeyspace, tc.targetKeyspace} { + for _, shardName := range keyspace.ShardNames { + si, err := env.ts.GetShard(ctx, keyspace.KeyspaceName, shardName) + require.NoError(t, err) + switch { + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionForward): + require.True(t, hasDeniedTableEntry(si)) + case keyspace == tc.sourceKeyspace && tc.req.Direction == int32(DirectionBackward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionForward): + require.False(t, hasDeniedTableEntry(si)) + case keyspace == tc.targetKeyspace && tc.req.Direction == int32(DirectionBackward): + require.True(t, hasDeniedTableEntry(si)) + } + } + } + }) + } +} + +func TestMoveTablesTrafficSwitchingDryRun(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + table1Name := "t1" + table2Name := "a1" + tables := []string{table1Name, table2Name} + sort.Strings(tables) + tablesStr := strings.Join(tables, ",") + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + vrID := 1 + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + table1Name: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: table1Name, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", table1Name), + }, + }, + }, + table2Name: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: table2Name, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", table2Name), + }, + }, + }, + } + copyTableQR := &queryResult{ + query: fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)", + vrID, vrID), + result: &querypb.QueryResult{}, + } + journalQR := &queryResult{ + query: "/select val from _vt.resharding_journal.*", + result: &querypb.QueryResult{}, + } + lockTableQR := &queryResult{ + query: fmt.Sprintf("LOCK TABLES `%s` READ,`%s` READ", table2Name, table1Name), + result: &querypb.QueryResult{}, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + req *vtctldatapb.WorkflowSwitchTrafficRequest + want []string + }{ + { + name: "basic forward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionForward), + TabletTypes: tabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", + sourceKeyspaceName, tablesStr, sourceKeyspaceName, position, sourceKeyspaceName, position), + "Wait for vreplication on stopped streams to catchup for up to 30s", + fmt.Sprintf("Create reverse vreplication workflow %s", ReverseWorkflowName(workflowName)), + "Create journal entries on source databases", + fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", targetKeyspaceName, tablesStr), + fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", sourceKeyspaceName, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d,tablet:%d]", startingTargetTabletUID, startingTargetTabletUID+tabletUIDStep), + fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:-80;tablet:%d;workflow:%s;dbname:vt_%s,keyspace:%s;shard:80-;tablet:%d;workflow:%s;dbname:vt_%s]", + targetKeyspaceName, startingTargetTabletUID, workflowName, targetKeyspaceName, targetKeyspaceName, startingTargetTabletUID+tabletUIDStep, workflowName, targetKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + }, + }, + { + name: "basic backward", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + req: &vtctldatapb.WorkflowSwitchTrafficRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + Direction: int32(DirectionBackward), + TabletTypes: tabletTypes, + DryRun: true, + }, + want: []string{ + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [REPLICA,RDONLY]", tablesStr, targetKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Lock keyspace %s", targetKeyspaceName), + fmt.Sprintf("Lock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Stop writes on keyspace %s for tables [%s]: [keyspace:%s;shard:-80;position:%s,keyspace:%s;shard:80-;position:%s]", + targetKeyspaceName, tablesStr, targetKeyspaceName, position, targetKeyspaceName, position), + "Wait for vreplication on stopped streams to catchup for up to 30s", + fmt.Sprintf("Create reverse vreplication workflow %s", workflowName), + "Create journal entries on source databases", + fmt.Sprintf("Enable writes on keyspace %s for tables [%s]", sourceKeyspaceName, tablesStr), + fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", targetKeyspaceName, sourceKeyspaceName), + fmt.Sprintf("Routing rules for tables [%s] will be updated", tablesStr), + fmt.Sprintf("Switch writes completed, freeze and delete vreplication streams on: [tablet:%d,tablet:%d]", startingSourceTabletUID, startingSourceTabletUID+tabletUIDStep), + fmt.Sprintf("Mark vreplication streams frozen on: [keyspace:%s;shard:-80;tablet:%d;workflow:%s;dbname:vt_%s,keyspace:%s;shard:80-;tablet:%d;workflow:%s;dbname:vt_%s]", + sourceKeyspaceName, startingSourceTabletUID, ReverseWorkflowName(workflowName), sourceKeyspaceName, sourceKeyspaceName, startingSourceTabletUID+tabletUIDStep, ReverseWorkflowName(workflowName), sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", sourceKeyspaceName), + fmt.Sprintf("Unlock keyspace %s", targetKeyspaceName), + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + require.NotNil(t, tc.req) + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + if tc.req.Direction == int32(DirectionForward) { + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, lockTableQR) + } + } else { + env.tmc.reverse.Store(true) + // Setup the routing rules as they would be after having previously done SwitchTraffic. + env.addTableRoutingRules(t, ctx, tabletTypes, tables) + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.sourceKeyspace.KeyspaceName, copyTableQR) + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, journalQR) + } + for i := 0; i < len(tc.targetKeyspace.ShardNames); i++ { // Per stream + env.tmc.expectVRQueryResultOnKeyspaceTablets(tc.targetKeyspace.KeyspaceName, lockTableQR) + } + } + got, err := env.ws.WorkflowSwitchTraffic(ctx, tc.req) + require.NoError(t, err) + + require.EqualValues(t, tc.want, got.DryRunResults, "Server.WorkflowSwitchTraffic(DryRun:true) = %v, want %v", got.DryRunResults, tc.want) + }) + } +} diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 03faa4c4ca2..b8b1369bdf7 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -82,6 +82,7 @@ func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, for _, target := range dr.ts.Targets() { targetShards = append(targetShards, target.GetShard().ShardName()) } + // Sort the slices for deterministic output. sort.Strings(sourceShards) sort.Strings(targetShards) if direction == DirectionForward { @@ -103,6 +104,7 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, for _, servedType := range servedTypes { tabletTypes = append(tabletTypes, servedType.String()) } + sort.Strings(dr.ts.Tables()) // For deterministic output tables := strings.Join(dr.ts.Tables(), ",") dr.drLog.Logf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]", tables, ks, strings.Join(tabletTypes, ",")) dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables) @@ -114,6 +116,7 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows []string) error { dr.drLog.Log("Create journal entries on source databases") + sort.Strings(sourceWorkflows) // For deterministic output if len(sourceWorkflows) > 0 { dr.drLog.Logf("Source workflows found: [%s]", strings.Join(sourceWorkflows, ",")) } @@ -121,6 +124,7 @@ func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows [] } func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error { + sort.Strings(dr.ts.Tables()) // For deterministic output dr.drLog.Logf("Enable writes on keyspace %s for tables [%s]", dr.ts.TargetKeyspaceName(), strings.Join(dr.ts.Tables(), ",")) return nil } @@ -129,16 +133,27 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error { dr.drLog.Logf("Switch routing from keyspace %s to keyspace %s", dr.ts.SourceKeyspaceName(), dr.ts.TargetKeyspaceName()) var deleteLogs, addLogs []string if dr.ts.MigrationType() == binlogdatapb.MigrationType_TABLES { + sort.Strings(dr.ts.Tables()) // For deterministic output tables := strings.Join(dr.ts.Tables(), ",") dr.drLog.Logf("Routing rules for tables [%s] will be updated", tables) return nil } deleteLogs = nil addLogs = nil - for _, source := range dr.ts.Sources() { + sources := maps.Values(dr.ts.Sources()) + // Sort the slice for deterministic output. + sort.Slice(sources, func(i, j int) bool { + return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid + }) + for _, source := range sources { deleteLogs = append(deleteLogs, fmt.Sprintf("shard:%s;tablet:%d", source.GetShard().ShardName(), source.GetShard().PrimaryAlias.Uid)) } - for _, target := range dr.ts.Targets() { + targets := maps.Values(dr.ts.Targets()) + // Sort the slice for deterministic output. + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, target := range targets { addLogs = append(addLogs, fmt.Sprintf("shard:%s;tablet:%d", target.GetShard().ShardName(), target.GetShard().PrimaryAlias.Uid)) } if len(deleteLogs) > 0 { @@ -150,7 +165,12 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error { func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error { logs := make([]string, 0) - for _, t := range ts.Targets() { + targets := maps.Values(ts.Targets()) + // Sort the slice for deterministic output. + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, t := range targets { logs = append(logs, fmt.Sprintf("tablet:%d", t.GetPrimary().Alias.Uid)) } dr.drLog.Logf("Switch writes completed, freeze and delete vreplication streams on: [%s]", strings.Join(logs, ",")) @@ -159,7 +179,12 @@ func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *traffi func (dr *switcherDryRun) startReverseVReplication(ctx context.Context) error { logs := make([]string, 0) - for _, t := range dr.ts.Sources() { + sources := maps.Values(dr.ts.Sources()) + // Sort the slice for deterministic output. + sort.Slice(sources, func(i, j int) bool { + return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid + }) + for _, t := range sources { logs = append(logs, fmt.Sprintf("tablet:%d", t.GetPrimary().Alias.Uid)) } dr.drLog.Logf("Start reverse vreplication streams on: [%s]", strings.Join(logs, ",")) @@ -180,17 +205,33 @@ func (dr *switcherDryRun) migrateStreams(ctx context.Context, sm *StreamMigrator logs := make([]string, 0) dr.drLog.Logf("Migrate streams to %s:", dr.ts.TargetKeyspaceName()) - for key, streams := range sm.Streams() { - for _, stream := range streams { - logs = append(logs, fmt.Sprintf("shard:%s;id:%d;workflow:%s;position:%s;binlogsource:%v", key, stream.ID, stream.Workflow, replication.EncodePosition(stream.Position), stream.BinlogSource)) + allStreams := sm.Streams() + // Sort the keys and slices for deterministic output. + shards := maps.Keys(sm.Streams()) + sort.Strings(shards) + for _, shard := range shards { + shardStreams := allStreams[shard] + sort.Slice(shardStreams, func(i, j int) bool { + return shardStreams[i].ID < shardStreams[j].ID + }) + for _, stream := range shardStreams { + logs = append(logs, fmt.Sprintf("shard:%s;id:%d;workflow:%s;position:%s;binlogsource:%v", shard, stream.ID, stream.Workflow, replication.EncodePosition(stream.Position), stream.BinlogSource)) } } if len(logs) > 0 { dr.drLog.Logf("Migrate source streams: [%s]", strings.Join(logs, ",")) logs = nil } - for _, target := range dr.ts.Targets() { + // Sort the keys and slices for deterministic output. + targets := maps.Values(dr.ts.Targets()) + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, target := range targets { tabletStreams := templates + sort.Slice(tabletStreams, func(i, j int) bool { + return tabletStreams[i].ID < tabletStreams[j].ID + }) for _, vrs := range tabletStreams { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d;workflow:%s;id:%d,position:%v;binlogsource:%s", vrs.BinlogSource.Keyspace, vrs.BinlogSource.Shard, target.GetPrimary().Alias.Uid, vrs.Workflow, vrs.ID, replication.EncodePosition(vrs.Position), vrs.BinlogSource)) @@ -209,10 +250,16 @@ func (dr *switcherDryRun) waitForCatchup(ctx context.Context, filteredReplicatio func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error { logs := make([]string, 0) - for _, source := range dr.ts.Sources() { + sources := maps.Values(dr.ts.Sources()) + // Sort the slice for deterministic output. + sort.Slice(sources, func(i, j int) bool { + return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid + }) + for _, source := range sources { position, _ := dr.ts.TabletManagerClient().PrimaryPosition(ctx, source.GetPrimary().Tablet) logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;position:%s", dr.ts.SourceKeyspaceName(), source.GetShard().ShardName(), position)) } + sort.Strings(dr.ts.Tables()) // For deterministic output if len(logs) > 0 { dr.drLog.Logf("Stop writes on keyspace %s for tables [%s]: [%s]", dr.ts.SourceKeyspaceName(), strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ",")) @@ -222,8 +269,16 @@ func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error { func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) { logs := make([]string, 0) - for _, streams := range sm.Streams() { - for _, stream := range streams { + allStreams := sm.Streams() + // Sort the keys and slices for deterministic output. + shards := maps.Keys(sm.Streams()) + sort.Strings(shards) + for _, shard := range shards { + shardStreams := allStreams[shard] + sort.Slice(shardStreams, func(i, j int) bool { + return shardStreams[i].ID < shardStreams[j].ID + }) + for _, stream := range shardStreams { logs = append(logs, fmt.Sprintf("id:%d;keyspace:%s;shard:%s;rules:%s;position:%v", stream.ID, stream.BinlogSource.Keyspace, stream.BinlogSource.Shard, stream.BinlogSource.Filter, stream.Position)) } @@ -247,7 +302,13 @@ func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { logs := make([]string, 0) - for _, source := range dr.ts.Sources() { + sort.Strings(dr.ts.Tables()) // For deterministic output + sources := maps.Values(dr.ts.Sources()) + // Sort the slice for deterministic output. + sort.Slice(sources, func(i, j int) bool { + return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid + }) + for _, source := range sources { for _, tableName := range dr.ts.Tables() { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;dbname:%s;tablet:%d;table:%s", source.GetPrimary().Keyspace, source.GetPrimary().Shard, source.GetPrimary().DbName(), source.GetPrimary().Alias.Uid, tableName)) @@ -267,7 +328,12 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType Ta func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error { logs := make([]string, 0) tabletsList := make(map[string][]string) - for _, si := range dr.ts.SourceShards() { + // Sort the slice for deterministic output. + sourceShards := dr.ts.SourceShards() + sort.Slice(sourceShards, func(i, j int) bool { + return sourceShards[i].PrimaryAlias.Uid < sourceShards[j].PrimaryAlias.Uid + }) + for _, si := range sourceShards { tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) if err != nil { return err @@ -276,7 +342,7 @@ func (dr *switcherDryRun) dropSourceShards(ctx context.Context) error { for _, t := range tabletAliases { tabletsList[si.ShardName()] = append(tabletsList[si.ShardName()], fmt.Sprintf("%d", t.Uid)) } - sort.Strings(tabletsList[si.ShardName()]) + sort.Strings(tabletsList[si.ShardName()]) // For deterministic output logs = append(logs, fmt.Sprintf("cell:%s;keyspace:%s;shards:[%s]", si.Shard.PrimaryAlias.Cell, si.Keyspace(), si.ShardName()), strings.Join(tabletsList[si.ShardName()], ",")) } @@ -293,7 +359,12 @@ func (dr *switcherDryRun) validateWorkflowHasCompleted(ctx context.Context) erro func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) error { logs := make([]string, 0) - for _, t := range dr.ts.Targets() { + // Sort the keys and slices for deterministic output. + targets := maps.Values(dr.ts.Targets()) + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, t := range targets { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;workflow:%s;dbname:%s;tablet:%d", t.GetShard().Keyspace(), t.GetShard().ShardName(), dr.ts.WorkflowName(), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) } @@ -303,7 +374,12 @@ func (dr *switcherDryRun) dropTargetVReplicationStreams(ctx context.Context) err func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Context) error { logs := make([]string, 0) - for _, t := range dr.ts.Sources() { + sources := maps.Values(dr.ts.Sources()) + // Sort the slice for deterministic output. + sort.Slice(sources, func(i, j int) bool { + return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid + }) + for _, t := range sources { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;workflow:%s;dbname:%s;tablet:%d", t.GetShard().Keyspace(), t.GetShard().ShardName(), ReverseWorkflowName(dr.ts.WorkflowName()), t.GetPrimary().DbName(), t.GetPrimary().Alias.Uid)) } @@ -313,7 +389,12 @@ func (dr *switcherDryRun) dropSourceReverseVReplicationStreams(ctx context.Conte func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error { logs := make([]string, 0) - for _, target := range dr.ts.Targets() { + // Sort the keys and slices for deterministic output. + targets := maps.Values(dr.ts.Targets()) + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, target := range targets { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d;workflow:%s;dbname:%s", target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().Alias.Uid, dr.ts.WorkflowName(), target.GetPrimary().DbName())) } @@ -325,7 +406,12 @@ func (dr *switcherDryRun) freezeTargetVReplication(ctx context.Context) error { func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error { logs := make([]string, 0) - for _, si := range dr.ts.SourceShards() { + // Sort the slice for deterministic output. + sourceShards := dr.ts.SourceShards() + sort.Slice(sourceShards, func(i, j int) bool { + return sourceShards[i].PrimaryAlias.Uid < sourceShards[j].PrimaryAlias.Uid + }) + for _, si := range sourceShards { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid)) } if len(logs) > 0 { @@ -336,7 +422,12 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error { func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error { logs := make([]string, 0) - for _, si := range dr.ts.TargetShards() { + // Sort the slice for deterministic output. + targetShards := dr.ts.TargetShards() + sort.Slice(targetShards, func(i, j int) bool { + return targetShards[i].PrimaryAlias.Uid < targetShards[j].PrimaryAlias.Uid + }) + for _, si := range targetShards { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid)) } if len(logs) > 0 { @@ -351,7 +442,13 @@ func (dr *switcherDryRun) logs() *[]string { func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error { logs := make([]string, 0) - for _, target := range dr.ts.Targets() { + sort.Strings(dr.ts.Tables()) // For deterministic output + // Sort the keys and slices for deterministic output. + targets := maps.Values(dr.ts.Targets()) + sort.Slice(targets, func(i, j int) bool { + return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid + }) + for _, target := range targets { for _, tableName := range dr.ts.Tables() { logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;dbname:%s;tablet:%d;table:%s", target.GetPrimary().Keyspace, target.GetPrimary().Shard, target.GetPrimary().DbName(), target.GetPrimary().Alias.Uid, tableName)) @@ -367,7 +464,13 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error { func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error { logs := make([]string, 0) tabletsList := make(map[string][]string) - for _, si := range dr.ts.TargetShards() { + sort.Strings(dr.ts.Tables()) // For deterministic output + // Sort the slice for deterministic output. + targetShards := dr.ts.TargetShards() + sort.Slice(targetShards, func(i, j int) bool { + return targetShards[i].PrimaryAlias.Uid < targetShards[j].PrimaryAlias.Uid + }) + for _, si := range targetShards { tabletAliases, err := dr.ts.TopoServer().FindAllTabletAliasesInShard(ctx, si.Keyspace(), si.ShardName()) if err != nil { return err @@ -376,7 +479,7 @@ func (dr *switcherDryRun) dropTargetShards(ctx context.Context) error { for _, t := range tabletAliases { tabletsList[si.ShardName()] = append(tabletsList[si.ShardName()], fmt.Sprintf("%d", t.Uid)) } - sort.Strings(tabletsList[si.ShardName()]) + sort.Strings(tabletsList[si.ShardName()]) // For deterministic output logs = append(logs, fmt.Sprintf("cell:%s;keyspace:%s;shards:[%s]", si.Shard.PrimaryAlias.Cell, si.Keyspace(), si.ShardName()), strings.Join(tabletsList[si.ShardName()], ",")) } @@ -401,6 +504,7 @@ func (dr *switcherDryRun) resetSequences(ctx context.Context) error { } func (dr *switcherDryRun) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { + // Sort keys for deterministic output. sortedBackingTableNames := maps.Keys(sequencesByBackingTable) slices.Sort(sortedBackingTableNames) dr.drLog.Log(fmt.Sprintf("The following sequence backing tables used by tables being moved will be initialized: %s", diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 9ea1c8b609b..42f097f35b0 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -26,12 +26,11 @@ import ( "sync" "time" - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -53,6 +52,7 @@ import ( tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) @@ -104,6 +104,13 @@ const ( // TrafficSwitchDirection specifies the switching direction. type TrafficSwitchDirection int +func (tsd TrafficSwitchDirection) String() string { + if tsd == DirectionForward { + return "forward" + } + return "backward" +} + // TableRemovalType specifies the way the a table will be removed during a // DropSource for a MoveTables workflow. type TableRemovalType int @@ -428,6 +435,10 @@ func (ts *trafficSwitcher) deleteShardRoutingRules(ctx context.Context) error { } srr, err := topotools.GetShardRoutingRules(ctx, ts.TopoServer()) if err != nil { + if topo.IsErrType(err, topo.NoNode) { + log.Warningf("No shard routing rules found when attempting to delete the ones for the %s keyspace", ts.targetKeyspace) + return nil + } return err } for _, si := range ts.TargetShards() { @@ -520,14 +531,14 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableNameEscaped) if removalType == DropTable { ts.Logger().Infof("%s: Dropping table %s.%s\n", - source.GetPrimary().String(), source.GetPrimary().DbName(), tableName) + topoproto.TabletAliasString(source.GetPrimary().GetAlias()), source.GetPrimary().DbName(), tableName) } else { renameName, err := sqlescape.EnsureEscaped(getRenameFileName(tableName)) if err != nil { return err } ts.Logger().Infof("%s: Renaming table %s.%s to %s.%s\n", - source.GetPrimary().String(), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName) + topoproto.TabletAliasString(source.GetPrimary().GetAlias()), source.GetPrimary().DbName(), tableName, source.GetPrimary().DbName(), renameName) query = fmt.Sprintf("rename table %s.%s TO %s.%s", primaryDbName, tableNameEscaped, primaryDbName, renameName) } _, err = ts.ws.tmc.ExecuteFetchAsDba(ctx, source.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ @@ -537,10 +548,14 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T DisableForeignKeyChecks: true, }) if err != nil { - ts.Logger().Errorf("%s: Error removing table %s: %v", source.GetPrimary().String(), tableName, err) + if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable { + ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), tableName) + return nil + } + ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), tableName, err) return err } - ts.Logger().Infof("%s: Removed table %s.%s\n", source.GetPrimary().String(), source.GetPrimary().DbName(), tableName) + ts.Logger().Infof("%s: Removed table %s.%s\n", topoproto.TabletAliasString(source.GetPrimary().GetAlias()), source.GetPrimary().DbName(), tableName) } return nil @@ -598,7 +613,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, rebuildSrvVSchema bool, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) + log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction: %s", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -615,11 +630,6 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, tt := strings.ToLower(servedType.String()) for _, table := range ts.Tables() { - if direction == DirectionForward { - log.Infof("Route direction forward") - } else { - log.Infof("Route direction backwards") - } toTarget := []string{ts.TargetKeyspaceName() + "." + table} rules[table+"@"+tt] = toTarget rules[ts.TargetKeyspaceName()+"."+table+"@"+tt] = toTarget @@ -639,7 +649,7 @@ func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error { return ts.ForAllSources(func(source *MigrationSource) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(source.GetPrimary().DbName()), encodeString(ts.ReverseWorkflowName())) - _, err := ts.VReplicationExec(ctx, source.GetPrimary().Alias, query) + _, err := ts.VReplicationExec(ctx, source.GetPrimary().GetAlias(), query) return err }) } @@ -704,25 +714,11 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - return ts.allowTableTargetWrites(ctx) + return ts.switchDeniedTables(ctx) } return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } -func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error { - return ts.ForAllTargets(func(target *MigrationTarget) error { - if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) - }); err != nil { - return err - } - rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) - defer cancel() - _, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) - return err - }) -} - func (ts *trafficSwitcher) changeRouting(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { return ts.changeWriteRoute(ctx) @@ -833,6 +829,7 @@ func (ts *trafficSwitcher) deleteReverseVReplication(ctx context.Context) error return ts.ForAllSources(func(source *MigrationSource) error { query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(source.GetPrimary().DbName()), encodeString(ts.reverseWorkflow)) if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query); err != nil { + // vreplication.exec returns no error on delete if the rows do not exist. return err } ts.ws.deleteWorkflowVDiffData(ctx, source.GetPrimary().Tablet, ts.reverseWorkflow) @@ -926,8 +923,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error }) } log.Infof("Creating reverse workflow vreplication stream on tablet %s: workflow %s, startPos %s", - source.GetPrimary().Alias, ts.ReverseWorkflowName(), target.Position) - _, err = ts.VReplicationExec(ctx, source.GetPrimary().Alias, + source.GetPrimary().GetAlias(), ts.ReverseWorkflowName(), target.Position) + _, err = ts.VReplicationExec(ctx, source.GetPrimary().GetAlias(), binlogplayer.CreateVReplicationState(ts.ReverseWorkflowName(), reverseBls, target.Position, binlogdatapb.VReplicationWorkflowState_Stopped, source.GetPrimary().DbName(), ts.workflowType, ts.workflowSubType)) if err != nil { @@ -939,11 +936,11 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error if err != nil { return err } - updateQuery := ts.getReverseVReplicationUpdateQuery(target.GetPrimary().Alias.Cell, - source.GetPrimary().Alias.Cell, source.GetPrimary().DbName(), string(optionsJSON)) + updateQuery := ts.getReverseVReplicationUpdateQuery(target.GetPrimary().GetAlias().GetCell(), + source.GetPrimary().GetAlias().GetCell(), source.GetPrimary().DbName(), string(optionsJSON)) if updateQuery != "" { - log.Infof("Updating vreplication stream entry on %s with: %s", source.GetPrimary().Alias, updateQuery) - _, err = ts.VReplicationExec(ctx, source.GetPrimary().Alias, updateQuery) + log.Infof("Updating vreplication stream entry on %s with: %s", source.GetPrimary().GetAlias(), updateQuery) + _, err = ts.VReplicationExec(ctx, source.GetPrimary().GetAlias(), updateQuery) return err } return nil @@ -984,7 +981,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati // Source writes have been stopped, wait for all streams on targets to catch up. if err := ts.ForAllUIDs(func(target *MigrationTarget, uid int32) error { ts.Logger().Infof("Before Catchup: uid: %d, target primary %s, target position %s, shard %s", uid, - target.GetPrimary().AliasString(), target.Position, target.GetShard().String()) + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.Position, target.GetShard().String()) bls := target.Sources[uid] source := ts.Sources()[bls.Shard] ts.Logger().Infof("Before Catchup: waiting for keyspace:shard: %v:%v to reach source position %v, uid %d", @@ -997,7 +994,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati ts.Logger().Infof("After catchup: position for keyspace:shard: %v:%v reached, uid %d", ts.TargetKeyspaceName(), target.GetShard().ShardName(), uid) if _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { - log.Infof("Error marking stopped for cutover on %s, uid %d", target.GetPrimary().AliasString(), uid) + log.Infof("Error marking stopped for cutover on %s, uid %d", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), uid) return err } return nil @@ -1008,7 +1005,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati return ts.ForAllTargets(func(target *MigrationTarget) error { var err error target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) - ts.Logger().Infof("After catchup, position for target primary %s, %v", target.GetPrimary().AliasString(), target.Position) + ts.Logger().Infof("After catchup, position for target primary %s, %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.Position) return err }) } @@ -1016,7 +1013,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.changeTableSourceWrites(ctx, disallowWrites) + err = ts.switchDeniedTables(ctx) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } @@ -1036,36 +1033,60 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { }) } -func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { - err := ts.ForAllSources(func(source *MigrationSource) error { - if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) - }); err != nil { +// switchDeniedTables switches the denied tables rules for the traffic switch. +// They are removed on the source side and added on the target side. +func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { + if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { + return nil + } + + egrp, ectx := errgroup.WithContext(ctx) + egrp.Go(func() error { + return ts.ForAllSources(func(source *MigrationSource) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) + defer cancel() + isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger()) + if isPartial { + err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v", + source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails) + } return err - } - rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) - defer cancel() - isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger()) - if isPartial { - err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v", - source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails) - } - return err + }) }) - if err != nil { - log.Warningf("Error in changeTableSourceWrites: %s", err) + egrp.Go(func() error { + return ts.ForAllTargets(func(target *MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) + defer cancel() + isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + if isPartial { + err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v", + target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails) + } + return err + }) + }) + if err := egrp.Wait(); err != nil { + log.Warningf("Error in switchDeniedTables: %s", err) return err } - // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. - // However, we are using the notification of a SrvVSchema change in VTGate to recompute the state of a - // MoveTables workflow (which also looks up denied tables from the topo). So we need to trigger a SrvVSchema change here. - return ts.TopoServer().RebuildSrvVSchema(ctx, nil) + + return nil } func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.changeTableSourceWrites(ctx, allowWrites) + err = ts.switchDeniedTables(ctx) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } @@ -1087,7 +1108,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.deleteReverseVReplication(ctx) if err != nil { - ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) + ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) } } @@ -1112,6 +1133,7 @@ func (ts *trafficSwitcher) dropTargetVReplicationStreams(ctx context.Context) er ts.Logger().Infof("Deleting target streams and related data for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName()) query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) if _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query); err != nil { + // vreplication.exec returns no error on delete if the rows do not exist. return err } ts.ws.deleteWorkflowVDiffData(ctx, target.GetPrimary().Tablet, ts.WorkflowName()) @@ -1125,6 +1147,7 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont ts.Logger().Infof("Deleting reverse streams and related data for workflow %s db_name %s", ts.WorkflowName(), source.GetPrimary().DbName()) query := fmt.Sprintf(sqlDeleteWorkflow, encodeString(source.GetPrimary().DbName()), encodeString(ReverseWorkflowName(ts.WorkflowName()))) if _, err := ts.TabletManagerClient().VReplicationExec(ctx, source.GetPrimary().Tablet, query); err != nil { + // vreplication.exec returns no error on delete if the rows do not exist. return err } ts.ws.deleteWorkflowVDiffData(ctx, source.GetPrimary().Tablet, ReverseWorkflowName(ts.WorkflowName())) @@ -1147,7 +1170,7 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { } query := fmt.Sprintf("drop table %s.%s", primaryDbName, tableName) ts.Logger().Infof("%s: Dropping table %s.%s\n", - target.GetPrimary().String(), target.GetPrimary().DbName(), tableName) + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) res, err := ts.ws.tmc.ExecuteFetchAsDba(ctx, target.GetPrimary().Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ Query: []byte(query), MaxRows: 1, @@ -1156,12 +1179,16 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { }) log.Infof("Removed target table with result: %+v", res) if err != nil { - ts.Logger().Errorf("%s: Error removing table %s: %v", - target.GetPrimary().String(), tableName, err) + if mysqlErr, ok := err.(*sqlerror.SQLError); ok && mysqlErr.Num == sqlerror.ERNoSuchTable { + // The table was already gone, so we can ignore the error. + ts.Logger().Warningf("%s: Table %s did not exist when attempting to remove it", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName) + return nil + } + ts.Logger().Errorf("%s: Error removing table %s: %v", topoproto.TabletAliasString(target.GetPrimary().GetAlias()), tableName, err) return err } ts.Logger().Infof("%s: Removed table %s.%s\n", - target.GetPrimary().String(), target.GetPrimary().DbName(), tableName) + topoproto.TabletAliasString(target.GetPrimary().GetAlias()), target.GetPrimary().DbName(), tableName) } return nil @@ -1626,7 +1653,7 @@ func (ts *trafficSwitcher) resetSequences(ctx context.Context) error { } return ts.ForAllSources(func(source *MigrationSource) error { ts.Logger().Infof("Resetting sequences for source shard %s.%s on tablet %s", - source.GetShard().Keyspace(), source.GetShard().ShardName(), source.GetPrimary().String()) + source.GetShard().Keyspace(), source.GetShard().ShardName(), topoproto.TabletAliasString(source.GetPrimary().GetAlias())) return ts.TabletManagerClient().ResetSequences(ctx, source.GetPrimary().Tablet, ts.Tables()) }) } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 42e5129b40e..789319a2a53 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -412,6 +412,13 @@ func TestMoveTables(t *testing.T) { ), fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs), ), nil) + ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, wf, tenv.dbName), sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|pos|stop_pos|max_tps|max_replication_lag|cell|tablet_types|time_updated|transaction_timestamp|state|message|db_name|rows_copied|tags|time_heartbeat|workflow_type|time_throttled|component_throttled|workflow_sub_type|defer_secondary_keys", + "int64|varchar|blob|varchar|int64|int64|varchar|varchar|int64|int64|varchar|varchar|varchar|int64|varchar|int64|int64|int64|varchar|int64|int64", + ), + fmt.Sprintf("%d|%s|%s|NULL|0|0|||1686577659|0|Stopped||%s|1||0|0|0||0|1", vreplID, bls, position, targetKs), + ), nil) ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, wf), sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|source|cell|tablet_types|state|message",