Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use crtime.NowMono instead of time.Now #4124

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -1655,12 +1656,12 @@ func (b *Batch) Reader() batchrepr.Reader {

// SyncWait is to be used in conjunction with DB.ApplyNoSyncWait.
func (b *Batch) SyncWait() error {
now := time.Now()
now := crtime.NowMono()
b.fsyncWait.Wait()
if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
}
waitDuration := time.Since(now)
waitDuration := now.Elapsed()
b.commitStats.CommitWaitDuration += waitDuration
b.commitStats.TotalDuration += waitDuration
return b.commitErr
Expand Down
12 changes: 6 additions & 6 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/record"
Expand Down Expand Up @@ -300,13 +300,13 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
return nil
}

commitStartTime := time.Now()
commitStartTime := crtime.NowMono()
// Acquire semaphores.
p.commitQueueSem <- struct{}{}
if syncWAL {
p.logSyncQSem <- struct{}{}
}
b.commitStats.SemaphoreWaitDuration = time.Since(commitStartTime)
b.commitStats.SemaphoreWaitDuration = commitStartTime.Elapsed()

// Prepare the batch for committing: enqueuing the batch in the pending
// queue, determining the batch sequence number and writing the data to the
Expand Down Expand Up @@ -348,7 +348,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
// b.commitErr. We will read b.commitErr in Batch.SyncWait after the
// LogWriter is done writing.

b.commitStats.TotalDuration = time.Since(commitStartTime)
b.commitStats.TotalDuration = commitStartTime.Elapsed()

return err
}
Expand Down Expand Up @@ -488,9 +488,9 @@ func (p *commitPipeline) publish(b *Batch) {
if t == nil {
// Wait for another goroutine to publish us. We might also be waiting for
// the WAL sync to finish.
now := time.Now()
now := crtime.NowMono()
b.commit.Wait()
b.commitStats.CommitWaitDuration += time.Since(now)
b.commitStats.CommitWaitDuration += now.Elapsed()
break
}
if !t.applied.Load() {
Expand Down
5 changes: 3 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/compact"
Expand Down Expand Up @@ -1174,7 +1175,7 @@ func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {

func (d *DB) flush() {
pprof.Do(context.Background(), flushLabels, func(context.Context) {
flushingWorkStart := time.Now()
flushingWorkStart := crtime.NowMono()
d.mu.Lock()
defer d.mu.Unlock()
idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
Expand All @@ -1185,7 +1186,7 @@ func (d *DB) flush() {
d.opts.EventListener.BackgroundError(err)
}
d.mu.compact.flushing = false
d.mu.compact.noOngoingFlushStartTime = time.Now()
d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
Expand Down
15 changes: 8 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -456,7 +457,7 @@ type DB struct {
flushWriteThroughput ThroughputMetric
// The idle start time for the flush "loop", i.e., when the flushing
// bool above transitions to false.
noOngoingFlushStartTime time.Time
noOngoingFlushStartTime crtime.Mono
}

// Non-zero when file cleaning is disabled. The disabled count acts as a
Expand Down Expand Up @@ -2477,10 +2478,10 @@ func (d *DB) maybeInduceWriteStall(b *Batch) {
Reason: "memtable count limit reached",
})
}
now := time.Now()
beforeWait := crtime.NowMono()
d.mu.compact.cond.Wait()
if b != nil {
b.commitStats.MemTableWriteStallDuration += time.Since(now)
b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
}
continue
}
Expand All @@ -2493,10 +2494,10 @@ func (d *DB) maybeInduceWriteStall(b *Batch) {
Reason: "L0 file count limit exceeded",
})
}
now := time.Now()
beforeWait := crtime.NowMono()
d.mu.compact.cond.Wait()
if b != nil {
b.commitStats.L0ReadAmpWriteStallDuration += time.Since(now)
b.commitStats.L0ReadAmpWriteStallDuration += beforeWait.Elapsed()
}
continue
}
Expand Down Expand Up @@ -2530,10 +2531,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
var newLogNum base.DiskFileNum
var prevLogSize uint64
if !d.opts.DisableWAL {
now := time.Now()
beforeRotate := crtime.NowMono()
newLogNum, prevLogSize = d.rotateWAL()
if b != nil {
b.commitStats.WALRotationDuration += time.Since(now)
b.commitStats.WALRotationDuration += beforeRotate.Elapsed()
}
}
immMem := d.mu.mem.mutable
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require (
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/cespare/xxhash/v2 v2.2.0
github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94
github.com/cockroachdb/crlib v0.0.0-20241030175859-ddcdee82a927
github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056
github.com/cockroachdb/errors v1.11.3
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94 h1:bvJv505UUfjzbaIPdNS4AEkHreDqQk6yuNpsdRHpwFA=
github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac=
github.com/cockroachdb/crlib v0.0.0-20241030175859-ddcdee82a927 h1:KxHSDcEqwtVRnknERc+ao26hlGQuReg7hfsdQeNoomE=
github.com/cockroachdb/crlib v0.0.0-20241030175859-ddcdee82a927/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac=
github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXychO2uDM6hYRu4c0pD0udNI8uObfeKN6UInWViS8=
github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I=
Expand Down
3 changes: 1 addition & 2 deletions internal/batchskl/skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
"fmt"
"math"
"math/rand/v2"
"time"
"unsafe"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -173,7 +172,7 @@ func (s *Skiplist) Init(storage *[]byte, cmp base.Compare, abbreviatedKey base.A
nodes: s.nodes[:0],
height: 1,
}
s.rand.Seed(0, uint64(time.Now().UnixNano()))
s.rand.Seed(0, rand.Uint64())

const initBufSize = 256
if cap(s.nodes) < initBufSize {
Expand Down
9 changes: 3 additions & 6 deletions internal/randvar/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

package randvar

import (
"math/rand/v2"
"time"
)
import "math/rand/v2"

// NewRand creates a new random number generator seeded with the current time.
// NewRand creates a new random number generator with a random seed.
func NewRand() *rand.Rand {
return rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano())))
return rand.New(rand.NewPCG(0, rand.Uint64()))
}

