diff --git a/events/event_queue.go b/events/event_queue.go index 427d40a35..e2b8daf85 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -194,10 +194,10 @@ func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *event return consumer } -func (t *SyncEventConsumer) Handle(ctx *api.Context) error { +func (t *SyncEventConsumer) Handle(ctx *api.Context) (int, error) { tx := ctx.DB().Begin() if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) + return 0, fmt.Errorf("error initiating db tx: %w", tx.Error) } defer tx.Rollback() @@ -205,22 +205,22 @@ func (t *SyncEventConsumer) Handle(ctx *api.Context) error { events, err := fetchEvents(ctx, t.watchEvents, 1) if err != nil { - return fmt.Errorf("error fetching events: %w", err) + return 0, fmt.Errorf("error fetching events: %w", err) } if len(events) == 0 { - return api.Errorf(api.ENOTFOUND, "No events found") + return 0, nil } for _, syncConsumer := range t.consumers { if err := syncConsumer(ctx, events[0]); err != nil { - return fmt.Errorf("error processing sync consumer: %w", err) + return 0, fmt.Errorf("error processing sync consumer: %w", err) } } // FIXME: When this fails we only roll it back and the attempt is never increased. // Also, error is never saved. - return tx.Commit().Error + return len(events), tx.Commit().Error } type AsyncEventConsumer struct { @@ -230,10 +230,10 @@ type AsyncEventConsumer struct { numConsumers int } -func (t *AsyncEventConsumer) Handle(ctx *api.Context) error { +func (t *AsyncEventConsumer) Handle(ctx *api.Context) (int, error) { tx := ctx.DB().Begin() if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) + return 0, fmt.Errorf("error initiating db tx: %w", tx.Error) } defer tx.Rollback() @@ -241,11 +241,7 @@ func (t *AsyncEventConsumer) Handle(ctx *api.Context) error { events, err := fetchEvents(ctx, t.watchEvents, t.batchSize) if err != nil { - return fmt.Errorf("error fetching events: %w", err) - } - - if len(events) == 0 { - return api.Errorf(api.ENOTFOUND, "No events found") + return 0, fmt.Errorf("error fetching events: %w", err) } failedEvents := t.consumer(ctx, events) @@ -265,7 +261,7 @@ func (t *AsyncEventConsumer) Handle(ctx *api.Context) error { } } - return tx.Commit().Error + return len(events), tx.Commit().Error } func (t AsyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { diff --git a/events/eventconsumer/event_consumer.go b/events/eventconsumer/event_consumer.go index 3827921f2..a4c06be9d 100644 --- a/events/eventconsumer/event_consumer.go +++ b/events/eventconsumer/event_consumer.go @@ -18,7 +18,7 @@ const ( defaultPgNotifyTimeout = time.Minute ) -type EventConsumerFunc func(ctx *api.Context) error +type EventConsumerFunc func(ctx *api.Context) (count int, err error) type EventConsumer struct { db *gorm.DB @@ -30,8 +30,8 @@ type EventConsumer struct { // pgNotifyTimeout is the timeout to consume events in case no Consume notification is received. pgNotifyTimeout time.Duration - // consumerFunc is responsible in fetching the events for the given batch size and events. - // It should return a NotFound error if it cannot find any event to consume. + // consumerFunc is responsible in fetching & consuming the events for the given batch size and events. + // It returns the number of events it fetched. consumerFunc EventConsumerFunc } @@ -75,14 +75,12 @@ func (e EventConsumer) Validate() error { // ConsumeEventsUntilEmpty consumes events in a loop until the event queue is empty. func (t *EventConsumer) ConsumeEventsUntilEmpty(ctx *api.Context) { for { - err := t.consumerFunc(ctx) + count, err := t.consumerFunc(ctx) if err != nil { - if api.ErrorCode(err) == api.ENOTFOUND { - return - } - logger.Errorf("error processing event, waiting %s to try again: %v", waitDurationOnFailure, err) time.Sleep(waitDurationOnFailure) + } else if count == 0 { + return } } } diff --git a/playbook/run_consumer.go b/playbook/run_consumer.go index 49bd7b3c1..19b7a6875 100644 --- a/playbook/run_consumer.go +++ b/playbook/run_consumer.go @@ -26,10 +26,10 @@ func StartPlaybookRunConsumer(db *gorm.DB, pool *pgxpool.Pool) { Listen(pgNotifyChannel) } -func EventConsumer(ctx *api.Context) error { +func EventConsumer(ctx *api.Context) (int, error) { tx := ctx.DB().Begin() if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) + return 0, fmt.Errorf("error initiating db tx: %w", tx.Error) } defer tx.Rollback() @@ -46,16 +46,12 @@ func EventConsumer(ctx *api.Context) error { var runs []models.PlaybookRun if err := tx.Raw(query, models.PlaybookRunStatusScheduled).Find(&runs).Error; err != nil { - return err - } - - if len(runs) == 0 { - return api.Errorf(api.ENOTFOUND, "No events found") + return 0, err } for i := range runs { ExecuteRun(ctx, runs[i]) } - return tx.Commit().Error + return len(runs), tx.Commit().Error }