diff --git a/pkg/server/config.go b/pkg/server/config.go index 1ae7d24e..224528b3 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -10,6 +10,13 @@ import ( type Config struct { // The address to listen on. Addr string `yaml:"addr" default:":8080"` + // PreStopSleepSeconds is the number of seconds to sleep before stopping. + // Useful for giving kubernetes time to drain connections. + // This sleep will happen after a SIGTERM is received, and will + // delay the shutdown of the server and all of it's components. + // Note: Do not set this to a value greater than the kubernetes + // terminationGracePeriodSeconds. + PreStopSleepSeconds int `yaml:"preStopSleepSeconds" default:"0"` // MetricsAddr is the address to listen on for metrics. MetricsAddr string `yaml:"metricsAddr" default:":9090"` // PProfAddr is the address to listen on for pprof. diff --git a/pkg/server/server.go b/pkg/server/server.go index 9a230b7f..394467ec 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -167,16 +167,20 @@ func (x *Xatu) Start(ctx context.Context) error { } func (x *Xatu) stop(ctx context.Context) error { + x.log.WithField("pre_stop_sleep_seconds", x.config.PreStopSleepSeconds).Info("Stopping server") + + time.Sleep(time.Duration(x.config.PreStopSleepSeconds) * time.Second) + + if x.grpcServer != nil { + x.grpcServer.GracefulStop() + } + for _, s := range x.services { if err := s.Stop(ctx); err != nil { return err } } - if x.grpcServer != nil { - x.grpcServer.GracefulStop() - } - if x.config.Persistence.Enabled && x.persistence != nil { if err := x.persistence.Stop(ctx); err != nil { return err @@ -207,6 +211,8 @@ func (x *Xatu) stop(ctx context.Context) error { } } + x.log.Info("Server stopped") + return nil } diff --git a/pkg/server/service/coordinator/client.go b/pkg/server/service/coordinator/client.go index ca1ce9ac..09365ca3 100644 --- a/pkg/server/service/coordinator/client.go +++ b/pkg/server/service/coordinator/client.go @@ -63,7 +63,7 @@ func NewClient(ctx context.Context, log logrus.FieldLogger, conf *Config, p *per } func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error { - c.log.Info("starting module") + c.log.Info("Starting module") xatu.RegisterCoordinatorServer(grpcServer, c) @@ -81,7 +81,7 @@ func (c *Client) Stop(ctx context.Context) error { } } - c.log.Info("module stopped") + c.log.Info("Module stopped") return nil } diff --git a/pkg/server/service/coordinator/node/record.go b/pkg/server/service/coordinator/node/record.go index 1a588464..e47e1d2f 100644 --- a/pkg/server/service/coordinator/node/record.go +++ b/pkg/server/service/coordinator/node/record.go @@ -55,7 +55,7 @@ func (r *Record) Stop(ctx context.Context) error { } } - r.log.Info("component stopped") + r.log.Info("Component stopped") return nil } diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index cbe9dc8d..3407808e 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -47,7 +47,7 @@ func NewIngester(ctx context.Context, log logrus.FieldLogger, conf *Config, cloc } func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { - e.log.Info("starting module") + e.log.Info("Starting module") xatu.RegisterEventIngesterServer(grpcServer, e) @@ -55,7 +55,7 @@ func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { } func (e *Ingester) Stop(ctx context.Context) error { - e.log.Info("stopping module") + e.log.Info("Stopping module") for _, sink := range e.sinks { if err := sink.Stop(ctx); err != nil {