Skip to content

Commit

Permalink
slack-15.0: pre-backport txthrottler crash fixes (#480)
Browse files Browse the repository at this point in the history
* `txthrottler`: move `ThrottlerInterface` to `go/vt/throttler`, use `slices` pkg, add stats (vitessio#16248)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* revert to `reflect`

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Support passing filters to `discovery.NewHealthCheck(...)`

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go

Co-authored-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Address some PR suggestions

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* PR ctx suggestion

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* fix test

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* simplify updateHealthCheckCells signature

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Fix race in `replicationLagModule` of `go/vt/throttle`

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
2 people authored and makinje16 committed Aug 29, 2024
1 parent 100653d commit 8f78ade
Show file tree
Hide file tree
Showing 21 changed files with 453 additions and 211 deletions.
52 changes: 32 additions & 20 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"html/template"
Expand Down Expand Up @@ -98,6 +99,9 @@ var (

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

// errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined.
errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -296,6 +300,27 @@ type HealthCheckImpl struct {
healthCheckDialSem *semaphore.Weighted
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
return nil, errKeyspacesToWatchAndTabletFilters
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
return filters, nil
}

// NewHealthCheck creates a new HealthCheck object.
// Parameters:
// retryDelay.
Expand All @@ -317,10 +342,14 @@ type HealthCheckImpl struct {
//
// The localCell for this healthcheck
//
// callback.
// cellsToWatch.
//
// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
// Is a list of cells to watch for tablets.
//
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
Expand All @@ -342,27 +371,10 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
}

for _, c := range cells {
var filters TabletFilters
log.Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filters = append(filters, fbs)
} else if len(KeyspacesToWatch) > 0 {
filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch))
}
if len(tabletFilterTags) > 0 {
filters = append(filters, NewFilterByTabletTags(tabletFilterTags))
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

Expand Down
79 changes: 75 additions & 4 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,77 @@ func init() {
refreshInterval = time.Minute
}

func TestNewVTGateHealthCheckFilters(t *testing.T) {
defer func() {
KeyspacesToWatch = nil
tabletFilters = nil
tabletFilterTags = nil
}()

testCases := []struct {
name string
keyspacesToWatch []string
tabletFilters []string
tabletFilterTags map[string]string
expectedError string
expectedFilterTypes []any
}{
{
name: "noFilters",
},
{
name: "tabletFilters",
tabletFilters: []string{"ks1|-80"},
expectedFilterTypes: []any{&FilterByShard{}},
},
{
name: "keyspacesToWatch",
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}},
},
{
name: "tabletFiltersAndTags",
tabletFilters: []string{"ks1|-80"},
tabletFilterTags: map[string]string{"test": "true"},
expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}},
},
{
name: "keyspacesToWatchAndTags",
tabletFilterTags: map[string]string{"test": "true"},
keyspacesToWatch: []string{"ks1"},
expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}},
},
{
name: "failKeyspacesToWatchAndFilters",
tabletFilters: []string{"ks1|-80"},
keyspacesToWatch: []string{"ks1"},
expectedError: errKeyspacesToWatchAndTabletFilters.Error(),
},
{
name: "failInvalidTabletFilters",
tabletFilters: []string{"shouldfail!@#!"},
expectedError: "failed to parse tablet_filters value \"shouldfail!@#!\": invalid FilterByShard parameter: shouldfail!@#!",
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
KeyspacesToWatch = testCase.keyspacesToWatch
tabletFilters = testCase.tabletFilters
tabletFilterTags = testCase.tabletFilterTags

filters, err := NewVTGateHealthCheckFilters()
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
}
assert.Len(t, filters, len(testCase.expectedFilterTypes))
for i, filter := range filters {
assert.IsType(t, testCase.expectedFilterTypes[i], filter)
}
})
}
}

