From 4cc8637676a11b95b1c362af5bb9e514b0ffdc40 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 27 Aug 2024 16:46:27 -0400 Subject: [PATCH] db: add TestWALFailoverRandomized Add a randomized test of WAL failover and recovery of a DB from failover WAL logs. Close #3865. --- open_test.go | 170 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/open_test.go b/open_test.go index 0b2e0ba277..fff9024b4d 100644 --- a/open_test.go +++ b/open_test.go @@ -7,8 +7,11 @@ package pebble import ( "bytes" "context" + "encoding/binary" "fmt" "io" + "math" + "math/rand" "os" "path/filepath" "reflect" @@ -18,12 +21,15 @@ import ( "sort" "strconv" "strings" + "sync" "sync/atomic" "syscall" "testing" + "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/metamorphic" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/manifest" @@ -1525,3 +1531,167 @@ func TestMkdirAllAndSyncParents(t *testing.T) { } }) } + +// TestWALFailoverRandomized is a randomzied test exercising recovery in the +// presence of WAL failover. It repeatedly opens a database, writes a number of +// batches concurrently and simulates a hard crash using vfs.NewCrashableMem. It +// ensures that the resulting DB state opens successfully, and the contents of +// the DB match the expectations based on the keys written. +// +// This test is partially a regression test for #3865. +func TestWALFailoverRandomized(t *testing.T) { + seed := time.Now().UnixNano() + t.Logf("seed %d", seed) + mem := vfs.NewCrashableMem() + makeOptions := func(mem *vfs.MemFS) *Options { + failoverOpts := WALFailoverOptions{ + Secondary: wal.Dir{FS: mem, Dirname: "secondary"}, + FailoverOptions: wal.FailoverOptions{ + PrimaryDirProbeInterval: time.Microsecond, + HealthyProbeLatencyThreshold: 20 * time.Microsecond, + HealthyInterval: 10 * time.Microsecond, + UnhealthySamplingInterval: time.Microsecond, + UnhealthyOperationLatencyThreshold: func() (time.Duration, bool) { + return 10 * time.Microsecond, true + }, + ElevatedWriteStallThresholdLag: 50 * time.Microsecond, + }, + } + + mean := time.Duration(rand.ExpFloat64() * float64(time.Microsecond)) + p := rand.Float64() + t.Logf("Injecting mean %s of latency with p=%.3f", mean, p) + fs := errorfs.Wrap(mem, errorfs.RandomLatency(errorfs.Randomly(p, seed), mean, seed, time.Second)) + return &Options{ + FS: fs, + FormatMajorVersion: internalFormatNewest, + Logger: testLogger{t}, + MemTableSize: 128 << 10, // 128 KiB + MemTableStopWritesThreshold: 4, + WALFailover: &failoverOpts, + } + } + + // KV state tracking. + // + // This test uses all uint16 big-endian integers as a keyspace. Values are + // randomly sized but always contain the key in the first two bytes. We + // track the state of all KVs throughout the test (whether they're + // definitely set, maybe set or definitely unset). + // + // Note that the test may wrap around to the beginning of the keyspace. This + // may cause KVs left at kvMaybeSet to be written and be definitively set + // the second time around. + type kvState int8 + const ( + kvUnset kvState = 0 + kvMaybeSet kvState = 1 + kvSet kvState = 2 + ) + const keyspaceSize = math.MaxUint16 + 1 + var kvs struct { + sync.Mutex + states [keyspaceSize]kvState + count uint64 // [0, math.MaxUint16]; INVARIANT: states[count:] all zeroes + crashing bool + } + setIsCrashing := func(crashing bool) { + kvs.Lock() + defer kvs.Unlock() + kvs.crashing = crashing + } + // transitionState is called by goroutines responsible for committing + // batches to the engine. Note that 'i' is the index of the KV before + // wrapping around and needs to be modded by math.MaxUint16. + transitionState := func(i, count uint64, state kvState) { + kvs.Lock() + defer kvs.Unlock() + if kvs.crashing && state == kvSet { + // We're racing with a CrashClone call and it's indeterminate + // whether what we think we synced actually made the cut. Leave the + // kvs at the kvMaybeSet. + state = kvMaybeSet + } + for j := uint64(0); j < count; j++ { + idx := (i + j) % keyspaceSize + kvs.states[idx] = max(kvs.states[idx], state) + } + kvs.count = max(kvs.count, i+count, math.MaxUint16) + } + // validateState is called on recovery to ensure that engine state agrees + // with the tracked KV state. + validateState := func(d *DB) { + it, err := d.NewIter(nil) + require.NoError(t, err) + valid := it.First() + for i := 0; i < int(kvs.count); i++ { + var kvIsSet bool + if valid { + require.Len(t, it.Key(), 2) + require.Equal(t, it.Key(), it.Value()[:2]) + kvIsSet = binary.BigEndian.Uint16(it.Key()) == uint16(i) + } + if kvIsSet && kvs.states[i] == kvUnset { + t.Fatalf("key %04x is set; state says it should be unset", i) + } else if !kvIsSet && kvs.states[i] == kvSet { + t.Fatalf("key %04x is unset; state says it should be set", i) + } + if kvIsSet { + valid = it.Next() + } + } + require.NoError(t, it.Close()) + } + + d, err := Open("primary", makeOptions(mem)) + require.NoError(t, err) + rng := rand.New(rand.NewSource(seed)) + var wg sync.WaitGroup + var n uint64 + randomOps := metamorphic.Weighted[func()]{ + {Weight: 1, Item: func() { + time.Sleep(time.Microsecond * time.Duration(rand.Intn(30))) + t.Log("initiating hard crash") + setIsCrashing(true) + // Take a crash-consistent clone of the filesystem and use that going forward. + mem = mem.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 50, RNG: rng}) + wg.Wait() // Wait for outstanding batch commits to finish. + _ = d.Close() + d, err = Open("primary", makeOptions(mem)) + require.NoError(t, err) + validateState(d) + setIsCrashing(false) + }}, + {Weight: 20, Item: func() { + count := rng.Intn(14) + 1 + var k [2]byte + var v [4096]byte + b := d.NewBatch() + for i := 0; i < count; i++ { + j := uint16((n + uint64(i)) % keyspaceSize) + binary.BigEndian.PutUint16(k[:], j) + vn := max(rng.Intn(cap(v)), 2) + binary.BigEndian.PutUint16(v[:], j) + require.NoError(t, b.Set(k[:], v[:vn], nil)) + } + maybeSync := NoSync + if rng.Intn(2) == 1 { + maybeSync = Sync + } + wg.Add(1) + go func(n, count uint64) { + defer wg.Done() + transitionState(n, count, kvMaybeSet) + require.NoError(t, b.Commit(maybeSync)) + if maybeSync == Sync { + transitionState(n, count, kvSet) + } + }(n, uint64(count)) + n += uint64(count) + }}, + } + nextRandomOp := randomOps.RandomDeck(rng) + for o := 0; o < 1000; o++ { + nextRandomOp()() + } +}