From 1b45c8089211727fda2f7ea99a78eca3373fbac5 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 20 Sep 2023 18:03:08 +0545 Subject: [PATCH] fix: when sync event fails, update the event's attempt metadata --- events/event_queue.go | 46 ++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/events/event_queue.go b/events/event_queue.go index e2b8daf85..3d18c6618 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -5,6 +5,7 @@ import ( "time" "github.com/flanksource/commons/logger" + "github.com/flanksource/commons/utils" "github.com/flanksource/duty/upstream" "github.com/flanksource/incident-commander/api" "github.com/flanksource/incident-commander/events/eventconsumer" @@ -195,9 +196,32 @@ func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *event } func (t *SyncEventConsumer) Handle(ctx *api.Context) (int, error) { + event, err := t.consumeOne(ctx) + if err != nil { + if event != nil { + event.Attempts++ + event.Error = err.Error() + event.LastAttempt = utils.Ptr(time.Now()) + if err := ctx.DB().Save(event).Error; err != nil { + logger.Errorf("error saving updates of a failed event: %v", err) + } + } + + err = fmt.Errorf("error processing sync consumer: %w", err) + } + + if event == nil { + return 0, err + } + + return 1, err +} + +// consumeOne fetches a single event and passes it to all the consumers in one single transaction. +func (t *SyncEventConsumer) consumeOne(ctx *api.Context) (*api.Event, error) { tx := ctx.DB().Begin() if tx.Error != nil { - return 0, fmt.Errorf("error initiating db tx: %w", tx.Error) + return nil, fmt.Errorf("error initiating db tx: %w", tx.Error) } defer tx.Rollback() @@ -205,22 +229,23 @@ func (t *SyncEventConsumer) Handle(ctx *api.Context) (int, error) { events, err := fetchEvents(ctx, t.watchEvents, 1) if err != nil { - return 0, fmt.Errorf("error fetching events: %w", err) + return nil, fmt.Errorf("error fetching events: %w", err) } if len(events) == 0 { - return 0, nil + return nil, nil } + // sync consumers always fetch a single event at a time + event := events[0] + for _, syncConsumer := range t.consumers { - if err := syncConsumer(ctx, events[0]); err != nil { - return 0, fmt.Errorf("error processing sync consumer: %w", err) + if err := syncConsumer(ctx, event); err != nil { + return &event, err } } - // FIXME: When this fails we only roll it back and the attempt is never increased. - // Also, error is never saved. - return len(events), tx.Commit().Error + return &event, tx.Commit().Error } type AsyncEventConsumer struct { @@ -245,13 +270,12 @@ func (t *AsyncEventConsumer) Handle(ctx *api.Context) (int, error) { } failedEvents := t.consumer(ctx, events) - lastAttempt := time.Now() for i := range failedEvents { e := &failedEvents[i] e.Attempts += 1 - e.LastAttempt = &lastAttempt - logger.Errorf("Failed to process event[%s]: %s", e.ID, e.Error) + e.LastAttempt = utils.Ptr(time.Now()) + logger.Errorf("error processing async event (id=%s, name=%s): %s", e.ID, e.Name, e.Error) } if len(failedEvents) > 0 {