Skip to content

Commit

Permalink
refactor: process upstream messages individually if push fails
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Aug 5, 2023
1 parent 96acd1b commit e017e08
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions events/upstream_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,25 @@ func (t *pushToUpstreamEventHandler) Run(ctx *api.Context, events []api.Event) [
}

upstreamMsg.ApplyLabels(t.conf.LabelsMap())
if err := upstream.Push(ctx, t.conf, upstreamMsg); err != nil {
err := upstream.Push(ctx, t.conf, upstreamMsg)
if err == nil {
return failedEvents
}

if len(events) == 1 {
errMsg := fmt.Errorf("failed to push to upstream: %w", err)
failedEvents = append(failedEvents, addErrorToFailedEvents(events, errMsg)...)
return failedEvents
} else {
// Error encountered while pushing could be an SQL or Application error
// Since we do not know which event in the bulk is failing
// Process each event individually since upsteam.Push is idempotent
var failedIndividualEvents []api.Event
for _, e := range events {
failedIndividualEvents = append(failedIndividualEvents, t.Run(ctx, []api.Event{e})...)
}
return failedIndividualEvents
}

return failedEvents
}

type GroupedPushEvents struct {
Expand Down

0 comments on commit e017e08

Please sign in to comment.