From b4438f926f3519136d3c131fab192de898463df9 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 19 Sep 2023 13:53:53 +1000 Subject: [PATCH] fix(server): Gracefully handle shutdown (#195) * fix(server): Gracefully handle shutdown * feat(config): Add PreStopSleepSeconds field * refactor(server): Stop and start log messages --- pkg/server/config.go | 7 +++++++ pkg/server/server.go | 14 ++++++++++---- pkg/server/service/coordinator/client.go | 4 ++-- pkg/server/service/coordinator/node/record.go | 2 +- pkg/server/service/event-ingester/ingester.go | 4 ++-- 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/server/config.go b/pkg/server/config.go index 1ae7d24e..224528b3 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -10,6 +10,13 @@ import ( type Config struct { // The address to listen on. Addr string `yaml:"addr" default:":8080"` + // PreStopSleepSeconds is the number of seconds to sleep before stopping. + // Useful for giving kubernetes time to drain connections. + // This sleep will happen after a SIGTERM is received, and will + // delay the shutdown of the server and all of it's components. + // Note: Do not set this to a value greater than the kubernetes + // terminationGracePeriodSeconds. + PreStopSleepSeconds int `yaml:"preStopSleepSeconds" default:"0"` // MetricsAddr is the address to listen on for metrics. MetricsAddr string `yaml:"metricsAddr" default:":9090"` // PProfAddr is the address to listen on for pprof. diff --git a/pkg/server/server.go b/pkg/server/server.go index 9a230b7f..394467ec 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -167,16 +167,20 @@ func (x *Xatu) Start(ctx context.Context) error { } func (x *Xatu) stop(ctx context.Context) error { + x.log.WithField("pre_stop_sleep_seconds", x.config.PreStopSleepSeconds).Info("Stopping server") + + time.Sleep(time.Duration(x.config.PreStopSleepSeconds) * time.Second) + + if x.grpcServer != nil { + x.grpcServer.GracefulStop() + } + for _, s := range x.services { if err := s.Stop(ctx); err != nil { return err } } - if x.grpcServer != nil { - x.grpcServer.GracefulStop() - } - if x.config.Persistence.Enabled && x.persistence != nil { if err := x.persistence.Stop(ctx); err != nil { return err @@ -207,6 +211,8 @@ func (x *Xatu) stop(ctx context.Context) error { } } + x.log.Info("Server stopped") + return nil } diff --git a/pkg/server/service/coordinator/client.go b/pkg/server/service/coordinator/client.go index ca1ce9ac..09365ca3 100644 --- a/pkg/server/service/coordinator/client.go +++ b/pkg/server/service/coordinator/client.go @@ -63,7 +63,7 @@ func NewClient(ctx context.Context, log logrus.FieldLogger, conf *Config, p *per } func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error { - c.log.Info("starting module") + c.log.Info("Starting module") xatu.RegisterCoordinatorServer(grpcServer, c) @@ -81,7 +81,7 @@ func (c *Client) Stop(ctx context.Context) error { } } - c.log.Info("module stopped") + c.log.Info("Module stopped") return nil } diff --git a/pkg/server/service/coordinator/node/record.go b/pkg/server/service/coordinator/node/record.go index 1a588464..e47e1d2f 100644 --- a/pkg/server/service/coordinator/node/record.go +++ b/pkg/server/service/coordinator/node/record.go @@ -55,7 +55,7 @@ func (r *Record) Stop(ctx context.Context) error { } } - r.log.Info("component stopped") + r.log.Info("Component stopped") return nil } diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index cbe9dc8d..3407808e 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -47,7 +47,7 @@ func NewIngester(ctx context.Context, log logrus.FieldLogger, conf *Config, cloc } func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { - e.log.Info("starting module") + e.log.Info("Starting module") xatu.RegisterEventIngesterServer(grpcServer, e) @@ -55,7 +55,7 @@ func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { } func (e *Ingester) Stop(ctx context.Context) error { - e.log.Info("stopping module") + e.log.Info("Stopping module") for _, sink := range e.sinks { if err := sink.Stop(ctx); err != nil {