-
Notifications
You must be signed in to change notification settings - Fork 58
/
store_api.go
393 lines (318 loc) · 12.9 KB
/
store_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
// Copyright 2016-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package moss
import (
"errors"
"sync"
"github.com/couchbase/ghistogram"
)
// ErrNoValidFooter is returned when a valid footer could not be found
// in a file.
var ErrNoValidFooter = errors.New("no-valid-footer")
// ErrNothingToCompact is an internal error returned when compact is
// called on a store that is already compacted.
var ErrNothingToCompact = errors.New("nothing-to-compact")
// --------------------------------------------------------
// Store represents data persisted in a directory.
type Store struct {
dir string
options *StoreOptions
m sync.Mutex // Protects the fields that follow.
refs int
footer *Footer
nextFNameSeq int64
totPersists uint64 // Total number of persists
totCompactions uint64 // Total number of compactions
totCompactionsPartial uint64 // Total number of partial compactions into same file
numLastCompactionBeforeBytes uint64 // File size before last compaction
numLastCompactionAfterBytes uint64 // File size after last compaction
totCompactionDecreaseBytes uint64 // File size decrease after all compactions
totCompactionIncreaseBytes uint64 // File size increase after all compactions
maxCompactionDecreaseBytes uint64 // Max file size decrease from any compaction
maxCompactionIncreaseBytes uint64 // Max file size increase from any compaction
totCompactionBeforeBytes uint64 // total bytes to be compacted
totCompactionWrittenBytes uint64 // total bytes written out by compaction
histograms ghistogram.Histograms // Histograms from store operations
fileRefMap map[string]*FileRef // Map to contain the FileRefs
abortCh chan struct{} // Forced close/abort channel
}
// StoreCloseExOptions represents store CloseEx options.
type StoreCloseExOptions struct {
// Abort means stop as soon as possible, even if data might be lost,
// such as any mutations not yet persisted.
Abort bool
}
// StoreOptions are provided to OpenStore().
type StoreOptions struct {
// CollectionOptions should be the same as used with
// NewCollection().
CollectionOptions CollectionOptions
// CompactionPercentage determines when a compaction will run when
// CompactionConcern is CompactionAllow. When the percentage of
// ops between the non-base level and the base level is greater
// than CompactionPercentage, then compaction will be run.
CompactionPercentage float64
// CompactionLevelMaxSegments determines the number of segments
// per level exceeding which partial or full compaction will run.
CompactionLevelMaxSegments int
// CompactionLevelMultiplier is the factor which determines the
// next level in terms of segment sizes.
CompactionLevelMultiplier int
// CompactionBufferPages is the number of pages to use for
// compaction, where writes are buffered before flushing to disk.
CompactionBufferPages int
// CompactionSync of true means perform a file sync at the end of
// compaction for additional safety.
CompactionSync bool
// CompactionSyncAfterBytes controls the number of bytes after
// which compaction is allowed to invoke an file sync, followed
// by an additional file sync at the end of compaction. A value
// that is < 0 annulls this behavior.
CompactionSyncAfterBytes int
// OpenFile allows apps to optionally provide their own file
// opening implementation. When nil, os.OpenFile() is used.
OpenFile OpenFile `json:"-"`
// Log is a callback invoked when store needs to log a debug
// message. Optional, may be nil.
Log func(format string, a ...interface{}) `json:"-"`
// KeepFiles means that unused, obsoleted files will not be
// removed during OpenStore(). Keeping old files might be useful
// when diagnosing file corruption cases.
KeepFiles bool
// Choose which Kind of segment to persist, if unspecified defaults
// to the value of DefaultPersistKind.
PersistKind string
// SegmentKeysIndexMaxBytes is the maximum size in bytes allowed for
// the segmentKeysIndex. Also, an index will not be built if the
// segment's total key bytes is less than this parameter.
SegmentKeysIndexMaxBytes int
// SegmentKeysIndexMinKeyBytes is the minimum size in bytes that the
// keys of a segment must reach before a segment key index is built.
SegmentKeysIndexMinKeyBytes int
}
// DefaultPersistKind determines which persistence Kind to choose when
// none is specified in StoreOptions.
var DefaultPersistKind = SegmentKindBasic
// DefaultStoreOptions are the default store options when the
// application hasn't provided a meaningful configuration value.
// Advanced applications can use these to fine tune performance.
var DefaultStoreOptions = StoreOptions{
CompactionPercentage: 0.65,
CompactionLevelMaxSegments: 4,
CompactionLevelMultiplier: 9,
CompactionBufferPages: 512,
CompactionSyncAfterBytes: 16000000,
SegmentKeysIndexMaxBytes: 100000,
SegmentKeysIndexMinKeyBytes: 10000000,
}
// StorePersistOptions are provided to Store.Persist().
type StorePersistOptions struct {
// NoSync means do not perform a file sync at the end of
// persistence (before returning from the Store.Persist() method).
// Using NoSync of true might provide better performance, but at
// the cost of data safety.
NoSync bool
// CompactionConcern controls whether compaction is allowed or
// forced as part of persistence.
CompactionConcern CompactionConcern
}
// CompactionConcern is a type representing various possible compaction
// behaviors associated with persistence.
type CompactionConcern int
// CompactionDisable means no compaction.
var CompactionDisable = CompactionConcern(0)
// CompactionAllow means compaction decision is automated and based on
// the configed policy and parameters, such as CompactionPercentage.
var CompactionAllow = CompactionConcern(1)
// CompactionForce means compaction should be performed immediately.
var CompactionForce = CompactionConcern(2)
// --------------------------------------------------------
// SegmentLoc represents a persisted segment.
type SegmentLoc struct {
Kind string // Used as the key for SegmentLoaders.
KvsOffset uint64 // Byte offset within the file.
KvsBytes uint64 // Number of bytes for the persisted segment.kvs.
BufOffset uint64 // Byte offset within the file.
BufBytes uint64 // Number of bytes for the persisted segment.buf.
TotOpsSet uint64
TotOpsDel uint64
TotKeyByte uint64
TotValByte uint64
mref *mmapRef // Immutable and ephemeral / non-persisted.
}
// TotOps returns number of ops in a segment loc.
func (sloc *SegmentLoc) TotOps() int { return int(sloc.KvsBytes / 8 / 2) }
// --------------------------------------------------------
// SegmentLocs represents a slice of SegmentLoc
type SegmentLocs []SegmentLoc
// AddRef increases the ref count on each SegmentLoc in this SegmentLocs
func (slocs SegmentLocs) AddRef() {
for _, sloc := range slocs {
if sloc.mref != nil {
sloc.mref.AddRef()
}
}
}
// DecRef decreases the ref count on each SegmentLoc in this SegmentLocs
func (slocs SegmentLocs) DecRef() {
for _, sloc := range slocs {
if sloc.mref != nil {
sloc.mref.DecRef()
}
}
}
// Close allows the SegmentLocs to implement the io.Closer interface.
// It actually just performs what should be the final DecRef() call
// which takes the reference count to 0.
func (slocs SegmentLocs) Close() error {
slocs.DecRef()
return nil
}
// --------------------------------------------------------
// A SegmentLoaderFunc is able to load a segment from a SegmentLoc.
type SegmentLoaderFunc func(
sloc *SegmentLoc) (Segment, error)
// SegmentLoaders is a registry of available segment loaders, which
// should be immutable after process init()'ialization. It is keyed
// by SegmentLoc.Kind.
var SegmentLoaders = map[string]SegmentLoaderFunc{}
// A SegmentPersisterFunc is able to persist a segment to a file,
// and return a SegmentLoc describing it.
type SegmentPersisterFunc func(
s Segment, f File, pos int64, options *StoreOptions) (SegmentLoc, error)
// SegmentPersisters is a registry of available segment persisters,
// which should be immutable after process init()'ialization. It is
// keyed by SegmentLoc.Kind.
var SegmentPersisters = map[string]SegmentPersisterFunc{}
// --------------------------------------------------------
// OpenStore returns a store instance for a directory. An empty
// directory results in an empty store.
func OpenStore(dir string, options StoreOptions) (*Store, error) {
return openStore(dir, options)
}
// Dir returns the directory for this store
func (s *Store) Dir() string {
return s.dir
}
// Options a copy of this Store's StoreOptions
func (s *Store) Options() StoreOptions {
return *s.options // Copy.
}
// Snapshot creates a Snapshot to access this Store
func (s *Store) Snapshot() (Snapshot, error) {
return s.snapshot()
}
func (s *Store) snapshot() (*Footer, error) {
s.m.Lock()
footer := s.footer
if footer != nil {
footer.AddRef()
}
s.m.Unlock()
return footer, nil
}
// AddRef increases the ref count on this store
func (s *Store) AddRef() {
s.m.Lock()
s.refs++
s.m.Unlock()
}
// Close decreases the ref count on this store, and if the count is 0
// proceeds to actually close the store.
func (s *Store) Close() error {
s.m.Lock()
defer s.m.Unlock()
s.refs--
if s.refs > 0 || s.footer == nil {
return nil
}
footer := s.footer
s.footer = nil
return footer.Close()
}
// CloseEx provides more advanced closing options.
func (s *Store) CloseEx(options StoreCloseExOptions) error {
if options.Abort {
close(s.abortCh)
}
return s.Close()
}
// IsAborted returns whether the store operations are aborted.
func (s *Store) IsAborted() bool {
select {
case <-s.abortCh:
return true
default:
return false
}
}
// --------------------------------------------------------
// Persist helps the store implement the lower-level-update func, and
// normally is not called directly by applications. The higher
// snapshot may be nil. Advanced users who wish to call Persist()
// directly MUST invoke it in single threaded manner only.
func (s *Store) Persist(higher Snapshot, persistOptions StorePersistOptions) (
Snapshot, error) {
return s.persist(higher, persistOptions)
}
// --------------------------------------------------------
// OpenStoreCollection returns collection based on a persisted store
// in a directory. Updates to the collection will be persisted. An
// empty directory starts an empty collection. Both the store and
// collection should be closed by the caller when done.
func OpenStoreCollection(dir string, options StoreOptions,
persistOptions StorePersistOptions) (*Store, Collection, error) {
store, err := OpenStore(dir, options)
if err != nil {
return nil, nil, err
}
coll, err := store.OpenCollection(options, persistOptions)
if err != nil {
store.Close()
return nil, nil, err
}
return store, coll, nil
}
// --------------------------------------------------------
// OpenCollection opens a collection based on a store. Applications
// should open at most a single collection per store for performing
// read/write work.
func (s *Store) OpenCollection(options StoreOptions,
persistOptions StorePersistOptions) (Collection, error) {
return s.openCollection(options, persistOptions)
}
// --------------------------------------------------------
// SnapshotPrevious returns the next older, previous snapshot based on
// a given snapshot, allowing the application to walk backwards into
// the history of a store at previous points in time. The given
// snapshot must come from the same store. A nil returned snapshot
// means no previous snapshot is available. Of note, store
// compactions will trim previous history from a store.
func (s *Store) SnapshotPrevious(ss Snapshot) (Snapshot, error) {
return s.snapshotPrevious(ss)
}
// --------------------------------------------------------
// SnapshotRevert atomically and durably brings the store back to the
// point-in-time as represented by the revertTo snapshot.
// SnapshotRevert() should only be passed a snapshot that came from
// the same store, such as from using Store.Snapshot() or
// Store.SnapshotPrevious().
//
// SnapshotRevert() must not be invoked concurrently with
// Store.Persist(), so it is recommended that SnapshotRevert() should
// be invoked only after the collection has been Close()'ed, which
// helps ensure that you are not racing with concurrent, background
// persistence goroutines.
//
// SnapshotRevert() can fail if the given snapshot is too old,
// especially w.r.t. compactions. For example, navigate back to an
// older snapshot X via SnapshotPrevious(). Then, do a full
// compaction. Then, SnapshotRevert(X) will give an error.
func (s *Store) SnapshotRevert(revertTo Snapshot) error {
return s.snapshotRevert(revertTo)
}