Skip to content

Commit

Permalink
fix: error handling in pending notifications processor
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Nov 22, 2024
1 parent 4b8a226 commit a7b420b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 59 deletions.
113 changes: 57 additions & 56 deletions notification/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func ProcessPendingNotificationsJob(ctx context.Context) *job.Job {
Schedule: "@every 30s",
Fn: func(ctx job.JobRuntime) error {
for {
done, err := ProcessPendingNotification(ctx.Context)
done, err := ProcessPendingNotifications(ctx.Context)
if err != nil {
ctx.History.AddErrorf("failed to send pending notification: %v", err)
time.Sleep(2 * time.Second) // prevent spinning on db errors
Expand All @@ -43,7 +43,7 @@ func ProcessPendingNotificationsJob(ctx context.Context) *job.Job {
}
}

func ProcessPendingNotification(ctx context.Context) (bool, error) {
func ProcessPendingNotifications(ctx context.Context) (bool, error) {
var noMorePending bool

err := ctx.DB().Transaction(func(tx *gorm.DB) error {
Expand All @@ -66,72 +66,73 @@ func ProcessPendingNotification(ctx context.Context) (bool, error) {
}

currentHistory := pending[0]
if err := processPendingNotification(ctx, currentHistory); err != nil {
if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending),
"error": err.Error(),
"retries": gorm.Expr("retries + 1"),
}).Error; dberr != nil {
return ctx.Oops().Join(dberr, err)
}
}

var payload NotificationEventPayload
payload.FromMap(currentHistory.Payload)
// we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt.
return nil
})

{
// We need to re-evaluate the health of the resource.
// and ensure that the original event matches with the current health before we send out the notification.
return noMorePending, err
}

// previousHealth is the health that triggered the notification event
originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt}
if len(payload.Properties) > 0 {
if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil {
return fmt.Errorf("failed to unmarshal properties: %w", err)
}
}
func processPendingNotification(ctx context.Context, currentHistory models.NotificationSendHistory) error {
var payload NotificationEventPayload
payload.FromMap(currentHistory.Payload)

celEnv, err := GetEnvForEvent(ctx, originalEvent)
if err != nil {
return fmt.Errorf("failed to get cel env: %w", err)
}
// We need to re-evaluate the health of the resource.
// and ensure that the original event matches with the current health before we send out the notification.

previousHealth := api.EventToHealth(payload.EventName)
originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt}
if len(payload.Properties) > 0 {
if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil {
return fmt.Errorf("failed to unmarshal properties: %w", err)
}
}

currentHealth, err := celEnv.GetResourceHealth(ctx)
if err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
}
celEnv, err := GetEnvForEvent(ctx, originalEvent)
if err != nil {
return fmt.Errorf("failed to get cel env: %w", err)
}

notif, err := GetNotification(ctx, payload.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification: %w", err)
}
// previousHealth is the health that triggered the notification event
previousHealth := api.EventToHealth(payload.EventName)

if !isHealthReportable(notif.Events, previousHealth, currentHealth) {
ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID)
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSkipped,
}).Error; dberr != nil {
return dberr
}

return nil
}
}
currentHealth, err := celEnv.GetResourceHealth(ctx)
if err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
}

if err := sendPendingNotification(ctx, currentHistory, payload); err != nil {
if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending),
"error": err.Error(),
"retries": gorm.Expr("retries + 1"),
}).Error; dberr != nil {
return ctx.Oops().Join(dberr, err)
}
notif, err := GetNotification(ctx, payload.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification: %w", err)
}

// we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt.
return nil
} else {
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSent,
}).Error; dberr != nil {
return dberr
}
if !isHealthReportable(notif.Events, previousHealth, currentHealth) {
ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID)
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSkipped,
}).Error; dberr != nil {
return fmt.Errorf("failed to save notification status as skipped: %w", err)
}

return nil
})
}

return noMorePending, err
if err := sendPendingNotification(ctx, currentHistory, payload); err != nil {
return fmt.Errorf("failed to send notification: %w", err)
} else if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSent,
}).Error; dberr != nil {
return fmt.Errorf("failed to save notification status as sent: %w", err)
}

return nil
}
6 changes: 3 additions & 3 deletions notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {

ginkgo.It("should not consume the event within the delay period", func() {
for i := 0; i < 5; i++ {
_, err := notification.ProcessPendingNotification(DefaultContext)
_, err := notification.ProcessPendingNotifications(DefaultContext)
Expect(err).To(BeNil())

var history models.NotificationSendHistory
Expand All @@ -462,7 +462,7 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {
Eventually(func() bool {
DefaultContext.Logger.V(0).Infof("checking if the delayed notification.send event has been consumed")

_, err := notification.ProcessPendingNotification(DefaultContext)
_, err := notification.ProcessPendingNotifications(DefaultContext)
Expect(err).To(BeNil())

var pending []models.NotificationSendHistory
Expand Down Expand Up @@ -506,7 +506,7 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {
}, "15s", "1s", "should create a pending notification").Should(BeTrue())

Eventually(func() int {
_, err := notification.ProcessPendingNotification(DefaultContext)
_, err := notification.ProcessPendingNotifications(DefaultContext)
Expect(err).To(BeNil())

var sendHistory []models.NotificationSendHistory
Expand Down

0 comments on commit a7b420b

Please sign in to comment.