Skip to content

Commit

Permalink
Merge pull request #9 from libp2p/feat/idle-triming
Browse files Browse the repository at this point in the history
add a feature for tracking idle timers
  • Loading branch information
Stebalien authored Oct 30, 2019
2 parents b1aae8c + 1de972e commit 99ae915
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 33 deletions.
32 changes: 11 additions & 21 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flow
import (
"math"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand All @@ -22,8 +23,8 @@ func TestBasic(t *testing.T) {
<-ticker.C
}
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 500) {
t.Errorf("expected rate 25000 (±500), got %f", actual.Rate)
if !approxEq(actual.Rate, 25000, 1000) {
t.Errorf("expected rate 25000 (±1000), got %f", actual.Rate)
}

for i := 0; i < 200; i++ {
Expand Down Expand Up @@ -103,8 +104,6 @@ func TestShared(t *testing.T) {
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})

for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
Expand All @@ -116,8 +115,7 @@ func TestUnregister(t *testing.T) {
<-ticker.C
}

<-pause
time.Sleep(2 * time.Second)
time.Sleep(62 * time.Second)

for i := 0; i < 40; i++ {
m.Mark(2)
Expand All @@ -133,7 +131,10 @@ func TestUnregister(t *testing.T) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}

<-pause
time.Sleep(60 * time.Second)
if atomic.LoadUint64(&m.accumulator) != 0 {
t.Error("expected meter to be paused")
}

actual = m.Snapshot()
if actual.Total != 40 {
Expand All @@ -150,24 +151,13 @@ func TestUnregister(t *testing.T) {
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active")
}
}()

}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)

wg.Wait()

globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}

func approxEq(a, b, err float64) bool {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
module github.com/libp2p/go-flow-metrics

go 1.12
24 changes: 19 additions & 5 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,26 @@ package flow
import (
"fmt"
"sync/atomic"
"time"
)

// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
Rate float64
Total uint64
LastUpdate time.Time
}

// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
return &Meter{
snapshot: Snapshot{
LastUpdate: time.Now(),
},
}
}

func (s Snapshot) String() string {
Expand All @@ -32,10 +46,10 @@ func (m *Meter) Mark(count uint64) {
}
}

// Snapshot gets a consistent snapshot of the total and rate.
// Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
return m.snapshot
}

Expand Down
41 changes: 40 additions & 1 deletion registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flow

import (
"sync"
"time"
)

// MeterRegistry is a registry for named meters.
Expand All @@ -14,10 +15,48 @@ func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, new(Meter))
m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter)
}

// FindIdle finds all meters that haven't been used since the given time.
func (r *MeterRegistry) FindIdle(since time.Time) []string {
var idle []string
r.walkIdle(since, func(key interface{}) {
idle = append(idle, key.(string))
})
return idle
}

// TrimIdle trims that haven't been updated since the given time. Returns the
// number of timers trimmed.
func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
// keep these as interfaces to avoid allocating when calling delete.
var idle []interface{}
r.walkIdle(since, func(key interface{}) {
idle = append(idle, since)
})
for _, i := range idle {
r.meters.Delete(i)
}
return len(idle)
}

func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()

r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
cb(k)
}
return true
})
}

// Remove removes the named meter from the registry.
//
// Note: The only reason to do this is to save a bit of memory. Unused meters
Expand Down
23 changes: 23 additions & 0 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")

m1Update := m1.Snapshot().LastUpdate

m1.Mark(10)
m2.Mark(30)

Expand All @@ -21,6 +24,10 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected second total to be 30, got %d", total)
}

if !m1.Snapshot().LastUpdate.After(m1Update) {
t.Error("expected the last update to have been updated")
}

expectedMeters := map[string]*Meter{
"first": m1,
"second": m2,
Expand Down Expand Up @@ -74,4 +81,20 @@ func TestRegistry(t *testing.T) {
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}

before := time.Now()
m3.Mark(1)
time.Sleep(2 * time.Second)
after := time.Now()
if len(r.FindIdle(before)) != 1 {
t.Error("expected 1 idle timer")
}
if len(r.FindIdle(after)) != 2 {
t.Error("expected 2 idle timers")
}

count := r.TrimIdle(after)
if count != 2 {
t.Error("expected to trim 2 idle timers")
}
}
19 changes: 13 additions & 6 deletions sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ var alpha = 1 - math.Exp(-1.0)
var globalSweeper sweeper

type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
sweepOnce sync.Once

snapshotMu sync.RWMutex
meters []*Meter

lastUpdateTime time.Time
registerChannel chan *Meter
}
Expand Down Expand Up @@ -72,8 +74,8 @@ func (sw *sweeper) runActive() {
}

func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()

now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
Expand All @@ -87,7 +89,12 @@ func (sw *sweeper) update() {

for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator)
instant := timeMultiplier * float64(total-m.snapshot.Total)
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)

if diff > 0 {
m.snapshot.LastUpdate = now
}

if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
Expand Down

0 comments on commit 99ae915

Please sign in to comment.