Skip to content

Commit

Permalink
fix(test): fix racy test
Browse files Browse the repository at this point in the history
Also renames a confusing mutex to make it clear what it protects. It protects
snapshots, not the meter slice.
  • Loading branch information
Stebalien committed Oct 24, 2019
1 parent 8ee3d59 commit 1de972e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 28 deletions.
28 changes: 9 additions & 19 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flow
import (
"math"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -103,8 +104,6 @@ func TestShared(t *testing.T) {
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})

for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
Expand All @@ -116,8 +115,7 @@ func TestUnregister(t *testing.T) {
<-ticker.C
}

<-pause
time.Sleep(2 * time.Second)
time.Sleep(62 * time.Second)

for i := 0; i < 40; i++ {
m.Mark(2)
Expand All @@ -133,7 +131,10 @@ func TestUnregister(t *testing.T) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}

<-pause
time.Sleep(60 * time.Second)
if atomic.LoadUint64(&m.accumulator) != 0 {
t.Error("expected meter to be paused")
}

actual = m.Snapshot()
if actual.Total != 40 {
Expand All @@ -150,24 +151,13 @@ func TestUnregister(t *testing.T) {
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active")
}
}()

}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)

wg.Wait()

globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}

func approxEq(a, b, err float64) bool {
Expand Down
4 changes: 2 additions & 2 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (m *Meter) Mark(count uint64) {

// Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
return m.snapshot
}

Expand Down
4 changes: 2 additions & 2 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()

r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
Expand Down
12 changes: 7 additions & 5 deletions sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ var alpha = 1 - math.Exp(-1.0)
var globalSweeper sweeper

type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
sweepOnce sync.Once

snapshotMu sync.RWMutex
meters []*Meter

lastUpdateTime time.Time
registerChannel chan *Meter
}
Expand Down Expand Up @@ -72,8 +74,8 @@ func (sw *sweeper) runActive() {
}

func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()

now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
Expand Down

0 comments on commit 1de972e

Please sign in to comment.