From e78941523d38ea9e8b59cca655549e50e1eea840 Mon Sep 17 00:00:00 2001 From: bubbajoe Date: Fri, 14 Jun 2024 03:22:07 +0900 Subject: [PATCH] fix!: apply fixes from feat/fix-raft-bug branch --- .github/workflows/discord.yml | 20 ++++++ TODO.md | 22 ++---- internal/admin/admin_routes.go | 68 +++++++------------ .../changestate/testutil/change_state.go | 7 +- internal/admin/routes/collection_routes.go | 7 +- internal/admin/routes/domain_routes.go | 4 +- internal/admin/routes/misc_routes.go | 4 +- internal/admin/routes/module_routes.go | 4 +- internal/admin/routes/namespace_routes.go | 4 +- internal/admin/routes/route_routes.go | 4 +- internal/admin/routes/secret_routes.go | 4 +- internal/admin/routes/service_routes.go | 4 +- internal/config/config.go | 27 ++++---- internal/config/loader.go | 6 +- internal/proxy/change_log.go | 17 +---- internal/proxy/dynamic_proxy.go | 7 +- internal/proxy/proxy_handler.go | 18 ++++- internal/proxy/proxy_state.go | 67 +++++++++--------- pkg/dgclient/common.go | 2 +- pkg/spec/change_log.go | 14 ---- pkg/spec/response_writer_tracker.go | 11 +++ pkg/util/http.go | 21 ++++-- 22 files changed, 162 insertions(+), 180 deletions(-) create mode 100644 .github/workflows/discord.yml diff --git a/.github/workflows/discord.yml b/.github/workflows/discord.yml new file mode 100644 index 0000000..c209ce5 --- /dev/null +++ b/.github/workflows/discord.yml @@ -0,0 +1,20 @@ +on: + release: + types: [published] + + jobs: + github-releases-to-discord: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Github Releases To Discord + uses: SethCohen/github-releases-to-discord@v1.13.1 + with: + webhook_url: ${{ secrets.DISCORD_WEBHOOK_URL }} + color: "2105893" + username: "Release Changelog" + avatar_url: "https://github.com/dgate-io.png" + content: "||@everyone||" + footer_title: "Changelog" + footer_timestamp: true \ No newline at end of file diff --git a/TODO.md b/TODO.md index 00ff125..5bb5d29 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ - server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.) - cluster management (raft commands, replica commands, etc.) (low priority) - other commands (backup, restore, etc.) (low priority) - - replace k6 with wrk for performance tests ## Add Module Tests @@ -16,9 +15,6 @@ - [ ] - Add option to specify export variables when ambiguous (?) - [ ] - check how global variable conflicts are handled -## Start using Pkl - -replace dgate server config with pkl. ## dgate-cli declaritive config @@ -70,10 +66,6 @@ expose metrics for the following: - Add Transactions - [ ] - Add transactional support for admin API -## DGate Documentation (dgate.io/docs) - -Use Docusaurus to create the documentation for DGate. - ## DGate Admin Console (low priority) Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser. @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster. -## DGate CLI - argument variable suggestions - -For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match. -``` -dgate-cli ns mk my-ns nmae=my-ns -Variable 'nmae' is not recognized. Did you mean 'name'? -``` - ## DGate CLI - help command show required variables When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables. @@ -159,4 +143,8 @@ Add stack tracing for typescript modules. Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation. -## Add Telemetry (sentry, datadog, etc.) \ No newline at end of file +## Add Telemetry (sentry, datadog, etc.) + +## ResourceManager callback for resource changes + +Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more. \ No newline at end of file diff --git a/internal/admin/admin_routes.go b/internal/admin/admin_routes.go index 871bdac..4539e60 100644 --- a/internal/admin/admin_routes.go +++ b/internal/admin/admin_routes.go @@ -3,7 +3,6 @@ package admin import ( "fmt" "log" - "net" "net/http" "strings" @@ -75,43 +74,20 @@ func configureRoutes( } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if ipList.Len() > 0 { - remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - remoteHost = r.RemoteAddr - } - allowed, err := ipList.Contains(remoteHost) - if !allowed && adminConfig.XForwardedForDepth > 0 { - xForwardedForIps := r.Header.Values("X-Forwarded-For") - count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps)) - for i := 0; i < count; i++ { - allowed, err = ipList.Contains(xForwardedForIps[i]) - if err != nil { - logger.Error("error checking x-forwarded-for ip", - zap.Error(err), - ) - if conf.Debug { - http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest) - } - http.Error(w, "Bad Request", http.StatusBadRequest) - return - } - if allowed { - break - } - } - } - + remoteIp := util.GetTrustedIP(r, + conf.AdminConfig.XForwardedForDepth) + allowed, err := ipList.Contains(remoteIp) if err != nil { if conf.Debug { http.Error(w, err.Error(), http.StatusInternalServerError) return } - http.Error(w, "Internal Server Error", http.StatusInternalServerError) + http.Error(w, "could not parse X-Forwarded-For IP", http.StatusBadRequest) return } if !allowed { if conf.Debug { - http.Error(w, "Unauthorized IP Address: "+remoteHost, http.StatusUnauthorized) + http.Error(w, "Unauthorized IP Address: "+remoteIp, http.StatusUnauthorized) return } http.Error(w, "Unauthorized", http.StatusUnauthorized) @@ -138,24 +114,26 @@ func configureRoutes( } else if adminConfig.KeyAuth.HeaderName != "" { key = r.Header.Get(adminConfig.KeyAuth.HeaderName) } else { - key = r.Header.Get("X-API-Key") + key = r.Header.Get("X-DGate-Key") } if _, keyFound := keyMap[key]; !keyFound { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } } - raftInstance := cs.Raft() - if r.Method == http.MethodPut && raftInstance != nil { - leader := raftInstance.Leader() - if leader == "" { - util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") - return - } - if raftInstance.State() != raft.Leader { - r.URL.Host = string(leader) - http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) - return + if raftInstance := cs.Raft(); raftInstance != nil { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + leader := raftInstance.Leader() + if leader == "" { + // TODO: add a way to wait for a leader with a timeout + util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") + return + } + if raftInstance.State() != raft.Leader { + r.URL.Host = string(leader) + http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) + return + } } } @@ -165,10 +143,14 @@ func configureRoutes( server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") - w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil)) w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly)) w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash())) - w.Header().Set("X-DGate-AdminAPI", "true") + if raftInstance := cs.Raft(); raftInstance != nil { + w.Header().Set( + "X-DGate-Raft-State", + raftInstance.State().String(), + ) + } w.WriteHeader(http.StatusOK) w.Write([]byte("DGate Admin API")) })) diff --git a/internal/admin/changestate/testutil/change_state.go b/internal/admin/changestate/testutil/change_state.go index ea1626f..8ecc33b 100644 --- a/internal/admin/changestate/testutil/change_state.go +++ b/internal/admin/changestate/testutil/change_state.go @@ -65,13 +65,8 @@ func (m *MockChangeState) ReloadState(a bool, cls ...*spec.ChangeLog) error { return m.Called(a, cls).Error(0) } -// SetReady implements changestate.ChangeState. -func (m *MockChangeState) SetReady() { - m.Called() -} - // SetupRaft implements changestate.ChangeState. -func (m *MockChangeState) SetupRaft(*raft.Raft, *raft.Config) { +func (m *MockChangeState) SetupRaft(*raft.Raft, chan raft.Observation) { m.Called().Error(0) } diff --git a/internal/admin/routes/collection_routes.go b/internal/admin/routes/collection_routes.go index 255433d..5e449fc 100644 --- a/internal/admin/routes/collection_routes.go +++ b/internal/admin/routes/collection_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -71,8 +70,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } @@ -279,8 +277,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/domain_routes.go b/internal/admin/routes/domain_routes.go index 06879c3..cb5fc72 100644 --- a/internal/admin/routes/domain_routes.go +++ b/internal/admin/routes/domain_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureDomainAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/misc_routes.go b/internal/admin/routes/misc_routes.go index 4b236ce..7ed280f 100644 --- a/internal/admin/routes/misc_routes.go +++ b/internal/admin/routes/misc_routes.go @@ -3,7 +3,6 @@ package routes import ( "encoding/json" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -14,8 +13,7 @@ import ( func ConfigureChangeLogAPI(server chi.Router, cs changestate.ChangeState, appConfig *config.DGateConfig) { server.Get("/changelog/hash", func(w http.ResponseWriter, r *http.Request) { if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/module_routes.go b/internal/admin/routes/module_routes.go index 75bb1fa..62c6b35 100644 --- a/internal/admin/routes/module_routes.go +++ b/internal/admin/routes/module_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureModuleAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch return } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/namespace_routes.go b/internal/admin/routes/namespace_routes.go index 0cc0563..63d4783 100644 --- a/internal/admin/routes/namespace_routes.go +++ b/internal/admin/routes/namespace_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -43,8 +42,7 @@ func ConfigureNamespaceAPI(server chi.Router, logger *zap.Logger, cs changestate } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/route_routes.go b/internal/admin/routes/route_routes.go index da76ac2..f366b73 100644 --- a/internal/admin/routes/route_routes.go +++ b/internal/admin/routes/route_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -51,8 +50,7 @@ func ConfigureRouteAPI(server chi.Router, logger *zap.Logger, cs changestate.Cha } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/secret_routes.go b/internal/admin/routes/secret_routes.go index 56156f4..a6c004e 100644 --- a/internal/admin/routes/secret_routes.go +++ b/internal/admin/routes/secret_routes.go @@ -5,7 +5,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureSecretAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch return } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/service_routes.go b/internal/admin/routes/service_routes.go index 082ec8f..4216291 100644 --- a/internal/admin/routes/service_routes.go +++ b/internal/admin/routes/service_routes.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -67,8 +66,7 @@ func ConfigureServiceAPI(server chi.Router, logger *zap.Logger, cs changestate.C if repl := cs.Raft(); repl != nil { logger.Debug("Waiting for raft barrier") - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/config/config.go b/internal/config/config.go index 1dcd90e..cf1c0c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,19 +39,22 @@ type ( } DGateProxyConfig struct { - Host string `koanf:"host"` - Port int `koanf:"port"` - TLS *DGateTLSConfig `koanf:"tls"` - EnableH2C bool `koanf:"enable_h2c"` - EnableHTTP2 bool `koanf:"enable_http2"` - EnableConsoleLogger bool `koanf:"enable_console_logger"` - RedirectHttpsDomains []string `koanf:"redirect_https"` - AllowedDomains []string `koanf:"allowed_domains"` - GlobalHeaders map[string]string `koanf:"global_headers"` - Transport DGateHttpTransportConfig `koanf:"client_transport"` + Host string `koanf:"host"` + Port int `koanf:"port"` + TLS *DGateTLSConfig `koanf:"tls"` + EnableH2C bool `koanf:"enable_h2c"` + EnableHTTP2 bool `koanf:"enable_http2"` + EnableConsoleLogger bool `koanf:"enable_console_logger"` + RedirectHttpsDomains []string `koanf:"redirect_https"` + AllowedDomains []string `koanf:"allowed_domains"` + GlobalHeaders map[string]string `koanf:"global_headers"` + Transport DGateHttpTransportConfig `koanf:"client_transport"` + DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + StrictMode bool `koanf:"strict_mode"` + XForwardedForDepth int `koanf:"x_forwarded_for_depth"` + // WARN: debug use only - InitResources *DGateResources `koanf:"init_resources"` - DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + InitResources *DGateResources `koanf:"init_resources"` } DGateTestServerConfig struct { diff --git a/internal/config/loader.go b/internal/config/loader.go index a85ff66..533f22c 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { } if k.Exists("admin") { kDefault(k, "admin.host", "127.0.0.1") + kDefault(k, "admin.x_forwarded_for_depth", -1) err = kRequireAll(k, "admin.port") if err != nil { return nil, err @@ -298,11 +299,10 @@ func (config *DGateReplicationConfig) LoadRaftConfig(defaultConfig *raft.Config) } if config.RaftID != "" { rc.LocalID = raft.ServerID(config.RaftID) - } else { - rc.LocalID = defaultConfig.LocalID } } - if err := raft.ValidateConfig(rc); err != nil { + err := raft.ValidateConfig(rc) + if err != nil { panic(err) } return rc diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 6dd4c35..9ec21e8 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -97,7 +97,7 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( if reload { if cl.Cmd.IsNoop() || cl.Cmd.Resource().IsRelatedTo(spec.Routes) { ps.logger.Debug("Registering change log", zap.Stringer("cmd", cl.Cmd)) - if err = ps.reconfigureState(false, cl); err != nil { + if err = ps.reconfigureState(false); err != nil { ps.logger.Error("Error registering change log", zap.Error(err)) return } @@ -265,19 +265,6 @@ func (ps *ProxyState) processSecret(scrt *spec.Secret, cl *spec.ChangeLog) (err return err } -// applyChange - apply a change to the proxy state, returns a channel that will receive an error when the state has been updated -func (ps *ProxyState) applyChange(changeLog *spec.ChangeLog) <-chan error { - done := make(chan error, 1) - if changeLog == nil { - changeLog = spec.NewNoopChangeLog() - } - changeLog.SetErrorChan(done) - if err := ps.processChangeLog(changeLog, true, true); err != nil { - done <- err - } - return done -} - func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { logs, err := ps.store.FetchChangeLogs() if err != nil { @@ -309,7 +296,7 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { return err } } else { - if err = ps.reconfigureState(false, nil); err != nil { + if err = ps.reconfigureState(false); err != nil { return nil } } diff --git a/internal/proxy/dynamic_proxy.go b/internal/proxy/dynamic_proxy.go index 036fceb..04d67d2 100644 --- a/internal/proxy/dynamic_proxy.go +++ b/internal/proxy/dynamic_proxy.go @@ -17,17 +17,16 @@ import ( "golang.org/x/net/http2/h2c" ) -func (ps *ProxyState) reconfigureState(init bool, log *spec.ChangeLog) (err error) { +func (ps *ProxyState) reconfigureState(init bool) (err error) { defer func() { if err != nil { - go ps.restartState(func(err error) { + ps.restartState(func(err error) { if err != nil { ps.logger.Error("Error restarting state", zap.Error(err)) go ps.Stop() } }) } - log.PushError(err) }() ps.proxyLock.Lock() @@ -309,7 +308,7 @@ func (ps *ProxyState) Stop() { ps.proxyLock.Lock() raftNode := ps.Raft() ps.proxyLock.Unlock() - + if raftNode != nil { ps.logger.Info("Stopping Raft node") if err := raftNode.Shutdown().Error(); err != nil { diff --git a/internal/proxy/proxy_handler.go b/internal/proxy/proxy_handler.go index 47a1a31..8c97626 100644 --- a/internal/proxy/proxy_handler.go +++ b/internal/proxy/proxy_handler.go @@ -25,12 +25,20 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) { With( zap.String("route", reqCtx.route.Name), zap.String("namespace", reqCtx.route.Namespace.Name), + zap.String("path", reqCtx.req.URL.Path), + zap.String("method", reqCtx.req.Method), + zap.String("query", reqCtx.req.URL.RawQuery), + zap.String("protocol", reqCtx.req.Proto), + zap.String("remote_address", reqCtx.req.RemoteAddr), + zap.String("user_agent", reqCtx.req.UserAgent()), + zap.Int64("content_length", reqCtx.req.ContentLength), + zap.String("content_type", reqCtx.req.Header.Get("Content-Type")), ) if reqCtx.route.Service != nil { event = event.With(zap.String("service", reqCtx.route.Service.Name)) } - event.Debug("Request Log") + event.Info("Request log") }() defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now()) @@ -156,7 +164,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt }). ErrorHandler(func(w http.ResponseWriter, r *http.Request, reqErr error) { upstreamErr = reqErr - ps.logger.Debug("Error proxying request", + ps.logger.Error("Error proxying request", zap.String("error", reqErr.Error()), zap.String("route", reqCtx.route.Name), zap.String("service", reqCtx.route.Service.Name), @@ -185,6 +193,12 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt } } if !reqCtx.rw.HeadersSent() && reqCtx.rw.BytesWritten() == 0 { + ps.logger.Error("Writing error response", + zap.String("error", reqErr.Error()), + zap.String("route", reqCtx.route.Name), + zap.String("service", reqCtx.route.Service.Name), + zap.String("namespace", reqCtx.route.Namespace.Name), + ) util.WriteStatusCodeError(reqCtx.rw, http.StatusBadGateway) } }) diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 8349fdb..5ca4df3 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -168,15 +168,6 @@ func (ps *ProxyState) ChangeHash() uint32 { return ps.changeHash } -func (ps *ProxyState) SetReady() { - if ps.replicationEnabled && !ps.raftReady.Load() { - ps.logger.Info("Replication status is now ready after " + - time.Since(ps.startTime).String()) - ps.raftReady.Store(true) - return - } -} - func (ps *ProxyState) Ready() bool { if ps.replicationEnabled { return ps.raftReady.Load() @@ -227,7 +218,10 @@ func (ps *ProxyState) SetupRaft(r *raft.Raft, oc chan raft.Observation) { func (ps *ProxyState) WaitForChanges() error { ps.proxyLock.RLock() defer ps.proxyLock.RUnlock() - return <-ps.applyChange(nil) + if rft := ps.Raft(); rft != nil { + return rft.Barrier(time.Second * 5).Error() + } + return nil } func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { @@ -288,30 +282,22 @@ func (ps *ProxyState) restartState(fn func(error)) { ps.routers.Clear() ps.sharedCache.Clear() ps.Scheduler().Stop() - if err := ps.initConfigResources(ps.config.ProxyConfig.InitResources); err != nil { fn(err) return } if ps.replicationEnabled { - for _, log := range ps.changeLogs { - if err := ps.processChangeLog(log, false, false); err != nil { - fn(err) - return - } - } - cl := spec.NewNoopChangeLog() - if err := ps.processChangeLog(cl, true, false); err != nil { - fn(err) - return - } - } else { - if err := ps.restoreFromChangeLogs(true); err != nil { + raft := ps.Raft() + err := raft.ReloadConfig(raft.ReloadableConfig()) + if err != nil { fn(err) return } } - + if err := ps.restoreFromChangeLogs(true); err != nil { + fn(err) + return + } ps.logger.Info("State successfully restarted") fn(nil) } @@ -329,7 +315,7 @@ func (ps *ProxyState) ReloadState(check bool, logs ...*spec.ChangeLog) error { } } if reload { - <-ps.applyChange(nil) + return ps.processChangeLog(nil, true, false) } return nil } @@ -338,8 +324,9 @@ func (ps *ProxyState) ProcessChangeLog(log *spec.ChangeLog, reload bool) error { err := ps.processChangeLog(log, reload, !ps.replicationEnabled) if err != nil { ps.logger.Error("processing error", zap.Error(err)) + return err } - return err + return nil } func (ps *ProxyState) DynamicTLSConfig(certFile, keyFile string) *tls.Config { @@ -437,8 +424,11 @@ func (ps *ProxyState) initConfigResources(resources *config.DGateResources) erro return err } if numChanges > 0 { - cl := spec.NewNoopChangeLog() - defer ps.processChangeLog(cl, false, false) + defer func() { + if err != nil { + err = ps.processChangeLog(nil, false, false) + } + }() } ps.logger.Info("Initializing resources") for _, ns := range resources.Namespaces { @@ -613,19 +603,30 @@ func (ps *ProxyState) ServeHTTP(w http.ResponseWriter, r *http.Request) { if router, ok := ps.routers.Find(ns.Name); ok { router.ServeHTTP(w, r) } else { - ps.logger.Debug("No router found for namespace", - zap.String("namespace", ns.Name), - ) util.WriteStatusCodeError(w, http.StatusNotFound) } } else { + if ps.config.ProxyConfig.StrictMode { + closeConnection(w) + return + } + trustedIp := util.GetTrustedIP(r, ps.config.ProxyConfig.XForwardedForDepth) ps.logger.Debug("No namespace found for request", zap.String("protocol", r.Proto), zap.String("host", r.Host), zap.String("path", r.URL.Path), zap.Bool("secure", r.TLS != nil), - zap.String("remote_addr", r.RemoteAddr), + zap.String("remote_addr", trustedIp), ) util.WriteStatusCodeError(w, http.StatusNotFound) } } + +func closeConnection(w http.ResponseWriter) { + if loot, ok := w.(http.Hijacker); ok { + if conn, _, err := loot.Hijack(); err == nil { + defer conn.Close() + return + } + } +} diff --git a/pkg/dgclient/common.go b/pkg/dgclient/common.go index bfe9b1f..681216d 100644 --- a/pkg/dgclient/common.go +++ b/pkg/dgclient/common.go @@ -150,5 +150,5 @@ func parseApiError(body io.Reader, wrapErr error) error { if err := json.NewDecoder(body).Decode(&apiError); err != nil || apiError.Error == "" { return wrapErr } - return fmt.Errorf("%d: %s", wrapErr, apiError.Error) + return fmt.Errorf("%s: %s", wrapErr, apiError.Error) } diff --git a/pkg/spec/change_log.go b/pkg/spec/change_log.go index f5f349d..d16899b 100644 --- a/pkg/spec/change_log.go +++ b/pkg/spec/change_log.go @@ -13,7 +13,6 @@ type ChangeLog struct { Namespace string `json:"namespace"` Item any `json:"item"` Version int `json:"version"` - errChan chan error } func NewNoopChangeLog() *ChangeLog { @@ -44,19 +43,6 @@ func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { } } -func (cl *ChangeLog) SetErrorChan(errChan chan error) { - cl.errChan = errChan -} - -func (cl *ChangeLog) PushError(err error) { - if cl == nil { - return - } - if cl.errChan != nil { - cl.errChan <- err - } -} - type Command string type Action string diff --git a/pkg/spec/response_writer_tracker.go b/pkg/spec/response_writer_tracker.go index 26dcb08..6a872ee 100644 --- a/pkg/spec/response_writer_tracker.go +++ b/pkg/spec/response_writer_tracker.go @@ -1,6 +1,8 @@ package spec import ( + "bufio" + "net" "net/http" ) @@ -17,6 +19,7 @@ type rwTracker struct { bytesWritten int64 } +var _ http.Hijacker = (*rwTracker)(nil) var _ ResponseWriterTracker = (*rwTracker)(nil) func NewResponseWriterTracker(rw http.ResponseWriter) ResponseWriterTracker { @@ -61,3 +64,11 @@ func (t *rwTracker) HeadersSent() bool { func (t *rwTracker) BytesWritten() int64 { return t.bytesWritten } + +func (t *rwTracker) Hijack() (net.Conn, *bufio.ReadWriter, error) { + hijacker, ok := t.rw.(http.Hijacker) + if !ok { + return nil, nil, http.ErrNotSupported + } + return hijacker.Hijack() +} diff --git a/pkg/util/http.go b/pkg/util/http.go index 2f8e007..741f6af 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -1,7 +1,7 @@ package util import ( - "fmt" + "net" "net/http" ) @@ -9,7 +9,20 @@ func WriteStatusCodeError(w http.ResponseWriter, code int) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(code) - w.Write([]byte( - fmt.Sprintf("DGate: %d %s", code, http.StatusText(code)), - )) +} + +// GetTrustedIP returns the trusted IP address of the client. It checks the +// X-Forwarded-For header first, and falls back to the RemoteAddr field of the +// request if the header is not present. depth is the number of proxies that +// the request has passed through. +func GetTrustedIP(r *http.Request, depth int) string { + ips := r.Header.Values("X-Forwarded-For") + if len(ips) == 0 || depth > len(ips) { + remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return r.RemoteAddr + } + return remoteHost + } + return ips[len(ips)-depth] }