diff --git a/dgraphtest/config.go b/dgraphtest/config.go index 5cc7567950f..2b414d68d58 100644 --- a/dgraphtest/config.go +++ b/dgraphtest/config.go @@ -100,23 +100,25 @@ func AllUpgradeCombos(v20 bool) []UpgradeCombo { // ClusterConfig stores all config for setting up a dgraph cluster type ClusterConfig struct { - prefix string - numAlphas int - numZeros int - replicas int - verbosity int - acl bool - aclTTL time.Duration - aclAlg jwt.SigningMethod - encryption bool - version string - volumes map[string]string - refillInterval time.Duration - uidLease int - portOffset int // exposed port offset for grpc/http port for both alpha/zero - bulkOutDir string - featureFlags []string - customPlugins bool + prefix string + numAlphas int + numZeros int + replicas int + verbosity int + acl bool + aclTTL time.Duration + aclAlg jwt.SigningMethod + encryption bool + version string + volumes map[string]string + refillInterval time.Duration + uidLease int + portOffset int // exposed port offset for grpc/http port for both alpha/zero + bulkOutDir string + featureFlags []string + customPlugins bool + snapShotAfterEntries uint64 + snapshotAfterDuration time.Duration } // NewClusterConfig generates a default ClusterConfig @@ -237,3 +239,9 @@ func (cc ClusterConfig) WithCustomPlugins() ClusterConfig { func (cc ClusterConfig) GetClusterVolume(volume string) string { return cc.volumes[volume] } + +func (cc ClusterConfig) WithSnapshotConfig(snapShotAfterEntries uint64, snapshotAfterDuration time.Duration) ClusterConfig { + cc.snapShotAfterEntries = snapShotAfterEntries + cc.snapshotAfterDuration = snapshotAfterDuration + return cc +} diff --git a/dgraphtest/dgraph.go b/dgraphtest/dgraph.go index 80d615e4043..1cfbe5058a9 100644 --- a/dgraphtest/dgraph.go +++ b/dgraphtest/dgraph.go @@ -94,6 +94,7 @@ type dnode interface { assignURL(*LocalCluster) (string, error) alphaURL(*LocalCluster) (string, error) zeroURL(*LocalCluster) (string, error) + changeStatus(bool) } type zero struct { @@ -101,6 +102,7 @@ type zero struct { containerID string // container ID in docker world containerName string // something like test-1234_zero2 aliasName string // something like alpha0, zero1 + isRunning bool } func (z *zero) cname() string { @@ -175,6 +177,10 @@ func (z *zero) healthURL(c *LocalCluster) (string, error) { return "http://localhost:" + publicPort + "/health", nil } +func (z *zero) changeStatus(isRunning bool) { + z.isRunning = isRunning +} + func (z *zero) assignURL(c *LocalCluster) (string, error) { publicPort, err := publicPort(c.dcli, z, zeroHttpPort) if err != nil { @@ -200,6 +206,7 @@ type alpha struct { containerID string containerName string aliasName string + isRunning bool } func (a *alpha) cname() string { @@ -283,6 +290,12 @@ func (a *alpha) cmd(c *LocalCluster) []string { acmd = append(acmd, fmt.Sprintf("--custom_tokenizers=%s", c.customTokenizers)) } + if c.conf.snapShotAfterEntries != 0 { + acmd = append(acmd, fmt.Sprintf("--raft=%s", + fmt.Sprintf(`snapshot-after-entries=%v;snapshot-after-duration=%v;`, + c.conf.snapShotAfterEntries, c.conf.snapshotAfterDuration))) + } + return acmd } @@ -351,6 +364,10 @@ func (a *alpha) alphaURL(c *LocalCluster) (string, error) { return "localhost:" + publicPort + "", nil } +func (a *alpha) changeStatus(isRunning bool) { + a.isRunning = isRunning +} + func (a *alpha) zeroURL(c *LocalCluster) (string, error) { return "", errNotImplemented } diff --git a/dgraphtest/load.go b/dgraphtest/load.go index a65f916c8b0..ef2d917ce3e 100644 --- a/dgraphtest/load.go +++ b/dgraphtest/load.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" + "github.com/dgraph-io/dgo/v230/protos/api" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/x" ) @@ -493,3 +494,17 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error { return nil } } + +// AddData will insert a total of end-start triples into the database. +func AddData(gc *GrpcClient, pred string, start, end int) error { + if err := gc.SetupSchema(fmt.Sprintf(`%v: string @index(exact) .`, pred)); err != nil { + return err + } + + rdf := "" + for i := start; i <= end; i++ { + rdf = rdf + fmt.Sprintf("_:a%v <%v> \"%v%v\" . \n", i, pred, pred, i) + } + _, err := gc.Mutate(&api.Mutation{SetNquads: []byte(rdf), CommitNow: true}) + return err +} diff --git a/dgraphtest/local_cluster.go b/dgraphtest/local_cluster.go index 375299b13d3..e11c1f0b470 100644 --- a/dgraphtest/local_cluster.go +++ b/dgraphtest/local_cluster.go @@ -352,6 +352,7 @@ func (c *LocalCluster) startContainer(dc dnode) error { if err := c.dcli.ContainerStart(ctx, dc.cid(), types.ContainerStartOptions{}); err != nil { return errors.Wrapf(err, "error starting container [%v]", dc.cname()) } + dc.changeStatus(true) return nil } @@ -398,6 +399,7 @@ func (c *LocalCluster) stopContainer(dc dnode) error { } return errors.Wrapf(err, "error stopping container [%v]", dc.cname()) } + dc.changeStatus(false) return nil } @@ -420,6 +422,9 @@ func (c *LocalCluster) killContainer(dc dnode) error { func (c *LocalCluster) HealthCheck(zeroOnly bool) error { log.Printf("[INFO] checking health of containers") for _, zo := range c.zeros { + if !zo.isRunning { + break + } url, err := zo.healthURL(c) if err != nil { return errors.Wrap(err, "error getting health URL") @@ -438,6 +443,9 @@ func (c *LocalCluster) HealthCheck(zeroOnly bool) error { } for _, aa := range c.alphas { + if !aa.isRunning { + break + } url, err := aa.healthURL(c) if err != nil { return errors.Wrap(err, "error getting health URL") @@ -688,8 +696,11 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) { // TODO(aman): can we cache the connections? var apiClients []api.DgraphClient var conns []*grpc.ClientConn - for i := 0; i < c.conf.numAlphas; i++ { - url, err := c.alphas[i].alphaURL(c) + for _, aa := range c.alphas { + if !aa.isRunning { + break + } + url, err := aa.alphaURL(c) if err != nil { return nil, nil, errors.Wrap(err, "error getting health URL") } diff --git a/dgraphtest/snapshot.go b/dgraphtest/snapshot.go new file mode 100644 index 00000000000..356c1edcb07 --- /dev/null +++ b/dgraphtest/snapshot.go @@ -0,0 +1,73 @@ +/* + * Copyright 2023 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dgraphtest + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" +) + +func (hc *HTTPClient) GetCurrentSnapshotTs(group uint64) (uint64, error) { + snapTsRequest := `query { + state { + groups { + id + snapshotTs + } + } + }` + params := GraphQLParams{ + Query: snapTsRequest, + } + resp, err := hc.RunGraphqlQuery(params, true) + if err != nil { + return 0, err + } + + var stateResp struct { + State struct { + Groups []struct { + SnapshotTs uint64 + } + } + } + + err = json.Unmarshal(resp, &stateResp) + if err != nil { + return 0, err + } + + return stateResp.State.Groups[group-1].SnapshotTs, nil +} + +func (hc *HTTPClient) WaitForSnapshot(group, prevSnapshotTs uint64) (uint64, error) { + + for i := 1; i <= 100; i++ { + currentSnapshotTs, err := hc.GetCurrentSnapshotTs(group) + if err != nil { + errors.Wrapf(err, "error while getting current snapshot timestamp: %v", err) + } + if currentSnapshotTs > prevSnapshotTs { + return currentSnapshotTs, nil + } + + time.Sleep(time.Second) + } + return 0, errors.New("timeout excedded") +} diff --git a/raftwal/storage.go b/raftwal/storage.go index 27036d8c911..3a282776f66 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -313,6 +313,9 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte return nil } +// TODO(Aman): In the raft example here, we store the snapshot first, followed by entries +// and then, hard state. https://github.com/etcd-io/etcd/blob/main/contrib/raftexample/raft.go +// We should do the same here. // Save would write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries // first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic // writes then all of them can be written together. Note that when writing an Entry with Index i, diff --git a/systest/integration2/snapshot_test.go b/systest/integration2/snapshot_test.go new file mode 100644 index 00000000000..67de8184731 --- /dev/null +++ b/systest/integration2/snapshot_test.go @@ -0,0 +1,68 @@ +//go:build integration2 + +/* + * Copyright 2023 Dgraph Labs, Inc. and Contributors * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "testing" + "time" + + "github.com/dgraph-io/dgraph/dgraphtest" + "github.com/dgraph-io/dgraph/x" + "github.com/stretchr/testify/require" +) + +func TestSnapshotTranferAfterNewNodeJoins(t *testing.T) { + conf := dgraphtest.NewClusterConfig().WithNumAlphas(3).WithNumZeros(1). + WithACL(time.Hour).WithReplicas(3).WithSnapshotConfig(11, time.Second) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + defer func() { c.Cleanup(t.Failed()) }() + + // start zero + require.NoError(t, c.StartZero(0)) + require.NoError(t, c.HealthCheck(true)) + require.NoError(t, c.StartAlpha(0)) + require.NoError(t, c.HealthCheck(false)) + + hc, err := c.HTTPClient() + require.NoError(t, err) + hc.LoginIntoNamespace(dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace) + + gc, cleanup, err := c.Client() + require.NoError(t, err) + defer cleanup() + require.NoError(t, gc.LoginIntoNamespace(context.Background(), + dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) + + prevSnapshotTs, err := hc.GetCurrentSnapshotTs(1) + require.NoError(t, err) + + dgraphtest.AddData(gc, "name", 1, 20) + + _, err = hc.WaitForSnapshot(1, prevSnapshotTs) + require.NoError(t, err) + + require.NoError(t, c.StartAlpha(1)) + require.NoError(t, c.StartAlpha(2)) + + // Wait for the other alpha nodes to receive the snapshot from the leader alpha. + // If they are healthy, there should be no issues with the snapshot streaming + time.Sleep(time.Second) + require.NoError(t, c.HealthCheck(false)) +} diff --git a/systest/online-restore/namespace-aware/restore_test.go b/systest/online-restore/namespace-aware/restore_test.go index 18711c3376f..32fda468f4a 100644 --- a/systest/online-restore/namespace-aware/restore_test.go +++ b/systest/online-restore/namespace-aware/restore_test.go @@ -25,23 +25,22 @@ import ( "github.com/stretchr/testify/require" - "github.com/dgraph-io/dgo/v230/protos/api" "github.com/dgraph-io/dgraph/dgraphtest" "github.com/dgraph-io/dgraph/x" ) -func addData(gc *dgraphtest.GrpcClient, pred string, start, end int) error { - if err := gc.SetupSchema(fmt.Sprintf(`%v: string @index(exact) .`, pred)); err != nil { - return err - } +// func addData(gc *dgraphtest.GrpcClient, pred string, start, end int) error { +// if err := gc.SetupSchema(fmt.Sprintf(`%v: string @index(exact) .`, pred)); err != nil { +// return err +// } - rdf := "" - for i := start; i <= end; i++ { - rdf = rdf + fmt.Sprintf("_:a%v <%v> \"%v%v\" . \n", i, pred, pred, i) - } - _, err := gc.Mutate(&api.Mutation{SetNquads: []byte(rdf), CommitNow: true}) - return err -} +// rdf := "" +// for i := start; i <= end; i++ { +// rdf = rdf + fmt.Sprintf("_:a%v <%v> \"%v%v\" . \n", i, pred, pred, i) +// } +// _, err := gc.Mutate(&api.Mutation{SetNquads: []byte(rdf), CommitNow: true}) +// return err +// } func commonTest(t *testing.T, existingCluster, freshCluster *dgraphtest.LocalCluster) { hc, err := existingCluster.HTTPClient() @@ -54,14 +53,14 @@ func commonTest(t *testing.T, existingCluster, freshCluster *dgraphtest.LocalClu require.NoError(t, gc.Login(context.Background(), dgraphtest.DefaultUser, dgraphtest.DefaultPassword)) namespaces := []uint64{0} - require.NoError(t, addData(gc, "pred", 1, 100)) + require.NoError(t, dgraphtest.AddData(gc, "pred", 1, 100)) for i := 1; i <= 5; i++ { ns, err := hc.AddNamespace() require.NoError(t, err) namespaces = append(namespaces, ns) require.NoError(t, gc.LoginIntoNamespace(context.Background(), dgraphtest.DefaultUser, dgraphtest.DefaultPassword, ns)) - require.NoError(t, addData(gc, "pred", 1, 100+int(ns))) + require.NoError(t, dgraphtest.AddData(gc, "pred", 1, 100+int(ns))) } require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) @@ -118,7 +117,7 @@ func commonIncRestoreTest(t *testing.T, existingCluster, freshCluster *dgraphtes require.NoError(t, gc.Login(context.Background(), dgraphtest.DefaultUser, dgraphtest.DefaultPassword)) require.NoError(t, gc.DropAll()) - require.NoError(t, addData(gc, "pred", 1, 100)) + require.NoError(t, dgraphtest.AddData(gc, "pred", 1, 100)) namespaces := []uint64{} for i := 1; i <= 5; i++ { @@ -133,7 +132,7 @@ func commonIncRestoreTest(t *testing.T, existingCluster, freshCluster *dgraphtes dgraphtest.DefaultUser, dgraphtest.DefaultPassword, ns)) start := i*20 + 1 end := (i + 1) * 20 - require.NoError(t, addData(gc, "pred", start, end)) + require.NoError(t, dgraphtest.AddData(gc, "pred", start, end)) } require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace)) diff --git a/worker/draft.go b/worker/draft.go index e4a38aaa224..356e6fdf040 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -1252,6 +1252,13 @@ func (n *node) Run() { if leader { // Leader can send messages in parallel with writing to disk. for i := range rd.Messages { + // When there is a `raftpb.EntryConfChange` after creating the snapshot, + // then the confState included in the snapshot is out of date. so We need + // to update the confState before sending a snapshot to a follower. + if rd.Messages[i].Type == raftpb.MsgSnap { + rd.Messages[i].Snapshot.Metadata.ConfState = *n.ConfState() + } + // NOTE: We can do some optimizations here to drop messages. n.Send(&rd.Messages[i]) } @@ -1338,6 +1345,7 @@ func (n *node) Run() { raft.IsEmptySnap(rd.Snapshot), raft.IsEmptyHardState(rd.HardState)) } + for x.WorkerConfig.HardSync && rd.MustSync { if err := n.Store.Sync(); err != nil { glog.Errorf("Error while calling Store.Sync: %+v", err) @@ -1362,7 +1370,6 @@ func (n *node) Run() { switch { case entry.Type == raftpb.EntryConfChange: n.applyConfChange(entry) - // Not present in proposal map. n.Applied.Done(entry.Index) groups().triggerMembershipSync() case len(entry.Data) == 0: @@ -1405,6 +1412,12 @@ func (n *node) Run() { if !leader { // Followers should send messages later. for i := range rd.Messages { + // When there is a `raftpb.EntryConfChange` after creating the snapshot, + // then the confState included in the snapshot is out of date. so We need + // to update the confState before sending a snapshot to a follower. + if rd.Messages[i].Type == raftpb.MsgSnap { + rd.Messages[i].Snapshot.Metadata.ConfState = *n.ConfState() + } // NOTE: We can do some optimizations here to drop messages. n.Send(&rd.Messages[i]) }