Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-20.0] VReplication: Improve workflow cancel/delete (#15977) #16131

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type VttabletProcess struct {
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbName string
DbPassword string
DbPort int
DbFlavor string
Expand Down Expand Up @@ -148,6 +149,8 @@ func (vttablet *VttabletProcess) Setup() (err error) {
return
}

vttablet.DbName = "vt_" + vttablet.Keyspace

vttablet.exit = make(chan error)
go func() {
if vttablet.proc != nil {
Expand Down Expand Up @@ -442,8 +445,11 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro

// CreateDB creates the database for keyspace
func (vttablet *VttabletProcess) CreateDB(keyspace string) error {
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS vt_%s", keyspace), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace), keyspace, false)
if vttablet.DbName == "" {
vttablet.DbName = "vt_" + keyspace
}
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS %s", vttablet.DbName), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS %s", vttablet.DbName), keyspace, false)
return err
}

Expand Down
39 changes: 15 additions & 24 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout ti
select {
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v",
query, timeout, qr.Rows))
query, timeout, qr))
case <-ticker.C:
log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick)
}
Expand Down Expand Up @@ -147,19 +147,6 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri
return qr
}

func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result {
if strings.TrimSpace(query) == "" {
return nil
}
if database != "" {
execQuery(t, conn, "use `"+database+"`;")
}
execQuery(t, conn, "begin")
qr := execQueryWithRetry(t, conn, query, timeout)
execQuery(t, conn, "commit")
return qr
}

func checkHealth(t *testing.T, url string) bool {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -516,20 +503,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
require.NotEmpty(t, output)
gotDryRun := strings.Split(output, "\n")
require.True(t, len(gotDryRun) > 3)
startRow := 3
if strings.Contains(gotDryRun[0], "deprecated") {
var startRow int
if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient
startRow = 3
} else if strings.Contains(gotDryRun[0], "deprecated") {
startRow = 4
} else {
startRow = 2
}
gotDryRun = gotDryRun[startRow : len(gotDryRun)-1]
if len(want) != len(gotDryRun) {
t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
}
var match, fail bool
fail = false
for i, w := range want {
w = strings.TrimSpace(w)
g := strings.TrimSpace(gotDryRun[i])
if w[0] == '/' {
if len(w) > 0 && w[0] == '/' {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
Expand All @@ -538,11 +529,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

Expand Down Expand Up @@ -578,7 +569,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
t.Fatalf("%v %v", err, output)
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
Expand All @@ -602,8 +593,8 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
// confirmAllStreamsRunning confirms that all of the workflow's streams are
// in the running state.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
Expand Down Expand Up @@ -801,7 +792,7 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool

func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
query := fmt.Sprintf("select count(*) from %s", table)
qr := execVtgateQuery(t, vtgateConn, "", query)
qr := execQuery(t, vtgateConn, query)
numRows, _ := qr.Rows[0][0].ToInt()
return numRows
}
Expand Down
51 changes: 32 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")

expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
Expand Down Expand Up @@ -605,7 +606,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)

// Add cell alias containing only zone2
result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias")
result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias")
require.NoError(t, err, "command failed with output: %v", result)

verifyClusterHealth(t, vc)
Expand Down Expand Up @@ -722,10 +723,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
require.NoError(t, err)
defer dbc.Close()
if res := execQuery(t, dbc, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, dbc, tablet.DbName, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
dec80Replicated = true
}
}
Expand Down Expand Up @@ -833,7 +837,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, workflowType, ksWorkflow, true)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow)
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
Expand Down Expand Up @@ -942,7 +946,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
var err error

for _, shard := range strings.Split("-80,80-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
if err == nil {
t.Fatal("GetShard merchant:-80 failed")
}
Expand All @@ -951,7 +955,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
Expand Down Expand Up @@ -1400,7 +1404,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) {
waitDuration := 500 * time.Millisecond
duration := maxWait
for duration > 0 {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow)
require.NoError(t, err)
lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")

Expand Down Expand Up @@ -1483,7 +1487,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
}

func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace)
require.NoError(t, err)
}

Expand All @@ -1494,19 +1498,24 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--tablet-types=rdonly,replica", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--dry-run")
if err == nil {
return
}
Expand All @@ -1532,11 +1541,13 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=rdonly")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica",
command, ksWorkflow)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=replica")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

Expand Down Expand Up @@ -1575,8 +1586,10 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run",
"SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
}
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,28 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for up to 30s",
"Create reverse replication workflow p2c_reverse",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Switch writes completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
"Start reverse replication streams on:",
" tablet 100 ",
"Mark vreplication streams frozen on:",
" Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer",
" Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer",
"Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]",
"Start reverse vreplication streams on: [tablet:100]",
"Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]",
"Unlock keyspace customer",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsSwitchWritesM2m3 = []string{
Expand Down
Loading
Loading