Skip to content

Commit

Permalink
When Migrate workflow is run on sharded keyspaces where the keyspace …
Browse files Browse the repository at this point in the history
…names are different, we need to scope the vindex used in the vreplication filter to the local name

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Sep 12, 2024
1 parent 1db282a commit 6a55dcd
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 17 deletions.
84 changes: 83 additions & 1 deletion go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package vreplication

import (
"fmt"
"strings"
"testing"
"time"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"

"github.com/tidwall/gjson"

Expand Down Expand Up @@ -165,7 +170,7 @@ func TestVtctlMigrate(t *testing.T) {
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
func TestVtctldMigrate(t *testing.T) {
func TestVtctldMigrateUnsharded(t *testing.T) {
vc = NewVitessCluster(t, nil)

defaultReplicas = 0
Expand Down Expand Up @@ -299,3 +304,80 @@ func TestVtctldMigrate(t *testing.T) {
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}

// TestShardedMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestMigrateSharded(t *testing.T) {
setSidecarDBName("_vt")
defaultRdonly = 1
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
defer vc.TearDown()
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
tstWorkflowComplete(t)
log.Infof("The sharded keyspace customer is setup")

var err error
// create external cluster
extCell := "extcell1"
extCells := []string{extCell}
extVc := NewVitessCluster(t, &clusterOptions{
cells: extCells,
clusterConfig: externalClusterConfig,
})
defer extVc.TearDown()

setupExtKeyspace(t, extVc, "rating", extCell)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80")
require.NoError(t, err)
err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-")
require.NoError(t, err)
verifyClusterHealth(t, extVc)
extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort)
defer extVtgateConn.Close()
log.Infof("The external keyspace rating is setup")

currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate
var output string
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil {
t.Fatalf("Mount command failed with %+v : %s\n", err, output)
}
ksWorkflow := "rating.e1"
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
"--tablet-types=primary,replica"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
time.Sleep(3 * time.Second)
vc = extVc // this is because currently doVtctldclientVDiff is using the global vc :-(
doVtctldclientVDiff(t, "rating", "e1", "zone1", nil)
}

func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) {
rdonly := 0
shards := []string{"-80", "80-"}
log.Infof("vc is %v, cell is %v", vc, vc.Cells[cellName])
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","),
customerVSchema, customerSchema, defaultReplicas, rdonly, 1200, nil); err != nil {
t.Fatal(err)
}
vtgate := vc.Cells[cellName].Vtgates[0]
for _, shard := range shards {
err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard)
require.NoError(t, err)
if defaultReplicas > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), defaultReplicas, 30*time.Second))
}
if rdonly > 0 {
require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", ksName, shard), defaultRdonly, 30*time.Second))
}
}
}
15 changes: 14 additions & 1 deletion go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}

subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange)))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
19 changes: 6 additions & 13 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
// Nothing left to do.
return ts, state, nil
}

var sourceKeyspace string

// We reverse writes by using the source_keyspace.workflowname_reverse workflow
Expand Down Expand Up @@ -1321,7 +1320,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
externalTopo *topo.Server
sourceTopo = s.ts
)

// When the source is an external cluster mounted using the Mount command.
if req.ExternalClusterName != "" {
externalTopo, err = s.ts.OpenExternalVitessClusterServer(ctx, req.ExternalClusterName)
Expand Down Expand Up @@ -1420,7 +1418,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if req.DropForeignKeys {
createDDLMode = createDDLAsCopyDropForeignKeys
}

for _, table := range tables {
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v", sqlparser.NewIdentifierCS(table))
Expand Down Expand Up @@ -1462,7 +1459,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
sw := &switcher{s: s, ts: ts}

// When creating the workflow, locking the workflow and its target keyspace is sufficient.
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "MoveTablesCreate")
Expand Down Expand Up @@ -1498,7 +1494,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
}
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules and denied tables entries in place.
if externalTopo == nil {
Expand Down Expand Up @@ -1530,7 +1525,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if err != nil {
return nil, err
}

migrationID, err := getMigrationID(targetKeyspace, tabletShards)
if err != nil {
return nil, err
Expand All @@ -1550,7 +1544,6 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, msg)
}
}

if req.AutoStart {
if err := mz.startStreams(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -2129,12 +2122,10 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
resp.TableCopyState[table].BytesPercentage = tableSizePct
}
}

workflow, err := s.GetWorkflow(ctx, req.Keyspace, req.Workflow, false, req.Shards)
if err != nil {
return nil, err
}

// The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100.
// We sort the keys for intuitive and consistent output.
streamKeys := make([]string, 0, len(workflow.ShardStreams))
Expand Down Expand Up @@ -2186,13 +2177,16 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt
resp.ShardStreams[ksShard].Streams[i] = ts
}
}

return resp, nil
}

// GetCopyProgress returns the progress of all tables being copied in the
// workflow.
// GetCopyProgress returns the progress of all tables being copied in the workflow.
func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) {
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate {
// The logic below expects the source primaries to be in the same cluster as the target.
// For now we don't report progress for Migrate workflows.
return nil, nil
}
getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d"
getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)"
tables := make(map[string]bool)
Expand Down Expand Up @@ -2734,7 +2728,6 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
if err != nil {
return nil, err
}

sourceShards, targetShards := ts.getSourceAndTargetShardsNames()

ts.isPartialMigration, err = ts.isPartialMoveTables(sourceShards, targetShards)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,19 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, mappedCol)
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
var vindexName string
if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate {
// For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the
// SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source.
// Note: it is expected that the source and target keyspaces have the same vindex name and data type.
keyspace := mz.ms.TargetKeyspace
if mz.ms.ExternalCluster != "" {
keyspace = mz.ms.SourceKeyspace
}
vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name)
} else {
vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
}
subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName))
subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}"))
inKeyRange := &sqlparser.FuncExpr{
Expand Down
2 changes: 1 addition & 1 deletion test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@
},
"vreplication_materialize": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"],
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_and_materialize",
Expand Down

0 comments on commit 6a55dcd

Please sign in to comment.