Skip to content

Commit

Permalink
db: ensure checkpointed database with WAL failover is openable
Browse files Browse the repository at this point in the history
This commit adapts Checkpoint to omit the [WAL Failover] section of the OPTIONS
file from the checkpoint's OPTIONS file. The WAL failover configuration is
specific to the original database. We want our checkpoints to be fully
encapsulated and complete, so we copy WAL files from both the primary WAL
directory and the failover secondary. This means a database opening the
checkpoint does not need the secondary directory to be provided as a
WALRecoveryDir.

With this commit, Checkpoint parses the old OPTIONS file and copies its
contents verbatim to a new file in the checkpointed directory. If the old
OPTIONS file had a WAL Failover configuration, its entire section is commented
out within the checkpoint.
  • Loading branch information
jbowens committed Nov 4, 2024
1 parent 29fdaf9 commit 1af22dc
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 68 deletions.
57 changes: 55 additions & 2 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"bytes"
"io"
"os"

Expand Down Expand Up @@ -239,10 +240,10 @@ func (d *DB) Checkpoint(
}

{
// Link or copy the OPTIONS.
// Copy the OPTIONS.
srcPath := base.MakeFilepath(fs, d.dirname, fileTypeOptions, optionsFileNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
ckErr = copyCheckpointOptions(fs, srcPath, destPath)
if ckErr != nil {
return ckErr
}
Expand Down Expand Up @@ -373,6 +374,58 @@ func (d *DB) Checkpoint(
return ckErr
}

// copyCheckpointOptions copies an OPTIONS file, commenting out some options
// that existed on the original database but no longer apply to the checkpointed
// database. For example, the entire [WAL Failover] stanza is commented out
// because Checkpoint will copy all WAL segment files from both the primary and
// secondary WAL directories into the checkpoint.
func copyCheckpointOptions(fs vfs.FS, srcPath, dstPath string) error {
var buf bytes.Buffer
f, err := fs.Open(srcPath)
if err != nil {
return err
}
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
return err
}
// Copy the OPTIONS file verbatim, but commenting out the [WAL Failover]
// section.
err = parseOptions(string(b), parseOptionsFuncs{
visitNewSection: func(startOff, endOff int, section string) error {
if section == "WAL Failover" {
buf.WriteString("# ")
}
buf.Write(b[startOff:endOff])
return nil
},
visitKeyValue: func(startOff, endOff int, section, key, value string) error {
if section == "WAL Failover" {
buf.WriteString("# ")
}
buf.Write(b[startOff:endOff])
return nil
},
visitCommentOrWhitespace: func(startOff, endOff int, line string) error {
buf.Write(b[startOff:endOff])
return nil
},
})
if err != nil {
return err
}
nf, err := fs.Create(dstPath, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
_, err = io.Copy(nf, &buf)
if err != nil {
return err
}
return errors.CombineErrors(nf.Sync(), nf.Close())
}

func (d *DB) writeCheckpointManifest(
fs vfs.FS,
formatVers FormatMajorVersion,
Expand Down
89 changes: 65 additions & 24 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math/rand/v2"
"runtime"
"sort"
Expand Down Expand Up @@ -35,22 +36,25 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
mem := vfs.NewMem()
var memLog base.InMemLogger
remoteMem := remote.NewInMem()
opts := &Options{
FS: vfs.WithLogging(mem, memLog.Infof),
FormatMajorVersion: internalFormatNewest,
L0CompactionThreshold: 10,
DisableAutomaticCompactions: true,
Logger: testLogger{t},
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remoteMem,
})
if createOnShared {
opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
makeOptions := func() *Options {
opts := &Options{
FS: vfs.WithLogging(mem, memLog.Infof),
FormatMajorVersion: internalFormatNewest,
L0CompactionThreshold: 10,
DisableAutomaticCompactions: true,
Logger: testLogger{t},
}
opts.Experimental.EnableColumnarBlocks = func() bool { return true }
opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
"": remoteMem,
})
if createOnShared {
opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
}
opts.DisableTableStats = true
opts.private.testingAlwaysWaitForCleanup = true
return opts
}
opts.DisableTableStats = true
opts.private.testingAlwaysWaitForCleanup = true

datadriven.RunTest(t, ddFile, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
Expand All @@ -70,7 +74,7 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
return memLog.String()

case "checkpoint":
if !(len(td.CmdArgs) == 2 || (len(td.CmdArgs) == 3 && td.CmdArgs[2].Key == "restrict")) {
if len(td.CmdArgs) < 2 {
return "checkpoint <db> <dir> [restrict=(start-end, ...)]"
}
var opts []CheckpointOption
Expand All @@ -93,6 +97,10 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
if err := d.Checkpoint(td.CmdArgs[1].String(), opts...); err != nil {
return err.Error()
}
if td.HasArg("nondeterministic") {
memLog.Reset()
return ""
}
return memLog.String()

case "ingest-and-excise":
Expand Down Expand Up @@ -184,19 +192,19 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
return fmt.Sprintf("%s\n", strings.Join(paths, "\n"))

case "open":
if len(td.CmdArgs) != 1 && len(td.CmdArgs) != 2 {
if len(td.CmdArgs) < 1 {
return "open <dir> [readonly]"
}
opts.ReadOnly = false
if len(td.CmdArgs) == 2 {
if td.CmdArgs[1].String() != "readonly" {
return "open <dir> [readonly]"
}
opts.ReadOnly = true
}
opts := makeOptions()
require.NoError(t, parseDBOptionsArgs(opts, td.CmdArgs[1:]))

memLog.Reset()
dir := td.CmdArgs[0].String()
if _, ok := dbs[dir]; ok {
require.NoError(t, dbs[dir].Close())
dbs[dir] = nil
}

d, err := Open(dir, opts)
if err != nil {
return err.Error()
Expand All @@ -208,6 +216,12 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
return err.Error()
}
}
waitForCompactionsAndTableStats(d)

if td.HasArg("nondeterministic") {
memLog.Reset()
return ""
}
return memLog.String()

case "scan":
Expand All @@ -232,6 +246,33 @@ func testCheckpointImpl(t *testing.T, ddFile string, createOnShared bool) {
})
}

func TestCopyCheckpointOptions(t *testing.T) {
fs := vfs.NewMem()
datadriven.RunTest(t, "testdata/copy_checkpoint_options", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "copy":
f, err := fs.Create("old", vfs.WriteCategoryUnspecified)
require.NoError(t, err)
_, err = io.WriteString(f, td.Input)
require.NoError(t, err)
require.NoError(t, f.Close())

if err := copyCheckpointOptions(fs, "old", "new"); err != nil {
return err.Error()
}

f, err = fs.Open("new")
require.NoError(t, err)
newFile, err := io.ReadAll(f)
require.NoError(t, err)
require.NoError(t, f.Close())
return string(newFile)
default:
panic(fmt.Sprintf("unrecognized command %q", td.Cmd))
}
})
}

