Skip to content

Commit

Permalink
wal: prevent negative durations
Browse files Browse the repository at this point in the history
The WAL failover monitor tracks the duration spent writing to the primary and
secondary WAL directories. Previously it was possible for updates to these
durations to be negative (despite implicitly using the time.Time's monotonic
time), because the time was retrieved before acquiring the mutex guarding the
accumulated durations. A goroutine that measured the current time first was not
necessarily the first to enter the critical section and update the monitor's
lastAccumulateIntoDurations field.

This commit moves these time measurements under the mutex and adds an assertion
that the monitor's understanding of time moves monotonically forward.

Informs cockroachdb/cockroach#136317.
  • Loading branch information
jbowens committed Nov 27, 2024
1 parent f5057c9 commit 21866e8
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/vfs"
)

Expand Down Expand Up @@ -261,11 +262,10 @@ func newFailoverMonitor(opts failoverMonitorOptions) *failoverMonitor {

// Called when previous writer is closed
func (m *failoverMonitor) noWriter() {
now := m.opts.timeSource.now()
m.mu.Lock()
defer m.mu.Unlock()
m.mu.writer = nil
m.accumulateDurationLocked(now)
m.accumulateDurationLocked(m.opts.timeSource.now())
}

// writerCreateFunc is allowed to return nil, if there is an error. It is not
Expand All @@ -292,6 +292,10 @@ func (m *failoverMonitor) elevateWriteStallThresholdForFailover() bool {

func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
dur := now.Sub(m.mu.lastAccumulateIntoDurations)
if invariants.Enabled && dur < 0 {
panic(errors.AssertionFailedf("time regressed: last accumulated %s; now is %s",
m.mu.lastAccumulateIntoDurations, now))
}
m.mu.lastAccumulateIntoDurations = now
if m.mu.dirIndex == primaryDirIndex {
m.mu.primaryWriteDuration += dur
Expand All @@ -301,10 +305,9 @@ func (m *failoverMonitor) accumulateDurationLocked(now time.Time) {
}

func (m *failoverMonitor) stats() FailoverStats {
now := m.opts.timeSource.now()
m.mu.Lock()
defer m.mu.Unlock()
m.accumulateDurationLocked(now)
m.accumulateDurationLocked(m.opts.timeSource.now())
return FailoverStats{
DirSwitchCount: m.mu.dirSwitchCount,
PrimaryWriteDuration: m.mu.primaryWriteDuration,
Expand Down Expand Up @@ -401,8 +404,8 @@ func (m *failoverMonitor) monitorLoop(shouldQuiesce <-chan struct{}) {
dirIndex = secondaryDirIndex
}
dir := m.opts.dirs[dirIndex]
now := m.opts.timeSource.now()
m.mu.Lock()
now := m.opts.timeSource.now()
m.accumulateDurationLocked(now)
m.mu.dirIndex = dirIndex
m.mu.dirSwitchCount++
Expand Down

0 comments on commit 21866e8

Please sign in to comment.