diff --git a/codecov.yml b/codecov.yml index 61232ae9197..978fc499df1 100644 --- a/codecov.yml +++ b/codecov.yml @@ -44,7 +44,10 @@ comment: # https://docs.codecov.com/docs/pull-request-comments coverage: status: # https://docs.codecov.com/docs/commit-status + patch: + default: + informational: true # Don't ever fail the codecov/patch test project: default: - informational: true # Don't ever fail the codecov/project or codecov/patch tests + informational: true # Don't ever fail the codecov/project test diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 7f881c1f21b..08f5bb8926d 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "runtime" + "strconv" "strings" "testing" "time" @@ -31,6 +32,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet" @@ -56,6 +58,7 @@ type testCase struct { testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once) testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end extraVDiffFlags map[string]string + vdiffCount int64 // Keep track of the number of vdiffs created to test the stats } const ( @@ -119,6 +122,14 @@ var testCases = []*testCase{ }, } +func checkVDiffCountStat(t *testing.T, tablet *cluster.VttabletProcess, expectedCount int64) { + countStr, err := getDebugVar(t, tablet.Port, []string{"VDiffCount"}) + require.NoError(t, err, "failed to get VDiffCount stat from %s-%d tablet: %v", tablet.Cell, tablet.TabletUID, err) + count, err := strconv.Atoi(countStr) + require.NoError(t, err, "failed to convert VDiffCount stat string to int: %v", err) + require.Equal(t, expectedCount, int64(count), "expected VDiffCount stat to be %d but got %d", expectedCount, count) +} + func TestVDiff2(t *testing.T) { cellNames := "zone5,zone1,zone2,zone3,zone4" sourceKs := "product" @@ -172,6 +183,21 @@ func TestVDiff2(t *testing.T) { testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1}) }) } + + statsTablet := vc.getPrimaryTablet(t, targetKs, targetShards[0]) + + // We diffed X rows so confirm that the global total is > 0. + countStr, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsComparedTotal"}) + require.NoError(t, err, "failed to get VDiffRowsComparedTotal stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + count, err := strconv.Atoi(countStr) + require.NoError(t, err, "failed to convert VDiffRowsComparedTotal stat string to int: %v", err) + require.Greater(t, count, 0, "expected VDiffRowsComparedTotal stat to be greater than 0 but got %d", count) + + // The VDiffs should all be cleaned up so the VDiffRowsCompared value, which + // is produced from controller info, should be empty. + vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"}) + require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + require.Equal(t, "{}", vdrc, "expected VDiffRowsCompared stat to be empty but got %s", vdrc) } func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) { @@ -183,6 +209,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, } ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) + statsShard := arrTargetShards[0] + statsTablet := vc.getPrimaryTablet(t, tc.targetKs, statsShard) var args []string args = append(args, tc.typ, "--") args = append(args, "--source", tc.sourceKs) @@ -251,9 +279,21 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, } // Wait for the workflow to catch up again on the deletes. waitForShardsToCatchup() + tc.vdiffCount++ // We only did vtctldclient vdiff create } else { vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) + tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create } + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) + + // Confirm that the VDiffRowsCompared stat -- which is a running count of the rows + // compared by vdiff per table at the controller level -- works as expected. + vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"}) + require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + uuid, jsout := performVDiff2Action(t, false, ksWorkflow, allCellNames, "show", "last", false, "--verbose") + expect := gjson.Get(jsout, fmt.Sprintf("Reports.customer.%s", statsShard)).Int() + got := gjson.Get(vdrc, fmt.Sprintf("%s.%s.%s", tc.workflow, uuid, "customer")).Int() + require.Equal(t, expect, got, "expected VDiffRowsCompared stat to be %d, but got %d", expect, got) if tc.autoRetryError { testAutoRetryError(t, tc, allCellNames) @@ -263,26 +303,37 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, testResume(t, tc, allCellNames) } + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) + // These are done here so that we have a valid workflow to test the commands against. if tc.stop { testStop(t, ksWorkflow, allCellNames) + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } if tc.testCLICreateWait { testCLICreateWait(t, ksWorkflow, allCellNames) + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } if tc.testCLIErrors { testCLIErrors(t, ksWorkflow, allCellNames) } if tc.testCLIFlagHandling { testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0]) + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) + testDelete(t, ksWorkflow, allCellNames) + tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) // Create another VDiff record to confirm it gets deleted when the workflow is completed. ts := time.Now() - uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) + uuid, _ = performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts) + tc.vdiffCount++ + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow) require.NoError(t, err) @@ -291,6 +342,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // Confirm the VDiff data is deleted for the workflow. testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards) + tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) } func testCLIErrors(t *testing.T, ksWorkflow, cells string) { diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index ded232bf3c7..0b9dd6f45ed 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -63,7 +63,15 @@ var ( } ) -func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { +func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (resp *tabletmanagerdatapb.VDiffResponse, err error) { + defer func() { + if err != nil { + globalStats.ErrorCount.Add(1) + } + }() + if req == nil { + return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request") + } if !vde.isOpen { return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed") } @@ -71,7 +79,7 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is still trying to open") } - resp := &tabletmanagerdatapb.VDiffResponse{ + resp = &tabletmanagerdatapb.VDiffResponse{ Id: 0, Output: nil, } @@ -368,6 +376,9 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer } controller.Stop() delete(vde.controllers, controller.id) + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + delete(globalStats.controllers, controller.id) } switch req.ActionArg { diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index 1049bc8607d..4676238cf69 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -56,8 +56,13 @@ func TestPerformVDiffAction(t *testing.T) { expectQueries []queryAndResult wantErr error }{ + { + name: "nil request", + wantErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request"), + }, { name: "engine not open", + req: &tabletmanagerdatapb.VDiffRequest{}, vde: &Engine{isOpen: false}, wantErr: vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed"), }, @@ -208,6 +213,7 @@ func TestPerformVDiffAction(t *testing.T) { }, }, } + errCount := int64(0) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.preFunc != nil { @@ -224,6 +230,9 @@ func TestPerformVDiffAction(t *testing.T) { vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil) } got, err := tt.vde.PerformVDiffAction(ctx, tt.req) + if err != nil { + errCount++ + } vdiffenv.dbClient.Wait() if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) { t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr) @@ -239,6 +248,8 @@ func TestPerformVDiffAction(t *testing.T) { // No VDiffs should be running anymore. require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d", len(vdiffenv.vde.controllers)) + require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers, but found %d") }) + require.Equal(t, errCount, globalStats.ErrorCount.Get(), "expected error count %d, got %d", errCount, globalStats.ErrorCount.Get()) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 1c50c0597ef..0265e8a0a35 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -75,6 +76,11 @@ type controller struct { sourceTimeZone, targetTimeZone string // Named time zones if conversions are necessary for datetime values externalCluster string // For Mount+Migrate + + // Information used in vdiff stats/metrics. + Errors *stats.CountersWithMultiLabels + TableDiffRowCounts *stats.CountersWithMultiLabels + TableDiffPhaseTimings *stats.Timings } func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, @@ -84,16 +90,19 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac id, _ := row["id"].ToInt64() ct := &controller{ - id: id, - uuid: row["vdiff_uuid"].ToString(), - workflow: row["workflow"].ToString(), - dbClientFactory: dbClientFactory, - ts: ts, - vde: vde, - done: make(chan struct{}), - tmc: vde.tmClientFactory(), - sources: make(map[string]*migrationSource), - options: options, + id: id, + uuid: row["vdiff_uuid"].ToString(), + workflow: row["workflow"].ToString(), + dbClientFactory: dbClientFactory, + ts: ts, + vde: vde, + done: make(chan struct{}), + tmc: vde.tmClientFactory(), + sources: make(map[string]*migrationSource), + options: options, + Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), + TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } ctx, ct.cancel = context.WithCancel(ctx) go ct.run(ctx) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 16e8a89d90e..b2285a070fa 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -26,17 +26,16 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - "vitess.io/vitess/go/vt/vttablet/tmclient" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) type Engine struct { @@ -159,6 +158,7 @@ func (vde *Engine) openLocked(ctx context.Context) error { if err := vde.initControllers(rows); err != nil { return err } + vde.updateStats() // At this point we've fully and successfully opened so begin // retrying error'd VDiffs until the engine is closed. @@ -218,6 +218,9 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman row, vde.thisTablet.Alias) } vde.controllers[ct.id] = ct + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + globalStats.controllers[ct.id] = ct return nil } @@ -392,4 +395,16 @@ func (vde *Engine) resetControllers() { ct.Stop() } vde.controllers = make(map[int64]*controller) + vde.updateStats() +} + +// updateStats must only be called while holding the engine lock. +func (vre *Engine) updateStats() { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + globalStats.controllers = make(map[int64]*controller, len(vre.controllers)) + for id, ct := range vre.controllers { + globalStats.controllers[id] = ct + } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 3d984366bc9..b68e1f86556 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -17,6 +17,7 @@ limitations under the License. package vdiff import ( + "fmt" "sync" "vitess.io/vitess/go/stats" @@ -34,21 +35,40 @@ func init() { // vdiffStats exports the stats for Engine. It's a separate structure to // prevent potential deadlocks with the mutex in Engine. type vdiffStats struct { - mu sync.Mutex + mu sync.Mutex + controllers map[int64]*controller + Count *stats.Gauge + ErrorCount *stats.Counter RestartedTableDiffs *stats.CountersWithSingleLabel + RowsDiffedCount *stats.Counter } -func (st *vdiffStats) register() { +func (vds *vdiffStats) register() { + globalStats.Count = stats.NewGauge("", "") + globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") + globalStats.RowsDiffedCount = stats.NewCounter("", "") + + stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) + + stats.NewCounterFunc( + "VDiffErrorCountTotal", + "Number of errors encountered across all vdiff actions", + func() int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + return globalStats.ErrorCount.Get() + }, + ) stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", - "vdiff table diffs restarted due to max-diff-duration counts per table", + "Table diffs restarted due to --max-diff-duration counts by table", []string{"table_name"}, func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() + vds.mu.Lock() + defer vds.mu.Unlock() result := make(map[string]int64) for label, count := range globalStats.RestartedTableDiffs.Counts() { if label == "" { @@ -59,4 +79,71 @@ func (st *vdiffStats) register() { return result }, ) + + stats.NewCounterFunc( + "VDiffRowsComparedTotal", + "Number of rows compared across all vdiffs", + func() int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + return globalStats.RowsDiffedCount.Get() + }, + ) + + stats.NewGaugesFuncWithMultiLabels( + "VDiffRowsCompared", + "Live number of rows compared per vdiff by table", + []string{"workflow", "uuid", "table"}, + func() map[string]int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { + for key, val := range ct.TableDiffRowCounts.Counts() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val + } + } + return result + }, + ) + + stats.NewCountersFuncWithMultiLabels( + "VDiffErrors", + "Count of specific errors seen during the lifetime of a vdiff", + []string{"workflow", "uuid", "error"}, + func() map[string]int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { + for key, val := range ct.Errors.Counts() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val + } + } + return result + }, + ) + + stats.NewGaugesFuncWithMultiLabels( + "VDiffPhaseTimings", + "VDiff phase timings", + []string{"workflow", "uuid", "table", "phase"}, + func() map[string]int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { + for tablePhase, h := range ct.TableDiffPhaseTimings.Histograms() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, tablePhase)] = h.Total() + } + } + return result + }, + ) +} + +func (vds *vdiffStats) numControllers() int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + return int64(len(vds.controllers)) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go new file mode 100644 index 00000000000..8b1a174466f --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/stats" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestVDiffStats(t *testing.T) { + testStats := &vdiffStats{ + ErrorCount: stats.NewCounter("", ""), + RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""), + RowsDiffedCount: stats.NewCounter("", ""), + } + id := int64(1) + testStats.controllers = map[int64]*controller{ + id: { + id: id, + workflow: "testwf", + workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + uuid: uuid.New().String(), + Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), + TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), + }, + } + + require.Equal(t, int64(1), testStats.numControllers()) + + sleepTime := 1 * time.Millisecond + record := func(phase string) { + defer testStats.controllers[id].TableDiffPhaseTimings.Record(phase, time.Now()) + time.Sleep(sleepTime) + } + want := int64(1.2 * float64(sleepTime)) // Allow 20% overhead for recording timing + record(string(initializing)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(initializing)].Total()) + record(string(pickingTablets)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(pickingTablets)].Total()) + record(string(diffingTable)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(diffingTable)].Total()) + + testStats.ErrorCount.Set(11) + require.Equal(t, int64(11), testStats.ErrorCount.Get()) + + testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12)) + require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"]) + + testStats.RestartedTableDiffs.Add("t1", int64(5)) + require.Equal(t, int64(5), testStats.RestartedTableDiffs.Counts()["t1"]) + + testStats.RowsDiffedCount.Add(512) + require.Equal(t, int64(512), testStats.RowsDiffedCount.Get()) +} diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index b3634e37ab5..7ad83a3ad4b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -49,6 +49,19 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +type tableDiffPhase string + +const ( + initializing = tableDiffPhase("initializing") + pickingTablets = tableDiffPhase("picking_streaming_tablets") + syncingSources = tableDiffPhase("syncing_source_streams") + syncingTargets = tableDiffPhase("syncing_target_streams") + startingSources = tableDiffPhase("starting_source_data_streams") + startingTargets = tableDiffPhase("starting_target_data_streams") + restartingVreplication = tableDiffPhase("restarting_vreplication_streams") + diffingTable = tableDiffPhase("diffing_table") +) + // how long to wait for background operations to complete var BackgroundOperationTimeout = topo.RemoteOperationTimeout * 4 @@ -90,6 +103,7 @@ func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefiniti // initialize func (td *tableDiffer) initialize(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, initializing), time.Now()) vdiffEngine := td.wd.ct.vde vdiffEngine.snapshotMu.Lock() defer vdiffEngine.snapshotMu.Unlock() @@ -212,6 +226,7 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err } func (td *tableDiffer) selectTablets(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, pickingTablets), time.Now()) var ( wg sync.WaitGroup sourceErr, targetErr error @@ -287,6 +302,7 @@ func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells [] } func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, syncingSources), time.Now()) // source can be replica, wait for them to at least reach max gtid of all target streams ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) @@ -305,6 +321,7 @@ func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { } func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, syncingTargets), time.Now()) ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) defer cancel() @@ -327,6 +344,7 @@ func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { } func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, startingTargets), time.Now()) ct := td.wd.ct gtidch := make(chan string, 1) ct.targetShardStreamer.result = make(chan *sqltypes.Result, 1) @@ -341,6 +359,7 @@ func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { } func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, startingSources), time.Now()) if err := td.forEachSource(func(source *migrationSource) error { gtidch := make(chan string, 1) source.result = make(chan *sqltypes.Result, 1) @@ -359,6 +378,7 @@ func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { } func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) error { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, restartingVreplication), time.Now()) ct := td.wd.ct query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(ct.vde.dbName), encodeString(ct.workflow)) @@ -467,6 +487,7 @@ func (td *tableDiffer) setupRowSorters() { } func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onlyPks bool, maxExtraRowsToCompare int64, maxReportSampleRows int64, stop <-chan time.Time) (*DiffReport, error) { + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, diffingTable), time.Now()) dbClient := td.wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return nil, err @@ -515,6 +536,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl if err := td.updateTableProgress(dbClient, dr, lastProcessedRow); err != nil { log.Errorf("Failed to update vdiff progress on %s table: %v", td.table.Name, err) } + globalStats.RowsDiffedCount.Add(dr.ProcessedRows) }() for { @@ -741,6 +763,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } + td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index f477e88406e..97d2bd387cb 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -235,7 +235,13 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D return nil } -func (wd *workflowDiffer) diff(ctx context.Context) error { +func (wd *workflowDiffer) diff(ctx context.Context) (err error) { + defer func() { + if err != nil { + globalStats.ErrorCount.Add(1) + wd.ct.Errors.Add([]string{err.Error()}, 1) + } + }() dbClient := wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return err