Skip to content

Commit

Permalink
add log to operator flag for vtgate
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Sep 11, 2024
1 parent 7d1e4b3 commit d955e51
Show file tree
Hide file tree
Showing 20 changed files with 110 additions and 52 deletions.
4 changes: 3 additions & 1 deletion go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ In particular, it contains:
}
schemaDir string
startMysql bool
logOpTraffic bool
mysqlPort = 3306
externalTopoServer bool
plannerName string
Expand Down Expand Up @@ -106,6 +107,7 @@ func init() {

Main.Flags().StringVar(&schemaDir, "schema_dir", schemaDir, "Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.")
Main.Flags().BoolVar(&startMysql, "start_mysql", startMysql, "Should vtcombo also start mysql")
Main.Flags().BoolVar(&logOpTraffic, "log_operator_traffic", false, "Logs all inter-operator communication in vtgate, including call counts and data flow statistics.")
Main.Flags().IntVar(&mysqlPort, "mysql_port", mysqlPort, "mysql port")
Main.Flags().BoolVar(&externalTopoServer, "external_topo_server", externalTopoServer, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")
Expand Down Expand Up @@ -333,7 +335,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
vtgate.QueryzHandler = "/debug/vtgate/queryz"

// pass nil for healthcheck, it will get created
vtg := vtgate.Init(ctx, env, nil, resilientServer, tpb.Cells[0], tabletTypes, plannerVersion)
vtg := vtgate.Init(ctx, env, nil, resilientServer, tpb.Cells[0], tabletTypes, plannerVersion, logOpTraffic)

// vtctld configuration and init
err = vtctld.InitVtctld(env, ts)
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtgate/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
cell string
tabletTypesToWait []topodatapb.TabletType
plannerName string
logOpTraffic bool
resilientServer *srvtopo.ResilientServer

Main = &cobra.Command{
Expand Down Expand Up @@ -182,7 +183,7 @@ func run(cmd *cobra.Command, args []string) error {
}

// pass nil for HealthCheck and it will be created
vtg := vtgate.Init(ctx, env, nil, resilientServer, cell, tabletTypes, plannerVersion)
vtg := vtgate.Init(ctx, env, nil, resilientServer, cell, tabletTypes, plannerVersion, logOpTraffic)

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
Expand Down Expand Up @@ -210,6 +211,7 @@ func init() {
Main.Flags().StringVar(&cell, "cell", cell, "cell to use")
Main.Flags().Var((*topoproto.TabletTypeListFlag)(&tabletTypesToWait), "tablet_types_to_wait", "Wait till connected for specified tablet types during Gateway initialization. Should be provided as a comma-separated set of tablet types.")
Main.Flags().StringVar(&plannerName, "planner-version", plannerName, "Sets the default planner to use when the session has not changed it. Valid values are: Gen4, Gen4Greedy, Gen4Left2Right")
Main.Flags().BoolVar(&logOpTraffic, "log_operator_traffic", false, "Logs all inter-operator communication in vtgate, including call counts and data flow statistics.")

Main.MarkFlagRequired("tablet_types_to_wait")
}
1 change: 1 addition & 0 deletions go/cmd/vttestserver/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func New() (cmd *cobra.Command) {
cmd.Flags().StringVar(&config.Charset, "charset", "utf8mb4", "MySQL charset")

cmd.Flags().StringVar(&config.PlannerVersion, "planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: Gen4, Gen4Greedy, Gen4Left2Right")
cmd.Flags().BoolVar(&config.LogOperatorTraffic, "log_operator_traffic", false, "Logs all inter-operator communication in vtgate, including call counts and data flow statistics.")

cmd.Flags().StringVar(&config.SnapshotFile, "snapshot_file", "",
"A MySQL DB snapshot file")
Expand Down
5 changes: 5 additions & 0 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type VSchemaWrapper struct {
EnableViews bool
TestBuilder func(query string, vschema plancontext.VSchema, keyspace string) (*engine.Plan, error)
Env *vtenv.Environment
LogOpTraffic bool
}

func (vw *VSchemaWrapper) GetPrepareData(stmtName string) *vtgatepb.PrepareData {
Expand Down Expand Up @@ -354,3 +355,7 @@ func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.Mir
}
return mirrorRule, err
}

func (vs *VSchemaWrapper) LogOperatorTraffic() bool {
return vs.LogOpTraffic
}
17 changes: 16 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,22 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, v
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
vte.vtgateExecutor = vtgate.NewExecutor(
ctx,
vte.env,
vte.explainTopo,
Cell,
resolver,
opts.Normalize,
false,
streamSize,
plans,
schemaTracker,
false,
opts.PlannerVersion,
0,
false, /*logOperatorTraffic*/
)
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (t *noopVCursor) UnresolvedTransactions(ctx context.Context, keyspace strin
panic("implement me")
}

func (t *noopVCursor) LogOperatorTraffic() bool {
return false
}

func (t *noopVCursor) SetExec(ctx context.Context, name string, value string) error {
panic("implement me")
}
Expand Down Expand Up @@ -874,6 +878,10 @@ func (f *loggingVCursor) UnresolvedTransactions(_ context.Context, _ string) ([]
return f.transactionStatusOutput, nil
}

func (f *loggingVCursor) LogOperatorTraffic() bool {
return false
}

// SQLParser implements VCursor
func (t *loggingVCursor) SQLParser() *sqlparser.Parser {
if t.parser == nil {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type Executor struct {

warmingReadsPercent int
warmingReadsChannel chan bool
logOperatorTraffic bool
}

var executorOnce sync.Once
Expand Down Expand Up @@ -161,6 +162,7 @@ func NewExecutor(
noScatter bool,
pv plancontext.PlannerVersion,
warmingReadsPercent int,
logOperatorTraffic bool,
) *Executor {
e := &Executor{
env: env,
Expand Down Expand Up @@ -1406,7 +1408,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st

func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.logOperatorTraffic)

stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)

executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
Expand Down Expand Up @@ -230,7 +230,7 @@ func createCustomExecutor(t testing.TB, vschema string, mysqlVersion string) (ex
plans := DefaultPlanCache()
env, err := vtenv.New(vtenv.Options{MySQLServerVersion: mysqlVersion})
require.NoError(t, err)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, env, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down Expand Up @@ -267,7 +267,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand All @@ -292,7 +292,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context,
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)

queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
executor = NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent, false)
executor.SetQueryLogger(queryLogger)

t.Cleanup(func() {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ func TestSelectListArg(t *testing.T) {
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
ex := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
ex.SetQueryLogger(queryLogger)
return ex
}
Expand Down Expand Up @@ -3269,7 +3269,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
}
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)
defer executor.Close()
// some sleep for all goroutines to start
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestStreamSQLSharded(t *testing.T) {
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
plans := DefaultPlanCache()

executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
executor := NewExecutor(ctx, vtenv.NewTestEnv(), serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0, false)
executor.SetQueryLogger(queryLogger)

defer executor.Close()
Expand Down
26 changes: 13 additions & 13 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,8 +1547,8 @@ var pv = querypb.ExecuteOptions_Gen4
func TestGetPlanUnnormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)

query1 := "select * from music_user_map where id = 1"
plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -1631,7 +1631,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
query1 := "select * from music_user_map where id = 1"

_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true)
Expand All @@ -1655,7 +1655,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)

unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1666,12 +1666,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1681,7 +1681,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)

query1 := "select * from music_user_map where id = 1"
_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */)
Expand All @@ -1698,7 +1698,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1709,12 +1709,12 @@ func TestGetPlanCacheNormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1724,8 +1724,8 @@ func TestGetPlanNormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

r.normalize = true
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)

query1 := "select * from music_user_map where id = 1"
query2 := "select * from music_user_map where id = 2"
Expand Down Expand Up @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) {

r.normalize = true
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, false)
assert.NoError(t, err)

stmt, err := sqlparser.NewTestParser().Parse(testCase.sql)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (e *Executor) newExecute(
}
}

vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, e.logOperatorTraffic)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d955e51

Please sign in to comment.