Skip to content

Commit

Permalink
VDiff: Add some stats (#15175)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 14, 2024
1 parent 1cdb279 commit fa59f9d
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 26 deletions.
5 changes: 4 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ comment: # https://docs.codecov.com/docs/pull-request-comments

coverage:
status: # https://docs.codecov.com/docs/commit-status
patch:
default:
informational: true # Don't ever fail the codecov/patch test
project:
default:
informational: true # Don't ever fail the codecov/project or codecov/patch tests
informational: true # Don't ever fail the codecov/project test

55 changes: 54 additions & 1 deletion go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"
Expand All @@ -56,6 +58,7 @@ type testCase struct {
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
vdiffCount int64 // Keep track of the number of vdiffs created to test the stats
}

const (
Expand Down Expand Up @@ -119,6 +122,14 @@ var testCases = []*testCase{
},
}

func checkVDiffCountStat(t *testing.T, tablet *cluster.VttabletProcess, expectedCount int64) {
countStr, err := getDebugVar(t, tablet.Port, []string{"VDiffCount"})
require.NoError(t, err, "failed to get VDiffCount stat from %s-%d tablet: %v", tablet.Cell, tablet.TabletUID, err)
count, err := strconv.Atoi(countStr)
require.NoError(t, err, "failed to convert VDiffCount stat string to int: %v", err)
require.Equal(t, expectedCount, int64(count), "expected VDiffCount stat to be %d but got %d", expectedCount, count)
}

func TestVDiff2(t *testing.T) {
cellNames := "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
Expand Down Expand Up @@ -172,6 +183,21 @@ func TestVDiff2(t *testing.T) {
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}

statsTablet := vc.getPrimaryTablet(t, targetKs, targetShards[0])

// We diffed X rows so confirm that the global total is > 0.
countStr, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsComparedTotal"})
require.NoError(t, err, "failed to get VDiffRowsComparedTotal stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
count, err := strconv.Atoi(countStr)
require.NoError(t, err, "failed to convert VDiffRowsComparedTotal stat string to int: %v", err)
require.Greater(t, count, 0, "expected VDiffRowsComparedTotal stat to be greater than 0 but got %d", count)

// The VDiffs should all be cleaned up so the VDiffRowsCompared value, which
// is produced from controller info, should be empty.
vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"})
require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
require.Equal(t, "{}", vdrc, "expected VDiffRowsCompared stat to be empty but got %s", vdrc)
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
Expand All @@ -183,6 +209,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

}
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)
statsShard := arrTargetShards[0]
statsTablet := vc.getPrimaryTablet(t, tc.targetKs, statsShard)
var args []string
args = append(args, tc.typ, "--")
args = append(args, "--source", tc.sourceKs)
Expand Down Expand Up @@ -251,9 +279,21 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
}
// Wait for the workflow to catch up again on the deletes.
waitForShardsToCatchup()
tc.vdiffCount++ // We only did vtctldclient vdiff create
} else {
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create
}
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// Confirm that the VDiffRowsCompared stat -- which is a running count of the rows
// compared by vdiff per table at the controller level -- works as expected.
vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"})
require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
uuid, jsout := performVDiff2Action(t, false, ksWorkflow, allCellNames, "show", "last", false, "--verbose")
expect := gjson.Get(jsout, fmt.Sprintf("Reports.customer.%s", statsShard)).Int()
got := gjson.Get(vdrc, fmt.Sprintf("%s.%s.%s", tc.workflow, uuid, "customer")).Int()
require.Equal(t, expect, got, "expected VDiffRowsCompared stat to be %d, but got %d", expect, got)

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand All @@ -263,26 +303,37 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testResume(t, tc, allCellNames)
}

checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// These are done here so that we have a valid workflow to test the commands against.
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}
if tc.testCLICreateWait {
testCLICreateWait(t, ksWorkflow, allCellNames)
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}
if tc.testCLIErrors {
testCLIErrors(t, ksWorkflow, allCellNames)
}
if tc.testCLIFlagHandling {
testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0])
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}

checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

testDelete(t, ksWorkflow, allCellNames)
tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// Create another VDiff record to confirm it gets deleted when the workflow is completed.
ts := time.Now()
uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
uuid, _ = performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts)
tc.vdiffCount++
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow)
require.NoError(t, err)
Expand All @@ -291,6 +342,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

