Skip to content

Commit

Permalink
fix(server): Gracefully handle shutdown (#195)
Browse files Browse the repository at this point in the history
* fix(server): Gracefully handle shutdown

* feat(config): Add PreStopSleepSeconds field

* refactor(server): Stop and start log messages
  • Loading branch information
samcm committed Sep 19, 2023
1 parent 3813611 commit b4438f9
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 9 deletions.
7 changes: 7 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
type Config struct {
// The address to listen on.
Addr string `yaml:"addr" default:":8080"`
// PreStopSleepSeconds is the number of seconds to sleep before stopping.
// Useful for giving kubernetes time to drain connections.
// This sleep will happen after a SIGTERM is received, and will
// delay the shutdown of the server and all of it's components.
// Note: Do not set this to a value greater than the kubernetes
// terminationGracePeriodSeconds.
PreStopSleepSeconds int `yaml:"preStopSleepSeconds" default:"0"`
// MetricsAddr is the address to listen on for metrics.
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
// PProfAddr is the address to listen on for pprof.
Expand Down
14 changes: 10 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,20 @@ func (x *Xatu) Start(ctx context.Context) error {
}

func (x *Xatu) stop(ctx context.Context) error {
x.log.WithField("pre_stop_sleep_seconds", x.config.PreStopSleepSeconds).Info("Stopping server")

time.Sleep(time.Duration(x.config.PreStopSleepSeconds) * time.Second)

if x.grpcServer != nil {
x.grpcServer.GracefulStop()
}

for _, s := range x.services {
if err := s.Stop(ctx); err != nil {
return err
}
}

if x.grpcServer != nil {
x.grpcServer.GracefulStop()
}

if x.config.Persistence.Enabled && x.persistence != nil {
if err := x.persistence.Stop(ctx); err != nil {
return err
Expand Down Expand Up @@ -207,6 +211,8 @@ func (x *Xatu) stop(ctx context.Context) error {
}
}

x.log.Info("Server stopped")

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/server/service/coordinator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewClient(ctx context.Context, log logrus.FieldLogger, conf *Config, p *per
}

func (c *Client) Start(ctx context.Context, grpcServer *grpc.Server) error {
c.log.Info("starting module")
c.log.Info("Starting module")

xatu.RegisterCoordinatorServer(grpcServer, c)

Expand All @@ -81,7 +81,7 @@ func (c *Client) Stop(ctx context.Context) error {
}
}

c.log.Info("module stopped")
c.log.Info("Module stopped")

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/service/coordinator/node/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *Record) Stop(ctx context.Context) error {
}
}

r.log.Info("component stopped")
r.log.Info("Component stopped")

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/service/event-ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func NewIngester(ctx context.Context, log logrus.FieldLogger, conf *Config, cloc
}

func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error {
e.log.Info("starting module")
e.log.Info("Starting module")

xatu.RegisterEventIngesterServer(grpcServer, e)

return nil
}

func (e *Ingester) Stop(ctx context.Context) error {
e.log.Info("stopping module")
e.log.Info("Stopping module")

for _, sink := range e.sinks {
if err := sink.Stop(ctx); err != nil {
Expand Down

0 comments on commit b4438f9

Please sign in to comment.