Skip to content

Commit

Permalink
refactor: return deleted event count instead of NotFound error
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 20, 2023
1 parent 1452295 commit 0bc9407
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 30 deletions.
24 changes: 10 additions & 14 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,33 @@ func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *event
return consumer
}

func (t *SyncEventConsumer) Handle(ctx *api.Context) error {
func (t *SyncEventConsumer) Handle(ctx *api.Context) (int, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
return 0, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx)

events, err := fetchEvents(ctx, t.watchEvents, 1)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
return 0, fmt.Errorf("error fetching events: %w", err)
}

if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
return 0, nil
}

for _, syncConsumer := range t.consumers {
if err := syncConsumer(ctx, events[0]); err != nil {
return fmt.Errorf("error processing sync consumer: %w", err)
return 0, fmt.Errorf("error processing sync consumer: %w", err)
}
}

// FIXME: When this fails we only roll it back and the attempt is never increased.
// Also, error is never saved.
return tx.Commit().Error
return len(events), tx.Commit().Error
}

type AsyncEventConsumer struct {
Expand All @@ -230,22 +230,18 @@ type AsyncEventConsumer struct {
numConsumers int
}

func (t *AsyncEventConsumer) Handle(ctx *api.Context) error {
func (t *AsyncEventConsumer) Handle(ctx *api.Context) (int, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
return 0, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx)

events, err := fetchEvents(ctx, t.watchEvents, t.batchSize)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
}

if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
return 0, fmt.Errorf("error fetching events: %w", err)
}

failedEvents := t.consumer(ctx, events)
Expand All @@ -265,7 +261,7 @@ func (t *AsyncEventConsumer) Handle(ctx *api.Context) error {
}
}

return tx.Commit().Error
return len(events), tx.Commit().Error
}

func (t AsyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
Expand Down
14 changes: 6 additions & 8 deletions events/eventconsumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
defaultPgNotifyTimeout = time.Minute
)

type EventConsumerFunc func(ctx *api.Context) error
type EventConsumerFunc func(ctx *api.Context) (count int, err error)

type EventConsumer struct {
db *gorm.DB
Expand All @@ -30,8 +30,8 @@ type EventConsumer struct {
// pgNotifyTimeout is the timeout to consume events in case no Consume notification is received.
pgNotifyTimeout time.Duration

// consumerFunc is responsible in fetching the events for the given batch size and events.
// It should return a NotFound error if it cannot find any event to consume.
// consumerFunc is responsible in fetching & consuming the events for the given batch size and events.
// It returns the number of events it fetched.
consumerFunc EventConsumerFunc
}

Expand Down Expand Up @@ -75,14 +75,12 @@ func (e EventConsumer) Validate() error {
// ConsumeEventsUntilEmpty consumes events in a loop until the event queue is empty.
func (t *EventConsumer) ConsumeEventsUntilEmpty(ctx *api.Context) {
for {
err := t.consumerFunc(ctx)
count, err := t.consumerFunc(ctx)
if err != nil {
if api.ErrorCode(err) == api.ENOTFOUND {
return
}

logger.Errorf("error processing event, waiting %s to try again: %v", waitDurationOnFailure, err)
time.Sleep(waitDurationOnFailure)
} else if count == 0 {
return
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions playbook/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func StartPlaybookRunConsumer(db *gorm.DB, pool *pgxpool.Pool) {
Listen(pgNotifyChannel)
}

func EventConsumer(ctx *api.Context) error {
func EventConsumer(ctx *api.Context) (int, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
return 0, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

Expand All @@ -46,16 +46,12 @@ func EventConsumer(ctx *api.Context) error {

var runs []models.PlaybookRun
if err := tx.Raw(query, models.PlaybookRunStatusScheduled).Find(&runs).Error; err != nil {
return err
}

if len(runs) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
return 0, err
}

for i := range runs {
ExecuteRun(ctx, runs[i])
}

return tx.Commit().Error
return len(runs), tx.Commit().Error
}

0 comments on commit 0bc9407

Please sign in to comment.