Skip to content

Commit

Permalink
fix: persistent/fsm issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bubbajoe committed Jun 13, 2024
1 parent d577368 commit 4ed0500
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 137 deletions.
6 changes: 3 additions & 3 deletions functional-tests/raft_tests/raft_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dgate-cli -f domain create name=dm-$id \

dgate-cli -f service create \
name=svc-$id namespace=ns-$id \
urls="http://localhost:8888/$RANDOM"
urls="http://localhost:8081/$RANDOM"

dgate-cli -f route create \
name=rt-$id \
Expand All @@ -55,14 +55,14 @@ for i in {1..5}; do
done

if dgate-cli --admin $ADMIN_URL4 namespace create name=0; then
echo "Expected error when creating namespace"
echo "Expected error when creating namespace on non-voter"
exit 1
fi

export DGATE_ADMIN_API=$ADMIN_URL5

if dgate-cli --admin $ADMIN_URL5 namespace create name=0; then
echo "Expected error when creating namespace"
echo "Expected error when creating namespace on non-voter"
exit 1
fi

Expand Down
2 changes: 1 addition & 1 deletion functional-tests/raft_tests/test1.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: v1
log_level: info

debug: true
tags:
- "dev"
- "internal"
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/knadh/koanf/v2 v2.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.31.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand All @@ -31,8 +30,8 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.21.0
golang.org/x/sync v0.6.0
golang.org/x/term v0.19.0
)

Expand Down Expand Up @@ -81,7 +80,6 @@ require (
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ github.com/clarkmcc/go-typescript v0.7.0 h1:3nVeaPYyTCWjX6Lf8GoEOTxME2bM5tLuWmwh
github.com/clarkmcc/go-typescript v0.7.0/go.mod h1:IZ/nzoVeydAmyfX7l6Jmp8lJDOEnae3jffoXwP4UyYg=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -78,7 +77,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
Expand Down Expand Up @@ -228,9 +226,6 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c h1:fPpdjePK1atuOg28PXfNSqgwf9I/qD1Hlo39JFwKBXk=
github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
Expand Down Expand Up @@ -278,6 +273,8 @@ go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZH
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
Expand Down Expand Up @@ -317,8 +314,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -340,7 +335,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
87 changes: 24 additions & 63 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,21 @@ import (
type dgateAdminFSM struct {
cs changestate.ChangeState
logger *zap.Logger
index uint64
}

var _ raft.BatchingFSM = (*dgateAdminFSM)(nil)

func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdminFSM {
return &dgateAdminFSM{cs, logger}
return &dgateAdminFSM{cs, logger, 0}
}

func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool {
return !fsm.cs.Ready() &&
log.Index+1 >= fsm.cs.Raft().LastIndex() &&
log.Index+1 >= fsm.cs.Raft().AppliedIndex()
func (fsm *dgateAdminFSM) SetIndex(index uint64) {
fsm.index = index
}

func (fsm *dgateAdminFSM) checkLast(log *raft.Log) {
rft := fsm.cs.Raft()
if !fsm.cs.Ready() && fsm.isReplay(log) {
fsm.logger.Info("FSM is not ready, setting ready",
zap.Uint64("index", log.Index),
zap.Uint64("applied-index", rft.AppliedIndex()),
zap.Uint64("last-index", rft.LastIndex()),
)
defer func() {
if err := fsm.cs.ReloadState(false); err != nil {
fsm.logger.Error("Error processing change log in FSM", zap.Error(err))
} else {
fsm.cs.SetReady()
}
}()
}
}

func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
func (fsm *dgateAdminFSM) applyLog(log *raft.Log, replay bool) (*spec.ChangeLog, error) {
log.Index = fsm.index
switch log.Type {
case raft.LogCommand:
var cl spec.ChangeLog
Expand All @@ -58,10 +40,8 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
fsm.logger.Error("Change log ID is empty", zap.Error(err))
panic("change log ID is empty")
}
// find a way to apply only if latest index to save time
return &cl, fsm.cs.ProcessChangeLog(&cl, false)
case raft.LogNoop:
fsm.logger.Debug("Noop Log - current leader is still leader")
// find a way to only reload if latest index to save time
return &cl, fsm.cs.ProcessChangeLog(&cl, replay)
case raft.LogConfiguration:
servers := raft.DecodeConfiguration(log.Data).Servers
for i, server := range servers {
Expand All @@ -70,54 +50,35 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) {
zap.Int("index", i),
)
}
case raft.LogBarrier:
err := fsm.cs.WaitForChanges()
if err != nil {
fsm.logger.Error("Error waiting for changes", zap.Error(err))
}
default:
fsm.logger.Error("Unknown log type in FSM Apply")
}
return nil, nil
}

func (fsm *dgateAdminFSM) Apply(log *raft.Log) any {
defer fsm.checkLast(log)
_, err := fsm.applyLog(log)
_, err := fsm.applyLog(log, true)
return err
}

func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any {
lastLog := logs[len(logs)-1]
if fsm.isReplay(lastLog) {
rft := fsm.cs.Raft()
fsm.logger.Info("applying log batch logs",
zap.Int("size", len(logs)),
zap.Uint64("current", lastLog.Index),
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
)
}
cls := make([]*spec.ChangeLog, 0, len(logs))
defer func() {
if !fsm.cs.Ready() {
fsm.checkLast(logs[len(logs)-1])
return
}

if err := fsm.cs.ReloadState(true, cls...); err != nil {
fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err))
}
}()

