Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jul 12, 2023
1 parent a4a8873 commit 0314024
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 44 deletions.
3 changes: 3 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Usage of vttablet:
--enable-consolidator-replicas Synonym to -enable_consolidator_replicas
--enable-lag-throttler Synonym to -enable_lag_throttler
--enable-per-workload-table-metrics If true, query counts and query error metrics include a label that identifies the workload
--enable-query-throttler If true throttling on low-priority queries will be enabled.
--enable-tx-throttler Synonym to -enable_tx_throttler
--enable_consolidator This option enables the query consolidator. (default true)
--enable_consolidator_replicas This option enables the query consolidator only on replicas.
Expand Down Expand Up @@ -213,6 +214,8 @@ Usage of vttablet:
--publish_retry_interval duration how long vttablet waits to retry publishing the tablet record (default 30s)
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--query-log-stream-handler string URL handler for streaming queries log (default "/debug/querylog")
--query-throttler-default-priority int Default priority assigned to queries that lack priority information
--query-throttler-pool-threshold int query pool usage percent to trigger query throttling
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ var (
Type: sqltypes.Int64,
},
}
errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
errQueryThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Query throttled")
errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
)

func returnStreamResult(result *sqltypes.Result) error {
Expand Down Expand Up @@ -170,6 +171,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {

switch qre.plan.PlanID {
case p.PlanSelect, p.PlanSelectImpossible, p.PlanShow:
if qre.tsv.queryThrottler.Throttle(getPriorityFromOptions(qre.options, qre.tsv.config.QueryThrottlerDefaultPriority)) {
return nil, errQueryThrottled
}

maxrows := qre.getSelectLimit()
qre.bindVars["#maxLimit"] = sqltypes.Int64BindVariable(maxrows + 1)
if qre.bindVars[sqltypes.BvReplaceSchemaName] != nil {
Expand Down Expand Up @@ -221,7 +226,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(getPriorityFromOptions(qre.options, qre.tsv.config.TxThrottlerDefaultPriority)) {
return nil, errTxThrottled
}

Expand All @@ -236,7 +241,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(getPriorityFromOptions(qre.options, qre.tsv.config.TxThrottlerDefaultPriority)) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
Expand Down
38 changes: 38 additions & 0 deletions go/vt/vttablet/tabletserver/query_throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package tabletserver

import (
"math/rand"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

type QueryThrottler interface {
Throttle(priority int) (result bool)
}

type queryThrottler struct {
config *tabletenv.TabletConfig
qe queryEngine

enabled bool
}

func NewQueryThrottler(env tabletenv.Env, qe queryEngine) QueryThrottler {
config := env.Config()
return &queryThrottler{
config: config,
enabled: config.QueryThrottlerPoolThreshold > 0,
qe: qe,
}
}

func (qt *queryThrottler) Throttle(priority int) (result bool) {
if !qt.enabled {
return false
}
if int(qt.qe.GetConnPoolUsagePercent()) > qt.config.QueryThrottlerPoolThreshold {
result = rand.Intn(sqlparser.MaxPriorityValue) < priority
}
return result
}
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.BoolVar(&currentConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.")
fs.StringVar(&currentConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.")
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Query throttler config
fs.BoolVar(&currentConfig.EnableQueryThrottler, "enable-query-throttler", defaultConfig.EnableQueryThrottler, "If true throttling on low-priority queries will be enabled.")
fs.IntVar(&currentConfig.QueryThrottlerPoolThreshold, "query-throttler-pool-threshold", defaultConfig.QueryThrottlerPoolThreshold, "query pool usage percent to trigger query throttling")
fs.IntVar(&currentConfig.QueryThrottlerDefaultPriority, "query-throttler-default-priority", defaultConfig.QueryThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")

// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatVar(fs, currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
Expand Down Expand Up @@ -360,6 +365,10 @@ type TabletConfig struct {
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge Seconds `json:"-"`

EnableQueryThrottler bool `json:"-"`
QueryThrottlerPoolThreshold int `json:"-"`
QueryThrottlerDefaultPriority int `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
Expand Down
86 changes: 45 additions & 41 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,22 @@ type TabletServer struct {
topoServer *topo.Server

// These are sub-components of TabletServer.
statelessql *QueryList
statefulql *QueryList
olapql *QueryList
se *schema.Engine
rt *repltracker.ReplTracker
vstreamer *vstreamer.Engine
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
lagThrottler *throttle.Throttler
tableGC *gc.TableGC
statelessql *QueryList
statefulql *QueryList
olapql *QueryList
se *schema.Engine
rt *repltracker.ReplTracker
vstreamer *vstreamer.Engine
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
queryThrottler QueryThrottler
txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
lagThrottler *throttle.Throttler
tableGC *gc.TableGC

// sm manages state transitions.
sm *stateManager
Expand Down Expand Up @@ -184,7 +185,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se)
tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config)
tsv.qe = NewQueryEngine(tsv, tsv.se)
tsv.queryThrottler = querythrottler.NewQueryThrottler(tsv)
tsv.queryThrottler = NewQueryThrottler(tsv, tsv.qe)
tsv.txThrottler = txthrottler.NewTxThrottler(tsv, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)
Expand Down Expand Up @@ -495,7 +496,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
priority := getPriorityFromOptions(options, tsv.config.TxThrottlerDefaultPriority)
if tsv.txThrottler.Throttle(priority) {
return errTxThrottled
}
var connSetting *pools.Setting
Expand Down Expand Up @@ -527,30 +529,6 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
return state, err
}

func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int {
priority := tsv.config.TxThrottlerDefaultPriority
if options == nil {
return priority
}
if options.Priority == "" {
return priority
}

optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)

return priority
}

return optionsPriority
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand Down Expand Up @@ -2017,6 +1995,32 @@ func (tsv *TabletServer) ConsolidatorMode() string {
return tsv.qe.consolidatorMode.Load().(string)
}

// getPriorityFromOptions returns the priority of an operation as an integer between 0 and
// 100. The defaultPriority is returned if none is found in *querypb.ExecuteOptions.
func getPriorityFromOptions(options *querypb.ExecuteOptions, defaultPriority int) int {
priority := defaultPriority
if options == nil {
return priority
}
if options.Priority == "" {
return priority
}

optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)

return priority
}

return optionsPriority
}

// queryAsString returns a readable normalized version of the query.
// If sanitize is false it also includes the bind variables.
// If truncateForLog is true, it truncates the sql query and the
Expand Down

0 comments on commit 0314024

Please sign in to comment.