From adc72033ca232d9a9b236ccd530e402e7aac518f Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Thu, 20 Apr 2023 15:36:54 +0800 Subject: [PATCH] metrics: use atomic type (#27121) --- metrics/counter.go | 14 +++++++------- metrics/ewma.go | 8 ++++---- metrics/gauge.go | 12 ++++++------ metrics/meter.go | 25 ++++++++++++++----------- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/metrics/counter.go b/metrics/counter.go index 2f78c90d5c..55e1c59540 100644 --- a/metrics/counter.go +++ b/metrics/counter.go @@ -38,13 +38,13 @@ func NewCounter() Counter { if !Enabled { return NilCounter{} } - return &StandardCounter{0} + return &StandardCounter{} } // NewCounterForced constructs a new StandardCounter and returns it no matter if // the global switch is enabled or not. func NewCounterForced() Counter { - return &StandardCounter{0} + return &StandardCounter{} } // NewRegisteredCounter constructs and registers a new StandardCounter. @@ -115,27 +115,27 @@ func (NilCounter) Snapshot() Counter { return NilCounter{} } // StandardCounter is the standard implementation of a Counter and uses the // sync/atomic package to manage a single int64 value. type StandardCounter struct { - count int64 + count atomic.Int64 } // Clear sets the counter to zero. func (c *StandardCounter) Clear() { - atomic.StoreInt64(&c.count, 0) + c.count.Store(0) } // Count returns the current count. func (c *StandardCounter) Count() int64 { - return atomic.LoadInt64(&c.count) + return c.count.Load() } // Dec decrements the counter by the given amount. func (c *StandardCounter) Dec(i int64) { - atomic.AddInt64(&c.count, -i) + c.count.Add(-i) } // Inc increments the counter by the given amount. func (c *StandardCounter) Inc(i int64) { - atomic.AddInt64(&c.count, i) + c.count.Add(i) } // Snapshot returns a read-only copy of the counter. diff --git a/metrics/ewma.go b/metrics/ewma.go index 039286493e..ed95cba19b 100644 --- a/metrics/ewma.go +++ b/metrics/ewma.go @@ -75,7 +75,7 @@ func (NilEWMA) Update(n int64) {} // of uncounted events and processes them on each tick. It uses the // sync/atomic package to manage uncounted events. type StandardEWMA struct { - uncounted int64 // /!\ this should be the first member to ensure 64-bit alignment + uncounted atomic.Int64 alpha float64 rate float64 init bool @@ -97,8 +97,8 @@ func (a *StandardEWMA) Snapshot() EWMA { // Tick ticks the clock to update the moving average. It assumes it is called // every five seconds. func (a *StandardEWMA) Tick() { - count := atomic.LoadInt64(&a.uncounted) - atomic.AddInt64(&a.uncounted, -count) + count := a.uncounted.Load() + a.uncounted.Add(-count) instantRate := float64(count) / float64(5*time.Second) a.mutex.Lock() defer a.mutex.Unlock() @@ -112,5 +112,5 @@ func (a *StandardEWMA) Tick() { // Update adds n uncounted events. func (a *StandardEWMA) Update(n int64) { - atomic.AddInt64(&a.uncounted, n) + a.uncounted.Add(n) } diff --git a/metrics/gauge.go b/metrics/gauge.go index b6b2758b0d..81137d7f7c 100644 --- a/metrics/gauge.go +++ b/metrics/gauge.go @@ -25,7 +25,7 @@ func NewGauge() Gauge { if !Enabled { return NilGauge{} } - return &StandardGauge{0} + return &StandardGauge{} } // NewRegisteredGauge constructs and registers a new StandardGauge. @@ -101,7 +101,7 @@ func (NilGauge) Value() int64 { return 0 } // StandardGauge is the standard implementation of a Gauge and uses the // sync/atomic package to manage a single int64 value. type StandardGauge struct { - value int64 + value atomic.Int64 } // Snapshot returns a read-only copy of the gauge. @@ -111,22 +111,22 @@ func (g *StandardGauge) Snapshot() Gauge { // Update updates the gauge's value. func (g *StandardGauge) Update(v int64) { - atomic.StoreInt64(&g.value, v) + g.value.Store(v) } // Value returns the gauge's current value. func (g *StandardGauge) Value() int64 { - return atomic.LoadInt64(&g.value) + return g.value.Load() } // Dec decrements the gauge's current value by the given amount. func (g *StandardGauge) Dec(i int64) { - atomic.AddInt64(&g.value, -i) + g.value.Add(-i) } // Inc increments the gauge's current value by the given amount. func (g *StandardGauge) Inc(i int64) { - atomic.AddInt64(&g.value, i) + g.value.Add(i) } // FunctionalGauge returns value from given function diff --git a/metrics/meter.go b/metrics/meter.go index 60ae919d04..e8564d6a5e 100644 --- a/metrics/meter.go +++ b/metrics/meter.go @@ -101,11 +101,7 @@ func NewRegisteredMeterForced(name string, r Registry) Meter { // MeterSnapshot is a read-only copy of another Meter. type MeterSnapshot struct { - // WARNING: The `temp` field is accessed atomically. - // On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is - // guaranteed to be so aligned, so take advantage of that. For more information, - // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG. - temp int64 + temp atomic.Int64 count int64 rate1, rate5, rate15, rateMean float64 } @@ -173,7 +169,7 @@ type StandardMeter struct { snapshot *MeterSnapshot a1, a5, a15 EWMA startTime time.Time - stopped uint32 + stopped atomic.Bool } func newStandardMeter() *StandardMeter { @@ -188,8 +184,8 @@ func newStandardMeter() *StandardMeter { // Stop stops the meter, Mark() will be a no-op if you use it after being stopped. func (m *StandardMeter) Stop() { - stopped := atomic.SwapUint32(&m.stopped, 1) - if stopped != 1 { + stopped := m.stopped.Swap(true) + if !stopped { arbiter.Lock() delete(arbiter.meters, m) arbiter.Unlock() @@ -207,7 +203,7 @@ func (m *StandardMeter) Count() int64 { // Mark records the occurrence of n events. func (m *StandardMeter) Mark(n int64) { - atomic.AddInt64(&m.snapshot.temp, n) + m.snapshot.temp.Add(n) } // Rate1 returns the one-minute moving average rate of events per second. @@ -241,7 +237,14 @@ func (m *StandardMeter) RateMean() float64 { // Snapshot returns a read-only copy of the meter. func (m *StandardMeter) Snapshot() Meter { m.lock.RLock() - snapshot := *m.snapshot + snapshot := MeterSnapshot{ + count: m.snapshot.count, + rate1: m.snapshot.rate1, + rate5: m.snapshot.rate5, + rate15: m.snapshot.rate15, + rateMean: m.snapshot.rateMean, + } + snapshot.temp.Store(m.snapshot.temp.Load()) m.lock.RUnlock() return &snapshot } @@ -257,7 +260,7 @@ func (m *StandardMeter) updateSnapshot() { func (m *StandardMeter) updateMeter() { // should only run with write lock held on m.lock - n := atomic.SwapInt64(&m.snapshot.temp, 0) + n := m.snapshot.temp.Swap(0) m.snapshot.count += n m.a1.Update(n) m.a5.Update(n)