From c371eaf0bc0757d0c4cca829f6023d1eddea94c5 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 1 Sep 2023 16:14:48 +0545 Subject: [PATCH] refactor: event consumer (#520) * feat: separate consumer for sync and async * fix: process sync consumers in a separate transaction * fix: skip auth to /kratos/* and return 401 if session id is not a UUID. * feat: add new sync handlers for playbook events * chore: set custom pgnotify timeout for event consumer. --- api/event.go | 2 +- api/global.go | 5 + api/responders.go | 1 + auth/middleware.go | 9 +- cmd/server.go | 5 +- db/playbooks.go | 11 - events/checks.go | 8 + events/components.go | 14 ++ events/event_consumer.go | 150 -------------- events/event_queue.go | 268 +++++++++++++++++++++++++ events/eventconsumer/event_consumer.go | 135 +++++++++++++ events/events.go | 75 ------- events/notifications.go | 81 ++++---- events/playbook.go | 81 ++++++++ events/responder.go | 127 +++++++++--- events/teams.go | 28 +-- events/upstream_push.go | 14 +- events/upstream_test.go | 12 +- go.mod | 16 +- go.sum | 42 ++-- jobs/jobs.go | 4 +- notification/notification_test.go | 10 +- playbook/playbook_test.go | 13 +- playbook/queue_consumer.go | 157 --------------- playbook/run_consumer.go | 51 +++++ 25 files changed, 773 insertions(+), 546 deletions(-) create mode 100644 events/checks.go create mode 100644 events/components.go delete mode 100644 events/event_consumer.go create mode 100644 events/event_queue.go create mode 100644 events/eventconsumer/event_consumer.go delete mode 100644 events/events.go create mode 100644 events/playbook.go delete mode 100644 playbook/queue_consumer.go create mode 100644 playbook/run_consumer.go diff --git a/api/event.go b/api/event.go index 259a138d7..36fe12011 100644 --- a/api/event.go +++ b/api/event.go @@ -8,7 +8,7 @@ import ( ) type Event struct { - ID uuid.UUID `json:"id"` + ID uuid.UUID `gorm:"default:generate_ulid()"` Name string `json:"name"` Properties types.JSONStringMap `json:"properties"` Error string `json:"error"` diff --git a/api/global.go b/api/global.go index 8ee529f46..f5b07030f 100644 --- a/api/global.go +++ b/api/global.go @@ -67,6 +67,11 @@ func NewContext(db *gorm.DB, echoCtx EchoContext) *Context { return c } +func (c Context) WithDB(db *gorm.DB) *Context { + c.db = db + return &c +} + func (c *Context) DB() *gorm.DB { if c.db == nil { return nil diff --git a/api/responders.go b/api/responders.go index 6a16d2d98..1f9227334 100644 --- a/api/responders.go +++ b/api/responders.go @@ -21,6 +21,7 @@ type IncidentResponders struct { type Responder struct { ID uuid.UUID `json:"id,omitempty"` + Type string `json:"type"` Properties types.JSONStringMap `json:"properties" gorm:"type:jsonstringmap;<-:false"` ExternalID string `json:"external_id,omitempty"` IncidentID uuid.UUID `json:"incident_id,omitempty"` diff --git a/auth/middleware.go b/auth/middleware.go index e3c794959..90178c001 100644 --- a/auth/middleware.go +++ b/auth/middleware.go @@ -61,7 +61,7 @@ func (k *KratosHandler) KratosMiddleware() (*kratosMiddleware, error) { }, nil } -var skipAuthPaths = []string{"/health", "/metrics"} +var skipAuthPaths = []string{"/health", "/metrics", "/kratos/*"} func canSkipAuth(c echo.Context) bool { return collections.Contains(skipAuthPaths, c.Path()) @@ -102,7 +102,12 @@ func (k *kratosMiddleware) Session(next echo.HandlerFunc) echo.HandlerFunc { email = e } } - ctx.WithUser(&api.ContextUser{ID: uuid.MustParse(session.Identity.GetId()), Email: email}) + + if uid, err := uuid.Parse(session.Identity.GetId()); err != nil { + return c.String(http.StatusUnauthorized, "Unauthorized") + } else { + ctx.WithUser(&api.ContextUser{ID: uid, Email: email}) + } return next(ctx) } diff --git a/cmd/server.go b/cmd/server.go index 52b11a5c9..396a35855 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -231,12 +231,11 @@ var Serve = &cobra.Command{ go jobs.Start() - events.StartConsumers(db.Gorm, events.Config{ + events.StartConsumers(db.Gorm, db.Pool, events.Config{ UpstreamPush: api.UpstreamConf, }) - playbookRunConsumer := playbook.NewQueueConsumer(db.Gorm, db.Pool) - go playbookRunConsumer.Listen() + go playbook.StartPlaybookRunConsumer(db.Gorm, db.Pool) go launchKopper() diff --git a/db/playbooks.go b/db/playbooks.go index 79c82a5f0..f615f1f46 100644 --- a/db/playbooks.go +++ b/db/playbooks.go @@ -106,17 +106,6 @@ func FindPlaybooksForComponent(ctx *api.Context, configType string, tags map[str return playbooks, err } -// GetScheduledPlaybookRuns returns all the scheduled playbook runs that are scheduled to run now -// or before now. -func GetScheduledPlaybookRuns(ctx *api.Context, limit int, exceptions ...uuid.UUID) ([]models.PlaybookRun, error) { - var runs []models.PlaybookRun - if err := ctx.DB().Not(exceptions).Where("start_time <= NOW()").Where("status = ?", models.PlaybookRunStatusScheduled).Limit(limit).Order("start_time").Find(&runs).Error; err != nil { - return nil, err - } - - return runs, nil -} - func PersistPlaybookFromCRD(obj *v1.Playbook) error { specJSON, err := json.Marshal(obj.Spec) if err != nil { diff --git a/events/checks.go b/events/checks.go new file mode 100644 index 000000000..461339ab9 --- /dev/null +++ b/events/checks.go @@ -0,0 +1,8 @@ +package events + +func NewCheckConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventCheckPassed, EventCheckFailed}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun}, + } +} diff --git a/events/components.go b/events/components.go new file mode 100644 index 000000000..1842bbef9 --- /dev/null +++ b/events/components.go @@ -0,0 +1,14 @@ +package events + +func NewComponentConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{ + EventComponentStatusError, + EventComponentStatusHealthy, + EventComponentStatusInfo, + EventComponentStatusUnhealthy, + EventComponentStatusWarning, + }, + consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun}, + } +} diff --git a/events/event_consumer.go b/events/event_consumer.go deleted file mode 100644 index d803c26ee..000000000 --- a/events/event_consumer.go +++ /dev/null @@ -1,150 +0,0 @@ -package events - -import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/flanksource/commons/logger" - "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/db" - "github.com/flanksource/incident-commander/utils" - "gorm.io/gorm" -) - -type EventConsumer struct { - WatchEvents []string - // We process mutliple events and return the failed events - ProcessBatchFunc func(*api.Context, []api.Event) []api.Event - BatchSize int - Consumers int - DB *gorm.DB -} - -func (e EventConsumer) Validate() error { - if e.BatchSize <= 0 { - return fmt.Errorf("BatchSize:%d <= 0", e.BatchSize) - } - if e.Consumers <= 0 { - return fmt.Errorf("consumers:%d <= 0", e.BatchSize) - } - if len(e.WatchEvents) == 0 { - return fmt.Errorf("no events registered to watch:%d <= 0", len(e.WatchEvents)) - } - return nil -} - -func (t *EventConsumer) consumeEvents() error { - tx := t.DB.Begin() - if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) - } - - ctx := api.NewContext(tx, nil) - - const selectEventsQuery = ` - DELETE FROM event_queue - WHERE id IN ( - SELECT id FROM event_queue - WHERE - attempts <= @maxAttempts AND - name IN @events AND - (last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponential)) - ORDER BY priority DESC, created_at ASC - FOR UPDATE SKIP LOCKED - LIMIT @batchSize - ) - RETURNING * - ` - - var events []api.Event - vals := map[string]any{ - "maxAttempts": eventMaxAttempts, - "events": t.WatchEvents, - "batchSize": t.BatchSize, - "baseDelay": 60, // in seconds - "exponential": 5, // along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes) - } - err := tx.Raw(selectEventsQuery, vals).Scan(&events).Error - if err != nil { - // Rollback the transaction in case of errors to prevent - // creating dangling connections and to release the locks - tx.Rollback() - return err - } - - if len(events) == 0 { - // Commit the transaction in case of no records found to prevent - // creating dangling connections and to release the locks - tx.Commit() - return gorm.ErrRecordNotFound - } - - failedEvents := t.ProcessBatchFunc(ctx, events) - for i := range failedEvents { - e := &failedEvents[i] - e.Attempts += 1 - last_attempt := time.Now() - e.LastAttempt = &last_attempt - logger.Errorf("Failed to process event[%s]: %s", e.ID, e.Error) - } - - if len(failedEvents) > 0 { - if err := tx.Create(failedEvents).Error; err != nil { - // TODO: More robust way to handle failed event insertion failures - logger.Errorf("Error inserting into table:event_queue with error:%v. %v", err) - } - } - return tx.Commit().Error -} - -// ConsumeEventsUntilEmpty consumes events forever until the event queue is empty. -func (t *EventConsumer) ConsumeEventsUntilEmpty() { - consumerFunc := func(wg *sync.WaitGroup) { - for { - err := t.consumeEvents() - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - wg.Done() - return - } else { - logger.Errorf("error processing event, waiting 60s to try again %v", err) - time.Sleep(waitDurationOnFailure) - } - } - } - } - - var wg sync.WaitGroup - for i := 0; i < t.Consumers; i++ { - wg.Add(1) - go consumerFunc(&wg) - } - wg.Wait() -} - -func (e *EventConsumer) Listen() { - logger.Infof("Started listening for database notify events: %v", e.WatchEvents) - - if err := e.Validate(); err != nil { - logger.Fatalf("Error starting event consumer: %v", err) - return - } - - // Consume pending events - e.ConsumeEventsUntilEmpty() - - pgNotify := make(chan string) - go utils.ListenToPostgresNotify(db.Pool, "event_queue_updates", dbReconnectMaxDuration, dbReconnectBackoffBaseDuration, pgNotify) - - for { - select { - case <-pgNotify: - e.ConsumeEventsUntilEmpty() - - case <-time.After(pgNotifyTimeout): - e.ConsumeEventsUntilEmpty() - } - } -} diff --git a/events/event_queue.go b/events/event_queue.go new file mode 100644 index 000000000..eb5d350fb --- /dev/null +++ b/events/event_queue.go @@ -0,0 +1,268 @@ +package events + +import ( + "fmt" + "time" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/upstream" + "github.com/flanksource/incident-commander/api" + "github.com/flanksource/incident-commander/events/eventconsumer" + "github.com/jackc/pgx/v5/pgxpool" + "gorm.io/gorm" +) + +const ( + // eventMaxAttempts is the maximum number of attempts to process an event in `event_queue` + eventMaxAttempts = 3 + + // eventQueueUpdateChannel is the channel on which new events on the `event_queue` table + // are notified. + eventQueueUpdateChannel = "event_queue_updates" +) + +type ( + // AsyncEventHandlerFunc processes multiple events and returns the failed ones + AsyncEventHandlerFunc func(*api.Context, []api.Event) []api.Event + + // SyncEventHandlerFunc processes a single event and ONLY makes db changes. + SyncEventHandlerFunc func(*api.Context, api.Event) error +) + +// List of all sync events in the `event_queue` table. +// +// These events are generated by the database in response to updates on some of the tables. +const ( + EventTeamUpdate = "team.update" + EventTeamDelete = "team.delete" + + EventCheckPassed = "check.passed" + EventCheckFailed = "check.failed" + + EventComponentStatusHealthy = "component.status.healthy" + EventComponentStatusUnhealthy = "component.status.unhealthy" + EventComponentStatusInfo = "component.status.info" + EventComponentStatusWarning = "component.status.warning" + EventComponentStatusError = "component.status.error" + + EventNotificationUpdate = "notification.update" + EventNotificationDelete = "notification.delete" + + EventPlaybookSpecApprovalUpdated = "playbook.spec.approval.updated" + + EventPlaybookApprovalInserted = "playbook.approval.inserted" + + EventIncidentCommentAdded = "incident.comment.added" + EventIncidentCreated = "incident.created" + EventIncidentDODAdded = "incident.dod.added" + EventIncidentDODPassed = "incident.dod.passed" + EventIncidentDODRegressed = "incident.dod.regressed" + EventIncidentResponderAdded = "incident.responder.added" + EventIncidentResponderRemoved = "incident.responder.removed" + EventIncidentStatusCancelled = "incident.status.cancelled" + EventIncidentStatusClosed = "incident.status.closed" + EventIncidentStatusInvestigating = "incident.status.investigating" + EventIncidentStatusMitigated = "incident.status.mitigated" + EventIncidentStatusOpen = "incident.status.open" + EventIncidentStatusResolved = "incident.status.resolved" +) + +// List of async events. +// +// Async events require the handler to talk to 3rd party services. +// They are not determinant and cannot be reliably rolled back and retried. +// +// They are mostly generated by the application itself from sync consumers in response +// to a sync event. +// Or, they could also be generated by the database. +const ( + EventPushQueueCreate = "push_queue.create" + + EventNotificationSend = "notification.send" + + EventJiraResponderAdded = "incident.responder.jira.added" + EventJiraCommentAdded = "incident.comment.jira.added" + + EventMSPlannerResponderAdded = "incident.responder.msplanner.added" + EventMSPlannerCommentAdded = "incident.comment.msplanner.added" +) + +type Config struct { + UpstreamPush upstream.UpstreamConfig +} + +func StartConsumers(gormDB *gorm.DB, pgpool *pgxpool.Pool, config Config) { + uniqEvents := make(map[string]struct{}) + allSyncHandlers := []SyncEventConsumer{ + NewTeamConsumerSync(), + NewCheckConsumerSync(), + NewComponentConsumerSync(), + NewResponderConsumerSync(), + NewCommentConsumerSync(), + NewNotificationSaveConsumerSync(), + NewNotificationUpdatesConsumerSync(), + NewPlaybookApprovalConsumerSync(), + NewPlaybookApprovalSpecUpdatedConsumerSync(), + } + + for i := range allSyncHandlers { + for _, event := range allSyncHandlers[i].watchEvents { + if _, ok := uniqEvents[event]; ok { + logger.Fatalf("Watch event %s is duplicated", event) + } + + uniqEvents[event] = struct{}{} + } + + go allSyncHandlers[i].EventConsumer(gormDB, pgpool).Listen() + } + + asyncConsumers := []AsyncEventConsumer{ + NewNotificationSendConsumerAsync(), + NewResponderConsumerAsync(), + } + if config.UpstreamPush.Valid() { + asyncConsumers = append(asyncConsumers, NewUpstreamPushConsumerAsync(config)) + } + + for i := range asyncConsumers { + for _, event := range asyncConsumers[i].watchEvents { + if _, ok := uniqEvents[event]; ok { + logger.Fatalf("Watch event %s is duplicated", event) + } + + uniqEvents[event] = struct{}{} + } + + go asyncConsumers[i].EventConsumer(gormDB, pgpool).Listen() + } +} + +// fetchEvents fetches given watch events from the `event_queue` table. +func fetchEvents(ctx *api.Context, watchEvents []string, batchSize int) ([]api.Event, error) { + const selectEventsQuery = ` + DELETE FROM event_queue + WHERE id IN ( + SELECT id FROM event_queue + WHERE + attempts <= @maxAttempts AND + name IN @events AND + (last_attempt IS NULL OR last_attempt <= NOW() - INTERVAL '1 SECOND' * @baseDelay * POWER(attempts, @exponential)) + ORDER BY priority DESC, created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT @batchSize + ) + RETURNING * + ` + + var events []api.Event + vals := map[string]any{ + "maxAttempts": eventMaxAttempts, + "events": watchEvents, + "batchSize": batchSize, + "baseDelay": 60, // in seconds + "exponential": 5, // along with baseDelay = 60, the retries are 1, 6, 31, 156 (in minutes) + } + err := ctx.DB().Raw(selectEventsQuery, vals).Scan(&events).Error + if err != nil { + return nil, fmt.Errorf("error selecting events: %w", err) + } + + return events, nil +} + +type SyncEventConsumer struct { + watchEvents []string + consumers []SyncEventHandlerFunc + numConsumers int +} + +func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { + consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle) + if t.numConsumers > 0 { + consumer = consumer.WithNumConsumers(t.numConsumers) + } + + return consumer +} + +func (t *SyncEventConsumer) Handle(ctx *api.Context) error { + tx := ctx.DB().Begin() + if tx.Error != nil { + return fmt.Errorf("error initiating db tx: %w", tx.Error) + } + defer tx.Rollback() + + ctx = ctx.WithDB(tx) + + events, err := fetchEvents(ctx, t.watchEvents, 1) + if err != nil { + return fmt.Errorf("error fetching events: %w", err) + } + + if len(events) == 0 { + return api.Errorf(api.ENOTFOUND, "No events found") + } + + for _, syncConsumer := range t.consumers { + if err := syncConsumer(ctx, events[0]); err != nil { + return fmt.Errorf("error processing sync consumer: %w", err) + } + } + + return tx.Commit().Error +} + +type AsyncEventConsumer struct { + watchEvents []string + batchSize int + consumer AsyncEventHandlerFunc + numConsumers int +} + +func (t *AsyncEventConsumer) Handle(ctx *api.Context) error { + tx := ctx.DB().Begin() + if tx.Error != nil { + return fmt.Errorf("error initiating db tx: %w", tx.Error) + } + defer tx.Rollback() + + ctx = ctx.WithDB(tx) + + events, err := fetchEvents(ctx, t.watchEvents, t.batchSize) + if err != nil { + return fmt.Errorf("error fetching events: %w", err) + } + + if len(events) == 0 { + return api.Errorf(api.ENOTFOUND, "No events found") + } + + failedEvents := t.consumer(ctx, events) + lastAttempt := time.Now() + + for i := range failedEvents { + e := &failedEvents[i] + e.Attempts += 1 + e.LastAttempt = &lastAttempt + logger.Errorf("Failed to process event[%s]: %s", e.ID, e.Error) + } + + if len(failedEvents) > 0 { + if err := tx.Create(failedEvents).Error; err != nil { + // TODO: More robust way to handle failed event insertion failures + logger.Errorf("Error inserting into table:event_queue with error:%v. %v", err) + } + } + + return tx.Commit().Error +} + +func (t AsyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { + consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle) + if t.numConsumers > 0 { + consumer = consumer.WithNumConsumers(t.numConsumers) + } + + return consumer +} diff --git a/events/eventconsumer/event_consumer.go b/events/eventconsumer/event_consumer.go new file mode 100644 index 000000000..e22b368d7 --- /dev/null +++ b/events/eventconsumer/event_consumer.go @@ -0,0 +1,135 @@ +package eventconsumer + +import ( + "fmt" + "sync" + "time" + + "github.com/flanksource/commons/logger" + "github.com/flanksource/incident-commander/api" + "github.com/flanksource/incident-commander/utils" + "github.com/jackc/pgx/v5/pgxpool" + "gorm.io/gorm" +) + +const ( + // waitDurationOnFailure is the duration to wait before attempting to consume events again + // after an unexpected failure. + waitDurationOnFailure = time.Second * 5 + + defaultPgNotifyTimeout = time.Minute + + dbReconnectMaxDuration = time.Minute * 5 + dbReconnectBackoffBaseDuration = time.Second +) + +type EventConsumerFunc func(ctx *api.Context) error + +type EventConsumer struct { + db *gorm.DB + pgPool *pgxpool.Pool + + // Number of concurrent consumers + numConsumers int + + // pgNotifyTimeout is the timeout to consume events in case no Consume notification is received. + pgNotifyTimeout time.Duration + + // pgNotifyChannel is the channel to listen to for any new updates on the event queue + pgNotifyChannel string + + // consumerFunc is responsible in fetching the events for the given batch size and events. + // It should return a NotFound error if it cannot find any event to consume. + consumerFunc EventConsumerFunc +} + +// New returns a new EventConsumer +func New(DB *gorm.DB, PGPool *pgxpool.Pool, PgNotifyChannel string, ConsumerFunc EventConsumerFunc) *EventConsumer { + return &EventConsumer{ + numConsumers: 1, + db: DB, + pgPool: PGPool, + pgNotifyChannel: PgNotifyChannel, + consumerFunc: ConsumerFunc, + pgNotifyTimeout: defaultPgNotifyTimeout, + } +} + +func (e *EventConsumer) WithNumConsumers(numConsumers int) *EventConsumer { + e.numConsumers = numConsumers + return e +} + +func (e *EventConsumer) WithNotifyTimeout(timeout time.Duration) *EventConsumer { + e.pgNotifyTimeout = timeout + return e +} + +func (e EventConsumer) Validate() error { + if e.numConsumers <= 0 { + return fmt.Errorf("consumers:%d <= 0", e.numConsumers) + } + if e.pgNotifyChannel == "" { + return fmt.Errorf("pgNotifyChannel is empty") + } + if e.consumerFunc == nil { + return fmt.Errorf("consumerFunc is empty") + } + if e.db == nil { + return fmt.Errorf("DB is nil") + } + if e.pgPool == nil { + return fmt.Errorf("PGPool is nil") + } + return nil +} + +// ConsumeEventsUntilEmpty consumes events forever until the event queue is empty. +func (t *EventConsumer) ConsumeEventsUntilEmpty(ctx *api.Context) { + consumerFunc := func(wg *sync.WaitGroup) { + for { + err := t.consumerFunc(ctx) + if err != nil { + if api.ErrorCode(err) == api.ENOTFOUND { + wg.Done() + return + } + + logger.Errorf("error processing event, waiting %s to try again: %v", waitDurationOnFailure, err) + time.Sleep(waitDurationOnFailure) + } + } + } + + var wg sync.WaitGroup + for i := 0; i < t.numConsumers; i++ { + wg.Add(1) + go consumerFunc(&wg) + } + wg.Wait() +} + +func (e *EventConsumer) Listen() { + if err := e.Validate(); err != nil { + logger.Fatalf("error starting event consumer: %v", err) + return + } + + ctx := api.NewContext(e.db, nil) + + // Consume pending events + e.ConsumeEventsUntilEmpty(ctx) + + pgNotify := make(chan string) + go utils.ListenToPostgresNotify(e.pgPool, e.pgNotifyChannel, dbReconnectMaxDuration, dbReconnectBackoffBaseDuration, pgNotify) + + for { + select { + case <-pgNotify: + e.ConsumeEventsUntilEmpty(ctx) + + case <-time.After(e.pgNotifyTimeout): + e.ConsumeEventsUntilEmpty(ctx) + } + } +} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 4dd16a5bf..000000000 --- a/events/events.go +++ /dev/null @@ -1,75 +0,0 @@ -package events - -import ( - "time" - - "github.com/flanksource/commons/logger" - "github.com/flanksource/duty/upstream" - "gorm.io/gorm" - - "github.com/flanksource/commons/collections/set" -) - -const ( - EventTeamUpdate = "team.update" - EventTeamDelete = "team.delete" - EventNotificationSend = "notification.send" - - EventNotificationUpdate = "notification.update" - EventNotificationDelete = "notification.delete" - - EventCheckPassed = "check.passed" - EventCheckFailed = "check.failed" - - EventIncidentCreated = "incident.created" - EventIncidentResponderAdded = "incident.responder.added" - EventIncidentResponderRemoved = "incident.responder.removed" - EventIncidentCommentAdded = "incident.comment.added" - EventIncidentDODAdded = "incident.dod.added" - EventIncidentDODPassed = "incident.dod.passed" - EventIncidentDODRegressed = "incident.dod.regressed" - EventIncidentStatusOpen = "incident.status.open" - EventIncidentStatusClosed = "incident.status.closed" - EventIncidentStatusMitigated = "incident.status.mitigated" - EventIncidentStatusResolved = "incident.status.resolved" - EventIncidentStatusInvestigating = "incident.status.investigating" - EventIncidentStatusCancelled = "incident.status.cancelled" - - EventPushQueueCreate = "push_queue.create" -) - -const ( - eventMaxAttempts = 3 - waitDurationOnFailure = time.Minute - pgNotifyTimeout = time.Minute - - dbReconnectMaxDuration = time.Minute * 5 - dbReconnectBackoffBaseDuration = time.Second -) - -type Config struct { - UpstreamPush upstream.UpstreamConfig -} - -func StartConsumers(gormDB *gorm.DB, config Config) { - allConsumers := []EventConsumer{ - NewTeamConsumer(gormDB), - NewNotificationConsumer(gormDB), - NewNotificationSendConsumer(gormDB), - NewResponderConsumer(gormDB), - } - if config.UpstreamPush.Valid() { - allConsumers = append(allConsumers, NewUpstreamPushConsumer(gormDB, config)) - } - - uniqWatchEvents := set.New[string]() - for i, c := range allConsumers { - for _, we := range c.WatchEvents { - if uniqWatchEvents.Contains(we) { - logger.Fatalf("Error starting consumers: event[%s] has multiple consumers", we) - } - } - uniqWatchEvents.Add(c.WatchEvents...) - go allConsumers[i].Listen() - } -} diff --git a/events/notifications.go b/events/notifications.go index 3d5b02f6c..629448b30 100644 --- a/events/notifications.go +++ b/events/notifications.go @@ -12,42 +12,52 @@ import ( "github.com/flanksource/incident-commander/api" pkgNotification "github.com/flanksource/incident-commander/notification" "github.com/flanksource/incident-commander/teams" - "github.com/google/uuid" - "gorm.io/gorm" ) -func NewNotificationConsumer(db *gorm.DB) EventConsumer { - return EventConsumer{ - WatchEvents: []string{ - EventNotificationUpdate, EventNotificationDelete, +func NewNotificationUpdatesConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventNotificationUpdate, EventNotificationDelete}, + consumers: []SyncEventHandlerFunc{ + handleNotificationUpdates, + }, + } +} + +func NewNotificationSaveConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{ EventIncidentCreated, + EventIncidentDODAdded, + EventIncidentDODPassed, + EventIncidentDODRegressed, EventIncidentResponderRemoved, - EventIncidentDODAdded, EventIncidentDODPassed, EventIncidentDODRegressed, - EventIncidentStatusOpen, EventIncidentStatusClosed, EventIncidentStatusMitigated, - EventIncidentStatusResolved, EventIncidentStatusInvestigating, EventIncidentStatusCancelled, - EventCheckPassed, EventCheckFailed, + EventIncidentStatusCancelled, + EventIncidentStatusClosed, + EventIncidentStatusInvestigating, + EventIncidentStatusMitigated, + EventIncidentStatusOpen, + EventIncidentStatusResolved, + }, + numConsumers: 3, + consumers: []SyncEventHandlerFunc{ + addNotificationEvent, }, - ProcessBatchFunc: processNotificationEvents, - BatchSize: 1, - Consumers: 1, - DB: db, } } -func NewNotificationSendConsumer(db *gorm.DB) EventConsumer { - return EventConsumer{ - WatchEvents: []string{EventNotificationSend}, - ProcessBatchFunc: processNotificationEvents, - BatchSize: 1, - Consumers: 5, - DB: db, +func NewNotificationSendConsumerAsync() AsyncEventConsumer { + return AsyncEventConsumer{ + watchEvents: []string{EventNotificationSend}, + consumer: processNotificationEvents, + batchSize: 1, + numConsumers: 5, } } func processNotificationEvents(ctx *api.Context, events []api.Event) []api.Event { var failedEvents []api.Event for _, e := range events { - if err := handleNotificationEvent(ctx, e); err != nil { + if err := sendNotification(ctx, e); err != nil { e.Error = err.Error() failedEvents = append(failedEvents, e) } @@ -55,24 +65,6 @@ func processNotificationEvents(ctx *api.Context, events []api.Event) []api.Event return failedEvents } -func handleNotificationEvent(ctx *api.Context, event api.Event) error { - switch event.Name { - case EventNotificationDelete, EventNotificationUpdate: - return handleNotificationUpdates(ctx, event) - case EventNotificationSend: - return sendNotification(ctx, event) - case EventIncidentCreated, EventIncidentResponderRemoved, - EventIncidentDODAdded, EventIncidentDODPassed, - EventIncidentDODRegressed, EventIncidentStatusOpen, - EventIncidentStatusClosed, EventIncidentStatusMitigated, - EventIncidentStatusResolved, EventIncidentStatusInvestigating, EventIncidentStatusCancelled, - EventCheckFailed, EventCheckPassed: - return addNotificationEvent(ctx, event) - default: - return fmt.Errorf("Unrecognized event name: %s", event.Name) - } -} - type NotificationEventProperties struct { ID string `json:"id"` // Resource id. depends what it is based on the original event. EventName string `json:"event_name"` // The name of the original event this notification is for. @@ -234,8 +226,7 @@ func addNotificationEvent(ctx *api.Context, event api.Event) error { PersonID: n.PersonID.String(), } - newEvent := api.Event{ - ID: uuid.New(), + newEvent := &api.Event{ Name: EventNotificationSend, Properties: prop.AsMap(), } @@ -269,8 +260,7 @@ func addNotificationEvent(ctx *api.Context, event api.Event) error { NotificationName: cn.Name, } - newEvent := api.Event{ - ID: uuid.New(), + newEvent := &api.Event{ Name: EventNotificationSend, Properties: prop.AsMap(), } @@ -293,8 +283,7 @@ func addNotificationEvent(ctx *api.Context, event api.Event) error { NotificationName: cn.Name, } - newEvent := api.Event{ - ID: uuid.New(), + newEvent := &api.Event{ Name: EventNotificationSend, Properties: prop.AsMap(), } diff --git a/events/playbook.go b/events/playbook.go new file mode 100644 index 000000000..a73d750a0 --- /dev/null +++ b/events/playbook.go @@ -0,0 +1,81 @@ +package events + +import ( + "encoding/json" + + "github.com/flanksource/duty/models" + "github.com/flanksource/incident-commander/api" + v1 "github.com/flanksource/incident-commander/api/v1" + "github.com/flanksource/incident-commander/db" +) + +func NewPlaybookApprovalSpecUpdatedConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventPlaybookSpecApprovalUpdated}, + consumers: []SyncEventHandlerFunc{onApprovalUpdated}, + } +} + +func NewPlaybookApprovalConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventPlaybookApprovalInserted}, + consumers: []SyncEventHandlerFunc{onPlaybookRunNewApproval}, + } +} + +func schedulePlaybookRun(ctx *api.Context, event api.Event) error { + // TODO: + // See if any playbook is listening on this event. + // Match the filters + // If everything goes ok, save the playbook run. + return nil +} + +func onApprovalUpdated(ctx *api.Context, event api.Event) error { + playbookID := event.Properties["id"] + + var playbook models.Playbook + if err := ctx.DB().Where("id = ?", playbookID).First(&playbook).Error; err != nil { + return err + } + + var spec v1.PlaybookSpec + if err := json.Unmarshal(playbook.Spec, &spec); err != nil { + return err + } + + if spec.Approval == nil { + return nil + } + + return db.UpdatePlaybookRunStatusIfApproved(ctx, playbookID, *spec.Approval) +} + +func onPlaybookRunNewApproval(ctx *api.Context, event api.Event) error { + runID := event.Properties["run_id"] + + var run models.PlaybookRun + if err := ctx.DB().Where("id = ?", runID).First(&run).Error; err != nil { + return err + } + + if run.Status != models.PlaybookRunStatusPending { + return nil + } + + var playbook models.Playbook + if err := ctx.DB().Where("id = ?", run.PlaybookID).First(&playbook).Error; err != nil { + return err + } + + var spec v1.PlaybookSpec + if err := json.Unmarshal(playbook.Spec, &spec); err != nil { + return err + } + + if spec.Approval == nil { + return nil + } + + return db.UpdatePlaybookRunStatusIfApproved(ctx, playbook.ID.String(), *spec.Approval) +} diff --git a/events/responder.go b/events/responder.go index de7a43ff7..feda28dc5 100644 --- a/events/responder.go +++ b/events/responder.go @@ -8,23 +8,110 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/responder" pkgResponder "github.com/flanksource/incident-commander/responder" ) -func NewResponderConsumer(db *gorm.DB) EventConsumer { - return EventConsumer{ - WatchEvents: []string{ - EventIncidentResponderAdded, - EventIncidentCommentAdded, - }, - ProcessBatchFunc: processResponderEvents, - BatchSize: 1, - Consumers: 1, - DB: db, +func NewResponderConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventIncidentResponderAdded}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, generateResponderAddedAsyncEvent}, } } +func NewCommentConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventIncidentCommentAdded}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, generateCommentAddedAsyncEvent}, + } +} + +func NewResponderConsumerAsync() AsyncEventConsumer { + return AsyncEventConsumer{ + watchEvents: []string{EventJiraResponderAdded, EventMSPlannerResponderAdded, EventMSPlannerCommentAdded, EventJiraCommentAdded}, + consumer: processResponderEvents, + batchSize: 1, + } +} + +// generateResponderAddedAsyncEvent generates async events for each of the configured responder clients +// in the associated team. +func generateResponderAddedAsyncEvent(ctx *api.Context, event api.Event) error { + responderID := event.Properties["id"] + + var responder api.Responder + err := ctx.DB().Where("id = ? AND external_id is NULL", responderID).Preload("Incident").Preload("Team").Find(&responder).Error + if err != nil { + return err + } + + spec, err := responder.Team.GetSpec() + if err != nil { + return err + } + + if spec.ResponderClients.Jira != nil { + if err := ctx.DB().Create(&api.Event{Name: EventJiraResponderAdded, Properties: map[string]string{"id": responderID}}).Error; err != nil { + return err + } + } + + if spec.ResponderClients.MSPlanner != nil { + if err := ctx.DB().Create(&api.Event{Name: EventMSPlannerResponderAdded, Properties: map[string]string{"id": responderID}}).Error; err != nil { + return err + } + } + + return nil +} + +// generateCommentAddedAsyncEvent generates comment.add async events for each of the configured responder clients. +func generateCommentAddedAsyncEvent(ctx *api.Context, event api.Event) error { + commentID := event.Properties["id"] + + var comment api.Comment + err := ctx.DB().Where("id = ? AND external_id IS NULL", commentID).First(&comment).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + logger.Debugf("Skipping comment %s since it was added via responder", commentID) + return nil + } + + return err + } + + // Get all responders related to a comment + var responders []api.Responder + commentRespondersQuery := ` + SELECT * FROM responders WHERE incident_id IN ( + SELECT incident_id FROM comments WHERE id = ? + ) + ` + if err = ctx.DB().Raw(commentRespondersQuery, commentID).Preload("Team").Find(&responders).Error; err != nil { + return err + } + + for _, responder := range responders { + switch responder.Type { + case "jira": + if err := ctx.DB().Create(&api.Event{Name: EventJiraCommentAdded, Properties: map[string]string{ + "responder_id": responder.ID.String(), + "id": commentID, + }}).Error; err != nil { + return err + } + case "ms_planner": + if err := ctx.DB().Create(&api.Event{Name: EventMSPlannerCommentAdded, Properties: map[string]string{ + "responder_id": responder.ID.String(), + "id": commentID, + }}).Error; err != nil { + return err + } + } + } + + return nil +} + func processResponderEvents(ctx *api.Context, events []api.Event) []api.Event { var failedEvents []api.Event for _, e := range events { @@ -38,15 +125,16 @@ func processResponderEvents(ctx *api.Context, events []api.Event) []api.Event { func handleResponderEvent(ctx *api.Context, event api.Event) error { switch event.Name { - case EventIncidentResponderAdded: + case EventJiraResponderAdded, EventMSPlannerResponderAdded: return reconcileResponderEvent(ctx, event) - case EventIncidentCommentAdded: + case EventJiraCommentAdded, EventMSPlannerCommentAdded: return reconcileCommentEvent(ctx, event) default: - return fmt.Errorf("Unrecognized event name: %s", event.Name) + return fmt.Errorf("unrecognized event name: %s", event.Name) } } +// TODO: Modify this such that it only notifies the responder mentioned in the event. func reconcileResponderEvent(ctx *api.Context, event api.Event) error { responderID := event.Properties["id"] @@ -70,13 +158,10 @@ func reconcileResponderEvent(ctx *api.Context, event api.Event) error { return ctx.DB().Model(&api.Responder{}).Where("id = ?", responder.ID).Update("external_id", externalID).Error } - if err := addNotificationEvent(ctx, event); err != nil { - logger.Errorf("failed to add notification publish event for responder: %v", err) - } - return nil } +// TODO: Modify this such that it only adds the comment to the particular responder mentioned in the event. func reconcileCommentEvent(ctx *api.Context, event api.Event) error { commentID := event.Properties["id"] @@ -107,7 +192,7 @@ func reconcileCommentEvent(ctx *api.Context, event api.Event) error { // Reset externalID to avoid inserting previous iteration's ID externalID := "" - responder, err := responder.GetResponder(ctx, _responder.Team) + responder, err := pkgResponder.GetResponder(ctx, _responder.Team) if err != nil { return err } @@ -129,9 +214,5 @@ func reconcileCommentEvent(ctx *api.Context, event api.Event) error { } } - if err := addNotificationEvent(ctx, event); err != nil { - logger.Errorf("failed to add notification publish event for comment: %v", err) - } - return nil } diff --git a/events/teams.go b/events/teams.go index 658dd0d05..eacc569cc 100644 --- a/events/teams.go +++ b/events/teams.go @@ -8,31 +8,15 @@ import ( "github.com/flanksource/incident-commander/responder" "github.com/flanksource/incident-commander/teams" "github.com/google/uuid" - "gorm.io/gorm" ) -func NewTeamConsumer(db *gorm.DB) EventConsumer { - return EventConsumer{ - WatchEvents: []string{ - EventTeamUpdate, - EventTeamDelete, - }, - ProcessBatchFunc: processTeamEvents, - BatchSize: 1, - Consumers: 1, - DB: db, +func NewTeamConsumerSync() SyncEventConsumer { + consumer := SyncEventConsumer{ + watchEvents: []string{EventTeamUpdate, EventTeamDelete}, + consumers: []SyncEventHandlerFunc{handleTeamEvent}, } -} -func processTeamEvents(ctx *api.Context, events []api.Event) []api.Event { - var failedEvents []api.Event - for _, e := range events { - if err := handleTeamEvent(ctx, e); err != nil { - e.Error = err.Error() - failedEvents = append(failedEvents, e) - } - } - return failedEvents + return consumer } func handleTeamEvent(ctx *api.Context, event api.Event) error { @@ -42,7 +26,7 @@ func handleTeamEvent(ctx *api.Context, event api.Event) error { case EventTeamDelete: return handleTeamDelete(ctx, event) default: - return fmt.Errorf("Unrecognized event name: %s", event.Name) + return fmt.Errorf("unrecognized event name: %s", event.Name) } } diff --git a/events/upstream_push.go b/events/upstream_push.go index ada527aad..b4df3aeac 100644 --- a/events/upstream_push.go +++ b/events/upstream_push.go @@ -6,22 +6,20 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty/upstream" "github.com/flanksource/incident-commander/api" - "gorm.io/gorm" ) var upstreamPushEventHandler *pushToUpstreamEventHandler -func NewUpstreamPushConsumer(db *gorm.DB, config Config) EventConsumer { +func NewUpstreamPushConsumerAsync(config Config) AsyncEventConsumer { if config.UpstreamPush.Valid() { upstreamPushEventHandler = newPushToUpstreamEventHandler(config.UpstreamPush) } - return EventConsumer{ - WatchEvents: []string{EventPushQueueCreate}, - ProcessBatchFunc: handleUpstreamPushEvents, - BatchSize: 50, - Consumers: 5, - DB: db, + return AsyncEventConsumer{ + watchEvents: []string{EventPushQueueCreate}, + consumer: handleUpstreamPushEvents, + batchSize: 50, + numConsumers: 5, } } diff --git a/events/upstream_test.go b/events/upstream_test.go index 84f4c7ba9..531e2ab62 100644 --- a/events/upstream_test.go +++ b/events/upstream_test.go @@ -153,18 +153,18 @@ var _ = ginkgo.Describe("Push Mode", ginkgo.Ordered, func() { }, } - c := NewUpstreamPushConsumer(agentBob.db, eventHandlerConfig) - c.ConsumeEventsUntilEmpty() + c := NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentBob.db, agentBob.pool) + c.ConsumeEventsUntilEmpty(api.NewContext(agentBob.db, nil)) // Agent James should also push everything in it's queue to the upstream eventHandlerConfig.UpstreamPush.AgentName = agentJames.name - c = NewUpstreamPushConsumer(agentJames.db, eventHandlerConfig) - c.ConsumeEventsUntilEmpty() + c = NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentJames.db, agentJames.pool) + c.ConsumeEventsUntilEmpty(api.NewContext(agentJames.db, nil)) // Agent Ross should also push everything in it's queue to the upstream eventHandlerConfig.UpstreamPush.AgentName = agentRoss.name - c = NewUpstreamPushConsumer(agentRoss.db, eventHandlerConfig) - c.ConsumeEventsUntilEmpty() + c = NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentRoss.db, agentRoss.pool) + c.ConsumeEventsUntilEmpty(api.NewContext(agentRoss.db, nil)) }) ginkgo.It("should have transferred all the components", func() { diff --git a/go.mod b/go.mod index f73708ab3..3b8dae8a0 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,8 @@ require ( github.com/containrrr/shoutrrr v0.7.1 github.com/fergusstrange/embedded-postgres v1.23.0 github.com/flanksource/commons v1.11.0 - github.com/flanksource/duty v1.0.161 - github.com/flanksource/gomplate/v3 v3.20.11 + github.com/flanksource/duty v1.0.166 + github.com/flanksource/gomplate/v3 v3.20.12 github.com/flanksource/kopper v1.0.6 github.com/google/cel-go v0.17.6 github.com/google/go-cmp v0.5.9 @@ -38,7 +38,7 @@ require ( ) require ( - ariga.io/atlas v0.13.2 // indirect + ariga.io/atlas v0.13.3 // indirect cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.2 // indirect @@ -46,7 +46,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect - github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect + github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -74,7 +74,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect - github.com/hashicorp/hcl/v2 v2.17.0 // indirect + github.com/hashicorp/hcl/v2 v2.18.0 // indirect github.com/itchyny/gojq v0.12.13 // indirect github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect @@ -101,7 +101,7 @@ require ( github.com/tidwall/pretty v1.2.1 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/yuin/gopher-lua v1.1.0 // indirect - github.com/zclconf/go-cty v1.13.3 // indirect + github.com/zclconf/go-cty v1.14.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect @@ -141,8 +141,8 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/TomOnTime/utfutil v0.0.0-20230223141146-125e65197b36 - github.com/antonmedv/expr v1.14.3 // indirect - github.com/aws/aws-sdk-go v1.44.331 // indirect + github.com/antonmedv/expr v1.15.0 // indirect + github.com/aws/aws-sdk-go v1.45.0 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/cjlapao/common-go v0.0.39 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index f29495742..8060cdd36 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ -ariga.io/atlas v0.13.1 h1:oSkEYgI3qUnQZ6b6+teAEiIuizjBvkZ4YDbz0XWfCdQ= -ariga.io/atlas v0.13.1/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= -ariga.io/atlas v0.13.2 h1:52uuqedNjRvuTLtpHJV2KNQve1iGmBNTibAPQTwpz80= -ariga.io/atlas v0.13.2/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= +ariga.io/atlas v0.13.3 h1:L6Zz/0/yo86flQclZwIqJEKrHoJxQOkj3Z2JPvzZEHE= +ariga.io/atlas v0.13.3/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -654,13 +652,13 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/antonmedv/expr v1.14.3 h1:GPrP7xKPWkFaLANPS7tPrgkNs7FMHpZdL72Dc5kFykg= -github.com/antonmedv/expr v1.14.3/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= +github.com/antonmedv/expr v1.15.0 h1:sBHNMx1i+b1lZfkBFGhicvSLW6RLnca3R0B7jWrk8iM= +github.com/antonmedv/expr v1.15.0/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= -github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= -github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= +github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= +github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= @@ -670,10 +668,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.44.330 h1:kO41s8I4hRYtWSIuMc/O053wmEGfMTT8D4KtPSojUkA= -github.com/aws/aws-sdk-go v1.44.330/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go v1.44.331 h1:hEwdOTv6973uegCUY2EY8jyyq0OUg9INc0HOzcu2bjw= -github.com/aws/aws-sdk-go v1.44.331/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.45.0 h1:qoVOQHuLacxJMO71T49KeE70zm+Tk3vtrl7XO4VUPZc= +github.com/aws/aws-sdk-go v1.45.0/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -769,13 +765,13 @@ github.com/fergusstrange/embedded-postgres v1.23.0 h1:ZYRD89nammxQDWDi6taJE2CYjD github.com/fergusstrange/embedded-postgres v1.23.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874= github.com/flanksource/commons v1.11.0 h1:ThP3hnX4Xh4thxVl2GjQ92WvQ93jq5VqzJh46jbW23A= github.com/flanksource/commons v1.11.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.159 h1:CkSKDZ4HYQ7cSEy8ufg1WODqcghkWsiAlk2o4I4JkqY= -github.com/flanksource/duty v1.0.159/go.mod h1:RJ/kcZ7dbL8/52tem757szVIA3IomS8bOAZIK0xb4rk= -github.com/flanksource/duty v1.0.161 h1:fuEFH5A7kKAApxDP/FzFRaD2HuCrUJhwYjeuNxj/JVY= -github.com/flanksource/duty v1.0.161/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= +github.com/flanksource/duty v1.0.165 h1:Kmy9ER1kKdWcssBwzPXUk3i2TaioVn/uhc4YZY6bt1w= +github.com/flanksource/duty v1.0.165/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= +github.com/flanksource/duty v1.0.166 h1:R20ksqEKgnk+Q8RCOWpLIhLx5ofYd1kdUAfxDxJRsC0= +github.com/flanksource/duty v1.0.166/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= -github.com/flanksource/gomplate/v3 v3.20.11 h1:B83fXI0dqlOii2QE+qPNI0blPlcZTmfnPPU8zwVGUtA= -github.com/flanksource/gomplate/v3 v3.20.11/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= +github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230705092916-3b4cf510c5fc/go.mod h1:4pQhmF+TnVqJroQKY8wSnSp+T18oLson6YQ2M0qPHfQ= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 h1:MHXg2Vo/oHB0rGLgsI0tkU9MGV7aDwqvO1lrbX7/shY= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= @@ -1043,8 +1039,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hashicorp/hcl/v2 v2.17.0 h1:z1XvSUyXd1HP10U4lrLg5e0JMVz6CPaJvAgxM0KNZVY= -github.com/hashicorp/hcl/v2 v2.17.0/go.mod h1:gJyW2PTShkJqQBKpAmPO3yxMxIuoXkOF2TpqXzrQyx4= +github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= +github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= @@ -1402,10 +1398,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -github.com/zclconf/go-cty v1.13.2 h1:4GvrUxe/QUDYuJKAav4EYqdM47/kZa672LwmXFmEKT0= -github.com/zclconf/go-cty v1.13.2/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= -github.com/zclconf/go-cty v1.13.3 h1:m+b9q3YDbg6Bec5rr+KGy1MzEVzY/jC2X+YX4yqKtHI= -github.com/zclconf/go-cty v1.13.3/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= +github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= +github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/etcd/api/v3 v3.5.5/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= diff --git a/jobs/jobs.go b/jobs/jobs.go index a7bc83f52..d7fa931ca 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -36,7 +36,7 @@ func Start() { responder.SyncConfig() CleanupJobHistoryTable() if err := rules.Run(); err != nil { - logger.Errorf("error running incident rules: %w", err) + logger.Errorf("error running incident rules: %v", err) } if _, err := ScheduleFunc(TeamComponentOwnershipSchedule, TeamComponentOwnershipRun); err != nil { @@ -74,7 +74,7 @@ func Start() { logger.Infof("IncidentRulesSchedule %s", incidentRulesSchedule) if _, err := ScheduleFunc(incidentRulesSchedule, func() { if err := rules.Run(); err != nil { - logger.Errorf("error running incident rules: %w", err) + logger.Errorf("error running incident rules: %v", err) } }); err != nil { logger.Errorf("Failed to schedule job for incident rules: %v", err) diff --git a/notification/notification_test.go b/notification/notification_test.go index 0edb89bdb..afa5c028f 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -104,13 +104,15 @@ var _ = ginkgo.Describe("Test Notification on incident creation", ginkgo.Ordered }) ginkgo.It("should consume the event and send the notification", func() { - notifHandler := events.NewNotificationConsumer(db.Gorm) - sendHandler := events.NewNotificationSendConsumer(db.Gorm) + notificationHandler := events.NewNotificationSaveConsumerSync().EventConsumer(db.Gorm, db.Pool) + sendHandler := events.NewNotificationSendConsumerAsync().EventConsumer(db.Gorm, db.Pool) + + ctx := api.NewContext(db.Gorm, nil) // Order of consumption is important as incident.create event // produces a notification.send event - notifHandler.ConsumeEventsUntilEmpty() - sendHandler.ConsumeEventsUntilEmpty() + notificationHandler.ConsumeEventsUntilEmpty(ctx) + sendHandler.ConsumeEventsUntilEmpty(ctx) Expect(webhookPostdata).To(Not(BeNil())) Expect(webhookPostdata["message"]).To(Equal(fmt.Sprintf("Severity: %s", incident.Severity))) diff --git a/playbook/playbook_test.go b/playbook/playbook_test.go index 9f18380be..beada4909 100644 --- a/playbook/playbook_test.go +++ b/playbook/playbook_test.go @@ -13,6 +13,8 @@ import ( "github.com/flanksource/duty/models" "github.com/flanksource/incident-commander/api" v1 "github.com/flanksource/incident-commander/api/v1" + "github.com/flanksource/incident-commander/events" + "github.com/flanksource/incident-commander/events/eventconsumer" ginkgo "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gorm.io/gorm/clause" @@ -22,7 +24,6 @@ var _ = ginkgo.Describe("Playbook runner", ginkgo.Ordered, func() { var ( playbook models.Playbook runResp RunResponse - consumer *queueConsumer ) ginkgo.It("should store dummy data", func() { @@ -32,8 +33,12 @@ var _ = ginkgo.Describe("Playbook runner", ginkgo.Ordered, func() { }) ginkgo.It("start the queue consumer in background", func() { - consumer = NewQueueConsumer(testDB, testDBPool) - go consumer.Listen() + go eventconsumer.New(testDB, testDBPool, "playbook_run_updates", EventConsumer). + WithNumConsumers(5). + WithNotifyTimeout(time.Second * 2). + Listen() + + go events.StartConsumers(testDB, testDBPool, events.Config{}) }) ginkgo.It("should create a new playbook", func() { @@ -211,7 +216,7 @@ var _ = ginkgo.Describe("Playbook runner", ginkgo.Ordered, func() { } attempts += 1 - if attempts > 20 { // wait for 2 seconds + if attempts > 50 { // wait for 5 seconds ginkgo.Fail(fmt.Sprintf("Timed out waiting for run to complete. Run status: %s", updatedRun.Status)) } } diff --git a/playbook/queue_consumer.go b/playbook/queue_consumer.go deleted file mode 100644 index ac6ce85b2..000000000 --- a/playbook/queue_consumer.go +++ /dev/null @@ -1,157 +0,0 @@ -package playbook - -import ( - "encoding/json" - "fmt" - "sync" - "time" - - "github.com/flanksource/commons/logger" - "github.com/flanksource/duty/models" - "github.com/flanksource/incident-commander/api" - v1 "github.com/flanksource/incident-commander/api/v1" - "github.com/flanksource/incident-commander/db" - "github.com/flanksource/incident-commander/utils" - "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" -) - -type queueConsumer struct { - pool *pgxpool.Pool - db *gorm.DB - tickInterval time.Duration - dbReconnectMaxDuration time.Duration - dbReconnectBackoffBaseDuration time.Duration - - // registry stores the list of playbook run IDs - // that are currently being executed. - registry sync.Map -} - -func NewQueueConsumer(db *gorm.DB, pool *pgxpool.Pool) *queueConsumer { - return &queueConsumer{ - db: db, - pool: pool, - tickInterval: time.Minute, - dbReconnectMaxDuration: time.Minute, - dbReconnectBackoffBaseDuration: time.Second, - registry: sync.Map{}, - } -} - -func (t *queueConsumer) Listen() { - pgNotify := make(chan string) - go utils.ListenToPostgresNotify(t.pool, "playbook_run_updates", t.dbReconnectMaxDuration, t.dbReconnectBackoffBaseDuration, pgNotify) - - pgNotifyPlaybookSpecApprovalUpdated := make(chan string) - go utils.ListenToPostgresNotify(t.pool, "playbook_spec_approval_updated", t.dbReconnectMaxDuration, t.dbReconnectBackoffBaseDuration, pgNotifyPlaybookSpecApprovalUpdated) - - pgNotifyPlaybookApprovalsInserted := make(chan string) - go utils.ListenToPostgresNotify(t.pool, "playbook_approval_inserted", t.dbReconnectMaxDuration, t.dbReconnectBackoffBaseDuration, pgNotifyPlaybookApprovalsInserted) - - ctx := api.NewContext(t.db, nil) - for { - select { - case <-pgNotify: - if err := t.consumeAll(ctx); err != nil { - logger.Errorf("%v", err) - } - - case id := <-pgNotifyPlaybookSpecApprovalUpdated: - if err := t.onApprovalUpdated(ctx, id); err != nil { - logger.Errorf("%v", err) - } - - case id := <-pgNotifyPlaybookApprovalsInserted: - if err := t.onPlaybookRunNewApproval(ctx, id); err != nil { - logger.Errorf("%v", err) - } - - case <-time.After(t.tickInterval): - if err := t.consumeAll(ctx); err != nil { - logger.Errorf("%v", err) - } - } - } -} - -// onApprovalUpdated is called when the playbook spec approval is updated -func (t *queueConsumer) onApprovalUpdated(ctx *api.Context, playbookID string) error { - var playbook models.Playbook - if err := ctx.DB().Where("id = ?", playbookID).First(&playbook).Error; err != nil { - return err - } - - var spec v1.PlaybookSpec - if err := json.Unmarshal(playbook.Spec, &spec); err != nil { - return err - } - - if spec.Approval == nil { - return nil - } - - return db.UpdatePlaybookRunStatusIfApproved(ctx, playbookID, *spec.Approval) -} - -func (t *queueConsumer) onPlaybookRunNewApproval(ctx *api.Context, runID string) error { - var run models.PlaybookRun - if err := ctx.DB().Where("id = ?", runID).First(&run).Error; err != nil { - return err - } - - if run.Status != models.PlaybookRunStatusPending { - return nil - } - - var playbook models.Playbook - if err := ctx.DB().Where("id = ?", run.PlaybookID).First(&playbook).Error; err != nil { - return err - } - - var spec v1.PlaybookSpec - if err := json.Unmarshal(playbook.Spec, &spec); err != nil { - return err - } - - if spec.Approval == nil { - return nil - } - - return db.UpdatePlaybookRunStatusIfApproved(ctx, playbook.ID.String(), *spec.Approval) -} - -// TODO: Limit to X workers (5 workers, 1 batch size) -func (t *queueConsumer) consumeAll(ctx *api.Context) error { - runs, err := db.GetScheduledPlaybookRuns(ctx, 1, t.getRunIDsInRegistry()...) - if err != nil { - return fmt.Errorf("failed to get playbook runs: %w", err) - } - - if len(runs) == 0 { - return nil - } - - for _, r := range runs { - go func(run models.PlaybookRun) { - if _, loaded := t.registry.LoadOrStore(run.ID, nil); !loaded { - ExecuteRun(ctx, run) - } - - t.registry.Delete(run.ID) - }(r) - } - - return nil -} - -func (t *queueConsumer) getRunIDsInRegistry() []uuid.UUID { - var ids []uuid.UUID - t.registry.Range(func(k any, val any) bool { - ids = append(ids, k.(uuid.UUID)) - return true - }) - - return ids -} diff --git a/playbook/run_consumer.go b/playbook/run_consumer.go new file mode 100644 index 000000000..0ce62588c --- /dev/null +++ b/playbook/run_consumer.go @@ -0,0 +1,51 @@ +package playbook + +import ( + "fmt" + + "github.com/flanksource/duty/models" + "github.com/flanksource/incident-commander/api" + "github.com/flanksource/incident-commander/events/eventconsumer" + "github.com/jackc/pgx/v5/pgxpool" + "gorm.io/gorm" +) + +func StartPlaybookRunConsumer(db *gorm.DB, pool *pgxpool.Pool) { + eventconsumer.New(db, pool, "playbook_run_updates", EventConsumer). + WithNumConsumers(5). + Listen() +} + +func EventConsumer(ctx *api.Context) error { + tx := ctx.DB().Begin() + if tx.Error != nil { + return fmt.Errorf("error initiating db tx: %w", tx.Error) + } + defer tx.Rollback() + + ctx = ctx.WithDB(tx) + + query := ` + SELECT * + FROM playbook_runs + WHERE status = ? + AND start_time <= NOW() + ORDER BY start_time + FOR UPDATE SKIP LOCKED + ` + + var runs []models.PlaybookRun + if err := tx.Raw(query, models.PlaybookRunStatusScheduled).Find(&runs).Error; err != nil { + return err + } + + if len(runs) == 0 { + return api.Errorf(api.ENOTFOUND, "No events found") + } + + for i := range runs { + ExecuteRun(ctx, runs[i]) + } + + return tx.Commit().Error +}