diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index e7b38658d68..a0d5dac2cf1 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -132,6 +132,10 @@ func (t *noopVCursor) UnresolvedTransactions(ctx context.Context, keyspace strin panic("implement me") } +func (t *noopVCursor) StartPrimitiveTrace() func() map[int]RowsReceived { + panic("implement me") +} + func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) error { panic("implement me") } @@ -874,6 +878,10 @@ func (f *loggingVCursor) UnresolvedTransactions(_ context.Context, _ string) ([] return f.transactionStatusOutput, nil } +func (f *loggingVCursor) StartPrimitiveTrace() func() map[int]RowsReceived { + panic("implement me") +} + // SQLParser implements VCursor func (t *loggingVCursor) SQLParser() *sqlparser.Parser { if t.parser == nil { diff --git a/go/vt/vtgate/engine/plan_description.go b/go/vt/vtgate/engine/plan_description.go index 0da79723d22..bb196438d16 100644 --- a/go/vt/vtgate/engine/plan_description.go +++ b/go/vt/vtgate/engine/plan_description.go @@ -48,6 +48,8 @@ type PrimitiveDescription struct { ID PrimitiveID InputName string Inputs []PrimitiveDescription + + Stats RowsReceived } // MarshalJSON serializes the PlanDescription into a JSON representation. @@ -97,6 +99,14 @@ func (pd PrimitiveDescription) MarshalJSON() ([]byte, error) { return nil, err } } + if len(pd.Stats) > 0 { + if err := marshalAdd(prepend, buf, "NoOfCalls", len(pd.Stats)); err != nil { + return nil, err + } + if err := marshalAdd(prepend, buf, "Rows", pd.Stats); err != nil { + return nil, err + } + } err := addMap(pd.Other, buf) if err != nil { return nil, err diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 2f6f1c46db2..f0ce8a3924f 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -140,6 +140,10 @@ type ( // UnresolvedTransactions reads the state of all the unresolved atomic transactions in the given keyspace. UnresolvedTransactions(ctx context.Context, keyspace string) ([]*querypb.TransactionMetadata, error) + + // StartPrimitiveTrace starts a trace for the given primitive, + // and returns a function to get the trace logs after the primitive execution. + StartPrimitiveTrace() func() map[int]RowsReceived } // SessionActions gives primitives ability to interact with the session state diff --git a/go/vt/vtgate/engine/trace.go b/go/vt/vtgate/engine/trace.go new file mode 100644 index 00000000000..37d61462049 --- /dev/null +++ b/go/vt/vtgate/engine/trace.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "encoding/json" + + "vitess.io/vitess/go/mysql/collations" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +var _ Primitive = (*Trace)(nil) + +type Trace struct { + identifiablePrimitive + Inner Primitive +} + +type RowsReceived []int + +func (t *Trace) RouteType() string { + return t.Inner.RouteType() +} + +func (t *Trace) GetKeyspaceName() string { + return t.Inner.GetKeyspaceName() +} + +func (t *Trace) GetTableName() string { + return t.Inner.GetTableName() +} + +func getFields() []*querypb.Field { + return []*querypb.Field{{ + Name: "Trace", + Type: sqltypes.VarChar, + Charset: uint32(collations.SystemCollation.Collation), + Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG), + }} +} + +func (t *Trace) GetFields(context.Context, VCursor, map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return &sqltypes.Result{Fields: getFields()}, nil +} + +func (t *Trace) NeedsTransaction() bool { + return t.Inner.NeedsTransaction() +} + +func preWalk(desc PrimitiveDescription, f func(PrimitiveDescription)) { + f(desc) + for _, input := range desc.Inputs { + preWalk(input, f) + } +} + +func (t *Trace) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + getOpStats := vcursor.StartPrimitiveTrace() + _, err := t.Inner.TryExecute(ctx, vcursor, bindVars, wantfields) + if err != nil { + return nil, err + } + + return t.getExplainTraceOutput(getOpStats) +} + +func (t *Trace) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + getOpsStats := vcursor.StartPrimitiveTrace() + noop := func(result *sqltypes.Result) error { return nil } + err := t.Inner.TryStreamExecute(ctx, vcursor, bindVars, wantfields, noop) + if err != nil { + return err + } + + res, err := t.getExplainTraceOutput(getOpsStats) + if err != nil { + return err + } + + return callback(res) +} + +func (t *Trace) getExplainTraceOutput(getOpStats func() map[int]RowsReceived) (*sqltypes.Result, error) { + description := PrimitiveToPlanDescription(t.Inner) + statsMap := getOpStats() + + // let's add the stats to the description + preWalk(description, func(desc PrimitiveDescription) { + stats, found := statsMap[int(desc.ID)] + if !found { + return + } + desc.Stats = stats + }) + + output, err := json.MarshalIndent(description, "", "\t") + if err != nil { + return nil, err + } + + return &sqltypes.Result{ + Fields: getFields(), + Rows: []sqltypes.Row{{ + sqltypes.NewVarChar(string(output)), + }}, + }, nil +} + +func (t *Trace) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{t.Inner}, nil +} + +func (t *Trace) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Trace", + } +} diff --git a/go/vt/vtgate/planbuilder/vexplain.go b/go/vt/vtgate/planbuilder/vexplain.go index 21a35f02967..39ad61a03f7 100644 --- a/go/vt/vtgate/planbuilder/vexplain.go +++ b/go/vt/vtgate/planbuilder/vexplain.go @@ -38,6 +38,8 @@ func buildVExplainPlan(ctx context.Context, vexplainStmt *sqlparser.VExplainStmt return buildVExplainLoggingPlan(ctx, vexplainStmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) case sqlparser.PlanVExplainType: return buildVExplainVtgatePlan(ctx, vexplainStmt.Statement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) + case sqlparser.TraceVExplainType: + return buildVExplainTracePlan(ctx, vexplainStmt, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) } return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected vtexplain type: %s", vexplainStmt.Type.ToString()) } @@ -166,3 +168,21 @@ func explainPlan(explain *sqlparser.ExplainStmt, reservedVars *sqlparser.Reserve SingleShardOnly: true, }, tables...), nil } + +func buildVExplainTracePlan(ctx context.Context, explainStatement sqlparser.Statement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) { + innerInstruction, err := createInstructionFor(ctx, sqlparser.String(explainStatement), explainStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL) + if err != nil { + return nil, err + } + + // We'll go over the primitive tree and assign unique IDs + id := 1 + engine.PreOrderTraverse(innerInstruction.primitive, func(primitive engine.Primitive) { + primitive.SetID(engine.PrimitiveID(id)) + id++ + }) + + // We'll set the trace engine as the root primitive + innerInstruction.primitive = &engine.Trace{Inner: innerInstruction.primitive} + return innerInstruction, nil +} diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a3c3f96b5a9..018228b6e7c 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -65,73 +65,79 @@ var ( _ vindexes.VCursor = (*vcursorImpl)(nil) ) -// vcursor_impl needs these facilities to be able to be able to execute queries for vindexes -type iExecute interface { - Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) - ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) - StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, observer resultsObserver) []error - ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession, lockFuncType sqlparser.LockingFuncType) (*sqltypes.Result, error) - Commit(ctx context.Context, safeSession *SafeSession) error - ExecuteMessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error - ExecuteVStream(ctx context.Context, rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error - ReleaseLock(ctx context.Context, session *SafeSession) error - - showVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) - showShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) - showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) - showVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) - setVitessMetadata(ctx context.Context, name, value string) error - - // TODO: remove when resolver is gone - ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) - VSchema() *vindexes.VSchema - planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) - - environment() *vtenv.Environment - ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) - UnresolvedTransactions(ctx context.Context, targets []*querypb.Target) ([]*querypb.TransactionMetadata, error) -} - -// VSchemaOperator is an interface to Vschema Operations -type VSchemaOperator interface { - GetCurrentSrvVschema() *vschemapb.SrvVSchema - UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error -} - -// vcursorImpl implements the VCursor functionality used by dependent -// packages to call back into VTGate. -type vcursorImpl struct { - safeSession *SafeSession - keyspace string - tabletType topodatapb.TabletType - destination key.Destination - marginComments sqlparser.MarginComments - executor iExecute - resolver *srvtopo.Resolver - topoServer *topo.Server - logStats *logstats.LogStats - collation collations.ID - - // fkChecksState stores the state of foreign key checks variable. - // This state is meant to be the final fk checks state after consulting the - // session state, and the given query's comments for `SET_VAR` optimizer hints. - // A nil value represents that no foreign_key_checks value was provided. - fkChecksState *bool - ignoreMaxMemoryRows bool - vschema *vindexes.VSchema - vm VSchemaOperator - semTable *semantics.SemTable - warnShardedOnly bool // when using sharded only features, a warning will be warnings field - queryTimeout time.Duration - - warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here - pv plancontext.PlannerVersion - - warmingReadsPercent int - warmingReadsChannel chan bool - - resultsObserver resultsObserver -} +type ( + // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes + iExecute interface { + Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) + ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) + StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, observer resultsObserver) []error + ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession, lockFuncType sqlparser.LockingFuncType) (*sqltypes.Result, error) + Commit(ctx context.Context, safeSession *SafeSession) error + ExecuteMessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error + ExecuteVStream(ctx context.Context, rss []*srvtopo.ResolvedShard, filter *binlogdatapb.Filter, gtid string, callback func(evs []*binlogdatapb.VEvent) error) error + ReleaseLock(ctx context.Context, session *SafeSession) error + + showVitessReplicationStatus(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) + showShards(ctx context.Context, filter *sqlparser.ShowFilter, destTabletType topodatapb.TabletType) (*sqltypes.Result, error) + showTablets(filter *sqlparser.ShowFilter) (*sqltypes.Result, error) + showVitessMetadata(ctx context.Context, filter *sqlparser.ShowFilter) (*sqltypes.Result, error) + setVitessMetadata(ctx context.Context, name, value string) error + + // TODO: remove when resolver is gone + ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) + VSchema() *vindexes.VSchema + planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, query string) (*engine.Plan, sqlparser.Statement, error) + + environment() *vtenv.Environment + ReadTransaction(ctx context.Context, transactionID string) (*querypb.TransactionMetadata, error) + UnresolvedTransactions(ctx context.Context, targets []*querypb.Target) ([]*querypb.TransactionMetadata, error) + } + + // VSchemaOperator is an interface to Vschema Operations + VSchemaOperator interface { + GetCurrentSrvVschema() *vschemapb.SrvVSchema + UpdateVSchema(ctx context.Context, ksName string, vschema *vschemapb.SrvVSchema) error + } + + // vcursorImpl implements the VCursor functionality used by dependent + // packages to call back into VTGate. + vcursorImpl struct { + safeSession *SafeSession + keyspace string + tabletType topodatapb.TabletType + destination key.Destination + marginComments sqlparser.MarginComments + executor iExecute + resolver *srvtopo.Resolver + topoServer *topo.Server + logStats *logstats.LogStats + collation collations.ID + + // fkChecksState stores the state of foreign key checks variable. + // This state is meant to be the final fk checks state after consulting the + // session state, and the given query's comments for `SET_VAR` optimizer hints. + // A nil value represents that no foreign_key_checks value was provided. + fkChecksState *bool + ignoreMaxMemoryRows bool + vschema *vindexes.VSchema + vm VSchemaOperator + semTable *semantics.SemTable + warnShardedOnly bool // when using sharded only features, a warning will be warnings field + queryTimeout time.Duration + + warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here + pv plancontext.PlannerVersion + + warmingReadsPercent int + warmingReadsChannel chan bool + + resultsObserver resultsObserver + + // this is a map of the number of rows that every primitive has returned + // if this field is nil, it means that we are not logging operator traffic + primitiveStats map[int]engine.RowsReceived + } +) // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with // the query and supply it here. Trailing comments are typically sent by the application for various reasons, @@ -279,6 +285,13 @@ func (vc *vcursorImpl) UnresolvedTransactions(ctx context.Context, keyspace stri return vc.executor.UnresolvedTransactions(ctx, targets) } +func (vc *vcursorImpl) StartPrimitiveTrace() func() map[int]engine.RowsReceived { + vc.primitiveStats = make(map[int]engine.RowsReceived) + return func() map[int]engine.RowsReceived { + return vc.primitiveStats + } +} + // FindTable finds the specified table. If the keyspace what specified in the input, it gets used as qualifier. // Otherwise, the keyspace from the request is used, if one was provided. func (vc *vcursorImpl) FindTable(name sqlparser.TableName) (*vindexes.Table, string, topodatapb.TabletType, key.Destination, error) { @@ -512,11 +525,21 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { continue } + vc.logOpTraffic(primitive, res) return res, err } return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") } +func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) { + if vc.primitiveStats != nil { + key := int(primitive.GetID()) + rows := vc.primitiveStats[key] + rows = append(rows, len(res.Rows)) + vc.primitiveStats[key] = rows + } +} + func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { // clone the vcursorImpl with a new session. newVC := vc.cloneWithAutocommitSession() @@ -525,12 +548,26 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { continue } + vc.logOpTraffic(primitive, res) return res, err } return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") } +func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error { + if vc.primitiveStats == nil { + return callback + } + + return func(result *sqltypes.Result) error { + vc.logOpTraffic(primitive, result) + return callback(result) + } +} + func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + callback = vc.wrapCallback(callback, primitive) + for try := 0; try < MaxBufferingRetries; try++ { err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback) if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { @@ -542,6 +579,8 @@ func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive eng } func (vc *vcursorImpl) StreamExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error { + callback = vc.wrapCallback(callback, primitive) + // clone the vcursorImpl with a new session. newVC := vc.cloneWithAutocommitSession() for try := 0; try < MaxBufferingRetries; try++ { @@ -608,12 +647,14 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) - + vc.logOpTraffic(primitive, qr) return qr, errs } // StreamExecuteMulti is the streaming version of ExecuteMultiShard. func (vc *vcursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error { + callback = vc.wrapCallback(callback, primitive) + noOfShards := len(rss) atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards)) err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{}) @@ -645,6 +686,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P // The autocommit flag is always set to false because we currently don't // execute DMLs through ExecuteStandalone. qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver) + vc.logOpTraffic(primitive, qr) return qr, vterrors.Aggregate(errs) }