From 75ba868936a285991df3db16637b4f853c067d75 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jul 2023 14:04:45 +1000 Subject: [PATCH 1/2] fix: Flush sinks in grpc services when shutting down --- pkg/output/stdout/stdout.go | 2 +- pkg/server/service/event-ingester/ingester.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/output/stdout/stdout.go b/pkg/output/stdout/stdout.go index 91430e8a..5f3226bb 100644 --- a/pkg/output/stdout/stdout.go +++ b/pkg/output/stdout/stdout.go @@ -61,7 +61,7 @@ func (h *StdOut) Start(ctx context.Context) error { } func (h *StdOut) Stop(ctx context.Context) error { - return nil + return h.proc.Shutdown(ctx) } func (h *StdOut) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error { diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index a7a71e5c..2dcc2d70 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -52,7 +52,11 @@ func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { } func (e *Ingester) Stop(ctx context.Context) error { - e.log.Info("stopping module") + for _, sink := range e.sinks { + if err := sink.Stop(ctx); err != nil { + return err + } + } return nil } From eff38792d937fddd08c78563a2bfac47a90e0ea2 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 17 Jul 2023 14:06:02 +1000 Subject: [PATCH 2/2] chore: Add log message when stopping module --- pkg/server/service/event-ingester/ingester.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 2dcc2d70..9dcf0591 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -52,6 +52,8 @@ func (e *Ingester) Start(ctx context.Context, grpcServer *grpc.Server) error { } func (e *Ingester) Stop(ctx context.Context) error { + e.log.Info("stopping module") + for _, sink := range e.sinks { if err := sink.Stop(ctx); err != nil { return err