rft := fsm.cs.Raft()
lastIndex := len(logs) - 1
fsm.logger.Debug("apply log batch",
zap.Uint64("applied", rft.AppliedIndex()),
zap.Uint64("commit", rft.CommitIndex()),
zap.Uint64("last", rft.LastIndex()),
zap.Uint64("fsmLastIndex", fsm.index),
zap.Uint64("log[0]", logs[0].Index),
zap.Uint64("log[-1]", logs[lastIndex].Index),
zap.Int("logs", len(logs)),
)
results := make([]any, len(logs))
for i, log := range logs {
var cl *spec.ChangeLog
cl, results[i] = fsm.applyLog(log)
if cl != nil {
cls = append(cls, cl)
}
// TODO: check to see if this can be optimized channels raft node provides
_, results[i] = fsm.applyLog(
log, lastIndex == i,
)
}
return results
}
Expand Down
50 changes: 34 additions & 16 deletions internal/admin/admin_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dgate-io/dgate/internal/config"
"github.com/dgate-io/dgate/pkg/raftadmin"
"github.com/dgate-io/dgate/pkg/rafthttp"
"github.com/dgate-io/dgate/pkg/spec"
"github.com/dgate-io/dgate/pkg/storage"
"github.com/dgate-io/dgate/pkg/util/logadapter"
raftbadgerdb "github.com/dgate-io/raft-badger"
Expand Down Expand Up @@ -53,23 +54,18 @@ func setupRaft(
default:
panic(fmt.Errorf("invalid storage type: %s", conf.Storage.StorageType))
}
raftId := adminConfig.Replication.RaftID
if raftId == "" {
raftId = conf.NodeId
}

raftConfig := adminConfig.Replication.LoadRaftConfig(
&raft.Config{
ProtocolVersion: raft.ProtocolVersionMax,
LocalID: raft.ServerID(raftId),
LocalID: raft.ServerID(adminConfig.Replication.RaftID),
HeartbeatTimeout: time.Second * 4,
ElectionTimeout: time.Second * 5,
CommitTimeout: time.Second * 4,
BatchApplyCh: true,
MaxAppendEntries: 16,
BatchApplyCh: false,
MaxAppendEntries: 512,
LeaderLeaseTimeout: time.Second * 4,
// TODO: Support snapshots
SnapshotInterval: time.Hour * 24,
SnapshotInterval: time.Hour*2 ^ 32,
SnapshotThreshold: ^uint64(0),
Logger: logadapter.NewZap2HCLogAdapter(logger),
},
Expand All @@ -90,15 +86,21 @@ func setupRaft(
address, http.DefaultClient, raftHttpLogger,
adminConfig.Replication.AdvertScheme+"://(address)/raft",
)
fsmLogger := logger.Named("fsm")
snapstore := raft.NewInmemSnapshotStore()
fsm := newDGateAdminFSM(fsmLogger, cs)
raftNode, err := raft.NewRaft(
raftConfig, newDGateAdminFSM(logger.Named("fsm"), cs),
lstore, sstore, raft.NewInmemSnapshotStore(), transport,
raftConfig, fsm, lstore,
sstore, snapstore, transport,
)
if err != nil {
panic(err)
}

cs.SetupRaft(raftNode, raftConfig)
observerChan := make(chan raft.Observation, 10)
raftNode.RegisterObserver(raft.NewObserver(observerChan, false, nil))
cs.SetupRaft(raftNode, observerChan)

// Setup raft handler
server.Handle("/raft/*", transport)

Expand All @@ -120,16 +122,27 @@ func setupRaft(
})

