Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve workflow cancel/delete #15977

Merged
merged 37 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0b12d82
Workflow Delete: remove workflow before artifacts
mattlord May 20, 2024
4401a21
Add unit test
mattlord May 20, 2024
51b75c1
Add timeouts to vx.CallbackContext usage
mattlord May 21, 2024
82005fb
Don't fail workflow cancel/cleanup if there's nothing to do
mattlord May 22, 2024
7dd69c2
Improve and unify logging for tablet reference
mattlord May 22, 2024
371c90c
Put initial denied tables entries in place on MoveTables create
mattlord May 22, 2024
7b87e59
Get target keyspace lock before adding denied tables entries
mattlord May 22, 2024
8650d95
Comment out broken unit test for now
mattlord May 22, 2024
4b381f5
Adjust e2e tests
mattlord May 22, 2024
6fa20cf
Merge remote-tracking branch 'origin/main' into vtctldclient_wf_delete
mattlord May 22, 2024
9fe930f
Adjust test after merging origin/main
mattlord May 22, 2024
4ea4dbe
Get vrepl e2e tests working
mattlord May 22, 2024
dddf1c2
Fix backup tests
mattlord May 23, 2024
3758430
Correct updatePrimaryTabletControl and test
mattlord May 29, 2024
0c6e12a
Complete/Cancel fixups
mattlord May 29, 2024
862bdba
Uncomment test
mattlord May 30, 2024
5b4c288
Unify denied table rules management when doing traffic switches
mattlord Jun 4, 2024
e501711
Add unit test framework and test cases (scaffolding)
mattlord Jun 4, 2024
5750a3c
Merge remote-tracking branch 'origin/main' into vtctldclient_wf_delete
mattlord Jun 4, 2024
e5f8acd
Ongoing improvements
mattlord Jun 4, 2024
c80f8a3
Add scaffolding for MoveTables traffic switching unit test
mattlord Jun 5, 2024
4cf58c6
Finish up new unit tests
mattlord Jun 5, 2024
0a945ef
Move basic test to vtctldclient for static client uses
mattlord Jun 6, 2024
2004173
Merge remote-tracking branch 'origin/main' into vtctldclient_wf_delete
mattlord Jun 6, 2024
aa572b8
Changes from self review
mattlord Jun 7, 2024
0c8a9ab
Correct warning message
mattlord Jun 7, 2024
1955b9a
Sort all the things in dry run switcher
mattlord Jun 7, 2024
392c8f6
Add Traffic Switching DryRun unit test
mattlord Jun 7, 2024
0550ea0
Improve dry run unit test
mattlord Jun 7, 2024
f86792c
Dry run unit test improvements part II
mattlord Jun 7, 2024
46e8b1f
Be consistent with test framework mutex
mattlord Jun 8, 2024
ae34e94
Use std parameter name
mattlord Jun 8, 2024
b308ed7
Remove extraneous changes
mattlord Jun 8, 2024
93613ac
Correct log message
mattlord Jun 8, 2024
f66d6c2
Add routing rules helper to test framework
mattlord Jun 9, 2024
4fc0037
Trim down PR
mattlord Jun 10, 2024
904ba09
Correct comment
mattlord Jun 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type VttabletProcess struct {
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbName string
DbPassword string
DbPort int
DbFlavor string
Expand Down Expand Up @@ -148,6 +149,8 @@ func (vttablet *VttabletProcess) Setup() (err error) {
return
}

vttablet.DbName = "vt_" + vttablet.Keyspace

vttablet.exit = make(chan error)
go func() {
if vttablet.proc != nil {
Expand Down Expand Up @@ -442,8 +445,11 @@ func (vttablet *VttabletProcess) TearDownWithTimeout(timeout time.Duration) erro

// CreateDB creates the database for keyspace
func (vttablet *VttabletProcess) CreateDB(keyspace string) error {
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS vt_%s", keyspace), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS vt_%s", keyspace), keyspace, false)
if vttablet.DbName == "" {
vttablet.DbName = "vt_" + keyspace
}
_, _ = vttablet.QueryTablet(fmt.Sprintf("drop database IF EXISTS %s", vttablet.DbName), keyspace, false)
_, err := vttablet.QueryTablet(fmt.Sprintf("create database IF NOT EXISTS %s", vttablet.DbName), keyspace, false)
return err
}

Expand Down
39 changes: 15 additions & 24 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout ti
select {
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v",
query, timeout, qr.Rows))
query, timeout, qr))
case <-ticker.C:
log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick)
}
Expand Down Expand Up @@ -147,19 +147,6 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri
return qr
}

