Skip to content

Commit

Permalink
actually add the required info to logstats
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Sep 12, 2024
1 parent 6994eb4 commit efbd268
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 29 deletions.
8 changes: 6 additions & 2 deletions go/logstats/logstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ type (

// NewLogStats constructs a new LogStats with supplied Method and ctx
// field values, and the StartTime field set to the present time.
func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable) *LogStats {
return &LogStats{
func NewLogStats(ctx context.Context, methodName, sql, sessionUUID string, bindVars map[string]*querypb.BindVariable, opStats bool) *LogStats {
l := &LogStats{
Ctx: ctx,
Method: methodName,
SQL: sql,
SessionUUID: sessionUUID,
BindVariables: bindVars,
StartTime: time.Now(),
}
if opStats {
l.PrimitiveStats = make(map[int]PrimitiveStats)
}
return l
}

// SaveEndTime sets the end time of this request to now
Expand Down
14 changes: 7 additions & 7 deletions go/logstats/logstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLogStatsFormat(t *testing.T) {
streamlog.SetRedactDebugUIQueries(false)
streamlog.SetQueryLogFormat("text")
}()
logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil)
logStats := NewLogStats(context.Background(), "test", "sql1", "suuid", nil, false)
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
logStats.TablesUsed = []string{"ks1.tbl1", "ks2.tbl2"}
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestLogStatsFormat(t *testing.T) {
func TestLogStatsFilter(t *testing.T) {
defer func() { streamlog.SetQueryLogFilterTag("") }()

logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)})
logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, false)
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
params := map[string][]string{"full": {}}
Expand All @@ -173,7 +173,7 @@ func TestLogStatsFilter(t *testing.T) {
func TestLogStatsRowThreshold(t *testing.T) {
defer func() { streamlog.SetQueryLogRowThreshold(0) }()

logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)})
logStats := NewLogStats(context.Background(), "test", "sql1 /* LOG_THIS_QUERY */", "", map[string]*querypb.BindVariable{"intVal": sqltypes.Int64BindVariable(1)}, false)
logStats.StartTime = time.Date(2017, time.January, 1, 1, 2, 3, 0, time.UTC)
logStats.EndTime = time.Date(2017, time.January, 1, 1, 2, 4, 1234, time.UTC)
params := map[string][]string{"full": {}}
Expand All @@ -197,14 +197,14 @@ func TestLogStatsContextHTML(t *testing.T) {
Html: testconversions.MakeHTMLForTest(html),
}
ctx := callinfo.NewContext(context.Background(), callInfo)
logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{})
logStats := NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, false)
if logStats.ContextHTML().String() != html {
t.Fatalf("expect to get html: %s, but got: %s", html, logStats.ContextHTML().String())
}
}

func TestLogStatsErrorStr(t *testing.T) {
logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{})
logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, false)
if logStats.ErrorStr() != "" {
t.Fatalf("should not get error in stats, but got: %s", logStats.ErrorStr())
}
Expand All @@ -216,7 +216,7 @@ func TestLogStatsErrorStr(t *testing.T) {
}

func TestLogStatsRemoteAddrUsername(t *testing.T) {
logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{})
logStats := NewLogStats(context.Background(), "test", "sql1", "", map[string]*querypb.BindVariable{}, false)
addr, user := logStats.RemoteAddrUsername()
if addr != "" {
t.Fatalf("remote addr should be empty")
Expand All @@ -232,7 +232,7 @@ func TestLogStatsRemoteAddrUsername(t *testing.T) {
User: username,
}
ctx := callinfo.NewContext(context.Background(), callInfo)
logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{})
logStats = NewLogStats(ctx, "test", "sql1", "", map[string]*querypb.BindVariable{}, false)
addr, user = logStats.RemoteAddrUsername()
if addr != remoteAddr {
t.Fatalf("expected to get remote addr: %s, but got: %s", remoteAddr, addr)
Expand Down
14 changes: 7 additions & 7 deletions go/test/endtoend/vtgate/queries/aggregation/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package aggregation
import (
_ "embed"
"flag"
"fmt"
"os"
"testing"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/test/endtoend/utils"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestMain(m *testing.M) {
// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
log.Fatalf("topo err: %v", err.Error())
}

// Start keyspace
Expand All @@ -63,27 +64,26 @@ func TestMain(m *testing.M) {
SchemaSQL: schemaSQL,
VSchema: vschema,
}
clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal"}
clusterInstance.VtGateExtraArgs = []string{"--schema_change_signal", "--log_operator_traffic=true"}
clusterInstance.VtTabletExtraArgs = []string{"--queryserver-config-schema-change-signal"}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 0, false)
if err != nil {
return 1
log.Fatalf("Error starting keyspace: %v", err.Error())
}

clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--enable_system_settings=true")
// Start vtgate
err = clusterInstance.StartVtgate()
if err != nil {
return 1
log.Fatalf("Error starting vtgate: %v", err.Error())
}

vtParams = clusterInstance.GetVTParams(keyspaceName)

// create mysql instance and connection parameters
conn, closer, err := utils.NewMySQL(clusterInstance, keyspaceName, schemaSQL)
if err != nil {
fmt.Println(err)
return 1
log.Fatalf("Error creating mysql instance: %v", err)
}
defer closer()
mysqlParams = conn
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func NewExecutor(
plans: plans,
warmingReadsPercent: warmingReadsPercent,
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
logOperatorTraffic: logOperatorTraffic,
}

