Skip to content

Commit

Permalink
vtexplain: Ensure memory topo is set up for throttler (#15279)
Browse files Browse the repository at this point in the history
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
dbussink committed Feb 19, 2024
1 parent d131230 commit c4afae2
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 44 deletions.
5 changes: 4 additions & 1 deletion go/cmd/vtexplain/cli/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vtexplain"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand Down Expand Up @@ -182,7 +183,9 @@ func parseAndRun() error {
if err != nil {
return err
}
vte, err := vtexplain.Init(context.Background(), env, vschema, schema, ksShardMap, opts)
ctx := context.Background()
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts)
if err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions go/vt/srvtopo/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)
Expand Down Expand Up @@ -204,8 +205,11 @@ func (entry *watchEntry) onErrorLocked(ctx context.Context, err error, init bool
entry.value = nil
}
} else {
entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
log.Errorf("%v", entry.lastError)
if !topo.IsErrType(err, topo.Interrupted) {
// No need to log if we're explicitly interrupted.
entry.lastError = fmt.Errorf("ResilientWatch stream failed for %v: %w", entry.key, err)
log.Errorf("%v", entry.lastError)
}

// Even though we didn't get a new value, update the lastValueTime
// here since the watch was successfully running before and we want
Expand All @@ -224,8 +228,7 @@ func (entry *watchEntry) onErrorLocked(ctx context.Context, err error, init bool

if len(entry.listeners) > 0 && !topo.IsErrType(err, topo.Interrupted) {
go func() {
time.Sleep(entry.rw.cacheRefreshInterval)

_ = timer.SleepContext(ctx, entry.rw.cacheRefreshInterval)
entry.mutex.Lock()
entry.ensureWatchingLocked(ctx)
entry.mutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/watch_srvkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (k *srvKeyspaceKey) String() string {
func NewSrvKeyspaceWatcher(ctx context.Context, topoServer *topo.Server, counts *stats.CountersWithSingleLabel, cacheRefresh, cacheTTL time.Duration) *SrvKeyspaceWatcher {
watch := func(entry *watchEntry) {
key := entry.key.(*srvKeyspaceKey)
requestCtx, requestCancel := context.WithCancel(context.Background())
requestCtx, requestCancel := context.WithCancel(ctx)
defer requestCancel()

current, changes, err := topoServer.WatchSrvKeyspace(requestCtx, key.cell, key.keyspace)
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtadmin/cluster"
"vitess.io/vitess/go/vt/vtadmin/cluster/dynamic"
Expand Down Expand Up @@ -2382,7 +2383,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
return nil, er.Error()
}

vte, err := vtexplain.Init(ctx, api.env, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
if err != nil {
return nil, fmt.Errorf("error initilaizing vtexplain: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtexplain/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -53,7 +54,7 @@ func init() {
}

const (
vtexplainCell = "explainCell"
Cell = "explainCell"

// ModeMulti is the default mode with autocommit implemented at vtgate
ModeMulti = "multi"
Expand Down Expand Up @@ -183,7 +184,7 @@ type TabletActions struct {
}

// Init sets up the fake execution environment
func Init(ctx context.Context, env *vtenv.Environment, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
func Init(ctx context.Context, env *vtenv.Environment, ts *topo.Server, vSchemaStr, sqlSchema, ksShardMapStr string, opts *Options) (*VTExplain, error) {
// Verify options
if opts.ReplicationMode != "ROW" && opts.ReplicationMode != "STATEMENT" {
return nil, fmt.Errorf("invalid replication mode \"%s\"", opts.ReplicationMode)
Expand All @@ -206,7 +207,7 @@ func Init(ctx context.Context, env *vtenv.Environment, vSchemaStr, sqlSchema, ks
env: env,
}
vte.setGlobalTabletEnv(tabletEnv)
err = vte.initVtgateExecutor(ctx, vSchemaStr, ksShardMapStr, opts)
err = vte.initVtgateExecutor(ctx, ts, vSchemaStr, ksShardMapStr, opts)
if err != nil {
return nil, fmt.Errorf("initVtgateExecutor: %v", err.Error())
}
Expand Down
19 changes: 11 additions & 8 deletions go/vt/vtexplain/vtexplain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/test/utils"
Expand All @@ -51,7 +52,7 @@ type testopts struct {
shardmap map[string]map[string]*topo.ShardInfo
}

func initTest(ctx context.Context, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
func initTest(ctx context.Context, ts *topo.Server, mode string, opts *Options, topts *testopts, t *testing.T) *VTExplain {
schema, err := os.ReadFile("testdata/test-schema.sql")
require.NoError(t, err)

Expand All @@ -67,7 +68,7 @@ func initTest(ctx context.Context, mode string, opts *Options, topts *testopts,
}

opts.ExecutionMode = mode
vte, err := Init(ctx, vtenv.NewTestEnv(), string(vSchema), string(schema), shardmap, opts)
vte, err := Init(ctx, vtenv.NewTestEnv(), ts, string(vSchema), string(schema), shardmap, opts)
require.NoError(t, err, "vtexplain Init error\n%s", string(schema))
return vte
}
Expand All @@ -90,7 +91,8 @@ func runTestCase(testcase, mode string, opts *Options, topts *testopts, t *testi
t.Run(testcase, func(t *testing.T) {
ctx := utils.LeakCheckContext(t)

vte := initTest(ctx, mode, opts, topts, t)
ts := memorytopo.NewServer(ctx, Cell)
vte := initTest(ctx, ts, mode, opts, topts, t)
defer vte.Stop()

sqlFile := fmt.Sprintf("testdata/%s-queries.sql", testcase)
Expand Down Expand Up @@ -156,8 +158,8 @@ func TestExplain(t *testing.T) {

func TestErrors(t *testing.T) {
ctx := utils.LeakCheckContext(t)

vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(ctx, Cell)
vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
defer vte.Stop()

tests := []struct {
Expand Down Expand Up @@ -196,8 +198,8 @@ func TestErrors(t *testing.T) {

func TestJSONOutput(t *testing.T) {
ctx := utils.LeakCheckContext(t)

vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(ctx, Cell)
vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
defer vte.Stop()
sql := "select 1 from user where id = 1"
explains, err := vte.Run(sql)
Expand Down Expand Up @@ -346,7 +348,8 @@ func TestInit(t *testing.T) {
}
}`
schema := "create table table_missing_primary_vindex (id int primary key)"
_, err := Init(ctx, vtenv.NewTestEnv(), vschema, schema, "", defaultTestOpts())
ts := memorytopo.NewServer(ctx, Cell)
_, err := Init(ctx, vtenv.NewTestEnv(), ts, vschema, schema, "", defaultTestOpts())
require.Error(t, err)
require.Contains(t, err.Error(), "missing primary col vindex")
}
Expand Down
60 changes: 43 additions & 17 deletions go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,23 @@ package vtexplain
import (
"context"
"fmt"
"path"
"sort"
"strings"

"vitess.io/vitess/go/cache/theine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -50,14 +47,14 @@ import (
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShardMapStr string, opts *Options) error {
func (vte *VTExplain) initVtgateExecutor(ctx context.Context, ts *topo.Server, vSchemaStr, ksShardMapStr string, opts *Options) error {
vte.explainTopo = &ExplainTopo{NumShards: opts.NumShards}
vte.explainTopo.TopoServer = memorytopo.NewServer(ctx, vtexplainCell)
vte.explainTopo.TopoServer = ts
vte.healthCheck = discovery.NewFakeHealthCheck(nil)

resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, vtexplainCell)
resolver := vte.newFakeResolver(ctx, opts, vte.explainTopo, Cell)

err := vte.buildTopology(ctx, opts, vSchemaStr, ksShardMapStr, opts.NumShards)
err := vte.buildTopology(ctx, ts, opts, vSchemaStr, ksShardMapStr, opts.NumShards)
if err != nil {
return err
}
Expand All @@ -75,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
queryLogBufferSize := 10
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.env, vte.explainTopo, Cell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
Expand All @@ -95,7 +92,7 @@ func (vte *VTExplain) newFakeResolver(ctx context.Context, opts *Options, serv s
return vtgate.NewResolver(srvResolver, serv, cell, sc)
}

func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
func (vte *VTExplain) buildTopology(ctx context.Context, ts *topo.Server, opts *Options, vschemaStr string, ksShardMapStr string, numShardsPerKeyspace int) error {
vte.explainTopo.Lock.Lock()
defer vte.explainTopo.Lock.Unlock()

Expand All @@ -120,6 +117,10 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS
return err
}

conn, err := ts.ConnForCell(ctx, Cell)
if err != nil {
return err
}
vte.explainTopo.TabletConns = make(map[string]*explainTablet)
vte.explainTopo.KeyspaceShards = make(map[string]map[string]*topodatapb.ShardReference)
for ks, vschema := range vte.explainTopo.Keyspaces {
Expand All @@ -130,6 +131,32 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS

vte.explainTopo.KeyspaceShards[ks] = make(map[string]*topodatapb.ShardReference)

srvPath := path.Join(topo.KeyspacesPath, ks, topo.SrvKeyspaceFile)
srvKeyspace := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_PRIMARY,
ShardReferences: shards,
},
{
ServedType: topodatapb.TabletType_REPLICA,
ShardReferences: shards,
},
{
ServedType: topodatapb.TabletType_RDONLY,
ShardReferences: shards,
},
},
}
data, err := srvKeyspace.MarshalVT()
if err != nil {
return err
}
_, err = conn.Update(ctx, srvPath, data, nil)
if err != nil {
return err
}

for _, shard := range shards {
// If the topology is in the middle of a reshard, there can be two shards covering the same key range (e.g.
// both source shard 80- and target shard 80-c0 cover the keyrange 80-c0). For the purposes of explain, we
Expand All @@ -142,14 +169,13 @@ func (vte *VTExplain) buildTopology(ctx context.Context, opts *Options, vschemaS
hostname := fmt.Sprintf("%s/%s", ks, shard.Name)
log.Infof("registering test tablet %s for keyspace %s shard %s", hostname, ks, shard.Name)

tablet := vte.healthCheck.AddFakeTablet(vtexplainCell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
return vte.newTablet(ctx, vte.env, opts, t)
tablet := vte.healthCheck.AddFakeTablet(Cell, hostname, 1, ks, shard.Name, topodatapb.TabletType_PRIMARY, true, 1, nil, func(t *topodatapb.Tablet) queryservice.QueryService {
return vte.newTablet(ctx, vte.env, opts, t, ts)
})
vte.explainTopo.TabletConns[hostname] = tablet.(*explainTablet)
vte.explainTopo.KeyspaceShards[ks][shard.Name] = shard
}
}

return err
}

Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/mysql"
Expand All @@ -35,7 +36,6 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -105,7 +105,7 @@ type explainTablet struct {

var _ queryservice.QueryService = (*explainTablet)(nil)

func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opts *Options, t *topodatapb.Tablet) *explainTablet {
func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opts *Options, t *topodatapb.Tablet, ts *topo.Server) *explainTablet {
db := fakesqldb.New(nil)
sidecardb.AddSchemaInitQueries(db, true, env.Parser())

Expand All @@ -120,7 +120,7 @@ func (vte *VTExplain) newTablet(ctx context.Context, env *vtenv.Environment, opt
config.EnableTableGC = false

// XXX much of this is cloned from the tabletserver tests
tsv := tabletserver.NewTabletServer(ctx, env, topoproto.TabletAliasString(t.Alias), config, memorytopo.NewServer(ctx, ""), t.Alias)
tsv := tabletserver.NewTabletServer(ctx, env, topoproto.TabletAliasString(t.Alias), config, ts, t.Alias)

tablet := explainTablet{db: db, tsv: tsv, vte: vte, collationEnv: env.CollationEnv()}
db.Handler = &tablet
Expand Down
18 changes: 13 additions & 5 deletions go/vt/vtexplain/vtexplain_vttablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"

Expand Down Expand Up @@ -73,7 +75,8 @@ create table t2 (

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vte, err := Init(ctx, vtenv.NewTestEnv(), testVSchema, testSchema, "", opts)
ts := memorytopo.NewServer(ctx, Cell)
vte, err := Init(ctx, vtenv.NewTestEnv(), ts, testVSchema, testSchema, "", opts)
require.NoError(t, err)
defer vte.Stop()

Expand Down Expand Up @@ -132,17 +135,22 @@ create table test_partitioned (
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vte := initTest(ctx, ModeMulti, defaultTestOpts(), &testopts{}, t)
ts := memorytopo.NewServer(ctx, Cell)
vte := initTest(ctx, ts, ModeMulti, defaultTestOpts(), &testopts{}, t)
defer vte.Stop()

tabletEnv, _ := newTabletEnvironment(ddls, defaultTestOpts(), env.CollationEnv())
vte.setGlobalTabletEnv(tabletEnv)

tablet := vte.newTablet(ctx, env, defaultTestOpts(), &topodatapb.Tablet{
Keyspace: "test_keyspace",
Keyspace: "ks_sharded",
Shard: "-80",
Alias: &topodatapb.TabletAlias{},
})
Alias: &topodatapb.TabletAlias{
Cell: Cell,
},
}, ts)

time.Sleep(10 * time.Millisecond)
se := tablet.tsv.SchemaEngine()
tables := se.GetSchema()

Expand Down
Loading

0 comments on commit c4afae2

Please sign in to comment.