Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count how many shards were hit by routes when using vexplain #16802

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (t *noopVCursor) UnresolvedTransactions(ctx context.Context, keyspace strin
panic("implement me")
}

func (t *noopVCursor) StartPrimitiveTrace() func() map[Primitive]RowsReceived {
func (t *noopVCursor) StartPrimitiveTrace() func() Stats {
panic("implement me")
}

Expand Down
26 changes: 19 additions & 7 deletions go/vt/vtgate/engine/plan_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type PrimitiveDescription struct {
InputName string
Inputs []PrimitiveDescription

Stats RowsReceived
RowsReceived RowsReceived
ShardsQueried *ShardsQueried
}

// MarshalJSON serializes the PlanDescription into a JSON representation.
Expand Down Expand Up @@ -92,15 +93,20 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) {
return nil, err
}
}
if len(pd.Stats) > 0 {
if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.Stats)); err != nil {
if len(pd.RowsReceived) > 0 {
if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.RowsReceived)); err != nil {
return nil, err
}

if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.Stats)); err != nil {
if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.RowsReceived)); err != nil {
return nil, err
}
if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.Stats)); err != nil {
if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.RowsReceived)); err != nil {
return nil, err
}
}
if pd.ShardsQueried != nil {
if err := marshalAdd(prepend, buf, "ShardsQueried", pd.ShardsQueried); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -219,10 +225,16 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj any) error {

// PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree
// If stats is not nil, it will be used to populate the stats field of the PlanDescription
func PrimitiveToPlanDescription(in Primitive, stats map[Primitive]RowsReceived) PrimitiveDescription {
func PrimitiveToPlanDescription(in Primitive, stats *Stats) PrimitiveDescription {
this := in.description()
if stats != nil {
this.Stats = stats[in]
this.RowsReceived = stats.InterOpStats[in]

// Only applies to Route primitive
v, ok := stats.ShardsStats[in]
if ok {
this.ShardsQueried = &v
}
}

inputs, infos := in.Inputs()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type (

// StartPrimitiveTrace starts a trace for the given primitive,
// and returns a function to get the trace logs after the primitive execution.
StartPrimitiveTrace() func() map[Primitive]RowsReceived
StartPrimitiveTrace() func() Stats
}

// SessionActions gives primitives ability to interact with the session state
Expand Down
19 changes: 13 additions & 6 deletions go/vt/vtgate/engine/vexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ type (
Type sqlparser.VExplainType
}

RowsReceived []int
ShardsQueried int
RowsReceived []int

Stats struct {
InterOpStats map[Primitive]RowsReceived
ShardsStats map[Primitive]ShardsQueried
}
)

var _ Primitive = (*VExplain)(nil)
Expand Down Expand Up @@ -111,7 +117,7 @@ func (v *VExplain) NeedsTransaction() bool {

// TryExecute implements the Primitive interface
func (v *VExplain) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
var stats func() map[Primitive]RowsReceived
var stats func() Stats
if v.Type == sqlparser.TraceVExplainType {
stats = vcursor.StartPrimitiveTrace()
} else {
Expand All @@ -130,7 +136,7 @@ func noOpCallback(*sqltypes.Result) error {

// TryStreamExecute implements the Primitive interface
func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var stats func() map[Primitive]RowsReceived
var stats func() Stats
if v.Type == sqlparser.TraceVExplainType {
stats = vcursor.StartPrimitiveTrace()
} else {
Expand All @@ -148,7 +154,7 @@ func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVa
return callback(result)
}

func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) {
func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() Stats) (*sqltypes.Result, error) {
switch v.Type {
case sqlparser.QueriesVExplainType:
result := convertToVExplainQueriesResult(vcursor.Session().GetVExplainLogs())
Expand All @@ -163,8 +169,9 @@ func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats f
}
}

func (v *VExplain) getExplainTraceOutput(getOpStats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) {
description := PrimitiveToPlanDescription(v.Input, getOpStats())
func (v *VExplain) getExplainTraceOutput(getOpStats func() Stats) (*sqltypes.Result, error) {
stats := getOpStats()
description := PrimitiveToPlanDescription(v.Input, &stats)

output, err := json.MarshalIndent(description, "", "\t")
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/executor_vexplain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ func TestSimpleVexplainTrace(t *testing.T) {
"Name": "TestExecutor",
"Sharded": true
},
"NoOfCalls": 2,
"NoOfCalls": 1,
"AvgNumberOfRows": 16,
"MedianNumberOfRows": 16,
"ShardsQueried": 8,
"FieldQuery": "select count(*), col2, weight_string(col2) from music where 1 != 1 group by col2, weight_string(col2)",
"OrderBy": "(1|2) ASC",
"Query": "select count(*), col2, weight_string(col2) from music group by col2, weight_string(col2) order by col2 asc",
Expand Down
33 changes: 22 additions & 11 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type (

// this is a map of the number of rows that every primitive has returned
// if this field is nil, it means that we are not logging operator traffic
primitiveStats map[engine.Primitive]engine.RowsReceived
interOpStats map[engine.Primitive]engine.RowsReceived
shardsStats map[engine.Primitive]engine.ShardsQueried
}
)

Expand Down Expand Up @@ -285,10 +286,14 @@ func (vc *vcursorImpl) UnresolvedTransactions(ctx context.Context, keyspace stri
return vc.executor.UnresolvedTransactions(ctx, targets)
}

func (vc *vcursorImpl) StartPrimitiveTrace() func() map[engine.Primitive]engine.RowsReceived {
vc.primitiveStats = make(map[engine.Primitive]engine.RowsReceived)
return func() map[engine.Primitive]engine.RowsReceived {
return vc.primitiveStats
func (vc *vcursorImpl) StartPrimitiveTrace() func() engine.Stats {
vc.interOpStats = make(map[engine.Primitive]engine.RowsReceived)
vc.shardsStats = make(map[engine.Primitive]engine.ShardsQueried)
return func() engine.Stats {
return engine.Stats{
InterOpStats: vc.interOpStats,
ShardsStats: vc.shardsStats,
}
}
}

Expand Down Expand Up @@ -532,14 +537,20 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr
}

func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) {
if vc.primitiveStats != nil {
rows := vc.primitiveStats[primitive]
if vc.interOpStats != nil {
rows := vc.interOpStats[primitive]
if res == nil {
rows = append(rows, 0)
} else {
rows = append(rows, len(res.Rows))
}
vc.primitiveStats[primitive] = rows
vc.interOpStats[primitive] = rows
}
}

func (vc *vcursorImpl) logShardsQueried(primitive engine.Primitive, shardsNb int) {
if vc.shardsStats != nil {
vc.shardsStats[primitive] += engine.ShardsQueried(shardsNb)
}
}

Expand All @@ -558,7 +569,7 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive
}

func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error {
if vc.primitiveStats == nil {
if vc.interOpStats == nil {
return callback
}

Expand Down Expand Up @@ -650,7 +661,7 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.logOpTraffic(primitive, qr)
vc.logShardsQueried(primitive, len(rss))
return qr, errs
}

Expand Down Expand Up @@ -689,7 +700,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P
// The autocommit flag is always set to false because we currently don't
// execute DMLs through ExecuteStandalone.
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.logOpTraffic(primitive, qr)
vc.logShardsQueried(primitive, len(rss))
return qr, vterrors.Aggregate(errs)
}

Expand Down
Loading