diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index 3cdf0165554..f0b11b7a3ac 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -132,7 +132,7 @@ func (t *noopVCursor) UnresolvedTransactions(ctx context.Context, keyspace strin panic("implement me") } -func (t *noopVCursor) StartPrimitiveTrace() func() map[Primitive]RowsReceived { +func (t *noopVCursor) StartPrimitiveTrace() func() Stats { panic("implement me") } diff --git a/go/vt/vtgate/engine/plan_description.go b/go/vt/vtgate/engine/plan_description.go index edc5dd14f4c..dfcad4e5e6b 100644 --- a/go/vt/vtgate/engine/plan_description.go +++ b/go/vt/vtgate/engine/plan_description.go @@ -48,7 +48,8 @@ type PrimitiveDescription struct { InputName string Inputs []PrimitiveDescription - Stats RowsReceived + RowsReceived RowsReceived + ShardsQueried *ShardsQueried } // MarshalJSON serializes the PlanDescription into a JSON representation. @@ -92,15 +93,20 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) { return nil, err } } - if len(pd.Stats) > 0 { - if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.Stats)); err != nil { + if len(pd.RowsReceived) > 0 { + if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.RowsReceived)); err != nil { return nil, err } - if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.Stats)); err != nil { + if err := marshalAdd(prepend, buf, "AvgNumberOfRows", average(pd.RowsReceived)); err != nil { return nil, err } - if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.Stats)); err != nil { + if err := marshalAdd(prepend, buf, "MedianNumberOfRows", median(pd.RowsReceived)); err != nil { + return nil, err + } + } + if pd.ShardsQueried != nil { + if err := marshalAdd(prepend, buf, "ShardsQueried", pd.ShardsQueried); err != nil { return nil, err } } @@ -219,10 +225,16 @@ func marshalAdd(prepend string, buf *bytes.Buffer, name string, obj any) error { // PrimitiveToPlanDescription transforms a primitive tree into a corresponding PlanDescription tree // If stats is not nil, it will be used to populate the stats field of the PlanDescription -func PrimitiveToPlanDescription(in Primitive, stats map[Primitive]RowsReceived) PrimitiveDescription { +func PrimitiveToPlanDescription(in Primitive, stats *Stats) PrimitiveDescription { this := in.description() if stats != nil { - this.Stats = stats[in] + this.RowsReceived = stats.InterOpStats[in] + + // Only applies to Route primitive + v, ok := stats.ShardsStats[in] + if ok { + this.ShardsQueried = &v + } } inputs, infos := in.Inputs() diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index b1bc55a61ce..76d0a28e516 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -143,7 +143,7 @@ type ( // StartPrimitiveTrace starts a trace for the given primitive, // and returns a function to get the trace logs after the primitive execution. - StartPrimitiveTrace() func() map[Primitive]RowsReceived + StartPrimitiveTrace() func() Stats } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/vexplain.go b/go/vt/vtgate/engine/vexplain.go index 0aed4493ddc..78941e8160f 100644 --- a/go/vt/vtgate/engine/vexplain.go +++ b/go/vt/vtgate/engine/vexplain.go @@ -44,7 +44,13 @@ type ( Type sqlparser.VExplainType } - RowsReceived []int + ShardsQueried int + RowsReceived []int + + Stats struct { + InterOpStats map[Primitive]RowsReceived + ShardsStats map[Primitive]ShardsQueried + } ) var _ Primitive = (*VExplain)(nil) @@ -111,7 +117,7 @@ func (v *VExplain) NeedsTransaction() bool { // TryExecute implements the Primitive interface func (v *VExplain) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - var stats func() map[Primitive]RowsReceived + var stats func() Stats if v.Type == sqlparser.TraceVExplainType { stats = vcursor.StartPrimitiveTrace() } else { @@ -130,7 +136,7 @@ func noOpCallback(*sqltypes.Result) error { // TryStreamExecute implements the Primitive interface func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - var stats func() map[Primitive]RowsReceived + var stats func() Stats if v.Type == sqlparser.TraceVExplainType { stats = vcursor.StartPrimitiveTrace() } else { @@ -148,7 +154,7 @@ func (v *VExplain) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVa return callback(result) } -func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) { +func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats func() Stats) (*sqltypes.Result, error) { switch v.Type { case sqlparser.QueriesVExplainType: result := convertToVExplainQueriesResult(vcursor.Session().GetVExplainLogs()) @@ -163,8 +169,9 @@ func (v *VExplain) convertToResult(ctx context.Context, vcursor VCursor, stats f } } -func (v *VExplain) getExplainTraceOutput(getOpStats func() map[Primitive]RowsReceived) (*sqltypes.Result, error) { - description := PrimitiveToPlanDescription(v.Input, getOpStats()) +func (v *VExplain) getExplainTraceOutput(getOpStats func() Stats) (*sqltypes.Result, error) { + stats := getOpStats() + description := PrimitiveToPlanDescription(v.Input, &stats) output, err := json.MarshalIndent(description, "", "\t") if err != nil { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 375f930a50f..4f616f77fc8 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -135,7 +135,8 @@ type ( // this is a map of the number of rows that every primitive has returned // if this field is nil, it means that we are not logging operator traffic - primitiveStats map[engine.Primitive]engine.RowsReceived + interOpStats map[engine.Primitive]engine.RowsReceived + shardsStats map[engine.Primitive]engine.ShardsQueried } ) @@ -285,10 +286,14 @@ func (vc *vcursorImpl) UnresolvedTransactions(ctx context.Context, keyspace stri return vc.executor.UnresolvedTransactions(ctx, targets) } -func (vc *vcursorImpl) StartPrimitiveTrace() func() map[engine.Primitive]engine.RowsReceived { - vc.primitiveStats = make(map[engine.Primitive]engine.RowsReceived) - return func() map[engine.Primitive]engine.RowsReceived { - return vc.primitiveStats +func (vc *vcursorImpl) StartPrimitiveTrace() func() engine.Stats { + vc.interOpStats = make(map[engine.Primitive]engine.RowsReceived) + vc.shardsStats = make(map[engine.Primitive]engine.ShardsQueried) + return func() engine.Stats { + return engine.Stats{ + InterOpStats: vc.interOpStats, + ShardsStats: vc.shardsStats, + } } } @@ -532,14 +537,20 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr } func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) { - if vc.primitiveStats != nil { - rows := vc.primitiveStats[primitive] + if vc.interOpStats != nil { + rows := vc.interOpStats[primitive] if res == nil { rows = append(rows, 0) } else { rows = append(rows, len(res.Rows)) } - vc.primitiveStats[primitive] = rows + vc.interOpStats[primitive] = rows + } +} + +func (vc *vcursorImpl) logShardsQueried(primitive engine.Primitive, shardsNb int) { + if vc.shardsStats != nil { + vc.shardsStats[primitive] += engine.ShardsQueried(shardsNb) } } @@ -558,7 +569,7 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive } func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error { - if vc.primitiveStats == nil { + if vc.interOpStats == nil { return callback } @@ -650,7 +661,7 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) - vc.logOpTraffic(primitive, qr) + vc.logShardsQueried(primitive, len(rss)) return qr, errs } @@ -689,7 +700,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P // The autocommit flag is always set to false because we currently don't // execute DMLs through ExecuteStandalone. qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver) - vc.logOpTraffic(primitive, qr) + vc.logShardsQueried(primitive, len(rss)) return qr, vterrors.Aggregate(errs) }