func ensureRand(rng *rand.Rand) *rand.Rand {
Expand Down
7 changes: 4 additions & 3 deletions obsolete_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage"
Expand Down Expand Up @@ -87,7 +88,7 @@ func openCleanupManager(
opts: opts,
objProvider: objProvider,
onTableDeleteFn: onTableDeleteFn,
deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
deletePacer: newDeletionPacer(crtime.NowMono(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
jobsCh: make(chan *cleanupJob, jobsQueueDepth),
}
cm.mu.completedJobsCond.L = &cm.mu.Mutex
Expand Down Expand Up @@ -127,7 +128,7 @@ func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile)
}
}
if pacingBytes > 0 {
cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
}

cm.mu.Lock()
Expand Down Expand Up @@ -208,7 +209,7 @@ func (cm *cleanupManager) maybePace(
return
}

tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
if tokens == 0.0 {
// The token bucket might be in debt; it could make us wait even for 0
// tokens. We don't want that if the pacer decided throttling should be
Expand Down
3 changes: 2 additions & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/pebble/batchrepr"
Expand Down Expand Up @@ -280,7 +281,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
}
d.mu.compact.cond.L = &d.mu.Mutex
d.mu.compact.inProgress = make(map[*compaction]struct{})
d.mu.compact.noOngoingFlushStartTime = time.Now()
d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
d.mu.snapshots.init()
// logSeqNum is the next sequence number that will be assigned.
// Start assigning sequence numbers from base.SeqNumStart to leave
Expand Down
20 changes: 11 additions & 9 deletions pacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package pebble
import (
"sync"
"time"

"github.com/cockroachdb/crlib/crtime"
)

// deletionPacerInfo contains any info from the db necessary to make deletion
Expand Down Expand Up @@ -58,7 +60,7 @@ const deletePacerHistory = 5 * time.Minute
// normally limit deletes (when we are not falling behind or running out of
// space). A value of 0.0 disables pacing.
func newDeletionPacer(
now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
now crtime.Mono, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
) *deletionPacer {
d := &deletionPacer{
freeSpaceThreshold: 16 << 30, // 16 GB
Expand All @@ -79,7 +81,7 @@ func newDeletionPacer(
// deletion rate accordingly.
//
// ReportDeletion is thread-safe.
func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) {
func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.history.Add(now, int64(bytesToDelete))
Expand All @@ -89,7 +91,7 @@ func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) {
// deleting the given number of bytes.
//
// PacingDelay is thread-safe.
func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) {
func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (waitSeconds float64) {
if p.targetByteDeletionRate == 0 {
// Pacing disabled.
return 0.0
Expand Down Expand Up @@ -136,7 +138,7 @@ func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSe
// are effectively rounded down to the nearest epoch boundary.
type history struct {
epochDuration time.Duration
startTime time.Time
startTime crtime.Mono
// currEpoch is the epoch of the most recent operation.
currEpoch int64
// val contains the recent epoch values.
Expand All @@ -151,7 +153,7 @@ const historyEpochs = 100

// Init the history helper to keep track of data over the given number of
// seconds.
func (h *history) Init(now time.Time, timeframe time.Duration) {
func (h *history) Init(now crtime.Mono, timeframe time.Duration) {
*h = history{
epochDuration: timeframe / time.Duration(historyEpochs),
startTime: now,
Expand All @@ -161,25 +163,25 @@ func (h *history) Init(now time.Time, timeframe time.Duration) {
}

// Add adds a value for the current time.
func (h *history) Add(now time.Time, val int64) {
func (h *history) Add(now crtime.Mono, val int64) {
h.advance(now)
h.val[h.currEpoch%historyEpochs] += val
h.sum += val
}

// Sum returns the sum of recent values. The result is approximate in that the
// cut-off time is within 1% of the exact one.
func (h *history) Sum(now time.Time) int64 {
func (h *history) Sum(now crtime.Mono) int64 {
h.advance(now)
return h.sum
}

func (h *history) epoch(t time.Time) int64 {
func (h *history) epoch(t crtime.Mono) int64 {
return int64(t.Sub(h.startTime) / h.epochDuration)
}

// advance advances the time to the given time.
func (h *history) advance(now time.Time) {
func (h *history) advance(now crtime.Mono) {
epoch := h.epoch(now)
for h.currEpoch < epoch {
h.currEpoch++
Expand Down
Loading
Loading