Skip to content

Commit

Permalink
fix(cannon): Correctly return sink errors (#198)
Browse files Browse the repository at this point in the history
* fix(cannon): Correctly return sink errors

* refactor: Move exportWithTimeout function to separate method
  • Loading branch information
samcm committed Sep 20, 2023
1 parent 3648c6a commit 97c9f76
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 14 deletions.
7 changes: 2 additions & 5 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/go-co-op/gocron"
"github.com/google/uuid"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -249,11 +250,7 @@ func (c *Cannon) syncClockDrift(ctx context.Context) error {
func (c *Cannon) handleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error {
for _, sink := range c.sinks {
if err := sink.HandleNewDecoratedEvents(ctx, events); err != nil {
c.log.
WithError(err).
WithField("sink", sink.Type()).
WithField("events", len(events)).
Error("Failed to send events to sink")
return perrors.Wrapf(err, "failed to handle new decorated events in sink %s", sink.Name())
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (h *HTTP) Start(ctx context.Context) error {
return nil
}

func (h *HTTP) Name() string {
return h.name
}

func (h *HTTP) Stop(ctx context.Context) error {
return h.proc.Shutdown(ctx)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/output/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Sink interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
Type() string
Name() string
HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error
HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error
}
4 changes: 4 additions & 0 deletions pkg/output/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
}, nil
}

func (h *StdOut) Name() string {
return h.name
}

func (h *StdOut) Type() string {
return SinkType
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (h *Xatu) Type() string {
return SinkType
}

func (h *Xatu) Name() string {
return h.name
}

func (h *Xatu) Start(ctx context.Context) error {
return nil
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,6 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
bvp.batchMutex.Lock()
defer bvp.batchMutex.Unlock()

if bvp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout)

defer cancel()
}

if l := len(items); l > 0 {
countItemsToExport := len(items)

Expand All @@ -208,9 +201,9 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it

bvp.log.WithFields(logrus.Fields{
"count": len(itemsBatch),
}).Debug("immediately exporting items")
}).Debug("Immediately exporting items")

err := bvp.e.ExportItems(ctx, itemsBatch)
err := bvp.exportWithTimeout(ctx, itemsBatch)

bvp.metrics.IncItemsExportedBy(float64(len(itemsBatch)))

Expand All @@ -223,6 +216,23 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
return nil
}

// exportWithTimeout exports the items with a timeout.
func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*T) error {
if bvp.o.ExportTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout)

defer cancel()
}

err := bvp.e.ExportItems(ctx, itemsBatch)
if err != nil {
return err
}

return nil
}

// Shutdown flushes the queue and waits until all items are processed.
// It only executes once. Subsequent call does nothing.
func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error {
Expand Down

0 comments on commit 97c9f76

Please sign in to comment.