Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: notification wait for #1607

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/containrrr/shoutrrr v0.8.0
github.com/fergusstrange/embedded-postgres v1.25.0 // indirect
github.com/flanksource/commons v1.31.2
github.com/flanksource/duty v1.0.754
github.com/flanksource/duty v1.0.755
github.com/flanksource/gomplate/v3 v3.24.39
github.com/flanksource/kopper v1.0.10
github.com/gomarkdown/markdown v0.0.0-20240419095408-642f0ee99ae2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,8 @@ github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z
github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70=
github.com/flanksource/commons v1.31.2 h1:VBhmhmvk6PjhJYuaK8LL+7700E3zPCY03VV/K1BxH64=
github.com/flanksource/commons v1.31.2/go.mod h1:X2txnbNGY6fKQuKLmc7x92FMYjB2MuaqNJOR6vEWDMs=
github.com/flanksource/duty v1.0.754 h1:n0W/4JA5DkR04PbOozqLjPWsm8swA2WMDOYVFH8d7ZI=
github.com/flanksource/duty v1.0.754/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE=
github.com/flanksource/duty v1.0.755 h1:brHyLAqRtX865KdVXVtb5elLWh8A4c8dAftc+/UNeR8=
github.com/flanksource/duty v1.0.755/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.24.39 h1:O763lnNIcTELSMYeIO0dNDfcb3LoZvzU1fr62I4Yxqg=
github.com/flanksource/gomplate/v3 v3.24.39/go.mod h1:0wY/+UPvd7CxmiTBNmzZdWIEOUZAsRkpGY1j5R711O8=
Expand Down
50 changes: 46 additions & 4 deletions notification/job.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package notification

import (
"encoding/json"
"fmt"
"time"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/flanksource/incident-commander/api"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand Down Expand Up @@ -68,23 +70,63 @@ func ProcessPendingNotification(ctx context.Context) (bool, error) {
var payload NotificationEventPayload
payload.FromMap(currentHistory.Payload)

{
// We need to re-evaluate the health of the resource.
// and ensure that the original event matches with the current health before we send out the notification.

// previousHealth is the health that triggered the notification event
originalEvent := models.Event{Name: payload.EventName, CreatedAt: payload.EventCreatedAt}
if len(payload.Properties) > 0 {
if err := json.Unmarshal(payload.Properties, &originalEvent.Properties); err != nil {
return fmt.Errorf("failed to unmarshal properties: %w", err)
}
}

celEnv, err := GetEnvForEvent(ctx, originalEvent)
if err != nil {
return fmt.Errorf("failed to get cel env: %w", err)
}

previousHealth := api.EventToHealth(payload.EventName)

currentHealth, err := celEnv.GetResourceHealth(ctx)
if err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
}

notif, err := GetNotification(ctx, payload.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification: %w", err)
}

if !isHealthReportable(notif.Events, previousHealth, currentHealth) {
ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID)
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSkipped,
}).Error; dberr != nil {
return dberr
}

return nil
}
}

if err := sendPendingNotification(ctx, currentHistory, payload); err != nil {
if dberr := ctx.DB().Debug().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": gorm.Expr("CASE WHEN retries >= ? THEN ? ELSE ? END", ctx.Properties().Int("notification.max-retries", 4)-1, models.NotificationStatusError, models.NotificationStatusPending),
"error": err.Error(),
"retries": gorm.Expr("retries + 1"),
}).Error; dberr != nil {
return err
return ctx.Oops().Join(dberr, err)
}

// return nil
// or else the transaction will be rolled back and there'll be no trace of a failed attempt.
// we return nil or else the transaction will be rolled back and there'll be no trace of a failed attempt.
return nil
} else {
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSent,
}).Error; dberr != nil {
return err
return dberr
}
}

Expand Down
51 changes: 50 additions & 1 deletion notification/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {
n = models.Notification{
ID: uuid.New(),
Name: "wait-for-test",
Events: pq.StringArray([]string{"config.healthy", "config.unhealthy"}),
Events: pq.StringArray([]string{"config.healthy", "config.warning", "config.unhealthy"}),
Source: models.SourceCRD,
Title: "Dummy",
Template: "dummy",
Expand Down Expand Up @@ -471,6 +471,55 @@ var _ = ginkgo.Describe("Notifications", ginkgo.Ordered, func() {
return len(pending) == 0
}, "15s", "1s").Should(BeTrue())
})

ginkgo.It("should have sent out a notification", func() {
var sendHistory []models.NotificationSendHistory
err := DefaultContext.DB().
Where("notification_id = ?", n.ID.String()).
Where("resource_id = ?", config.ID.String()).
Where("source_event = ?", "config.unhealthy").
Find(&sendHistory).Error
Expect(err).To(BeNil())
Expect(len(sendHistory)).To(Equal(1))
})

ginkgo.It("`should not send out a notification`", func() {
{
// Change health to warning & then back to unknown
// This should create 1 notification.send event for the 'warning' health.
// since the health is changed immediately, we shouldn't receive a notification for it.
err := DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).Update("health", models.HealthWarning).Error
Expect(err).To(BeNil())

err = DefaultContext.DB().Model(&models.ConfigItem{}).Where("id = ?", config.ID).Update("health", models.HealthUnknown).Error
Expect(err).To(BeNil())
}

Eventually(func() bool {
events.ConsumeAll(DefaultContext)

var pending []models.NotificationSendHistory
err := DefaultContext.DB().Where("notification_id = ?", n.ID.String()).Where("status = ?", models.NotificationStatusPending).Find(&pending).Error
Expect(err).To(BeNil())

return len(pending) == 1
}, "15s", "1s", "should create a pending notification").Should(BeTrue())

Eventually(func() int {
_, err := notification.ProcessPendingNotification(DefaultContext)
Expect(err).To(BeNil())

var sendHistory []models.NotificationSendHistory
err = DefaultContext.DB().
Where("notification_id = ?", n.ID.String()).
Where("resource_id = ?", config.ID.String()).
Where("source_event = ?", "config.warning").
Where("status = ?", "skipped").
Find(&sendHistory).Error
Expect(err).To(BeNil())
return len(sendHistory)
}, "15s", "1s").Should(Equal(1))
})
})

var _ = ginkgo.Describe("template vailidity", func() {
Expand Down
Loading