Skip to content

Commit

Permalink
Move basic test to vtctldclient for static client uses
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 6, 2024
1 parent 4cf58c6 commit 0a945ef
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 38 deletions.
18 changes: 11 additions & 7 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,20 +504,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
require.NotEmpty(t, output)
gotDryRun := strings.Split(output, "\n")
require.True(t, len(gotDryRun) > 3)
startRow := 3
if strings.Contains(gotDryRun[0], "deprecated") {
var startRow int
if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient
startRow = 3
} else if strings.Contains(gotDryRun[0], "deprecated") {
startRow = 4
} else {
startRow = 2
}
gotDryRun = gotDryRun[startRow : len(gotDryRun)-1]
if len(want) != len(gotDryRun) {
t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
}
var match, fail bool
fail = false
for i, w := range want {
w = strings.TrimSpace(w)
g := strings.TrimSpace(gotDryRun[i])
if w[0] == '/' {
if len(w) > 0 && w[0] == '/' {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
Expand All @@ -526,11 +530,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

Expand Down Expand Up @@ -566,7 +570,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
t.Fatalf("%v %v", err, output)
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
Expand Down
35 changes: 20 additions & 15 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)

// Add cell alias containing only zone2
result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias")
result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias")
require.NoError(t, err, "command failed with output: %v", result)

verifyClusterHealth(t, vc)
Expand Down Expand Up @@ -836,7 +836,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, workflowType, ksWorkflow, true)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow)
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
Expand Down Expand Up @@ -945,7 +945,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
var err error

for _, shard := range strings.Split("-80,80-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
if err == nil {
t.Fatal("GetShard merchant:-80 failed")
}
Expand All @@ -954,7 +954,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
Expand Down Expand Up @@ -1403,7 +1403,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) {
waitDuration := 500 * time.Millisecond
duration := maxWait
for duration > 0 {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow)
require.NoError(t, err)
lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")

Expand Down Expand Up @@ -1486,7 +1486,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
}

func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace)
require.NoError(t, err)
}

Expand All @@ -1497,19 +1497,23 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic", "--cells="+cells, "--tablet-types=rdonly,replica",
"--dry-run")
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic", "--cells="+cells, "--dry-run")
if err == nil {
return
}
Expand All @@ -1535,11 +1539,11 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, "--cells="+cells, "--tablet-types=rdonly")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica",
command, ksWorkflow)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, "--cells="+cells, "--tablet-types=replica")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

Expand Down Expand Up @@ -1578,8 +1582,9 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run",
"SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
}
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,28 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for up to 30s",
"Create reverse replication workflow p2c_reverse",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Switch writes completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
"Start reverse replication streams on:",
" tablet 100 ",
"Mark vreplication streams frozen on:",
" Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer",
" Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer",
"Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]",
"Start reverse vreplication streams on: [tablet:100]",
"Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]",
"Unlock keyspace customer",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsSwitchWritesM2m3 = []string{
Expand Down
21 changes: 18 additions & 3 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,20 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
}
deleteLogs = nil
addLogs = nil
for _, source := range dr.ts.Sources() {
sources := maps.Values(dr.ts.Sources())
// Sort the slice for deterministic output.
sort.Slice(sources, func(i, j int) bool {
return sources[i].GetPrimary().Alias.Uid < sources[j].GetPrimary().Alias.Uid
})
for _, source := range sources {
deleteLogs = append(deleteLogs, fmt.Sprintf("shard:%s;tablet:%d", source.GetShard().ShardName(), source.GetShard().PrimaryAlias.Uid))
}
for _, target := range dr.ts.Targets() {
targets := maps.Values(dr.ts.Targets())
// Sort the slice for deterministic output.
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, target := range targets {
addLogs = append(addLogs, fmt.Sprintf("shard:%s;tablet:%d", target.GetShard().ShardName(), target.GetShard().PrimaryAlias.Uid))
}
if len(deleteLogs) > 0 {
Expand All @@ -150,7 +160,12 @@ func (dr *switcherDryRun) changeRouting(ctx context.Context) error {

func (dr *switcherDryRun) streamMigraterfinalize(ctx context.Context, ts *trafficSwitcher, workflows []string) error {
logs := make([]string, 0)
for _, t := range ts.Targets() {
targets := maps.Values(ts.Targets())
// Sort the slice for deterministic output.
sort.Slice(targets, func(i, j int) bool {
return targets[i].GetPrimary().Alias.Uid < targets[j].GetPrimary().Alias.Uid
})
for _, t := range targets {
logs = append(logs, fmt.Sprintf("tablet:%d", t.GetPrimary().Alias.Uid))
}
dr.drLog.Logf("Switch writes completed, freeze and delete vreplication streams on: [%s]", strings.Join(logs, ","))
Expand Down

0 comments on commit 0a945ef

Please sign in to comment.