Skip to content

Commit

Permalink
VDiff: properly split cell values in record when using TabletPicker (#…
Browse files Browse the repository at this point in the history
…14099)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Sep 27, 2023
1 parent b089f78 commit 2f679aa
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 37 deletions.
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/vreplication/vdiff/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ func registerVDiffCommands(root *cobra.Command) {
create.Flags().StringSliceVar(&createOptions.SourceCells, "source-cells", nil, "The source cell(s) to compare from; default is any available cell.")
create.Flags().StringSliceVar(&createOptions.TargetCells, "target-cells", nil, "The target cell(s) to compare with; default is any available cell.")
create.Flags().Var((*topoprotopb.TabletTypeListFlag)(&createOptions.TabletTypes), "tablet-types", "Tablet types to use on the source and target.")
create.Flags().BoolVar(&common.CreateOptions.TabletTypesInPreferenceOrder, "tablet-types-in-preference-order", true, "When performing source tablet selection, look for candidates in the type order as they are listed in the tablet-types flag.")
create.Flags().DurationVar(&createOptions.FilteredReplicationWaitTime, "filtered-replication-wait-time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for replication to catch up when syncing tablet streams.")
create.Flags().Uint32Var(&createOptions.Limit, "limit", math.MaxUint32, "Max rows to stop comparing after.")
create.Flags().BoolVar(&createOptions.DebugQuery, "debug-query", false, "Adds a mysql query to the report that can be used for further debugging.")
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,27 @@ var testCases = []*testCase{
}

func TestVDiff2(t *testing.T) {
allCellNames = "zone1"
defaultCellName := "zone1"
allCellNames = "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", []string{allCellNames}, mainClusterConfig)
vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
cells := []*Cell{defaultCell}
zone1 := vc.Cells["zone1"]
zone2 := vc.Cells["zone2"]
zone3 := vc.Cells["zone3"]
defaultCell = zone1

defer vc.TearDown(t)

vc.AddKeyspace(t, cells, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
_, err := vc.AddKeyspace(t, []*Cell{zone2, zone1, zone3}, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
require.NoError(t, err)

vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
Expand All @@ -140,23 +144,25 @@ func TestVDiff2(t *testing.T) {

generateMoreCustomers(t, sourceKs, 100)

_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
tks, err := vc.AddKeyspace(t, []*Cell{zone3, zone1, zone2}, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testWorkflow(t, vc, tc, cells)
// Primary tablets for any new shards are added in the first cell.
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) {
func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
arrTargetShards := strings.Split(tc.targetShards, ",")
if tc.typ == "Reshard" {
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
Expand All @@ -169,6 +175,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
if tc.typ == "Reshard" {
args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards)
}
args = append(args, "--cells", allCellNames)
args = append(args, "--tables", tc.tables)
args = append(args, "Create")
args = append(args, ksWorkflow)
Expand All @@ -180,7 +187,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
catchup(t, tab, tc.workflow, tc.typ)
}

vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil)
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
Expand Down
12 changes: 10 additions & 2 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow)

if useVtctlclient {
args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"}
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
}
Expand All @@ -195,7 +199,11 @@ func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, a
} else {
args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action}
if strings.ToLower(action) == string(vdiff2.CreateAction) {
args = append(args, "--tablet-types=primary", "--source-cells="+cells)
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells)
}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vdiff
import (
"context"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
Expand Down Expand Up @@ -107,8 +108,8 @@ func TestVDiff(t *testing.T) {
MaxRows: 100,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: tstenv.Cells[0],
TargetCell: tstenv.Cells[0],
SourceCell: strings.Join(tstenv.Cells, ","),
TargetCell: strings.Join(tstenv.Cells, ","),
TabletTypes: "primary",
},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
Expand Down
51 changes: 29 additions & 22 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -120,7 +121,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

if err := td.selectTablets(ctx, td.wd.opts.PickerOptions.SourceCell, td.wd.opts.PickerOptions.TabletTypes); err != nil {
if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
Expand Down Expand Up @@ -198,16 +199,22 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err
return allErrors.AggrError(vterrors.Aggregate)
}

func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes string) error {
var wg sync.WaitGroup
ct := td.wd.ct
var err1, err2 error
func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
)

// The cells from the vdiff record are a comma separated list.
sourceCells := strings.Split(td.wd.opts.PickerOptions.SourceCell, ",")
targetCells := strings.Split(td.wd.opts.PickerOptions.TargetCell, ",")

// For Mount+Migrate, the source tablets will be in a different
// Vitess cluster with its own TopoServer.
sourceTopoServer := ct.ts
if ct.externalCluster != "" {
extTS, err := ct.ts.OpenExternalVitessClusterServer(ctx, ct.externalCluster)
sourceTopoServer := td.wd.ct.ts
if td.wd.ct.externalCluster != "" {
extTS, err := td.wd.ct.ts.OpenExternalVitessClusterServer(ctx, td.wd.ct.externalCluster)
if err != nil {
return err
}
Expand All @@ -216,39 +223,39 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri
wg.Add(1)
go func() {
defer wg.Done()
err1 = td.forEachSource(func(source *migrationSource) error {
tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.vde.thisTablet.Alias.Cell, ct.sourceKeyspace, source.shard, tabletTypes)
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
if err != nil {
return err
}
source.tablet = tablet
source.tablet = sourceTablet
return nil
})
}()

wg.Add(1)
go func() {
defer wg.Done()
tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Alias.Cell, ct.vde.thisTablet.Keyspace,
ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if err2 != nil {
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Alias.Cell, td.wd.ct.vde.thisTablet.Keyspace,
td.wd.ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if targetErr != nil {
return
}
ct.targetShardStreamer = &shardStreamer{
tablet: tablet,
shard: tablet.Shard,
td.wd.ct.targetShardStreamer = &shardStreamer{
tablet: targetTablet,
shard: targetTablet.Shard,
}
}()

wg.Wait()
if err1 != nil {
return err1
if sourceErr != nil {
return sourceErr
}
return err2
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cell, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, []string{cell}, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 2f679aa

Please sign in to comment.