diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 1f365c47600..57ec8238d2b 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -201,6 +201,8 @@ func TestVtctldMigrate(t *testing.T) { extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) insertInitialDataIntoExternalCluster(t, extVtgateConn) + targetPrimary := vc.getPrimaryTablet(t, "product", "0") + var output, expected string t.Run("mount external cluster", func(t *testing.T) { @@ -232,12 +234,12 @@ func TestVtctldMigrate(t *testing.T) { } waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1) - waitForRowCount(t, vtgateConn, "product:0", "rating", 2) - waitForRowCount(t, vtgateConn, "product:0", "review", 3) + waitForRowCountInTablet(t, targetPrimary, "product", "rating", 2) + waitForRowCountInTablet(t, targetPrimary, "product", "review", 3) execVtgateQuery(t, extVtgateConn, "rating", "insert into review(rid, pid, review) values(4, 1, 'review4');") execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") - waitForRowCount(t, vtgateConn, "product:0", "rating", 3) - waitForRowCount(t, vtgateConn, "product:0", "review", 4) + waitForRowCountInTablet(t, targetPrimary, "product", "rating", 3) + waitForRowCountInTablet(t, targetPrimary, "product", "review", 4) vdiffSideBySide(t, ksWorkflow, "extcell1") output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", @@ -268,8 +270,8 @@ func TestVtctldMigrate(t *testing.T) { require.NoError(t, err, "Migrate command failed with %s", output) expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String()) - waitForRowCount(t, vtgateConn, "product:0", "rating", 0) - waitForRowCount(t, vtgateConn, "product:0", "review", 0) + waitForRowCountInTablet(t, targetPrimary, "product", "rating", 0) + waitForRowCountInTablet(t, targetPrimary, "product", "review", 0) output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", "--target-keyspace", "product", "--workflow", "e1", "cancel") require.NoError(t, err, "Migrate command failed with %s", output) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 08f5bb8926d..fb8ed7c8787 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -306,10 +306,12 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, checkVDiffCountStat(t, statsTablet, tc.vdiffCount) // These are done here so that we have a valid workflow to test the commands against. + if tc.stop { testStop(t, ksWorkflow, allCellNames) tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } + if tc.testCLICreateWait { testCLICreateWait(t, ksWorkflow, allCellNames) tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create @@ -519,14 +521,16 @@ func testResume(t *testing.T, tc *testCase, cells string) { func testStop(t *testing.T, ksWorkflow, cells string) { t.Run("Stop", func(t *testing.T) { - // create a new VDiff and immediately stop it + // Create a new VDiff and immediately stop it. uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false) _, _ = performVDiff2Action(t, false, ksWorkflow, cells, "stop", uuid, false) - // confirm the VDiff is in the expected stopped state + // Confirm the VDiff is in the expected state. _, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", uuid, false) jsonOutput := getVDiffInfo(output) - require.Equal(t, "stopped", jsonOutput.State) - // confirm that the context cancelled error was also cleared + // It may have been able to complete before we could stop it (there's virtually no data + // to diff). There's no way to avoid this potential race so don't consider that a failure. + require.True(t, (jsonOutput.State == "stopped" || jsonOutput.State == "completed"), "expected vdiff state to be stopped or completed but it was %s", jsonOutput.State) + // Confirm that the context cancelled error was also cleared. require.False(t, strings.Contains(output, `"Errors":`)) }) } diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index 53e19e56731..561edfe8b7e 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -26,10 +26,12 @@ import ( "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vdiff2 "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) const ( @@ -329,16 +331,7 @@ type vdiffInfo struct { func getVDiffInfo(json string) *vdiffInfo { var info vdiffInfo - info.Workflow = gjson.Get(json, "Workflow").String() - info.Keyspace = gjson.Get(json, "Keyspace").String() - info.State = gjson.Get(json, "State").String() - info.Shards = gjson.Get(json, "Shards").String() - info.RowsCompared = gjson.Get(json, "RowsCompared").Int() - info.StartedAt = gjson.Get(json, "StartedAt").String() - info.CompletedAt = gjson.Get(json, "CompletedAt").String() - info.HasMismatch = gjson.Get(json, "HasMismatch").Bool() - info.Progress.Percentage = gjson.Get(json, "Progress.Percentage").Float() - info.Progress.ETA = gjson.Get(json, "Progress.ETA").String() + _ = json2.Unmarshal([]byte(json), &info) return &info }