Skip to content

Commit

Permalink
Tablet throttler: recent check diff fix (#16001)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed May 31, 2024
1 parent c5e6e9b commit bec711b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
39 changes: 30 additions & 9 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type Throttler struct {

recentCheckRateLimiter *timer.RateLimiter
recentCheckDormantDiff int64
recentCheckDiff int64

throttleTabletTypesMap map[topodatapb.TabletType]bool

Expand Down Expand Up @@ -216,10 +217,11 @@ type ThrottlerStatus struct {
Keyspace string
Shard string

IsLeader bool
IsOpen bool
IsEnabled bool
IsDormant bool
IsLeader bool
IsOpen bool
IsEnabled bool
IsDormant bool
IsRecentlyChecked bool

Query string
Threshold float64
Expand Down Expand Up @@ -268,6 +270,10 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
throttler.throttledAppsSnapshotInterval = throttledAppsSnapshotInterval
throttler.dormantPeriod = dormantPeriod
throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval)
throttler.recentCheckDiff = int64(1 * time.Second / recentCheckRateLimiterInterval)
if throttler.recentCheckDiff < 1 {
throttler.recentCheckDiff = 1
}

throttler.StoreMetricsThreshold(defaultThrottleLagThreshold.Seconds()) //default
throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric {
Expand Down Expand Up @@ -683,9 +689,20 @@ func (throttler *Throttler) ThrottledApps() (result []base.AppThrottle) {
// Instead of measuring actual time, we use the fact recentCheckRateLimiter ticks every second, and take
// a logical diff, counting the number of ticks since the last check. This is a good enough approximation.
func (throttler *Throttler) isDormant() bool {
if throttler.recentCheckRateLimiter == nil {
return false
}
return throttler.recentCheckRateLimiter.Diff() > throttler.recentCheckDormantDiff
}

// recentlyChecked returns true when this throttler was checked "just now" (whereabouts of once second or two)
func (throttler *Throttler) recentlyChecked() bool {
if throttler.recentCheckRateLimiter == nil {
return false
}
return throttler.recentCheckRateLimiter.Diff() <= throttler.recentCheckDiff
}

// Operate is the main entry point for the throttler operation and logic. It will
// run the probes, collect metrics, refresh inventory, etc.
func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -775,7 +792,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
})
}
//
if throttler.recentCheckRateLimiter.Diff() <= 1 { // recently checked
if throttler.recentlyChecked() {
if !throttler.isLeader.Load() {
// This is a replica, and has just recently been checked.
// We want to proactively "stimulate" the primary throttler to renew the heartbeat lease.
Expand Down Expand Up @@ -1264,6 +1281,9 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor
checkResult.RecentlyChecked = true
statsThrottlerRecentlyChecked.Add(1)
}
if !checkResult.RecentlyChecked {
checkResult.RecentlyChecked = throttler.recentlyChecked()
}

return checkResult
}
Expand Down Expand Up @@ -1299,10 +1319,11 @@ func (throttler *Throttler) Status() *ThrottlerStatus {
Keyspace: throttler.keyspace,
Shard: throttler.shard,

IsLeader: throttler.isLeader.Load(),
IsOpen: throttler.isOpen.Load(),
IsEnabled: throttler.isEnabled.Load(),
IsDormant: throttler.isDormant(),
IsLeader: throttler.isLeader.Load(),
IsOpen: throttler.isOpen.Load(),
IsEnabled: throttler.isEnabled.Load(),
IsDormant: throttler.isDormant(),
IsRecentlyChecked: throttler.recentlyChecked(),

Query: throttler.GetMetricsQuery(),
Threshold: throttler.GetMetricsThreshold(),
Expand Down
26 changes: 24 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func newTestThrottler() *Throttler {
throttler.throttledAppsSnapshotInterval = 10 * time.Millisecond
throttler.dormantPeriod = 5 * time.Second
throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval)
throttler.recentCheckDiff = int64(3 * time.Second / recentCheckRateLimiterInterval)

throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric {
return &mysql.MySQLThrottleMetric{
Expand All @@ -184,6 +185,13 @@ func newTestThrottler() *Throttler {
return throttler
}

func TestInitThrottler(t *testing.T) {
throttler := newTestThrottler()
assert.Equal(t, 5*time.Second, throttler.dormantPeriod)
assert.EqualValues(t, 5, throttler.recentCheckDormantDiff)
assert.EqualValues(t, 3, throttler.recentCheckDiff)
}

func TestIsAppThrottled(t *testing.T) {
throttler := Throttler{
throttledApps: cache.New(cache.NoExpiration, 0),
Expand Down Expand Up @@ -555,7 +563,11 @@ func TestReplica(t *testing.T) {
runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) {
assert.Empty(t, tmClient.AppNames())
flags := &CheckFlags{}
throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf)
{
checkResult := throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf)
assert.False(t, checkResult.RecentlyChecked) // "vitess" app does not mark the throttler as recently checked
assert.False(t, throttler.recentlyChecked()) // "vitess" app does not mark the throttler as recently checked
}
go func() {
select {
case <-ctx.Done():
Expand All @@ -573,7 +585,17 @@ func TestReplica(t *testing.T) {
assert.Containsf(t, appNames, throttlerapp.ThrottlerStimulatorName.String(), "%+v", appNames)
assert.Equalf(t, 1, len(appNames), "%+v", appNames)
}
throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf)
{
checkResult := throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf)
assert.True(t, checkResult.RecentlyChecked)
assert.True(t, throttler.recentlyChecked())
}
{
checkResult := throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf)
assert.True(t, checkResult.RecentlyChecked) // due to previous "online-ddl" check
assert.True(t, throttler.recentlyChecked()) // due to previous "online-ddl" check
}

select {
case <-ctx.Done():
require.FailNow(t, "context expired before testing completed")
Expand Down

0 comments on commit bec711b

Please sign in to comment.