// Confirm the VDiff data is deleted for the workflow.
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)
}

func testCLIErrors(t *testing.T, ksWorkflow, cells string) {
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,23 @@ var (
}
)

func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (resp *tabletmanagerdatapb.VDiffResponse, err error) {
defer func() {
if err != nil {
globalStats.ErrorCount.Add(1)
}
}()
if req == nil {
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request")
}
if !vde.isOpen {
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed")
}
if vde.cancelRetry != nil {
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is still trying to open")
}

resp := &tabletmanagerdatapb.VDiffResponse{
resp = &tabletmanagerdatapb.VDiffResponse{
Id: 0,
Output: nil,
}
Expand Down Expand Up @@ -368,6 +376,9 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer
}
controller.Stop()
delete(vde.controllers, controller.id)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
delete(globalStats.controllers, controller.id)
}

switch req.ActionArg {
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ func TestPerformVDiffAction(t *testing.T) {
expectQueries []queryAndResult
wantErr error
}{
{
name: "nil request",
wantErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request"),
},
{
name: "engine not open",
req: &tabletmanagerdatapb.VDiffRequest{},
vde: &Engine{isOpen: false},
wantErr: vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed"),
},
Expand Down Expand Up @@ -208,6 +213,7 @@ func TestPerformVDiffAction(t *testing.T) {
},
},
}
errCount := int64(0)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.preFunc != nil {
Expand All @@ -224,6 +230,9 @@ func TestPerformVDiffAction(t *testing.T) {
vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil)
}
got, err := tt.vde.PerformVDiffAction(ctx, tt.req)
if err != nil {
errCount++
}
vdiffenv.dbClient.Wait()
if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) {
t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -239,6 +248,8 @@ func TestPerformVDiffAction(t *testing.T) {
// No VDiffs should be running anymore.
require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d",
len(vdiffenv.vde.controllers))
require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers, but found %d")
})
require.Equal(t, errCount, globalStats.ErrorCount.Get(), "expected error count %d, got %d", errCount, globalStats.ErrorCount.Get())
}
}
29 changes: 19 additions & 10 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -75,6 +76,11 @@ type controller struct {
sourceTimeZone, targetTimeZone string // Named time zones if conversions are necessary for datetime values

externalCluster string // For Mount+Migrate

// Information used in vdiff stats/metrics.
Errors *stats.CountersWithMultiLabels
TableDiffRowCounts *stats.CountersWithMultiLabels
TableDiffPhaseTimings *stats.Timings
}

func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
Expand All @@ -84,16 +90,19 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
id, _ := row["id"].ToInt64()

ct := &controller{
id: id,
uuid: row["vdiff_uuid"].ToString(),
workflow: row["workflow"].ToString(),
dbClientFactory: dbClientFactory,
ts: ts,
vde: vde,
done: make(chan struct{}),
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
id: id,
uuid: row["vdiff_uuid"].ToString(),
workflow: row["workflow"].ToString(),
dbClientFactory: dbClientFactory,
ts: ts,
vde: vde,
done: make(chan struct{}),
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),
}
ctx, ct.cancel = context.WithCancel(ctx)
go ct.run(ctx)
Expand Down
27 changes: 21 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ import (

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

type Engine struct {
Expand Down Expand Up @@ -159,6 +158,7 @@ func (vde *Engine) openLocked(ctx context.Context) error {
if err := vde.initControllers(rows); err != nil {
return err
}
vde.updateStats()

// At this point we've fully and successfully opened so begin
// retrying error'd VDiffs until the engine is closed.
Expand Down Expand Up @@ -218,6 +218,9 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman
row, vde.thisTablet.Alias)
}
vde.controllers[ct.id] = ct
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
globalStats.controllers[ct.id] = ct
return nil
}

Expand Down Expand Up @@ -392,4 +395,16 @@ func (vde *Engine) resetControllers() {
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
vde.updateStats()
}

// updateStats must only be called while holding the engine lock.
func (vre *Engine) updateStats() {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()

globalStats.controllers = make(map[int64]*controller, len(vre.controllers))
for id, ct := range vre.controllers {
globalStats.controllers[id] = ct
}
}
Loading

0 comments on commit fa59f9d

Please sign in to comment.