diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 591d4878150..043fffcd67a 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -20,13 +20,11 @@ import ( "fmt" "strings" "testing" - "time" - - "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/vt/log" "github.com/tidwall/gjson" + "vitess.io/vitess/go/test/endtoend/cluster" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -320,7 +318,6 @@ func TestMigrateSharded(t *testing.T) { createMoveTablesWorkflow(t, "customer,Lead,datze,customer2") tstWorkflowSwitchReadsAndWrites(t) tstWorkflowComplete(t) - log.Infof("The sharded keyspace customer is setup") var err error // create external cluster @@ -340,7 +337,6 @@ func TestMigrateSharded(t *testing.T) { verifyClusterHealth(t, extVc) extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) defer extVtgateConn.Close() - log.Infof("The external keyspace rating is setup") currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate var output string @@ -356,15 +352,14 @@ func TestMigrateSharded(t *testing.T) { t.Fatalf("Migrate command failed with %+v : %s\n", err, output) } waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - time.Sleep(3 * time.Second) - vc = extVc // this is because currently doVtctldclientVDiff is using the global vc :-( + // this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster + vc = extVc doVtctldclientVDiff(t, "rating", "e1", "zone1", nil) } func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) { rdonly := 0 shards := []string{"-80", "80-"} - log.Infof("vc is %v, cell is %v", vc, vc.Cells[cellName]) if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","), customerVSchema, customerSchema, defaultReplicas, rdonly, 1200, nil); err != nil { t.Fatal(err) @@ -374,10 +369,10 @@ func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard) require.NoError(t, err) if defaultReplicas > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, 30*time.Second)) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, waitTimeout)) } if rdonly > 0 { - require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, 30*time.Second)) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, waitTimeout)) } } } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f55661245c2..2b9ed8d93d7 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -962,6 +962,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN // Nothing left to do. return ts, state, nil } + var sourceKeyspace string // We reverse writes by using the source_keyspace.workflowname_reverse workflow @@ -1320,6 +1321,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl externalTopo *topo.Server sourceTopo = s.ts ) + // When the source is an external cluster mounted using the Mount command. if req.ExternalClusterName != "" { externalTopo, err = s.ts.OpenExternalVitessClusterServer(ctx, req.ExternalClusterName) @@ -1418,6 +1420,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if req.DropForeignKeys { createDDLMode = createDDLAsCopyDropForeignKeys } + for _, table := range tables { buf := sqlparser.NewTrackedBuffer(nil) buf.Myprintf("select * from %v", sqlparser.NewIdentifierCS(table)) @@ -1459,6 +1462,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sw := &switcher{s: s, ts: ts} + // When creating the workflow, locking the workflow and its target keyspace is sufficient. lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName()) ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MoveTablesCreate") @@ -1494,6 +1498,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } }() + // Now that the streams have been successfully created, let's put the associated // routing rules and denied tables entries in place. if externalTopo == nil { @@ -1525,6 +1530,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if err != nil { return nil, err } + migrationID, err := getMigrationID(targetKeyspace, tabletShards) if err != nil { return nil, err @@ -1544,6 +1550,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, vterrors.New(vtrpcpb.Code_INTERNAL, msg) } } + if req.AutoStart { if err := mz.startStreams(ctx); err != nil { return nil, err @@ -2122,6 +2129,7 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt resp.TableCopyState[table].BytesPercentage = tableSizePct } } + workflow, err := s.GetWorkflow(ctx, req.Keyspace, req.Workflow, false, req.Shards) if err != nil { return nil, err @@ -2177,6 +2185,7 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt resp.ShardStreams[ksShard].Streams[i] = ts } } + return resp, nil } @@ -2728,6 +2737,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf if err != nil { return nil, err } + sourceShards, targetShards := ts.getSourceAndTargetShardsNames() ts.isPartialMigration, err = ts.isPartialMoveTables(sourceShards, targetShards)