From efbd2681bd6bba08a8a2880eb7bf97b344c1596a Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Thu, 12 Sep 2024 08:02:25 +0200 Subject: [PATCH] actually add the required info to logstats Signed-off-by: Andres Taylor --- go/logstats/logstats.go | 8 +++- go/logstats/logstats_test.go | 14 +++---- .../vtgate/queries/aggregation/main_test.go | 14 +++---- go/vt/vtgate/executor.go | 9 +++-- go/vt/vtgate/executor_test.go | 4 +- go/vt/vtgate/querylogz_test.go | 2 +- go/vt/vtgate/vcursor_impl.go | 39 ++++++++++++++++--- 7 files changed, 61 insertions(+), 29 deletions(-) diff --git a/go/logstats/logstats.go b/go/logstats/logstats.go index 9d739e67a8c..8970f09bdb6 100644 --- a/go/logstats/logstats.go +++ b/go/logstats/logstats.go @@ -63,8 +63,8 @@ type ( // NewLogStats constructs a new LogStats with supplied Method and ctx // field values, and the StartTime field set to the present time. -func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable) *LogStats { - return &LogStats{ +func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable, opStats bool) *LogStats { + l := &LogStats{ Ctx: ctx, Method: methodName, SQL: sql, @@ -72,6 +72,10 @@ func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindV BindVariables: bindVars, StartTime: time.Now(), } + if opStats { + l.PrimitiveStats = make(map[int]PrimitiveStats) + } + return l } // SaveEndTime sets the end time of this request to now diff --git a/go/logstats/logstats_test.go b/go/logstats/logstats_test.go index ae3c01e0f0b..aa8b1c93998 100644 --- a/go/logstats/logstats_test.go +++ b/go/logstats/logstats_test.go @@ -59,7 +59,7 @@ func TestLogStatsFormat(t *testing.T) { streamlog.SetRedactDebugUIQueries(false) streamlog.SetQueryLogFormat("text") }() - logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil) + logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil, false) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) logStats.TablesUsed = []string{"ks1.tbl1", "ks2.tbl2"} @@ -150,7 +150,7 @@ func TestLogStatsFormat(t *testing.T) { func TestLogStatsFilter(t *testing.T) { defer func() { streamlog.SetQueryLogFilterTag("") }() - logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, false) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) params := map[string][]string{"full": {}} @@ -173,7 +173,7 @@ func TestLogStatsFilter(t *testing.T) { func TestLogStatsRowThreshold(t *testing.T) { defer func() { streamlog.SetQueryLogRowThreshold(0) }() - logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}) + logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, false) logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC) logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC) params := map[string][]string{"full": {}} @@ -197,14 +197,14 @@ func TestLogStatsContextHTML(t *testing.T) { Html: testconversions.MakeHTMLForTest(html), } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, false) if logStats.ContextHTML().String() != html { t.Fatalf("expect to get html: %s, but got: %s", html, logStats.ContextHTML().String()) } } func TestLogStatsErrorStr(t *testing.T) { - logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, false) if logStats.ErrorStr() != "" { t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr()) } @@ -216,7 +216,7 @@ func TestLogStatsErrorStr(t *testing.T) { } func TestLogStatsRemoteAddrUsername(t *testing.T) { - logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, false) addr, user := logStats.RemoteAddrUsername() if addr != "" { t.Fatalf("remote addr should be empty") @@ -232,7 +232,7 @@ func TestLogStatsRemoteAddrUsername(t *testing.T) { User: username, } ctx := callinfo.NewContext(context.Background(), callInfo) - logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}) + logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, false) addr, user = logStats.RemoteAddrUsername() if addr != remoteAddr { t.Fatalf("expected to get remote addr: %s, but got: %s", remoteAddr, addr) diff --git a/go/test/endtoend/vtgate/queries/aggregation/main_test.go b/go/test/endtoend/vtgate/queries/aggregation/main_test.go index 02013a9b0e2..720fd56a9f9 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/main_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/main_test.go @@ -19,10 +19,11 @@ package aggregation import ( _ "embed" "flag" - "fmt" "os" "testing" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/mysql" @@ -54,7 +55,7 @@ func TestMain(m *testing.M) { // Start topo server err := clusterInstance.StartTopo() if err != nil { - return 1 + log.Fatalf("topo err: %v", err.Error()) } // Start keyspace @@ -63,18 +64,18 @@ func TestMain(m *testing.M) { SchemaSQL: schemaSQL, VSchema: vschema, } - clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal"} + clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal", "--log_operator_traffic=true"} clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-schema-change-signal"} err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false) if err != nil { - return 1 + log.Fatalf("Error starting keyspace: %v", err.Error()) } clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--enable_system_settings=true") // Start vtgate err = clusterInstance.StartVtgate() if err != nil { - return 1 + log.Fatalf("Error starting vtgate: %v", err.Error()) } vtParams = clusterInstance.GetVTParams(keyspaceName) @@ -82,8 +83,7 @@ func TestMain(m *testing.M) { // create mysql instance and connection parameters conn, closer, err := utils.NewMySQL(clusterInstance, keyspaceName, schemaSQL) if err != nil { - fmt.Println(err) - return 1 + log.Fatalf("Error creating mysql instance: %v", err) } defer closer() mysqlParams = conn diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 37037b18595..3416d64c778 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -181,6 +181,7 @@ func NewExecutor( plans: plans, warmingReadsPercent: warmingReadsPercent, warmingReadsChannel: make(chan bool, warmingReadsConcurrency), + logOperatorTraffic: logOperatorTraffic, } vschemaacl.Init() @@ -227,7 +228,7 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn trace.AnnotateSQL(span, sqlparser.Preview(sql)) defer span.Finish() - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic) stmtType, result, err := e.execute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats) logStats.Error = err if result == nil { @@ -294,7 +295,7 @@ func (e *Executor) StreamExecute( trace.AnnotateSQL(span, sqlparser.Preview(sql)) defer span.Finish() - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic) srr := &streaminResultReceiver{callback: callback} var err error @@ -1353,7 +1354,7 @@ func isValidPayloadSize(query string) bool { // Prepare executes a prepare statements. func (e *Executor) Prepare(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) { - logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars) + logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic) fld, err = e.prepare(ctx, safeSession, sql, bindVars, logStats) logStats.Error = err @@ -1591,7 +1592,7 @@ func (e *Executor) planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, qu } // creating this log stats to not interfere with the original log stats. - lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil) + lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil, e.logOperatorTraffic) plan, err := e.getPlan( ctx, vcursor, diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 2240abc7160..dee5cdc4892 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1612,7 +1612,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *vcursorImpl, sql string) } func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *vcursorImpl, sql string, comments sqlparser.MarginComments, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool) (*engine.Plan, *logstats.LogStats) { - logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) + logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, false) vcursor.safeSession = &SafeSession{ Session: &vtgatepb.Session{ Options: &querypb.ExecuteOptions{SkipQueryPlanCache: skipQueryPlanCache}}, @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) + logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, false) vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false) assert.NoError(t, err) diff --git a/go/vt/vtgate/querylogz_test.go b/go/vt/vtgate/querylogz_test.go index b6efc9f1b9f..b4633dc3401 100644 --- a/go/vt/vtgate/querylogz_test.go +++ b/go/vt/vtgate/querylogz_test.go @@ -36,7 +36,7 @@ import ( func TestQuerylogzHandlerFormatting(t *testing.T) { req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil) - logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil) + logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil, false) logStats.StmtType = "select" logStats.RowsAffected = 1000 logStats.ShardQueries = 1 diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 258a9364e66..1a4cf93c85e 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -516,16 +516,22 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { continue } - if vc.logOperatorTraffic { - stats := vc.logStats.PrimitiveStats[int(primitive.GetID())] - stats.NoOfCalls++ - stats.Rows = append(stats.Rows, len(res.Rows)) - } + vc.logOpTraffic(primitive, res) return res, err } return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") } +func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) { + if vc.logOperatorTraffic { + key := int(primitive.GetID()) + stats := vc.logStats.PrimitiveStats[key] + stats.NoOfCalls++ + stats.Rows = append(stats.Rows, len(res.Rows)) + vc.logStats.PrimitiveStats[key] = stats + } +} + func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { // clone the vcursorImpl with a new session. newVC := vc.cloneWithAutocommitSession() @@ -534,12 +540,26 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { continue } + vc.logOpTraffic(primitive, res) return res, err } return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available") } +func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error { + if !vc.logOperatorTraffic { + return callback + } + + return func(result *sqltypes.Result) error { + vc.logOpTraffic(primitive, result) + return callback(result) + } +} + func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + callback = vc.wrapCallback(callback, primitive) + for try := 0; try < MaxBufferingRetries; try++ { err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback) if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError { @@ -551,6 +571,8 @@ func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive eng } func (vc *vcursorImpl) StreamExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error { + callback = vc.wrapCallback(callback, primitive) + // clone the vcursorImpl with a new session. newVC := vc.cloneWithAutocommitSession() for try := 0; try < MaxBufferingRetries; try++ { @@ -617,12 +639,14 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) - + vc.logOpTraffic(primitive, qr) return qr, errs } // StreamExecuteMulti is the streaming version of ExecuteMultiShard. func (vc *vcursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error { + callback = vc.wrapCallback(callback, primitive) + noOfShards := len(rss) atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards)) err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{}) @@ -654,6 +678,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P // The autocommit flag is always set to false because we currently don't // execute DMLs through ExecuteStandalone. qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver) + vc.logOpTraffic(primitive, qr) return qr, vterrors.Aggregate(errs) } @@ -1432,6 +1457,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnings: vc.warnings, pv: vc.pv, resultsObserver: nullResultsObserver{}, + logOperatorTraffic: false, } v.marginComments.Trailing += "/* warming read */" @@ -1464,6 +1490,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { warnings: vc.warnings, pv: vc.pv, resultsObserver: nullResultsObserver{}, + logOperatorTraffic: false, } v.marginComments.Trailing += "/* mirror query */"