Skip to content

Commit

Permalink
vfs: redesign MemFS strict mode
Browse files Browse the repository at this point in the history
Currently we use `MemFS` in "strict" mode to test crash recovery in
the following way:
 - at the desired crash point we call `SetIgnoreSyncs(true)`
 - after we close the database, we call `ResetToSyncedState()` and
   `SetIgnoreSyncs(false)` and proceed using the same filesystem.

This model is a bit fragile in the sense that both the previous
operation that we're simulating a crash of and the new operation use
the same filesystem. For example, a background operation that is
finishing up some cleanup could in principle interfere with the new
process.

We switch to a "crash clone" model, where we instead extract a
crash-consistent copy of the filesystem; further testing can proceed
on this independent copy. This allows for more usage patterns - e.g.
we can take multiple crash clones at various points and check them all
afterwards.

We also add functionality to randomly retain part of the unsynced
data (which is closer to what would happen in a real crash).
  • Loading branch information
RaduBerinde committed Aug 26, 2024
1 parent de41143 commit 5baaf71
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 244 deletions.
4 changes: 2 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func TestCheckpointCompaction(t *testing.T) {

func TestCheckpointFlushWAL(t *testing.T) {
const checkpointPath = "checkpoints/checkpoint"
fs := vfs.NewStrictMem()
fs := vfs.NewCrashableMem()
opts := &Options{FS: fs, Logger: testLogger{t: t}}
key, value := []byte("key"), []byte("value")

Expand All @@ -345,7 +345,7 @@ func TestCheckpointFlushWAL(t *testing.T) {
err = d.Checkpoint(checkpointPath, WithFlushedWAL())
require.NoError(t, err)
require.NoError(t, d.Close())
fs.ResetToSyncedState()
fs = fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
}

// Check that the WAL has been flushed in the checkpoint.
Expand Down
33 changes: 15 additions & 18 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,13 @@ L6:
}

func TestDBWALRotationCrash(t *testing.T) {
memfs := vfs.NewStrictMem()
memfs := vfs.NewCrashableMem()

var crashFS *vfs.MemFS
var index atomic.Int32
inj := errorfs.InjectorFunc(func(op errorfs.Op) error {
if op.Kind.ReadOrWrite() == errorfs.OpIsWrite && index.Add(-1) == -1 {
memfs.SetIgnoreSyncs(true)
crashFS = memfs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
}
return nil
})
Expand Down Expand Up @@ -392,8 +393,7 @@ func TestDBWALRotationCrash(t *testing.T) {

fs := errorfs.Wrap(memfs, inj)
for k := int32(0); ; k++ {
// Run, simulating a crash by ignoring syncs after the k-th write
// operation after Open.
// Run, simulating a crash after the k-th write operation after Open.
index.Store(math.MaxInt32)
err := run(fs, k)
if !triggered() {
Expand All @@ -409,8 +409,7 @@ func TestDBWALRotationCrash(t *testing.T) {

// Reset the filesystem to its state right before the simulated
// "crash", restore syncs, and run again without crashing.
memfs.ResetToSyncedState()
memfs.SetIgnoreSyncs(false)
memfs = crashFS
index.Store(math.MaxInt32)
require.NoError(t, run(fs, k))
}
Expand All @@ -428,15 +427,19 @@ func TestDBCompactionCrash(t *testing.T) {
// crashIndex holds the value of k at which the crash is induced and is
// decremented by the errorfs on each write operation.
var crashIndex atomic.Int32
mkFS := func() (vfs.FS, *vfs.MemFS) {
memfs := vfs.NewStrictMem()
var crashFS *vfs.MemFS
crashRNG := rand.New(rand.NewSource(seed))
mkFS := func() vfs.FS {
memfs := vfs.NewCrashableMem()
inj := errorfs.InjectorFunc(func(op errorfs.Op) error {
if op.Kind.ReadOrWrite() == errorfs.OpIsWrite && crashIndex.Add(-1) == -1 {
memfs.SetIgnoreSyncs(true)
// Allow an arbitrary subset of non-synced state to survive beyond the
// crash point.
crashFS = memfs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 10, RNG: crashRNG})
}
return nil
})
return errorfs.Wrap(memfs, inj), memfs
return errorfs.Wrap(memfs, inj)
}
triggered := func() bool { return crashIndex.Load() < 0 }

Expand Down Expand Up @@ -523,8 +526,7 @@ func TestDBCompactionCrash(t *testing.T) {
// Run, simulating a crash by ignoring syncs after the k-th write
// operation after Open.
crashIndex.Store(math.MaxInt32)
fs, memfs := mkFS()
i, err := run(t, fs, k, seed)
i, err := run(t, mkFS(), k, seed)
if !triggered() {
// Stop when we reach a value of k greater than the number of
// write operations performed during `run`.
Expand All @@ -540,13 +542,8 @@ func TestDBCompactionCrash(t *testing.T) {
// Reset the filesystem to its state right before the simulated
// "crash", restore syncs and run again without crashing. No errors
// should be encountered.
//
// TODO(jackson): Allow an arbitrary subset of synced state to
// survive beyond the crash point.
memfs.ResetToSyncedState()
memfs.SetIgnoreSyncs(false)
crashIndex.Store(math.MaxInt32)
_, err = run(t, fs, math.MaxInt32, seed)
_, err = run(t, crashFS, math.MaxInt32, seed)
require.False(t, triggered())
// TODO(jackson): Add assertions on the database keys.
require.NoError(t, err)
Expand Down
23 changes: 9 additions & 14 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func testBasicDB(d *DB) error {
func TestFormatMajorVersions(t *testing.T) {
for vers := FormatMinSupported; vers <= FormatNewest; vers++ {
t.Run(fmt.Sprintf("vers=%03d", vers), func(t *testing.T) {
fs := vfs.NewStrictMem()
fs := vfs.NewCrashableMem()
opts := (&Options{
FS: fs,
FormatMajorVersion: vers,
Expand All @@ -126,18 +126,15 @@ func TestFormatMajorVersions(t *testing.T) {
t.Run("upgrade-at-open", func(t *testing.T) {
for upgradeVers := vers + 1; upgradeVers <= FormatNewest; upgradeVers++ {
t.Run(fmt.Sprintf("upgrade-vers=%03d", upgradeVers), func(t *testing.T) {
// We use vfs.MemFS's option to ignore syncs so
// that we can perform an upgrade on the current
// database state in fs, and revert it when this
// subtest is complete.
fs.SetIgnoreSyncs(true)
defer fs.ResetToSyncedState()
// We use vfs.MemFS's CrashClone to perform an upgrade without
// affecting the original filesystem.
opts := opts.Clone()
opts.FS = fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})

// Re-open the database, passing a higher format
// major version in the Options to automatically
// ratchet the format major version. Ensure some
// basic operations pass.
opts := opts.Clone()
opts.FormatMajorVersion = upgradeVers
d, err = Open("", opts)
require.NoError(t, err)
Expand All @@ -162,12 +159,10 @@ func TestFormatMajorVersions(t *testing.T) {
// options.
require.Equal(t, vers, opts.FormatMajorVersion)

// We use vfs.MemFS's option to ignore syncs so
// that we can perform an upgrade on the current
// database state in fs, and revert it when this
// subtest is complete.
fs.SetIgnoreSyncs(true)
defer fs.ResetToSyncedState()
// We use vfs.MemFS's CrashClone to perform an upgrade without
// affecting the original filesystem.
opts := opts.Clone()
opts.FS = fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})

// Re-open the database, still at the current format
// major version. Perform some basic operations,
Expand Down
18 changes: 8 additions & 10 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ func TestIngestLinkFallback(t *testing.T) {
func TestOverlappingIngestedSSTs(t *testing.T) {
dir := ""
var (
mem vfs.FS
mem *vfs.MemFS
crashClone *vfs.MemFS
d *DB
opts *Options
closed = false
Expand All @@ -446,7 +447,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
blockFlush = false

if strictMem {
mem = vfs.NewStrictMem()
mem = vfs.NewCrashableMem()
} else {
mem = vfs.NewMem()
}
Expand Down Expand Up @@ -513,16 +514,13 @@ func TestOverlappingIngestedSSTs(t *testing.T) {
reset(td.HasArg("strictMem"))
return ""

case "ignoreSyncs":
var ignoreSyncs bool
if len(td.CmdArgs) == 1 && td.CmdArgs[0].String() == "true" {
ignoreSyncs = true
}
mem.(*vfs.MemFS).SetIgnoreSyncs(ignoreSyncs)
case "crash-clone":
crashClone = mem.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
return ""

case "resetToSynced":
mem.(*vfs.MemFS).ResetToSyncedState()
case "reset-to-crash-clone":
mem = crashClone
crashClone = nil
files, err := mem.List(dir)
sort.Strings(files)
require.NoError(t, err)
Expand Down
17 changes: 9 additions & 8 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
maxProcs := runtime.GOMAXPROCS(0)

opts1 := &Options{
FS: vfs.NewStrictMem(),
FS: vfs.NewCrashableMem(),
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatNewest,
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
Expand All @@ -2120,7 +2120,7 @@ func TestRangeKeyMaskingRandomized(t *testing.T) {
require.NoError(t, err)

opts2 := &Options{
FS: vfs.NewStrictMem(),
FS: vfs.NewCrashableMem(),
Comparer: testkeys.Comparer,
FormatMajorVersion: FormatNewest,
MaxConcurrentCompactions: func() int { return maxProcs/2 + 1 },
Expand Down Expand Up @@ -2245,7 +2245,7 @@ func BenchmarkIterator_RangeKeyMasking(b *testing.B) {
keyBuf := make([]byte, prefixLen+testkeys.MaxSuffixLen)
valBuf := make([]byte, valueSize)

mem := vfs.NewStrictMem()
mem := vfs.NewMem()
maxProcs := runtime.GOMAXPROCS(0)
opts := &Options{
FS: mem,
Expand Down Expand Up @@ -2285,14 +2285,17 @@ func BenchmarkIterator_RangeKeyMasking(b *testing.B) {
d.mu.Unlock()
b.Log(d.Metrics().String())
require.NoError(b, d.Close())
// Set ignore syncs to true so that each subbenchmark may mutate state and
// then revert back to the original state.
mem.SetIgnoreSyncs(true)

// TODO(jackson): Benchmark lazy-combined iteration versus not.
// TODO(jackson): Benchmark seeks.
for _, rkSuffix := range []string{"@10", "@50", "@75", "@100"} {
b.Run(fmt.Sprintf("range-keys-suffixes=%s", rkSuffix), func(b *testing.B) {
// Clone the filesystem so that each subbenchmark may mutate state.
opts := opts.Clone()
opts.FS = vfs.NewMem()
ok, err := vfs.Clone(mem, opts.FS, "", "")
require.NoError(b, err)
require.True(b, ok)
d, err := Open("", opts)
require.NoError(b, err)
require.NoError(b, d.RangeKeySet([]byte("b"), []byte("e"), []byte(rkSuffix), nil, nil))
Expand Down Expand Up @@ -2367,10 +2370,8 @@ func BenchmarkIterator_RangeKeyMasking(b *testing.B) {
// range keys we wrote.
b.StopTimer()
require.NoError(b, d.Close())
mem.ResetToSyncedState()
})
}

}

func BenchmarkIteratorScan(b *testing.B) {
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func parseOptions(
return true
case "TestOptions.strictfs":
opts.strictFS = true
opts.Opts.FS = vfs.NewStrictMem()
opts.Opts.FS = vfs.NewCrashableMem()
return true
case "TestOptions.ingest_using_apply":
opts.ingestUsingApply = true
Expand Down Expand Up @@ -755,7 +755,7 @@ func RandomOptions(
testOpts.Threads = rng.Intn(runtime.GOMAXPROCS(0)) + 1
if testOpts.strictFS {
opts.DisableWAL = false
opts.FS = vfs.NewStrictMem()
opts.FS = vfs.NewCrashableMem()
} else if !testOpts.useDisk {
opts.FS = vfs.NewMem()
}
Expand Down
13 changes: 7 additions & 6 deletions metamorphic/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,8 @@ func (t *Test) restartDB(dbID objID) error {
return nil
}
t.opts.Cache.Ref()

memFS := vfs.Root(t.opts.FS).(*vfs.MemFS)
memFS.SetIgnoreSyncs(true)
fs := vfs.Root(t.opts.FS).(*vfs.MemFS)
crashFS := fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
if err := db.Close(); err != nil {
return err
}
Expand All @@ -300,9 +299,11 @@ func (t *Test) restartDB(dbID objID) error {
return err
}
}

memFS.ResetToSyncedState()
memFS.SetIgnoreSyncs(false)
t.opts.FS = crashFS
t.opts.WithFSDefaults()
if t.opts.WALFailover != nil {
t.opts.WALFailover.Secondary.FS = t.opts.FS
}

// TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
// are well defined within the context of retries.
Expand Down
9 changes: 6 additions & 3 deletions objstorage/objstorageprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestParallelSync(t *testing.T) {
name = "shared"
}
t.Run(name, func(t *testing.T) {
fs := vfs.NewStrictMem()
fs := vfs.NewCrashableMem()
st := DefaultSettings(fs, "")
st.Remote.StorageFactory = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remote.NewInMem(),
Expand Down Expand Up @@ -660,6 +660,7 @@ func TestParallelSync(t *testing.T) {
}
mustExist := make(map[base.DiskFileNum]struct{})
// "Crash" at a random time.
var crashFS *vfs.MemFS
time.AfterFunc(time.Duration(rand.Int63n(int64(10*time.Millisecond))), func() {
defer wg.Done()
if shared {
Expand All @@ -673,7 +674,7 @@ func TestParallelSync(t *testing.T) {
for n := range mustExistMu.m {
mustExist[n] = struct{}{}
}
fs.SetIgnoreSyncs(true)
crashFS = fs.CrashClone(vfs.CrashCloneCfg{})
mustExistMu.Unlock()
stop.Store(true)
})
Expand All @@ -682,8 +683,10 @@ func TestParallelSync(t *testing.T) {
// Now close the provider, reset the filesystem, and check that all files
// we expect to exist are there.
require.NoError(t, p.Close())
fs.ResetToSyncedState()

if !shared {
st.FS = crashFS
}
p, err = Open(st)
require.NoError(t, err)
// Check that all objects exist and can be opened.
Expand Down
Loading

0 comments on commit 5baaf71

Please sign in to comment.