func execVtgateQueryWithRetry(t *testing.T, conn *mysql.Conn, database string, query string, timeout time.Duration) *sqltypes.Result {
if strings.TrimSpace(query) == "" {
return nil
}
if database != "" {
execQuery(t, conn, "use `"+database+"`;")
}
execQuery(t, conn, "begin")
qr := execQueryWithRetry(t, conn, query, timeout)
execQuery(t, conn, "commit")
return qr
}

func checkHealth(t *testing.T, url string) bool {
resp, err := http.Get(url)
require.NoError(t, err)
Expand Down Expand Up @@ -516,20 +503,24 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
require.NotEmpty(t, output)
gotDryRun := strings.Split(output, "\n")
require.True(t, len(gotDryRun) > 3)
startRow := 3
if strings.Contains(gotDryRun[0], "deprecated") {
var startRow int
if strings.HasPrefix(gotDryRun[1], "Parameters:") { // vtctlclient
startRow = 3
} else if strings.Contains(gotDryRun[0], "deprecated") {
startRow = 4
} else {
startRow = 2
}
gotDryRun = gotDryRun[startRow : len(gotDryRun)-1]
if len(want) != len(gotDryRun) {
t.Fatalf("want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
require.Fail(t, "invalid dry run results", "want and got: lengths don't match, \nwant\n%s\n\ngot\n%s", strings.Join(want, "\n"), strings.Join(gotDryRun, "\n"))
}
var match, fail bool
fail = false
for i, w := range want {
w = strings.TrimSpace(w)
g := strings.TrimSpace(gotDryRun[i])
if w[0] == '/' {
if len(w) > 0 && w[0] == '/' {
w = strings.TrimSpace(w[1:])
result := strings.HasPrefix(g, w)
match = result
Expand All @@ -538,11 +529,11 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
}
if !match {
fail = true
t.Fatalf("want %s, got %s\n", w, gotDryRun[i])
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
}
}
if fail {
t.Fatalf("Dry run results don't match, want %s, got %s", want, gotDryRun)
require.Fail(t, "invalid dry run results", "Dry run results don't match, want %s, got %s", want, gotDryRun)
}
}

Expand Down Expand Up @@ -578,7 +569,7 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
var err error
found := false
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard); err != nil {
t.Fatalf("%v %v", err, output)
require.Fail(t, "GetShard error", "%v %v", err, output)
return false, err
}
jsonparser.ArrayEach([]byte(output), func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
Expand All @@ -602,8 +593,8 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
// confirmAllStreamsRunning confirms that all of the workflow's streams are
// in the running state.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
Expand Down Expand Up @@ -801,7 +792,7 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool

func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
query := fmt.Sprintf("select count(*) from %s", table)
qr := execVtgateQuery(t, vtgateConn, "", query)
qr := execQuery(t, vtgateConn, query)
numRows, _ := qr.Rows[0][0].ToInt()
return numRows
}
Expand Down
51 changes: 32 additions & 19 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")

expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
Expand Down Expand Up @@ -605,7 +606,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
vc.AddKeyspace(t, []*Cell{cell1, cell2}, keyspace, shard, initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, sourceKsOpts)

// Add cell alias containing only zone2
result, err := vc.VtctlClient.ExecuteCommandWithOutput("AddCellsAlias", "--", "--cells", "zone2", "alias")
result, err := vc.VtctldClient.ExecuteCommandWithOutput("AddCellsAlias", "--cells", "zone2", "alias")
require.NoError(t, err, "command failed with output: %v", result)

verifyClusterHealth(t, vc)
Expand Down Expand Up @@ -722,10 +723,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
execVtgateQuery(t, vtgateConn, sourceKs, "update json_tbl set j1 = null, j2 = 'null', j3 = '\"null\"'")
execVtgateQuery(t, vtgateConn, sourceKs, "insert into json_tbl(id, j1, j2, j3) values (7, null, 'null', '\"null\"')")
waitForNoWorkflowLag(t, vc, targetKs, workflow)
for _, shard := range []string{"-80", "80-"} {
shardTarget := fmt.Sprintf("%s:%s", targetKs, shard)
if res := execVtgateQuery(t, vtgateConn, shardTarget, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, vtgateConn, shardTarget, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
for _, tablet := range []*cluster.VttabletProcess{customerTab1, customerTab2} {
// Query the tablet's mysqld directly as the targets will have denied table entries.
dbc, err := tablet.TabletConn(targetKs, true)
require.NoError(t, err)
defer dbc.Close()
if res := execQuery(t, dbc, "select cid from customer"); len(res.Rows) > 0 {
waitForQueryResult(t, dbc, tablet.DbName, "select distinct dec80 from customer", `[[DECIMAL(0)]]`)
dec80Replicated = true
}
}
Expand Down Expand Up @@ -833,7 +837,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
printShardPositions(vc, ksShards)
switchWrites(t, workflowType, ksWorkflow, true)

output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow)
require.NoError(t, err)
require.Contains(t, output, "'customer.reverse_bits'")
require.Contains(t, output, "'customer.bmd5'")
Expand Down Expand Up @@ -942,7 +946,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {
var err error

for _, shard := range strings.Split("-80,80-", ",") {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard)
if err == nil {
t.Fatal("GetShard merchant:-80 failed")
}
Expand All @@ -951,7 +955,7 @@ func reshardMerchant2to3SplitMerge(t *testing.T) {

for _, shard := range strings.Split("-40,40-c0,c0-", ",") {
ksShard := fmt.Sprintf("%s:%s", merchantKeyspace, shard)
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", ksShard)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShard", ksShard)
if err != nil {
t.Fatalf("GetShard merchant failed for: %s: %v", shard, err)
}
Expand Down Expand Up @@ -1400,7 +1404,7 @@ func waitForLowLag(t *testing.T, keyspace, workflow string) {
waitDuration := 500 * time.Millisecond
duration := maxWait
for duration > 0 {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "show", "--workflow", workflow)
require.NoError(t, err)
lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag")

Expand Down Expand Up @@ -1483,7 +1487,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
}

func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema", vschema, keyspace)
require.NoError(t, err)
}

Expand All @@ -1494,19 +1498,24 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--tablet-types=rdonly,replica", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
if dryRunResults != nil {
validateDryRunResults(t, output, dryRunResults)
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, "SwitchTraffic",
"--cells="+cells, "--dry-run")
if err == nil {
return
}
Expand All @@ -1532,11 +1541,13 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=rdonly")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=replica",
command, ksWorkflow)
output, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command,
"--cells="+cells, "--tablet-types=replica")
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
}

Expand Down Expand Up @@ -1575,8 +1586,10 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--tablet_types=primary", "--dry_run",
"SwitchTraffic", ksWorkflow)
ks, wf, ok := strings.Cut(ksWorkflow, ".")
require.True(t, ok)
output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.NoError(t, err, fmt.Sprintf("Switch writes DryRun Error: %s: %s", err, output))
validateDryRunResults(t, output, dryRunResults)
}
Expand Down
23 changes: 10 additions & 13 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,28 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for up to 30s",
"Create reverse replication workflow p2c_reverse",
"/Stop writes on keyspace product for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]: [keyspace:product;shard:0;position:",
"Wait for vreplication on stopped streams to catchup for up to 30s",
"Create reverse vreplication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Enable writes on keyspace customer for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order]",
"Switch routing from keyspace product to keyspace customer",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Switch writes completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
"Start reverse replication streams on:",
" tablet 100 ",
"Mark vreplication streams frozen on:",
" Keyspace customer, Shard -80, Tablet 200, Workflow p2c, DbName vt_customer",
" Keyspace customer, Shard 80-, Tablet 300, Workflow p2c, DbName vt_customer",
"Switch writes completed, freeze and delete vreplication streams on: [tablet:200,tablet:300]",
"Start reverse vreplication streams on: [tablet:100]",
"Mark vreplication streams frozen on: [keyspace:customer;shard:-80;tablet:200;workflow:p2c;dbname:vt_customer,keyspace:customer;shard:80-;tablet:300;workflow:p2c;dbname:vt_customer]",
"Unlock keyspace customer",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] to keyspace customer for tablet types [RDONLY,REPLICA]",
"Routing rules for tables [Lead,Lead-1,blüb_tbl,customer,db_order_test,geom_tbl,json_tbl,loadtest,reftable,vdiff_order] will be updated",
"Serving VSchema will be rebuilt for the customer keyspace",
"Unlock keyspace product",
"", // Additional empty newline in the output
}

var dryRunResultsSwitchWritesM2m3 = []string{
Expand Down
Loading
Loading