Skip to content

Commit

Permalink
Move max-lag logic into cache
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jun 7, 2024
1 parent 396151d commit a303bda
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 17 deletions.
33 changes: 33 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache
}
}

func (c *replicationLagCache) getEntries() map[string]*replicationLagHistory {
c.mu.Lock()
defer c.mu.Unlock()
return c.entries
}

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
Expand All @@ -82,6 +88,30 @@ func (c *replicationLagCache) add(r replicationLagRecord) {
entry.add(r)
}

// maxLag returns the maximum replication lag for the entries in cache.
func (c *replicationLagCache) maxLag() (maxLag uint32) {
c.mu.Lock()
defer c.mu.Unlock()

for key := range c.entries {
if c.isIgnored(key) {
continue
}

entry, ok := c.entries[key]
if !ok {
continue
}

latest := entry.latest()
if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
Expand Down Expand Up @@ -155,6 +185,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool {
// this slow replica.
// "key" refers to ReplicationLagRecord.LegacyTabletStats.Key.
func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.slowReplicas) == 0 {
// No slow replicas at all.
return false
Expand Down
20 changes: 3 additions & 17 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)
cache.mu.Lock()
defer cache.mu.Unlock()

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
if cache == nil {
return 0
}

return maxLag
return cache.maxLag()
}

// ThreadFinished marks threadID as finished and redistributes the thread's
Expand Down

0 comments on commit a303bda

Please sign in to comment.