Skip to content

Commit

Permalink
fix: when sync event fails, update the event's attempt metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Sep 20, 2023
1 parent af965f5 commit 1b45c80
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/commons/utils"
"github.com/flanksource/duty/upstream"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
Expand Down Expand Up @@ -195,32 +196,56 @@ func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *event
}

func (t *SyncEventConsumer) Handle(ctx *api.Context) (int, error) {
event, err := t.consumeOne(ctx)
if err != nil {
if event != nil {
event.Attempts++
event.Error = err.Error()
event.LastAttempt = utils.Ptr(time.Now())
if err := ctx.DB().Save(event).Error; err != nil {
logger.Errorf("error saving updates of a failed event: %v", err)
}
}

err = fmt.Errorf("error processing sync consumer: %w", err)
}

if event == nil {
return 0, err
}

return 1, err
}

// consumeOne fetches a single event and passes it to all the consumers in one single transaction.
func (t *SyncEventConsumer) consumeOne(ctx *api.Context) (*api.Event, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return 0, fmt.Errorf("error initiating db tx: %w", tx.Error)
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx)

events, err := fetchEvents(ctx, t.watchEvents, 1)
if err != nil {
return 0, fmt.Errorf("error fetching events: %w", err)
return nil, fmt.Errorf("error fetching events: %w", err)
}

if len(events) == 0 {
return 0, nil
return nil, nil
}

// sync consumers always fetch a single event at a time
event := events[0]

for _, syncConsumer := range t.consumers {
if err := syncConsumer(ctx, events[0]); err != nil {
return 0, fmt.Errorf("error processing sync consumer: %w", err)
if err := syncConsumer(ctx, event); err != nil {
return &event, err
}
}

// FIXME: When this fails we only roll it back and the attempt is never increased.
// Also, error is never saved.
return len(events), tx.Commit().Error
return &event, tx.Commit().Error
}

type AsyncEventConsumer struct {
Expand All @@ -245,13 +270,12 @@ func (t *AsyncEventConsumer) Handle(ctx *api.Context) (int, error) {
}

failedEvents := t.consumer(ctx, events)
lastAttempt := time.Now()

for i := range failedEvents {
e := &failedEvents[i]
e.Attempts += 1
e.LastAttempt = &lastAttempt
logger.Errorf("Failed to process event[%s]: %s", e.ID, e.Error)
e.LastAttempt = utils.Ptr(time.Now())
logger.Errorf("error processing async event (id=%s, name=%s): %s", e.ID, e.Name, e.Error)
}

if len(failedEvents) > 0 {
Expand Down

0 comments on commit 1b45c80

Please sign in to comment.