Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…sdefense

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Jul 25, 2023
1 parent bd682ad commit f162e7d
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 101 deletions.
12 changes: 12 additions & 0 deletions go/flagutil/flagutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/spf13/pflag"
"sort"
"strings"
)
Expand Down Expand Up @@ -180,3 +181,14 @@ func DualFormatBoolVar(p *bool, name string, value bool, usage string) {
flag.BoolVar(p, dashes, *p, fmt.Sprintf("Synonym to -%s", underscores))
}
}

// DualFormatVar creates a flag which supports both dashes and underscores
func DualFormatVar(val pflag.Value, name string, usage string) {
dashes := strings.Replace(name, "_", "-", -1)
underscores := strings.Replace(name, "-", "_", -1)

flag.Var(val, underscores, usage)
if dashes != underscores {
flag.Var(val, dashes, fmt.Sprintf("Synonym to -%s", underscores))
}
}
46 changes: 41 additions & 5 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand All @@ -49,7 +51,8 @@ const (

var (
currentConfig = TabletConfig{
DB: &dbconfigs.GlobalDBConfigs,
DB: &dbconfigs.GlobalDBConfigs,
TxThrottlerConfig: defaultTxThrottlerConfig(),
}

queryLogHandler = flag.String("query-log-stream-handler", "/debug/querylog", "URL handler for streaming queries log")
Expand Down Expand Up @@ -88,6 +91,24 @@ var (
enableReplicationReporter bool
)

type TxThrottlerConfigFlag struct {
*throttlerdatapb.Configuration
}

func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag {
return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}}
}

func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration {
return t.Configuration
}

func (t *TxThrottlerConfigFlag) Set(arg string) error {
return prototext.Unmarshal([]byte(arg), t.Configuration)
}

func (t *TxThrottlerConfigFlag) Type() string { return "string" }

func init() {
flag.IntVar(&currentConfig.OltpReadPool.Size, "queryserver-config-pool-size", defaultConfig.OltpReadPool.Size, "query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction)")
flag.IntVar(&currentConfig.OltpReadPool.PrefillParallelism, "queryserver-config-pool-prefill-parallelism", defaultConfig.OltpReadPool.PrefillParallelism, "query server read pool prefill parallelism, a non-zero value will prefill the pool using the specified parallism.")
Expand Down Expand Up @@ -137,7 +158,7 @@ func init() {
flag.StringVar(&currentConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.")
SecondsVar(&currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
flagutil.DualFormatBoolVar(&currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(&currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message")
flagutil.DualFormatVar(currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
flagutil.DualFormatStringListVar(&currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
flag.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information.")
topoproto.TabletTypeListVar(&currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)")
Expand Down Expand Up @@ -302,7 +323,7 @@ type TabletConfig struct {
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes []topodatapb.TabletType `json:"-"`
Expand Down Expand Up @@ -460,6 +481,21 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error {

// verifyTxThrottlerConfig checks the TxThrottler related config for sanity.
func (c *TabletConfig) verifyTxThrottlerConfig() error {
if !c.EnableTxThrottler {
return nil
}

err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
if len(c.TxThrottlerTabletTypes) == 0 {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled")
}
Expand Down Expand Up @@ -558,13 +594,13 @@ var defaultConfig = TabletConfig{
// object in text format. It uses the object returned by
// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its
// fields. It panics on error.
func defaultTxThrottlerConfig() string {
func defaultTxThrottlerConfig() *TxThrottlerConfigFlag {
// Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields.
config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
// TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10
// and remove this line.
config.MaxReplicationLagSec = 10
return prototext.Format(config)
return &TxThrottlerConfigFlag{config}
}

func defaultTransactionLimitConfig() TransactionLimitConfig {
Expand Down
46 changes: 39 additions & 7 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ package tabletenv
import (
"testing"
"time"

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/vt/throttler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestConfigParse(t *testing.T) {
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestFlags(t *testing.T) {
TrackSchemaVersions: false,
MessagePostponeParallelism: 4,
CacheResultFields: true,
TxThrottlerConfig: "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n",
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TransactionLimitConfig: TransactionLimitConfig{
TransactionLimitPerUser: 0.4,
Expand All @@ -241,7 +242,15 @@ func TestFlags(t *testing.T) {
},
}
assert.Equal(t, want.DB, currentConfig.DB)
assert.Equal(t, want, currentConfig)
// We compare the string representation instead of the values directly because there are multiple pointers involved
// that make the vars look different even though they aren't.
assert.Equal(t, want.TxThrottlerConfig.String(), currentConfig.TxThrottlerConfig.String())
// And for the rest we compare without the corresponding attrs
wantWithoutTxThrottlerConf := want
wantWithoutTxThrottlerConf.TxThrottlerConfig = nil
currentWithoutTxThrottlerConf := currentConfig
currentWithoutTxThrottlerConf.TxThrottlerConfig = nil
assert.Equal(t, wantWithoutTxThrottlerConf, currentWithoutTxThrottlerConf)

// HACK: expect default ("replica") only after .Init() because topoproto.TabletTypeListVar(...)
// defaults dont work and it's fixed later in .Init()
Expand Down Expand Up @@ -372,6 +381,26 @@ func TestFlags(t *testing.T) {
assert.Equal(t, want, currentConfig)
}

func TestTxThrottlerConfigFlag(t *testing.T) {
f := NewTxThrottlerConfigFlag()
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String())
assert.Equal(t, "string", f.Type())
}
{
defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.NotNil(t, f.Get())
assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec)
}
{
assert.NotNil(t, f.Set("should not parse"))
}
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
{
// default config (replica)
Expand All @@ -392,6 +421,8 @@ func TestVerifyTxThrottlerConfig(t *testing.T) {
// no tablet types
Init()
currentConfig.TxThrottlerTabletTypes = []topodatapb.TabletType{}
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = defaultTxThrottlerConfig()
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
Expand All @@ -400,6 +431,7 @@ func TestVerifyTxThrottlerConfig(t *testing.T) {
// disallowed tablet type
Init()
currentConfig.TxThrottlerTabletTypes = []topodatapb.TabletType{topodatapb.TabletType_DRAINED}
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
Expand Down
73 changes: 21 additions & 52 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package txthrottler

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -182,64 +180,35 @@ type txThrottlerState struct {
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
return txThrottler
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}

var throttlerConfig throttlerdatapb.Configuration
if err := prototext.Unmarshal([]byte(env.Config().TxThrottlerConfig), &throttlerConfig); err != nil {
return nil, err
defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
}

// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
})
}

func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
if err != nil {
return nil, err
}
if len(config.healthCheckCells) == 0 {
return nil, fmt.Errorf("empty healthCheckCells given. %+v", config)
}
}
return &txThrottler{
config: config,
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}, nil
}
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
Expand Down
Loading

0 comments on commit f162e7d

Please sign in to comment.