func TestCheckpoint(t *testing.T) {
t.Run("shared=false", func(t *testing.T) {
testCheckpointImpl(t, "testdata/checkpoint", false /* createOnShared */)
Expand Down
12 changes: 12 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/cockroachdb/pebble/wal"
"github.com/ghemawat/stream"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1473,6 +1474,8 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
default:
return errors.Newf("unrecognized Merger %q\n", cmdArg.Vals[0])
}
case "readonly":
opts.ReadOnly = true
case "target-file-sizes":
if len(opts.Levels) < len(cmdArg.Vals) {
opts.Levels = slices.Grow(opts.Levels, len(cmdArg.Vals)-len(opts.Levels))[0:len(cmdArg.Vals)]
Expand All @@ -1484,6 +1487,15 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
}
opts.Levels[i].TargetFileSize = size
}
case "wal-failover":
if v := cmdArg.Vals[0]; v == "off" || v == "disabled" {
opts.WALFailover = nil
continue
}
opts.WALFailover = &WALFailoverOptions{
Secondary: wal.Dir{FS: opts.FS, Dirname: cmdArg.Vals[0]},
}
opts.WALFailover.EnsureDefaults()
}
}
return nil
Expand Down
22 changes: 12 additions & 10 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,17 +743,19 @@ func GetVersion(dir string, fs vfs.FS) (string, error) {
if err != nil {
return "", err
}
err = parseOptions(string(data), func(section, key, value string) error {
switch {
case section == "Version":
switch key {
case "pebble_version":
version = value
case "rocksdb_version":
version = fmt.Sprintf("rocksdb v%s", value)
err = parseOptions(string(data), parseOptionsFuncs{
visitKeyValue: func(i, j int, section, key, value string) error {
switch {
case section == "Version":
switch key {
case "pebble_version":
version = value
case "rocksdb_version":
version = fmt.Sprintf("rocksdb v%s", value)
}
}
}
return nil
return nil
},
})
if err != nil {
return "", err
Expand Down
Loading

0 comments on commit 1af22dc

Please sign in to comment.