Skip to content

Commit

Permalink
feat: trigger scrape event on wait-for health re-evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Nov 14, 2024
1 parent e720219 commit d812505
Showing 1 changed file with 57 additions and 4 deletions.
61 changes: 57 additions & 4 deletions notification/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,46 @@ func ProcessPendingNotificationsJob(ctx context.Context) *job.Job {
}
}

func ScrapeAndGetHealth(ctx context.Context, configID string) (models.Health, error) {
event := models.Event{
Name: "config-db.incremental-scrape",
Properties: map[string]string{
"config_id": configID,
},
}
if err := ctx.DB().Create(&event).Error; err != nil {
return models.HealthUnknown, err
}

ticker := time.NewTicker(time.Second * 2)
timeout := time.NewTimer(time.Second * 30)

var lastRecordedHealth models.Health
for {
select {
case <-ticker.C:
var result struct {
Health models.Health `json:"health"`
WasUpdated bool `json:"was_updated"`
}
if err := ctx.DB().
Raw(`SELECT health, NOW() - last_scraped_time <= INTERVAL '10 SECONDS' as was_updated FROM config_items WHERE id = ?`, configID).
Find(&result).Error; err != nil {
return models.HealthUnhealthy, err
}

if result.WasUpdated {
return result.Health, nil
} else {
lastRecordedHealth = result.Health
}

case <-timeout.C:
return lastRecordedHealth, nil
}
}
}

func ProcessPendingNotification(ctx context.Context) (bool, error) {
var noMorePending bool

Expand Down Expand Up @@ -89,17 +129,30 @@ func ProcessPendingNotification(ctx context.Context) (bool, error) {

previousHealth := api.EventToHealth(payload.EventName)

currentHealth, err := celEnv.GetResourceHealth(ctx)
if err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
var currentHealth *models.Health

if celEnv.ConfigItem != nil {
if ch, err := ScrapeAndGetHealth(ctx, celEnv.ConfigItem.ID.String()); err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
} else {
currentHealth = &ch
}
}

if currentHealth == nil {
if ch, err := celEnv.GetResourceHealth(ctx); err != nil {
return fmt.Errorf("failed to get resource health from cel env: %w", err)
} else {
currentHealth = &ch
}
}

notif, err := GetNotification(ctx, payload.NotificationID.String())
if err != nil {
return fmt.Errorf("failed to get notification: %w", err)
}

if !isHealthReportable(notif.Events, previousHealth, currentHealth) {
if !isHealthReportable(notif.Events, previousHealth, *currentHealth) {
ctx.Logger.V(6).Infof("skipping notification[%s] as health change is not reportable", notif.ID)
if dberr := ctx.DB().Model(&models.NotificationSendHistory{}).Where("id = ?", currentHistory.ID).UpdateColumns(map[string]any{
"status": models.NotificationStatusSkipped,
Expand Down

0 comments on commit d812505

Please sign in to comment.