diff --git a/go.mod b/go.mod index 5533eac9..6f57c7c8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/containrrr/shoutrrr v0.8.0 github.com/fergusstrange/embedded-postgres v1.25.0 // indirect github.com/flanksource/commons v1.31.2 - github.com/flanksource/duty v1.0.754 + github.com/flanksource/duty v1.0.755 github.com/flanksource/gomplate/v3 v3.24.39 github.com/flanksource/kopper v1.0.10 github.com/gomarkdown/markdown v0.0.0-20240419095408-642f0ee99ae2 diff --git a/go.sum b/go.sum index 7f6c0634..9c68e499 100644 --- a/go.sum +++ b/go.sum @@ -924,8 +924,8 @@ github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= github.com/flanksource/commons v1.31.2 h1:VBhmhmvk6PjhJYuaK8LL+7700E3zPCY03VV/K1BxH64= github.com/flanksource/commons v1.31.2/go.mod h1:X2txnbNGY6fKQuKLmc7x92FMYjB2MuaqNJOR6vEWDMs= -github.com/flanksource/duty v1.0.754 h1:n0W/4JA5DkR04PbOozqLjPWsm8swA2WMDOYVFH8d7ZI= -github.com/flanksource/duty v1.0.754/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE= +github.com/flanksource/duty v1.0.755 h1:brHyLAqRtX865KdVXVtb5elLWh8A4c8dAftc+/UNeR8= +github.com/flanksource/duty v1.0.755/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.24.39 h1:O763lnNIcTELSMYeIO0dNDfcb3LoZvzU1fr62I4Yxqg= github.com/flanksource/gomplate/v3 v3.24.39/go.mod h1:0wY/+UPvd7CxmiTBNmzZdWIEOUZAsRkpGY1j5R711O8= diff --git a/notification/job.go b/notification/job.go index 66b486f7..e94d0b42 100644 --- a/notification/job.go +++ b/notification/job.go @@ -1,12 +1,14 @@ package notification import ( + "encoding/json" "fmt" "time" "github.com/flanksource/duty/context" "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" + "github.com/flanksource/incident-commander/api" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -68,23 +70,63 @@ func ProcessPendingNotification(ctx context.Context) (bool, error) { var payload NotificationEventPayload payload.FromMap(currentHistory.Payload) + { + // We need to re-evaluate the health of the resource. + // and ensure that the original event matches with the current health before we send out the notification. + + // previousHealth is the health that triggered the notification event + originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt} + if len(payload.Properties) > 0 { + if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil { + return fmt.Errorf("failed to unmarshal properties: %w", err) + } + } + + celEnv, err := GetEnvForEvent(ctx, originalEvent) + if err != nil { + return fmt.Errorf("failed to get cel env: %w", err) + } + + previousHealth := api.EventToHealth(payload.EventName) + + currentHealth, err := celEnv.GetResourceHealth(ctx) + if err != nil { + return fmt.Errorf("failed to get resource health from cel env: %w", err) + } + + notif, err := GetNotification(ctx, payload.NotificationID.String()) + if err != nil { + return fmt.Errorf("failed to get notification: %w", err) + } + + if !isHealthReportable(notif.Events, previousHealth, currentHealth) { + ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID) + if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{ + "status": models.NotificationStatusSkipped, + }).Error; dberr != nil { + return dberr + } + + return nil + } + } + if err := sendPendingNotification(ctx, currentHistory, payload); err != nil { if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{ "status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending), "error": err.Error(), "retries": gorm.Expr("retries + 1"), }).Error; dberr != nil { - return err + return ctx.Oops().Join(dberr, err) } - // return nil - // or else the transaction will be rolled back and there'll be no trace of a failed attempt. + // we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt. return nil } else { if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{ "status": models.NotificationStatusSent, }).Error; dberr != nil { - return err + return dberr } } diff --git a/notification/notification_test.go b/notification/notification_test.go index b2ab4176..9e002e8d 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -396,7 +396,7 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() { n = models.Notification{ ID: uuid.New(), Name: "wait-for-test", - Events: pq.StringArray([]string{"config.healthy", "config.unhealthy"}), + Events: pq.StringArray([]string{"config.healthy", "config.warning", "config.unhealthy"}), Source: models.SourceCRD, Title: "Dummy", Template: "dummy", @@ -471,6 +471,55 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() { return len(pending) == 0 }, "15s", "1s").Should(BeTrue()) }) + + ginkgo.It("should have sent out a notification", func() { + var sendHistory []models.NotificationSendHistory + err := DefaultContext.DB(). + Where("notification_id = ?", n.ID.String()). + Where("resource_id = ?", config.ID.String()). + Where("source_event = ?", "config.unhealthy"). + Find(&sendHistory).Error + Expect(err).To(BeNil()) + Expect(len(sendHistory)).To(Equal(1)) + }) + + ginkgo.It("`should not send out a notification`", func() { + { + // Change health to warning & then back to unknown + // This should create 1 notification.send event for the 'warning' health. + // since the health is changed immediately, we shouldn't receive a notification for it. + err := DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).Update("health", models.HealthWarning).Error + Expect(err).To(BeNil()) + + err = DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).Update("health", models.HealthUnknown).Error + Expect(err).To(BeNil()) + } + + Eventually(func() bool { + events.ConsumeAll(DefaultContext) + + var pending []models.NotificationSendHistory + err := DefaultContext.DB().Where("notification_id = ?", n.ID.String()).Where("status = ?", models.NotificationStatusPending).Find(&pending).Error + Expect(err).To(BeNil()) + + return len(pending) == 1 + }, "15s", "1s", "should create a pending notification").Should(BeTrue()) + + Eventually(func() int { + _, err := notification.ProcessPendingNotification(DefaultContext) + Expect(err).To(BeNil()) + + var sendHistory []models.NotificationSendHistory + err = DefaultContext.DB(). + Where("notification_id = ?", n.ID.String()). + Where("resource_id = ?", config.ID.String()). + Where("source_event = ?", "config.warning"). + Where("status = ?", "skipped"). + Find(&sendHistory).Error + Expect(err).To(BeNil()) + return len(sendHistory) + }, "15s", "1s").Should(Equal(1)) + }) }) var _ = ginkgo.Describe("template vailidity", func() {