func TestHealthCheck(t *testing.T) {
// reset error counters
hcErrorCounters.ResetAll()
Expand Down Expand Up @@ -943,7 +1014,7 @@ func TestGetHealthyTablets(t *testing.T) {

func TestPrimaryInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as primary in different cell
Expand Down Expand Up @@ -1000,7 +1071,7 @@ func TestPrimaryInOtherCell(t *testing.T) {

func TestReplicaInOtherCell(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

// add a tablet as replica
Expand Down Expand Up @@ -1102,7 +1173,7 @@ func TestReplicaInOtherCell(t *testing.T) {

func TestCellAliases(t *testing.T) {
ts := memorytopo.NewServer("cell1", "cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil)
defer hc.Close()

cellsAlias := &topodatapb.CellsAlias{
Expand Down Expand Up @@ -1248,7 +1319,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
}

func createTestHc(ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "")
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "", nil)
}

type fakeConn struct {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)
kss := &keyspaceState{
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestKeyspaceEventTypes(t *testing.T) {
factory.AddCell(cell)
ts := faketopo.NewFakeTopoServer(factory)
ts2 := &fakeTopoServer{}
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "")
hc := NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, cell, "", nil)
defer hc.Close()
kew := NewKeyspaceEventWatcher(context.Background(), ts2, hc, cell)

Expand Down
6 changes: 3 additions & 3 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type replica struct {

// throttler is used to enforce the maximum rate at which replica applies
// transactions. It must not be confused with the client's throttler.
throttler *throttler.Throttler
throttler throttler.Throttler
lastHealthUpdate time.Time
lagUpdateInterval time.Duration

Expand Down Expand Up @@ -224,7 +224,7 @@ type client struct {
primary *primary

healthCheck discovery.HealthCheck
throttler *throttler.Throttler
throttler throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
Expand All @@ -237,7 +237,7 @@ func newClient(primary *primary, replica *replica, ts *topo.Server) *client {
log.Fatal(err)
}

healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "")
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "", nil)
c := &client{
primary: primary,
healthCheck: healthCheck,
Expand Down
10 changes: 5 additions & 5 deletions go/vt/throttler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ type managerImpl struct {
// mu guards all fields in this group.
mu sync.Mutex
// throttlers tracks all running throttlers (by their name).
throttlers map[string]*Throttler
throttlers map[string]Throttler
}

func newManager() *managerImpl {
return &managerImpl{
throttlers: make(map[string]*Throttler),
throttlers: make(map[string]Throttler),
}
}

func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error {
func (m *managerImpl) registerThrottler(name string, throttler Throttler) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down Expand Up @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {

// log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) log(throttlerName string) ([]result, error) {
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}

return t.log(), nil
return t.Log(), nil
}
2 changes: 1 addition & 1 deletion go/vt/throttler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

type managerTestFixture struct {
m *managerImpl
t1, t2 *Throttler
t1, t2 Throttler
}

func (f *managerTestFixture) setUp() error {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec

m.memory.ageBadRate(now)

r := result{
r := Result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
Expand Down Expand Up @@ -445,7 +445,7 @@ func stateGreater(a, b state) bool {
// and we should not skip the current replica ("lagRecordNow").
// Even if it's the same replica we may skip it and return false because
// we want to wait longer for the propagation of the current rate change.
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
if m.replicaUnderTest == nil {
return true
}
Expand All @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)

oldRate := m.rate.Get()
Expand Down Expand Up @@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
return minDuration
}

func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
Expand Down Expand Up @@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
// guessReplicationRate guesses the actual replication rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgReplicationRate is the average rate (per second) at which the replica
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
Expand Down Expand Up @@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
return int64(newRate), reason
}

func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)

decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
}

func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
oldRate := m.rate.Get()
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
if rate == 0 {
Expand All @@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
}

func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
oldRate := m.rate.Get()

m.currentState = newState
Expand Down Expand Up @@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int

// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
Expand Down Expand Up @@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
}
}

func (m *MaxReplicationLagModule) log() []result {
func (m *MaxReplicationLagModule) log() []Result {
return m.results.latestValues()
}
Loading

0 comments on commit 8f78ade

Please sign in to comment.