Skip to content

Commit

Permalink
fix: check health on pending notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Nov 14, 2024
1 parent ebaff8d commit b0c3b71
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ require (
sigs.k8s.io/yaml v1.4.0
)

replace github.com/flanksource/duty => ../duty
// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/gomplate/v3 => ../gomplate

Expand Down
50 changes: 46 additions & 4 deletions notification/job.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package notification

import (
"encoding/json"
"fmt"
"time"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/flanksource/incident-commander/api"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand Down Expand Up @@ -68,23 +70,63 @@ func ProcessPendingNotification(ctx context.Context) (bool, error) {
var payload NotificationEventPayload
payload.FromMap(currentHistory.Payload)

{
// We need to re-evaluate the health of the resource.
// and ensure that the original event matches with the current health before we send out the notification.

// previousHealth is the health that triggered the notification event
originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt}
if len(payload.Properties) > 0 {
if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil {
return fmt.Errorf("failed to unmarshal properties: %w", err)
}
}

celEnv, err := GetEnvForEvent(ctx, originalEvent)
if err != nil {
return fmt.Errorf("failed to get cel env: %w", err)
}

previousHealth := api.EventToHealth(payload.EventName)

currentHealth, err := celEnv.GetResourceHealth(ctx)
if err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
}

notif, err := GetNotification(ctx, payload.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification: %w", err)
}

if !isHealthReportable(notif.Events, previousHealth, currentHealth) {
ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID)
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": "skipped",
}).Error; dberr != nil {
return dberr
}

return nil
}
}

if err := sendPendingNotification(ctx, currentHistory, payload); err != nil {
if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending),
"error": err.Error(),
"retries": gorm.Expr("retries + 1"),
}).Error; dberr != nil {
return err
return ctx.Oops().Join(dberr, err)
}

// return nil
// or else the transaction will be rolled back and there'll be no trace of a failed attempt.
// we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt.
return nil
} else {
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSent,
}).Error; dberr != nil {
return err
return dberr
}
}

Expand Down
15 changes: 9 additions & 6 deletions notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,23 +498,26 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {
Eventually(func() bool {
events.ConsumeAll(DefaultContext)

var count int64
err := DefaultContext.DB().Model(&models.Event{}).Where("properties->>'notification_id' = ?", n.ID.String()).Where("name = 'notification.send'").Count(&count).Error
var pending []models.NotificationSendHistory
err := DefaultContext.DB().Where("notification_id = ?", n.ID.String()).Where("status = ?", models.NotificationStatusPending).Find(&pending).Error
Expect(err).To(BeNil())

return count == 0
}, "15s", "1s").Should(BeTrue())
return len(pending) == 1
}, "15s", "1s", "should create a pending notification").Should(BeTrue())

{
_, err := notification.ProcessPendingNotification(DefaultContext)
Expect(err).To(BeNil())

var sendHistory []models.NotificationSendHistory
err := DefaultContext.DB().
err = DefaultContext.DB().
Where("notification_id = ?", n.ID.String()).
Where("resource_id = ?", config.ID.String()).
Where("source_event = ?", "config.warning").
Where("status = ?", "skipped").
Find(&sendHistory).Error
Expect(err).To(BeNil())
Expect(len(sendHistory)).To(Equal(1))

Check failure on line 520 in notification/notification_test.go

View workflow job for this annotation

GitHub Actions / test

It 11/14/24 05:52:26.454
Expect(sendHistory[0].Status).To(Equal(models.NotificationStatusPending))
}
})
})
Expand Down

0 comments on commit b0c3b71

Please sign in to comment.