vschemaacl.Init()
Expand Down Expand Up @@ -227,7 +228,7 @@ func (e *Executor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConn
trace.AnnotateSQL(span, sqlparser.Preview(sql))
defer span.Finish()

logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars)
logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic)
stmtType, result, err := e.execute(ctx, mysqlCtx, safeSession, sql, bindVars, logStats)
logStats.Error = err
if result == nil {
Expand Down Expand Up @@ -294,7 +295,7 @@ func (e *Executor) StreamExecute(
trace.AnnotateSQL(span, sqlparser.Preview(sql))
defer span.Finish()

logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars)
logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic)
srr := &streaminResultReceiver{callback: callback}
var err error

Expand Down Expand Up @@ -1353,7 +1354,7 @@ func isValidPayloadSize(query string) bool {

// Prepare executes a prepare statements.
func (e *Executor) Prepare(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) {
logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars)
logStats := logstats.NewLogStats(ctx, method, sql, safeSession.GetSessionUUID(), bindVars, e.logOperatorTraffic)
fld, err = e.prepare(ctx, safeSession, sql, bindVars, logStats)
logStats.Error = err

Expand Down Expand Up @@ -1591,7 +1592,7 @@ func (e *Executor) planPrepareStmt(ctx context.Context, vcursor *vcursorImpl, qu
}

// creating this log stats to not interfere with the original log stats.
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil)
lStats := logstats.NewLogStats(ctx, "prepare", query, vcursor.safeSession.SessionUUID, nil, e.logOperatorTraffic)
plan, err := e.getPlan(
ctx,
vcursor,
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ func assertCacheContains(t *testing.T, e *Executor, vc *vcursorImpl, sql string)
}

func getPlanCached(t *testing.T, ctx context.Context, e *Executor, vcursor *vcursorImpl, sql string, comments sqlparser.MarginComments, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool) (*engine.Plan, *logstats.LogStats) {
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, false)
vcursor.safeSession = &SafeSession{
Session: &vtgatepb.Session{
Options: &querypb.ExecuteOptions{SkipQueryPlanCache: skipQueryPlanCache}},
Expand Down Expand Up @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

r.normalize = true
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil, false)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
assert.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/querylogz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

func TestQuerylogzHandlerFormatting(t *testing.T) {
req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil)
logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil, false)
logStats.StmtType = "select"
logStats.RowsAffected = 1000
logStats.ShardQueries = 1
Expand Down
39 changes: 33 additions & 6 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,16 +516,22 @@ func (vc *vcursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
if vc.logOperatorTraffic {
stats := vc.logStats.PrimitiveStats[int(primitive.GetID())]
stats.NoOfCalls++
stats.Rows = append(stats.Rows, len(res.Rows))
}
vc.logOpTraffic(primitive, res)
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) {
if vc.logOperatorTraffic {
key := int(primitive.GetID())
stats := vc.logStats.PrimitiveStats[key]
stats.NoOfCalls++
stats.Rows = append(stats.Rows, len(res.Rows))
vc.logStats.PrimitiveStats[key] = stats
}
}

func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
// clone the vcursorImpl with a new session.
newVC := vc.cloneWithAutocommitSession()
Expand All @@ -534,12 +540,26 @@ func (vc *vcursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
continue
}
vc.logOpTraffic(primitive, res)
return res, err
}
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "upstream shards are not available")
}

func (vc *vcursorImpl) wrapCallback(callback func(*sqltypes.Result) error, primitive engine.Primitive) func(*sqltypes.Result) error {
if !vc.logOperatorTraffic {
return callback
}

return func(result *sqltypes.Result) error {
vc.logOpTraffic(primitive, result)
return callback(result)
}
}

func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
callback = vc.wrapCallback(callback, primitive)

for try := 0; try < MaxBufferingRetries; try++ {
err := primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback)
if err != nil && vterrors.RootCause(err) == buffer.ShardMissingError {
Expand All @@ -551,6 +571,8 @@ func (vc *vcursorImpl) StreamExecutePrimitive(ctx context.Context, primitive eng
}

func (vc *vcursorImpl) StreamExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(result *sqltypes.Result) error) error {
callback = vc.wrapCallback(callback, primitive)

// clone the vcursorImpl with a new session.
newVC := vc.cloneWithAutocommitSession()
for try := 0; try < MaxBufferingRetries; try++ {
Expand Down Expand Up @@ -617,12 +639,14 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)

vc.logOpTraffic(primitive, qr)
return qr, errs
}

// StreamExecuteMulti is the streaming version of ExecuteMultiShard.
func (vc *vcursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, rollbackOnError bool, autocommit bool, callback func(reply *sqltypes.Result) error) []error {
callback = vc.wrapCallback(callback, primitive)

noOfShards := len(rss)
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
Expand Down Expand Up @@ -654,6 +678,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P
// The autocommit flag is always set to false because we currently don't
// execute DMLs through ExecuteStandalone.
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver)
vc.logOpTraffic(primitive, qr)
return qr, vterrors.Aggregate(errs)
}

Expand Down Expand Up @@ -1432,6 +1457,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
logOperatorTraffic: false,
}

v.marginComments.Trailing += "/* warming read */"
Expand Down Expand Up @@ -1464,6 +1490,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
logOperatorTraffic: false,
}

v.marginComments.Trailing += "/* mirror query */"
Expand Down

0 comments on commit efbd268

Please sign in to comment.