diff --git a/pkg/cannon/cannon.go b/pkg/cannon/cannon.go index 6ee2ebf7..ffd68502 100644 --- a/pkg/cannon/cannon.go +++ b/pkg/cannon/cannon.go @@ -24,6 +24,7 @@ import ( "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/go-co-op/gocron" "github.com/google/uuid" + perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" ) @@ -249,11 +250,7 @@ func (c *Cannon) syncClockDrift(ctx context.Context) error { func (c *Cannon) handleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error { for _, sink := range c.sinks { if err := sink.HandleNewDecoratedEvents(ctx, events); err != nil { - c.log. - WithError(err). - WithField("sink", sink.Type()). - WithField("events", len(events)). - Error("Failed to send events to sink") + return perrors.Wrapf(err, "failed to handle new decorated events in sink %s", sink.Name()) } } diff --git a/pkg/output/http/http.go b/pkg/output/http/http.go index 6e6d49dc..58bb2104 100644 --- a/pkg/output/http/http.go +++ b/pkg/output/http/http.go @@ -68,6 +68,10 @@ func (h *HTTP) Start(ctx context.Context) error { return nil } +func (h *HTTP) Name() string { + return h.name +} + func (h *HTTP) Stop(ctx context.Context) error { return h.proc.Shutdown(ctx) } diff --git a/pkg/output/sink.go b/pkg/output/sink.go index 8bdc57c4..5568e2ff 100644 --- a/pkg/output/sink.go +++ b/pkg/output/sink.go @@ -22,6 +22,7 @@ type Sink interface { Start(ctx context.Context) error Stop(ctx context.Context) error Type() string + Name() string HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error } diff --git a/pkg/output/stdout/stdout.go b/pkg/output/stdout/stdout.go index b881410f..b1c2e376 100644 --- a/pkg/output/stdout/stdout.go +++ b/pkg/output/stdout/stdout.go @@ -57,6 +57,10 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu }, nil } +func (h *StdOut) Name() string { + return h.name +} + func (h *StdOut) Type() string { return SinkType } diff --git a/pkg/output/xatu/xatu.go b/pkg/output/xatu/xatu.go index 047e7f54..9ec75fb2 100644 --- a/pkg/output/xatu/xatu.go +++ b/pkg/output/xatu/xatu.go @@ -64,6 +64,10 @@ func (h *Xatu) Type() string { return SinkType } +func (h *Xatu) Name() string { + return h.name +} + func (h *Xatu) Start(ctx context.Context) error { return nil } diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index c2698aa2..924a7195 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -187,13 +187,6 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it bvp.batchMutex.Lock() defer bvp.batchMutex.Unlock() - if bvp.o.ExportTimeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) - - defer cancel() - } - if l := len(items); l > 0 { countItemsToExport := len(items) @@ -208,9 +201,9 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it bvp.log.WithFields(logrus.Fields{ "count": len(itemsBatch), - }).Debug("immediately exporting items") + }).Debug("Immediately exporting items") - err := bvp.e.ExportItems(ctx, itemsBatch) + err := bvp.exportWithTimeout(ctx, itemsBatch) bvp.metrics.IncItemsExportedBy(float64(len(itemsBatch))) @@ -223,6 +216,23 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it return nil } +// exportWithTimeout exports the items with a timeout. +func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*T) error { + if bvp.o.ExportTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) + + defer cancel() + } + + err := bvp.e.ExportItems(ctx, itemsBatch) + if err != nil { + return err + } + + return nil +} + // Shutdown flushes the queue and waits until all items are processed. // It only executes once. Subsequent call does nothing. func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {