From a303bda0d4da5c8e75af581799a04a73ad738ae5 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 7 Jun 2024 18:47:45 +0200 Subject: [PATCH] Move max-lag logic into cache Signed-off-by: Tim Vaillancourt --- go/vt/throttler/replication_lag_cache.go | 33 ++++++++++++++++++++++++ go/vt/throttler/throttler.go | 20 +++----------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c27316c7b3b..b07c7bbc26f 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -61,6 +61,12 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache } } +func (c *replicationLagCache) getEntries() map[string]*replicationLagHistory { + c.mu.Lock() + defer c.mu.Unlock() + return c.entries +} + // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { c.mu.Lock() @@ -82,6 +88,30 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry, ok := c.entries[key] + if !ok { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { @@ -155,6 +185,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 9854325f081..19b95559fed 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -229,24 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // the provided type, excluding ignored tablets. func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - cache.mu.Lock() - defer cache.mu.Unlock() - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's