configFuture := raftNode.GetConfiguration()

if err = configFuture.Error(); err != nil {
panic(err)
}
serverConfig := configFuture.Configuration()
raftId := string(raftConfig.LocalID)
logger.Info("replication config",
zap.String("raft_id", raftId),
zap.Any("config", serverConfig),
zap.Int("max_append_entries", raftConfig.MaxAppendEntries),
zap.Bool("batch_chan", raftConfig.BatchApplyCh),
zap.Duration("commit_timeout", raftConfig.CommitTimeout),
zap.Int("config_proto", int(raftConfig.ProtocolVersion)),
)

logger.Debug("Replication config",
zap.Any("config", serverConfig))
defer cs.ProcessChangeLog(spec.NewNoopChangeLog(), false)

if adminConfig.Replication.BootstrapCluster {
if adminConfig.Replication.BootstrapCluster && len(serverConfig.Servers) == 0 {
logger.Info("bootstrapping cluster",
zap.String("id", raftId),
zap.String("advert_addr", advertAddr),
)
raftNode.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{
Expand Down Expand Up @@ -162,6 +175,11 @@ func setupRaft(
addresses = append(addresses, fmt.Sprintf("%s:%d", addr, adminConfig.Port))
}
}
logger.Info("no servers found in configuration, adding myself to cluster",
zap.String("id", raftId),
zap.String("advert_addr", advertAddr),
zap.Strings("cluster_addrs", addresses),
)

if adminConfig.Replication.ClusterAddrs != nil && len(adminConfig.Replication.ClusterAddrs) > 0 {
addresses = append(addresses, adminConfig.Replication.ClusterAddrs...)
Expand Down
3 changes: 1 addition & 2 deletions internal/admin/changestate/change_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ type ChangeState interface {

// Readiness
Ready() bool
SetReady()

// Replication
SetupRaft(*raft.Raft, *raft.Config)
SetupRaft(*raft.Raft, chan raft.Observation)
Raft() *raft.Raft

// Resources
Expand Down
3 changes: 2 additions & 1 deletion internal/admin/routes/route_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func ConfigureRouteAPI(server chi.Router, logger *zap.Logger, cs changestate.Cha
}
}

util.JsonResponse(w, http.StatusCreated, spec.TransformDGateRoutes(rm.GetRoutesByNamespace(route.NamespaceName)...))
util.JsonResponse(w, http.StatusCreated, spec.TransformDGateRoutes(
rm.GetRoutesByNamespace(route.NamespaceName)...))
})

server.Delete("/route", func(w http.ResponseWriter, r *http.Request) {
Expand Down
8 changes: 5 additions & 3 deletions internal/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
shell := "/bin/sh"
if shellEnv := os.Getenv("SHELL"); shellEnv != "" {
shell = shellEnv
}
}
resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) {
cmdResult, err := exec.CommandContext(
ctx, shell, "-c", results["cmd"]).Output()
Expand Down Expand Up @@ -192,6 +192,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) {
}

if k.Exists("admin.replication") {
kDefault(k, "admin.replication.raft_id", k.Get("node_id"))
err = kRequireAll(k, "admin.host")
if err != nil {
return nil, err
Expand Down Expand Up @@ -297,10 +298,11 @@ func (config *DGateReplicationConfig) LoadRaftConfig(defaultConfig *raft.Config)
}
if config.RaftID != "" {
rc.LocalID = raft.ServerID(config.RaftID)
} else {
rc.LocalID = defaultConfig.LocalID
}
}
err := raft.ValidateConfig(rc)
if err != nil {
if err := raft.ValidateConfig(rc); err != nil {
panic(err)
}
return rc
Expand Down
Loading

0 comments on commit 4ed0500

Please sign in to comment.