-
Notifications
You must be signed in to change notification settings - Fork 452
/
pacer.go
192 lines (166 loc) · 6.07 KB
/
pacer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"sync"
"time"
"github.com/cockroachdb/crlib/crtime"
)
// deletionPacerInfo contains any info from the db necessary to make deletion
// pacing decisions (to limit background IO usage so that it does not contend
// with foreground traffic).
type deletionPacerInfo struct {
freeBytes uint64
obsoleteBytes uint64
liveBytes uint64
}
// deletionPacer rate limits deletions of obsolete files. This is necessary to
// prevent overloading the disk with too many deletions too quickly after a
// large compaction, or an iterator close. On some SSDs, disk performance can be
// negatively impacted if too many blocks are deleted very quickly, so this
// mechanism helps mitigate that.
type deletionPacer struct {
// If there are less than freeSpaceThreshold bytes of free space on
// disk, increase the pace of deletions such that we delete enough bytes to
// get back to the threshold within the freeSpaceTimeframe.
freeSpaceThreshold uint64
freeSpaceTimeframe time.Duration
// If the ratio of obsolete bytes to live bytes is greater than
// obsoleteBytesMaxRatio, increase the pace of deletions such that we delete
// enough bytes to get back to the threshold within the obsoleteBytesTimeframe.
obsoleteBytesMaxRatio float64
obsoleteBytesTimeframe time.Duration
mu struct {
sync.Mutex
// history keeps rack of recent deletion history; it used to increase the
// deletion rate to match the pace of deletions.
history history
}
targetByteDeletionRate int64
getInfo func() deletionPacerInfo
}
const deletePacerHistory = 5 * time.Minute
// newDeletionPacer instantiates a new deletionPacer for use when deleting
// obsolete files.
//
// targetByteDeletionRate is the rate (in bytes/sec) at which we want to
// normally limit deletes (when we are not falling behind or running out of
// space). A value of 0.0 disables pacing.
func newDeletionPacer(
now crtime.Mono, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
) *deletionPacer {
d := &deletionPacer{
freeSpaceThreshold: 16 << 30, // 16 GB
freeSpaceTimeframe: 10 * time.Second,
obsoleteBytesMaxRatio: 0.20,
obsoleteBytesTimeframe: 5 * time.Minute,
targetByteDeletionRate: targetByteDeletionRate,
getInfo: getInfo,
}
d.mu.history.Init(now, deletePacerHistory)
return d
}
// ReportDeletion is used to report a deletion to the pacer. The pacer uses it
// to keep track of the recent rate of deletions and potentially increase the
// deletion rate accordingly.
//
// ReportDeletion is thread-safe.
func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.history.Add(now, int64(bytesToDelete))
}
// PacingDelay returns the recommended pacing wait time (in seconds) for
// deleting the given number of bytes.
//
// PacingDelay is thread-safe.
func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (waitSeconds float64) {
if p.targetByteDeletionRate == 0 {
// Pacing disabled.
return 0.0
}
baseRate := float64(p.targetByteDeletionRate)
// If recent deletion rate is more than our target, use that so that we don't
// fall behind.
historicRate := func() float64 {
p.mu.Lock()
defer p.mu.Unlock()
return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
}()
if historicRate > baseRate {
baseRate = historicRate
}
// Apply heuristics to increase the deletion rate.
var extraRate float64
info := p.getInfo()
if info.freeBytes <= p.freeSpaceThreshold {
// Increase the rate so that we can free up enough bytes within the timeframe.
extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds()
}
if info.liveBytes == 0 {
// We don't know the obsolete bytes ratio. Disable pacing altogether.
return 0.0
}
obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
// Increase the rate so that we can free up enough bytes within the timeframe.
r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()
if extraRate < r {
extraRate = r
}
}
return float64(bytesToDelete) / (baseRate + extraRate)
}
// history is a helper used to keep track of the recent history of a set of
// data points (in our case deleted bytes), at limited granularity.
// Specifically, we split the desired timeframe into 100 "epochs" and all times
// are effectively rounded down to the nearest epoch boundary.
type history struct {
epochDuration time.Duration
startTime crtime.Mono
// currEpoch is the epoch of the most recent operation.
currEpoch int64
// val contains the recent epoch values.
// val[currEpoch % historyEpochs] is the current epoch.
// val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
val [historyEpochs]int64
// sum is always equal to the sum of values in val.
sum int64
}
const historyEpochs = 100
// Init the history helper to keep track of data over the given number of
// seconds.
func (h *history) Init(now crtime.Mono, timeframe time.Duration) {
*h = history{
epochDuration: timeframe / time.Duration(historyEpochs),
startTime: now,
currEpoch: 0,
sum: 0,
}
}
// Add adds a value for the current time.
func (h *history) Add(now crtime.Mono, val int64) {
h.advance(now)
h.val[h.currEpoch%historyEpochs] += val
h.sum += val
}
// Sum returns the sum of recent values. The result is approximate in that the
// cut-off time is within 1% of the exact one.
func (h *history) Sum(now crtime.Mono) int64 {
h.advance(now)
return h.sum
}
func (h *history) epoch(t crtime.Mono) int64 {
return int64(t.Sub(h.startTime) / h.epochDuration)
}
// advance advances the time to the given time.
func (h *history) advance(now crtime.Mono) {
epoch := h.epoch(now)
for h.currEpoch < epoch {
h.currEpoch++
// Forget the data for the oldest epoch.
h.sum -= h.val[h.currEpoch%historyEpochs]
h.val[h.currEpoch%historyEpochs] = 0
}
}