Skip to content

Commit

Permalink
Self-review
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Sep 13, 2024
1 parent 6a55dcd commit 04684af
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
17 changes: 6 additions & 11 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
"fmt"
"strings"
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"

"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -320,7 +318,6 @@ func TestMigrateSharded(t *testing.T) {
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)
log.Infof("The sharded keyspace customer is setup")

var err error
// create external cluster
Expand All @@ -340,7 +337,6 @@ func TestMigrateSharded(t *testing.T) {
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()
log.Infof("The external keyspace rating is setup")

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
Expand All @@ -356,15 +352,14 @@ func TestMigrateSharded(t *testing.T) {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
time.Sleep(3 * time.Second)
vc = extVc // this is because currently doVtctldclientVDiff is using the global vc :-(
// this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster
vc = extVc
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
rdonly := 0
shards := []string{"-80", "80-"}
log.Infof("vc is %v, cell is %v", vc, vc.Cells[cellName])
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, defaultReplicas, rdonly, 1200, nil); err != nil {
t.Fatal(err)
Expand All @@ -374,10 +369,10 @@ func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string)
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
if defaultReplicas > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, 30*time.Second))
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, waitTimeout))
}
if rdonly > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, 30*time.Second))
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, waitTimeout))
}
}
}
10 changes: 10 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
// Nothing left to do.
return ts, state, nil
}

var sourceKeyspace string

// We reverse writes by using the source_keyspace.workflowname_reverse workflow
Expand Down Expand Up @@ -1320,6 +1321,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
externalTopo *topo.Server
sourceTopo = s.ts
)

// When the source is an external cluster mounted using the Mount command.
if req.ExternalClusterName != "" {
externalTopo, err = s.ts.OpenExternalVitessClusterServer(ctx, req.ExternalClusterName)
Expand Down Expand Up @@ -1418,6 +1420,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if req.DropForeignKeys {
createDDLMode = createDDLAsCopyDropForeignKeys
}

for _, table := range tables {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewIdentifierCS(table))
Expand Down Expand Up @@ -1459,6 +1462,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
sw := &switcher{s: s, ts: ts}

// When creating the workflow, locking the workflow and its target keyspace is sufficient.
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MoveTablesCreate")
Expand Down Expand Up @@ -1494,6 +1498,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
}
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules and denied tables entries in place.
if externalTopo == nil {
Expand Down Expand Up @@ -1525,6 +1530,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if err != nil {
return nil, err
}

migrationID, err := getMigrationID(targetKeyspace, tabletShards)
if err != nil {
return nil, err
Expand All @@ -1544,6 +1550,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, msg)
}
}

if req.AutoStart {
if err := mz.startStreams(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -2122,6 +2129,7 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
resp.TableCopyState[table].BytesPercentage = tableSizePct
}
}

workflow, err := s.GetWorkflow(ctx, req.Keyspace, req.Workflow, false, req.Shards)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2177,6 +2185,7 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
resp.ShardStreams[ksShard].Streams[i] = ts
}
}

return resp, nil
}

Expand Down Expand Up @@ -2728,6 +2737,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
if err != nil {
return nil, err
}

sourceShards, targetShards := ts.getSourceAndTargetShardsNames()

ts.isPartialMigration, err = ts.isPartialMoveTables(sourceShards, targetShards)
Expand Down

0 comments on commit 04684af

Please sign in to comment.