Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Jan 15, 2025
1 parent 2dbb689 commit 1180f3a
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 29 deletions.
90 changes: 90 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err

Check warning on line 93 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L93

Added line #L93 was not covered by tests
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err

Check warning on line 121 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L121

Added line #L121 was not covered by tests
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err

Check warning on line 127 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L127

Added line #L127 was not covered by tests
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
_, st, _, err := w.ReadAll()
if err != nil {
return err

Check warning on line 133 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L133

Added line #L133 was not covered by tests
}

if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
return err

Check warning on line 137 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L137

Added line #L137 was not covered by tests
}
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
return err

Check warning on line 140 in etcdutl/etcdutl/common.go

View check run for this annotation

Codecov / codecov/patch

etcdutl/etcdutl/common.go#L140

Added line #L140 was not covered by tests
}
return w.Sync()
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
191 changes: 191 additions & 0 deletions etcdutl/etcdutl/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@
package etcdutl

import (
"path"
"path/filepath"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -141,3 +151,184 @@ func TestGetLatestWalSnap(t *testing.T) {
})
}
}

func TestCreateV2SnapshotFromV3Store(t *testing.T) {
testCases := []struct {
name string
consistentIndex uint64
term uint64
clusterVersion string
members []uint64
learners []uint64
removedMembers []uint64
expectedErrMsg string
}{
{
name: "unexpected term: less than the last snapshot.term",
consistentIndex: 3,
term: 1,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "unexpected consistent index: less than the last snapshot.index",
consistentIndex: 1,
term: 3,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "normal case",
consistentIndex: 32,
term: 4,
clusterVersion: "3.5.0",
members: []uint64{100, 200},
learners: []uint64{300},
removedMembers: []uint64{400, 500},
},
{
name: "empty cluster version",
consistentIndex: 45,
term: 4,
clusterVersion: "",
members: []uint64{110, 200},
learners: []uint64{350},
removedMembers: []uint64{450, 500},
},
{
name: "no learner",
consistentIndex: 7,
term: 5,
clusterVersion: "3.5.0",
members: []uint64{150, 200},
removedMembers: []uint64{450, 550},
},
{
name: "no removed members",
consistentIndex: 34,
term: 6,
clusterVersion: "3.7.0",
members: []uint64{160, 200},
learners: []uint64{300},
},
{
name: "no learner and removed members",
consistentIndex: 19,
term: 5,
clusterVersion: "3.6.0",
members: []uint64{120, 220},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dataDir := t.TempDir()
lg := zap.NewNop()

require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir)))

t.Log("Populate the wal file")
w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: 1,
ClusterID: 2,
},
))
require.NoError(t, err)
err = w.SaveSnapshot(walpb.Snapshot{Index: 2, Term: 2, ConfState: &raftpb.ConfState{Voters: []uint64{1}}})
require.NoError(t, err)
err = w.Save(raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, nil)
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)

t.Log("Generate a v2 snapshot file")
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
err = ss.SaveSnap(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}})
require.NoError(t, err)

t.Log("Load and verify the latest v2 snapshot file")
oldV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}, oldV2Snap.Metadata)

t.Log("Prepare the bbolt db")
be := backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db"))
schema.CreateMetaBucket(be.BatchTx())
schema.NewMembershipBackend(lg, be).MustCreateBackendBuckets()

if len(tc.clusterVersion) > 0 {
t.Logf("Populate the cluster version: %s", tc.clusterVersion)
schema.NewMembershipBackend(lg, be).MustSaveClusterVersionToBackend(semver.New(tc.clusterVersion))
} else {
t.Log("Skip populating cluster version due to not provided")
}

tx := be.BatchTx()
tx.LockOutsideApply()
t.Log("Populate the consistent index and term")
ci := cindex.NewConsistentIndex(be)
ci.SetConsistentIndex(tc.consistentIndex, tc.term)
ci.UnsafeSave(tx)
tx.Unlock()

t.Logf("Populate members: %d", len(tc.members))
memberBackend := schema.NewMembershipBackend(lg, be)
for _, mID := range tc.members {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID)})
}

t.Logf("Populate learner: %d", len(tc.learners))
for _, mID := range tc.learners {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID), RaftAttributes: membership.RaftAttributes{IsLearner: true}})
}

t.Logf("Populate removed members: %d", len(tc.removedMembers))
for _, mID := range tc.removedMembers {
memberBackend.MustDeleteMemberFromBackend(types.ID(mID))
}

t.Log("Committing bbolt db")
be.ForceCommit()
require.NoError(t, be.Close())

t.Log("Creating a new v2 snapshot file based on the v3 store")
err = createV2SnapshotFromV3Store(dataDir, backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db")))
if len(tc.expectedErrMsg) > 0 {
require.ErrorContains(t, err, tc.expectedErrMsg)
return
}
require.NoError(t, err)

t.Log("Loading & verifying the new latest v2 snapshot file")
newV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: tc.consistentIndex, Term: tc.term, ConfState: raftpb.ConfState{Voters: tc.members, Learners: tc.learners}}, newV2Snap.Metadata)

st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
require.NoError(t, st.Recovery(newV2Snap.Data))

cv, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "version"), false, false)
if len(tc.clusterVersion) > 0 {
require.NoError(t, err)
if !semver.New(*cv.Node.Value).Equal(*semver.New(tc.clusterVersion)) {
t.Fatalf("Unexpected cluster version, got %s, want %s", semver.New(*cv.Node.Value).String(), tc.clusterVersion)
}
} else {
require.ErrorContains(t, err, "Key not found")
}

members, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "members"), true, true)
require.NoError(t, err)
require.Len(t, members.Node.Nodes, len(tc.members)+len(tc.learners))

removedMembers, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "removed_members"), true, true)
if len(tc.removedMembers) > 0 {
require.NoError(t, err)
require.Equal(t, len(tc.removedMembers), len(removedMembers.Node.Nodes))
} else {
require.ErrorContains(t, err, "Key not found")
}
})
}
}
Loading

0 comments on commit 1180f3a

Please sign in to comment.