Skip to content

Commit

Permalink
Add mutex to prevent map race
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jun 6, 2024
1 parent b83f057 commit 15e9317
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
13 changes: 13 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package throttler

import (
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -30,6 +31,8 @@ type replicationLagCache struct {
// The map key is replicationLagRecord.LegacyTabletStats.Key.
entries map[string]*replicationLagHistory

mu sync.Mutex

// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
// This map will always be recomputed by sortByLag() and must not be modified
Expand Down Expand Up @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
defer c.mu.Unlock()

if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
Expand All @@ -79,6 +85,8 @@ func (c *replicationLagCache) add(r replicationLagRecord) {
// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -90,6 +98,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord {
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -100,6 +110,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag
// sortByLag sorts all replicas by their latest replication lag value and
// tablet uid and updates the c.slowReplicas set.
func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) {
c.mu.Lock()
defer c.mu.Unlock()

// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)

Expand Down
7 changes: 4 additions & 3 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,11 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries
cache.mu.Lock()
defer cache.mu.Unlock()

for key := range cacheEntries {
var maxLag uint32
for key := range cache.entries {
if cache.isIgnored(key) {
continue
}
Expand Down
49 changes: 28 additions & 21 deletions go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func TestThrottlerMaxLag(t *testing.T) {
defer throttler.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func(ctx context.Context, throttler *Throttler) {
for {
Expand All @@ -427,28 +428,34 @@ func TestThrottlerMaxLag(t *testing.T) {
}
}(ctx, throttler)

go func(cancel context.CancelFunc, throttler *Throttler) {
defer cancel()
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
} {
cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
PortMap: map[string]int32{
"test": 15999,
go func(ctx context.Context, throttler *Throttler) {
select {
case <-ctx.Done():
return
default:
for _, tabletType := range []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
} {
cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
PortMap: map[string]int32{
"test": 15999,
},
},
},
},
})
})
}
}
}(cancel, throttler)
}(ctx, throttler)

time.Sleep(time.Second)
}

0 comments on commit 15e9317

Please sign in to comment.