diff --git a/events/checks.go b/events/checks.go index 9d7de4aab..461339ab9 100644 --- a/events/checks.go +++ b/events/checks.go @@ -1,12 +1,8 @@ package events -import ( - "github.com/flanksource/incident-commander/events/eventconsumer" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" -) - -func NewCheckConsumerSync(db *gorm.DB, pgPool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pgPool, eventQueueUpdateChannel, - newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["check"], addNotificationEvent, SavePlaybookRun)) +func NewCheckConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventCheckPassed, EventCheckFailed}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun}, + } } diff --git a/events/components.go b/events/components.go index c9e41ec5f..1842bbef9 100644 --- a/events/components.go +++ b/events/components.go @@ -1,12 +1,14 @@ package events -import ( - "github.com/flanksource/incident-commander/events/eventconsumer" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" -) - -func NewComponentConsumerSync(db *gorm.DB, pgPool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pgPool, eventQueueUpdateChannel, - newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["component"], addNotificationEvent, SavePlaybookRun)) +func NewComponentConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{ + EventComponentStatusError, + EventComponentStatusHealthy, + EventComponentStatusInfo, + EventComponentStatusUnhealthy, + EventComponentStatusWarning, + }, + consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun}, + } } diff --git a/events/event_queue.go b/events/event_queue.go index a41b837ac..eb5d350fb 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/flanksource/commons/collections/set" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/upstream" "github.com/flanksource/incident-commander/api" @@ -23,11 +22,11 @@ const ( ) type ( - // AsyncEventHandler processes multiple events and returns the failed ones - AsyncEventHandler func(*api.Context, []api.Event) []api.Event + // AsyncEventHandlerFunc processes multiple events and returns the failed ones + AsyncEventHandlerFunc func(*api.Context, []api.Event) []api.Event - // SyncEventHandler processes a single event and ONLY makes db changes. - SyncEventHandler func(*api.Context, api.Event) error + // SyncEventHandlerFunc processes a single event and ONLY makes db changes. + SyncEventHandlerFunc func(*api.Context, api.Event) error ) // List of all sync events in the `event_queue` table. @@ -49,6 +48,10 @@ const ( EventNotificationUpdate = "notification.update" EventNotificationDelete = "notification.delete" + EventPlaybookSpecApprovalUpdated = "playbook.spec.approval.updated" + + EventPlaybookApprovalInserted = "playbook.approval.inserted" + EventIncidentCommentAdded = "incident.comment.added" EventIncidentCreated = "incident.created" EventIncidentDODAdded = "incident.dod.added" @@ -88,88 +91,50 @@ type Config struct { UpstreamPush upstream.UpstreamConfig } -// syncConsumerWatchEvents keeps a registry of all the event_queue consumer and the events they watch. -// This helps in ensuring that a single event is not being consumed by multiple consumers. -var syncConsumerWatchEvents = map[string][]string{ - "team": {EventTeamUpdate, EventTeamDelete}, - "check": {EventCheckPassed, EventCheckFailed}, - "component": { - EventComponentStatusError, - EventComponentStatusHealthy, - EventComponentStatusInfo, - EventComponentStatusUnhealthy, - EventComponentStatusWarning, - }, - "incident.responder": {EventIncidentResponderAdded}, - "incident.comment": {EventIncidentCommentAdded}, - "notification_update": { - EventNotificationUpdate, EventNotificationDelete, - }, - "notification_add": { - EventIncidentCreated, - EventIncidentDODAdded, - EventIncidentDODPassed, - EventIncidentDODRegressed, - EventIncidentResponderRemoved, - EventIncidentStatusCancelled, - EventIncidentStatusClosed, - EventIncidentStatusInvestigating, - EventIncidentStatusMitigated, - EventIncidentStatusOpen, - EventIncidentStatusResolved, - }, -} - -var asyncConsumerWatchEvents = map[string][]string{ - "push_queue": {EventPushQueueCreate}, - "notification_send": {EventNotificationSend}, - "incident.responder": { - EventJiraResponderAdded, EventMSPlannerResponderAdded, - EventMSPlannerCommentAdded, EventJiraCommentAdded, - }, -} - func StartConsumers(gormDB *gorm.DB, pgpool *pgxpool.Pool, config Config) { - uniqWatchEvents := set.New[string]() - for _, v := range syncConsumerWatchEvents { - for _, e := range v { - if uniqWatchEvents.Contains(e) { - logger.Fatalf("Error starting consumers: event[%s] has multiple consumers", e) - } - - uniqWatchEvents.Add(e) - } + uniqEvents := make(map[string]struct{}) + allSyncHandlers := []SyncEventConsumer{ + NewTeamConsumerSync(), + NewCheckConsumerSync(), + NewComponentConsumerSync(), + NewResponderConsumerSync(), + NewCommentConsumerSync(), + NewNotificationSaveConsumerSync(), + NewNotificationUpdatesConsumerSync(), + NewPlaybookApprovalConsumerSync(), + NewPlaybookApprovalSpecUpdatedConsumerSync(), } - for _, v := range asyncConsumerWatchEvents { - for _, e := range v { - if uniqWatchEvents.Contains(e) { - logger.Fatalf("Error starting consumers: event[%s] has multiple consumers", e) + for i := range allSyncHandlers { + for _, event := range allSyncHandlers[i].watchEvents { + if _, ok := uniqEvents[event]; ok { + logger.Fatalf("Watch event %s is duplicated", event) } - uniqWatchEvents.Add(e) + uniqEvents[event] = struct{}{} } + + go allSyncHandlers[i].EventConsumer(gormDB, pgpool).Listen() } - allConsumers := []*eventconsumer.EventConsumer{ - NewTeamConsumerSync(gormDB, pgpool), - NewCheckConsumerSync(gormDB, pgpool), - NewComponentConsumerSync(gormDB, pgpool), - NewResponderConsumerSync(gormDB, pgpool), - NewCommentConsumerSync(gormDB, pgpool), - NewNotificationConsumerSync(gormDB, pgpool), - NewNotificationUpdatesConsumerSync(gormDB, pgpool), - - // Async consumers - NewNotificationSendConsumerAsync(gormDB, pgpool), - NewResponderConsumerAsync(gormDB, pgpool), + asyncConsumers := []AsyncEventConsumer{ + NewNotificationSendConsumerAsync(), + NewResponderConsumerAsync(), } if config.UpstreamPush.Valid() { - allConsumers = append(allConsumers, NewUpstreamPushConsumerAsync(gormDB, pgpool, config)) + asyncConsumers = append(asyncConsumers, NewUpstreamPushConsumerAsync(config)) } - for i := range allConsumers { - go allConsumers[i].Listen() + for i := range asyncConsumers { + for _, event := range asyncConsumers[i].watchEvents { + if _, ok := uniqEvents[event]; ok { + logger.Fatalf("Watch event %s is duplicated", event) + } + + uniqEvents[event] = struct{}{} + } + + go asyncConsumers[i].EventConsumer(gormDB, pgpool).Listen() } } @@ -206,96 +171,74 @@ func fetchEvents(ctx *api.Context, watchEvents []string, batchSize int) ([]api.E return events, nil } -// newEventQueueAsyncConsumerFunc returns a new event consumer for the `watchEvents` events in the `event_queue` table. -func newEventQueueAsyncConsumerFunc(watchEvents []string, handleEventsAsync AsyncEventHandler) eventconsumer.EventConsumerFunc { - return func(ctx *api.Context, batchSize int) error { - tx := ctx.DB().Begin() - if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) - } - defer tx.Rollback() - - ctx = ctx.WithDB(tx) - - events, err := fetchEvents(ctx, watchEvents, batchSize) - if err != nil { - return fmt.Errorf("error fetching events: %w", err) - } - - if len(events) == 0 { - return api.Errorf(api.ENOTFOUND, "No events found") - } - - failedEvents := handleEventsAsync(ctx, events) - saveFailedEvents(tx, failedEvents) +type SyncEventConsumer struct { + watchEvents []string + consumers []SyncEventHandlerFunc + numConsumers int +} - return tx.Commit().Error +func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { + consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle) + if t.numConsumers > 0 { + consumer = consumer.WithNumConsumers(t.numConsumers) } -} -// newEventQueueConsumerFunc returns a new sync event consumer for the `watchEvents` events in the `event_queue` table. -func newEventQueueSyncConsumerFunc(watchEvents []string, syncConsumers ...SyncEventHandler) eventconsumer.EventConsumerFunc { - return func(ctx *api.Context, batchSize int) error { - tx := ctx.DB().Begin() - if tx.Error != nil { - return fmt.Errorf("error initiating db tx: %w", tx.Error) - } - defer tx.Rollback() + return consumer +} - events, err := fetchEvents(ctx.WithDB(tx), watchEvents, batchSize) - if err != nil { - return fmt.Errorf("error fetching events: %w", err) - } +func (t *SyncEventConsumer) Handle(ctx *api.Context) error { + tx := ctx.DB().Begin() + if tx.Error != nil { + return fmt.Errorf("error initiating db tx: %w", tx.Error) + } + defer tx.Rollback() - if len(events) == 0 { - return api.Errorf(api.ENOTFOUND, "No events found") - } + ctx = ctx.WithDB(tx) - failedEvents := handleEventsSync(ctx, events, syncConsumers) - saveFailedEvents(tx, failedEvents) + events, err := fetchEvents(ctx, t.watchEvents, 1) + if err != nil { + return fmt.Errorf("error fetching events: %w", err) + } - return tx.Commit().Error + if len(events) == 0 { + return api.Errorf(api.ENOTFOUND, "No events found") } -} -// handleEventsSync runs all the sync consumers for the given events. -// -// Each event is handled in isolation (in a separate transaction). -func handleEventsSync(ctx *api.Context, events []api.Event, syncConsumers []SyncEventHandler) []api.Event { - failedEvents := make([]api.Event, 0, len(events)) - for i := range events { - if err := processSyncConsumers(ctx, events[i], syncConsumers); err != nil { - logger.Errorf("Failed to process event[%s]: %s", events[i].ID, err.Error()) - events[i].Error = err.Error() - failedEvents = append(failedEvents, events[i]) + for _, syncConsumer := range t.consumers { + if err := syncConsumer(ctx, events[0]); err != nil { + return fmt.Errorf("error processing sync consumer: %w", err) } } - return failedEvents + return tx.Commit().Error } -// processSyncConsumers runs all the sync consumers for the given event. -// -// All of the consumers are expected to succeed. -// Returns error even if a single consumer fails and any changes are rolled back. -func processSyncConsumers(ctx *api.Context, event api.Event, syncConsumers []SyncEventHandler) error { +type AsyncEventConsumer struct { + watchEvents []string + batchSize int + consumer AsyncEventHandlerFunc + numConsumers int +} + +func (t *AsyncEventConsumer) Handle(ctx *api.Context) error { tx := ctx.DB().Begin() if tx.Error != nil { return fmt.Errorf("error initiating db tx: %w", tx.Error) } defer tx.Rollback() - for _, syncConsumer := range syncConsumers { - if err := syncConsumer(ctx, event); err != nil { - return fmt.Errorf("error processing sync consumer: %w", err) - } + ctx = ctx.WithDB(tx) + + events, err := fetchEvents(ctx, t.watchEvents, t.batchSize) + if err != nil { + return fmt.Errorf("error fetching events: %w", err) } - return tx.Commit().Error -} + if len(events) == 0 { + return api.Errorf(api.ENOTFOUND, "No events found") + } -// saveFailedEvents saves failed events back to the `event_queue` table. -func saveFailedEvents(tx *gorm.DB, failedEvents []api.Event) { + failedEvents := t.consumer(ctx, events) lastAttempt := time.Now() for i := range failedEvents { @@ -311,4 +254,15 @@ func saveFailedEvents(tx *gorm.DB, failedEvents []api.Event) { logger.Errorf("Error inserting into table:event_queue with error:%v. %v", err) } } + + return tx.Commit().Error +} + +func (t AsyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { + consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle) + if t.numConsumers > 0 { + consumer = consumer.WithNumConsumers(t.numConsumers) + } + + return consumer } diff --git a/events/eventconsumer/event_consumer.go b/events/eventconsumer/event_consumer.go index 3d5c97641..e22b368d7 100644 --- a/events/eventconsumer/event_consumer.go +++ b/events/eventconsumer/event_consumer.go @@ -17,25 +17,24 @@ const ( // after an unexpected failure. waitDurationOnFailure = time.Second * 5 - // pgNotifyTimeout is the timeout to consume events in case no Consume notification is received. - pgNotifyTimeout = time.Minute + defaultPgNotifyTimeout = time.Minute dbReconnectMaxDuration = time.Minute * 5 dbReconnectBackoffBaseDuration = time.Second ) -type EventConsumerFunc func(ctx *api.Context, batchSize int) error +type EventConsumerFunc func(ctx *api.Context) error type EventConsumer struct { db *gorm.DB pgPool *pgxpool.Pool - // Number of events to process at a time by a single consumer - batchSize int - // Number of concurrent consumers numConsumers int + // pgNotifyTimeout is the timeout to consume events in case no Consume notification is received. + pgNotifyTimeout time.Duration + // pgNotifyChannel is the channel to listen to for any new updates on the event queue pgNotifyChannel string @@ -47,31 +46,28 @@ type EventConsumer struct { // New returns a new EventConsumer func New(DB *gorm.DB, PGPool *pgxpool.Pool, PgNotifyChannel string, ConsumerFunc EventConsumerFunc) *EventConsumer { return &EventConsumer{ - batchSize: 1, numConsumers: 1, db: DB, pgPool: PGPool, pgNotifyChannel: PgNotifyChannel, consumerFunc: ConsumerFunc, + pgNotifyTimeout: defaultPgNotifyTimeout, } } -func (e *EventConsumer) WithBatchSize(batchSize int) *EventConsumer { - e.batchSize = batchSize +func (e *EventConsumer) WithNumConsumers(numConsumers int) *EventConsumer { + e.numConsumers = numConsumers return e } -func (e *EventConsumer) WithNumConsumers(numConsumers int) *EventConsumer { - e.numConsumers = numConsumers +func (e *EventConsumer) WithNotifyTimeout(timeout time.Duration) *EventConsumer { + e.pgNotifyTimeout = timeout return e } func (e EventConsumer) Validate() error { - if e.batchSize <= 0 { - return fmt.Errorf("BatchSize:%d <= 0", e.batchSize) - } if e.numConsumers <= 0 { - return fmt.Errorf("consumers:%d <= 0", e.batchSize) + return fmt.Errorf("consumers:%d <= 0", e.numConsumers) } if e.pgNotifyChannel == "" { return fmt.Errorf("pgNotifyChannel is empty") @@ -92,7 +88,7 @@ func (e EventConsumer) Validate() error { func (t *EventConsumer) ConsumeEventsUntilEmpty(ctx *api.Context) { consumerFunc := func(wg *sync.WaitGroup) { for { - err := t.consumerFunc(ctx, t.batchSize) + err := t.consumerFunc(ctx) if err != nil { if api.ErrorCode(err) == api.ENOTFOUND { wg.Done() @@ -132,7 +128,7 @@ func (e *EventConsumer) Listen() { case <-pgNotify: e.ConsumeEventsUntilEmpty(ctx) - case <-time.After(pgNotifyTimeout): + case <-time.After(e.pgNotifyTimeout): e.ConsumeEventsUntilEmpty(ctx) } } diff --git a/events/notifications.go b/events/notifications.go index 697b64416..629448b30 100644 --- a/events/notifications.go +++ b/events/notifications.go @@ -10,25 +10,48 @@ import ( "github.com/flanksource/commons/template" "github.com/flanksource/duty/models" "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/events/eventconsumer" pkgNotification "github.com/flanksource/incident-commander/notification" "github.com/flanksource/incident-commander/teams" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" ) -func NewNotificationConsumerSync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["notification_add"], addNotificationEvent)). - WithNumConsumers(3) +func NewNotificationUpdatesConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventNotificationUpdate, EventNotificationDelete}, + consumers: []SyncEventHandlerFunc{ + handleNotificationUpdates, + }, + } } -func NewNotificationUpdatesConsumerSync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["notification_update"], handleNotificationUpdates)) +func NewNotificationSaveConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{ + EventIncidentCreated, + EventIncidentDODAdded, + EventIncidentDODPassed, + EventIncidentDODRegressed, + EventIncidentResponderRemoved, + EventIncidentStatusCancelled, + EventIncidentStatusClosed, + EventIncidentStatusInvestigating, + EventIncidentStatusMitigated, + EventIncidentStatusOpen, + EventIncidentStatusResolved, + }, + numConsumers: 3, + consumers: []SyncEventHandlerFunc{ + addNotificationEvent, + }, + } } -func NewNotificationSendConsumerAsync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, newEventQueueAsyncConsumerFunc(asyncConsumerWatchEvents["notification_send"], processNotificationEvents)). - WithNumConsumers(5) +func NewNotificationSendConsumerAsync() AsyncEventConsumer { + return AsyncEventConsumer{ + watchEvents: []string{EventNotificationSend}, + consumer: processNotificationEvents, + batchSize: 1, + numConsumers: 5, + } } func processNotificationEvents(ctx *api.Context, events []api.Event) []api.Event { diff --git a/events/playbook.go b/events/playbook.go index ef1f9cac2..78a013cbe 100644 --- a/events/playbook.go +++ b/events/playbook.go @@ -1,6 +1,7 @@ package events import ( + "encoding/json" "fmt" "strconv" "strings" @@ -16,6 +17,20 @@ import ( "github.com/flanksource/incident-commander/playbook" ) +func NewPlaybookApprovalSpecUpdatedConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventPlaybookSpecApprovalUpdated}, + consumers: []SyncEventHandlerFunc{onApprovalUpdated}, + } +} + +func NewPlaybookApprovalConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventPlaybookApprovalInserted}, + consumers: []SyncEventHandlerFunc{onPlaybookRunNewApproval}, + } +} + type EventResource struct { Component *models.Component `json:"component,omitempty"` Check *models.Check `json:"check,omitempty"` @@ -32,7 +47,7 @@ func (t *EventResource) AsMap() map[string]any { } } -func SavePlaybookRun(ctx *api.Context, event api.Event) error { +func schedulePlaybookRun(ctx *api.Context, event api.Event) error { specEvent, ok := eventToSpecEvent[event.Name] if !ok { return nil @@ -182,3 +197,52 @@ var eventToSpecEvent = map[string]PlaybookSpecEvent{ EventComponentStatusWarning: {"component", "warning"}, EventComponentStatusError: {"component", "error"}, } + +func onApprovalUpdated(ctx *api.Context, event api.Event) error { + playbookID := event.Properties["id"] + + var playbook models.Playbook + if err := ctx.DB().Where("id = ?", playbookID).First(&playbook).Error; err != nil { + return err + } + + var spec v1.PlaybookSpec + if err := json.Unmarshal(playbook.Spec, &spec); err != nil { + return err + } + + if spec.Approval == nil { + return nil + } + + return db.UpdatePlaybookRunStatusIfApproved(ctx, playbookID, *spec.Approval) +} + +func onPlaybookRunNewApproval(ctx *api.Context, event api.Event) error { + runID := event.Properties["run_id"] + + var run models.PlaybookRun + if err := ctx.DB().Where("id = ?", runID).First(&run).Error; err != nil { + return err + } + + if run.Status != models.PlaybookRunStatusPending { + return nil + } + + var playbook models.Playbook + if err := ctx.DB().Where("id = ?", run.PlaybookID).First(&playbook).Error; err != nil { + return err + } + + var spec v1.PlaybookSpec + if err := json.Unmarshal(playbook.Spec, &spec); err != nil { + return err + } + + if spec.Approval == nil { + return nil + } + + return db.UpdatePlaybookRunStatusIfApproved(ctx, playbook.ID.String(), *spec.Approval) +} diff --git a/events/playbook_test.go b/events/playbook_test.go index 523e2a0d8..d309fd1a9 100644 --- a/events/playbook_test.go +++ b/events/playbook_test.go @@ -67,7 +67,7 @@ var _ = ginkgo.Describe("Should save playbook run on the correct event", ginkgo. }) ginkgo.It("Expect the event consumer to NOT save a playbook run", func() { - componentEventConsumer := NewComponentConsumerSync(playbookDB, playbookDBPool) + componentEventConsumer := NewComponentConsumerSync().EventConsumer(playbookDB, playbookDBPool) componentEventConsumer.ConsumeEventsUntilEmpty(api.NewContext(playbookDB, nil)) var playbooks []models.PlaybookRun @@ -84,7 +84,7 @@ var _ = ginkgo.Describe("Should save playbook run on the correct event", ginkgo. }) ginkgo.It("Expect the event consumer to save the playbook run", func() { - componentEventConsumer := NewComponentConsumerSync(playbookDB, playbookDBPool) + componentEventConsumer := NewComponentConsumerSync().EventConsumer(playbookDB, playbookDBPool) componentEventConsumer.ConsumeEventsUntilEmpty(api.NewContext(playbookDB, nil)) var playbook models.PlaybookRun diff --git a/events/responder.go b/events/responder.go index 11afc1ef8..feda28dc5 100644 --- a/events/responder.go +++ b/events/responder.go @@ -8,27 +8,29 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/events/eventconsumer" pkgResponder "github.com/flanksource/incident-commander/responder" - "github.com/jackc/pgx/v5/pgxpool" ) -func NewResponderConsumerAsync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, - newEventQueueAsyncConsumerFunc(asyncConsumerWatchEvents["incident.responder"], processResponderEvents), - ) +func NewResponderConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventIncidentResponderAdded}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, generateResponderAddedAsyncEvent}, + } } -func NewResponderConsumerSync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, - newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["incident.responder"], addNotificationEvent, generateResponderAddedAsyncEvent), - ) +func NewCommentConsumerSync() SyncEventConsumer { + return SyncEventConsumer{ + watchEvents: []string{EventIncidentCommentAdded}, + consumers: []SyncEventHandlerFunc{addNotificationEvent, generateCommentAddedAsyncEvent}, + } } -func NewCommentConsumerSync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, - newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["incident.comment"], addNotificationEvent, generateCommentAddedAsyncEvent), - ) +func NewResponderConsumerAsync() AsyncEventConsumer { + return AsyncEventConsumer{ + watchEvents: []string{EventJiraResponderAdded, EventMSPlannerResponderAdded, EventMSPlannerCommentAdded, EventJiraCommentAdded}, + consumer: processResponderEvents, + batchSize: 1, + } } // generateResponderAddedAsyncEvent generates async events for each of the configured responder clients diff --git a/events/teams.go b/events/teams.go index 199e41d9f..eacc569cc 100644 --- a/events/teams.go +++ b/events/teams.go @@ -5,16 +5,18 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/events/eventconsumer" "github.com/flanksource/incident-commander/responder" "github.com/flanksource/incident-commander/teams" "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" ) -func NewTeamConsumerSync(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer { - return eventconsumer.New(db, pool, eventQueueUpdateChannel, newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["team"], handleTeamEvent)) +func NewTeamConsumerSync() SyncEventConsumer { + consumer := SyncEventConsumer{ + watchEvents: []string{EventTeamUpdate, EventTeamDelete}, + consumers: []SyncEventHandlerFunc{handleTeamEvent}, + } + + return consumer } func handleTeamEvent(ctx *api.Context, event api.Event) error { diff --git a/events/upstream_push.go b/events/upstream_push.go index 6334f373f..b4df3aeac 100644 --- a/events/upstream_push.go +++ b/events/upstream_push.go @@ -6,21 +6,21 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty/upstream" "github.com/flanksource/incident-commander/api" - "github.com/flanksource/incident-commander/events/eventconsumer" - "github.com/jackc/pgx/v5/pgxpool" - "gorm.io/gorm" ) var upstreamPushEventHandler *pushToUpstreamEventHandler -func NewUpstreamPushConsumerAsync(db *gorm.DB, pool *pgxpool.Pool, config Config) *eventconsumer.EventConsumer { +func NewUpstreamPushConsumerAsync(config Config) AsyncEventConsumer { if config.UpstreamPush.Valid() { upstreamPushEventHandler = newPushToUpstreamEventHandler(config.UpstreamPush) } - return eventconsumer.New(db, pool, eventQueueUpdateChannel, newEventQueueAsyncConsumerFunc(asyncConsumerWatchEvents["push_queue"], handleUpstreamPushEvents)). - WithBatchSize(50). - WithNumConsumers(5) + return AsyncEventConsumer{ + watchEvents: []string{EventPushQueueCreate}, + consumer: handleUpstreamPushEvents, + batchSize: 50, + numConsumers: 5, + } } func handleUpstreamPushEvents(ctx *api.Context, events []api.Event) []api.Event { diff --git a/events/upstream_test.go b/events/upstream_test.go index 024c5ec89..531e2ab62 100644 --- a/events/upstream_test.go +++ b/events/upstream_test.go @@ -153,17 +153,17 @@ var _ = ginkgo.Describe("Push Mode", ginkgo.Ordered, func() { }, } - c := NewUpstreamPushConsumerAsync(agentBob.db, agentBob.pool, eventHandlerConfig) + c := NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentBob.db, agentBob.pool) c.ConsumeEventsUntilEmpty(api.NewContext(agentBob.db, nil)) // Agent James should also push everything in it's queue to the upstream eventHandlerConfig.UpstreamPush.AgentName = agentJames.name - c = NewUpstreamPushConsumerAsync(agentJames.db, agentJames.pool, eventHandlerConfig) + c = NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentJames.db, agentJames.pool) c.ConsumeEventsUntilEmpty(api.NewContext(agentJames.db, nil)) // Agent Ross should also push everything in it's queue to the upstream eventHandlerConfig.UpstreamPush.AgentName = agentRoss.name - c = NewUpstreamPushConsumerAsync(agentRoss.db, agentRoss.pool, eventHandlerConfig) + c = NewUpstreamPushConsumerAsync(eventHandlerConfig).EventConsumer(agentRoss.db, agentRoss.pool) c.ConsumeEventsUntilEmpty(api.NewContext(agentRoss.db, nil)) }) diff --git a/go.mod b/go.mod index 3868b7c0f..ce3ee1b62 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,8 @@ require ( github.com/containrrr/shoutrrr v0.7.1 github.com/fergusstrange/embedded-postgres v1.23.0 github.com/flanksource/commons v1.11.0 - github.com/flanksource/duty v1.0.161 - github.com/flanksource/gomplate/v3 v3.20.11 + github.com/flanksource/duty v1.0.166 + github.com/flanksource/gomplate/v3 v3.20.12 github.com/flanksource/kopper v1.0.6 github.com/google/cel-go v0.17.6 github.com/google/go-cmp v0.5.9 @@ -38,7 +38,7 @@ require ( ) require ( - ariga.io/atlas v0.13.2 // indirect + ariga.io/atlas v0.13.3 // indirect cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.2 // indirect @@ -46,7 +46,7 @@ require ( github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect - github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect + github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -74,7 +74,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect - github.com/hashicorp/hcl/v2 v2.17.0 // indirect + github.com/hashicorp/hcl/v2 v2.18.0 // indirect github.com/itchyny/gojq v0.12.13 // indirect github.com/itchyny/timefmt-go v0.1.5 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect @@ -101,7 +101,7 @@ require ( github.com/tidwall/pretty v1.2.1 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/yuin/gopher-lua v1.1.0 // indirect - github.com/zclconf/go-cty v1.13.3 // indirect + github.com/zclconf/go-cty v1.14.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect @@ -141,8 +141,8 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/TomOnTime/utfutil v0.0.0-20230223141146-125e65197b36 - github.com/antonmedv/expr v1.14.3 // indirect - github.com/aws/aws-sdk-go v1.44.331 // indirect + github.com/antonmedv/expr v1.15.0 // indirect + github.com/aws/aws-sdk-go v1.45.0 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/cjlapao/common-go v0.0.39 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -222,4 +222,3 @@ require ( sigs.k8s.io/yaml v1.3.0 // indirect ) -replace github.com/flanksource/duty => ../duty \ No newline at end of file diff --git a/go.sum b/go.sum index f29495742..d0b6b5e87 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ -ariga.io/atlas v0.13.1 h1:oSkEYgI3qUnQZ6b6+teAEiIuizjBvkZ4YDbz0XWfCdQ= -ariga.io/atlas v0.13.1/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= -ariga.io/atlas v0.13.2 h1:52uuqedNjRvuTLtpHJV2KNQve1iGmBNTibAPQTwpz80= -ariga.io/atlas v0.13.2/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= +ariga.io/atlas v0.13.3 h1:L6Zz/0/yo86flQclZwIqJEKrHoJxQOkj3Z2JPvzZEHE= +ariga.io/atlas v0.13.3/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -651,16 +649,15 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/andygrunwald/go-jira v1.16.0 h1:PU7C7Fkk5L96JvPc6vDVIrd99vdPnYudHu4ju2c2ikQ= github.com/andygrunwald/go-jira v1.16.0/go.mod h1:UQH4IBVxIYWbgagc0LF/k9FRs9xjIiQ8hIcC6HfLwFU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/antonmedv/expr v1.14.3 h1:GPrP7xKPWkFaLANPS7tPrgkNs7FMHpZdL72Dc5kFykg= -github.com/antonmedv/expr v1.14.3/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= +github.com/antonmedv/expr v1.15.0 h1:sBHNMx1i+b1lZfkBFGhicvSLW6RLnca3R0B7jWrk8iM= +github.com/antonmedv/expr v1.15.0/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= -github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= -github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= +github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= +github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc= @@ -670,10 +667,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.44.330 h1:kO41s8I4hRYtWSIuMc/O053wmEGfMTT8D4KtPSojUkA= -github.com/aws/aws-sdk-go v1.44.330/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go v1.44.331 h1:hEwdOTv6973uegCUY2EY8jyyq0OUg9INc0HOzcu2bjw= -github.com/aws/aws-sdk-go v1.44.331/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.45.0 h1:qoVOQHuLacxJMO71T49KeE70zm+Tk3vtrl7XO4VUPZc= +github.com/aws/aws-sdk-go v1.45.0/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -769,13 +764,9 @@ github.com/fergusstrange/embedded-postgres v1.23.0 h1:ZYRD89nammxQDWDi6taJE2CYjD github.com/fergusstrange/embedded-postgres v1.23.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874= github.com/flanksource/commons v1.11.0 h1:ThP3hnX4Xh4thxVl2GjQ92WvQ93jq5VqzJh46jbW23A= github.com/flanksource/commons v1.11.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.159 h1:CkSKDZ4HYQ7cSEy8ufg1WODqcghkWsiAlk2o4I4JkqY= -github.com/flanksource/duty v1.0.159/go.mod h1:RJ/kcZ7dbL8/52tem757szVIA3IomS8bOAZIK0xb4rk= -github.com/flanksource/duty v1.0.161 h1:fuEFH5A7kKAApxDP/FzFRaD2HuCrUJhwYjeuNxj/JVY= -github.com/flanksource/duty v1.0.161/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= -github.com/flanksource/gomplate/v3 v3.20.11 h1:B83fXI0dqlOii2QE+qPNI0blPlcZTmfnPPU8zwVGUtA= -github.com/flanksource/gomplate/v3 v3.20.11/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= +github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230705092916-3b4cf510c5fc/go.mod h1:4pQhmF+TnVqJroQKY8wSnSp+T18oLson6YQ2M0qPHfQ= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 h1:MHXg2Vo/oHB0rGLgsI0tkU9MGV7aDwqvO1lrbX7/shY= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= @@ -1041,10 +1032,9 @@ github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hashicorp/hcl/v2 v2.17.0 h1:z1XvSUyXd1HP10U4lrLg5e0JMVz6CPaJvAgxM0KNZVY= -github.com/hashicorp/hcl/v2 v2.17.0/go.mod h1:gJyW2PTShkJqQBKpAmPO3yxMxIuoXkOF2TpqXzrQyx4= +github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= +github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= @@ -1402,10 +1392,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -github.com/zclconf/go-cty v1.13.2 h1:4GvrUxe/QUDYuJKAav4EYqdM47/kZa672LwmXFmEKT0= -github.com/zclconf/go-cty v1.13.2/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= -github.com/zclconf/go-cty v1.13.3 h1:m+b9q3YDbg6Bec5rr+KGy1MzEVzY/jC2X+YX4yqKtHI= -github.com/zclconf/go-cty v1.13.3/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= +github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= +github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/etcd/api/v3 v3.5.5/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= @@ -2221,20 +2209,14 @@ k8s.io/api v0.24.2/go.mod h1:AHqbSkTm6YrQ0ObxjO3Pmp/ubFF/KuM7jU+3khoBsOg= k8s.io/api v0.26.4/go.mod h1:WwKEXU3R1rgCZ77AYa7DFksd9/BAIKyOmRlbVxgvjCk= k8s.io/api v0.28.0 h1:3j3VPWmN9tTDI68NETBWlDiA9qOiGJ7sdKeufehBYsM= k8s.io/api v0.28.0/go.mod h1:0l8NZJzB0i/etuWnIXcwfIv+xnDOhL3lLW919AWYDuY= -k8s.io/api v0.28.1 h1:i+0O8k2NPBCPYaMB+uCkseEbawEt/eFaiRqUx8aB108= -k8s.io/api v0.28.1/go.mod h1:uBYwID+66wiL28Kn2tBjBYQdEU0Xk0z5qF8bIBqk/Dg= k8s.io/apiextensions-apiserver v0.27.4 h1:ie1yZG4nY/wvFMIR2hXBeSVq+HfNzib60FjnBYtPGSs= k8s.io/apiextensions-apiserver v0.27.4/go.mod h1:KHZaDr5H9IbGEnSskEUp/DsdXe1hMQ7uzpQcYUFt2bM= k8s.io/apimachinery v0.24.2/go.mod h1:82Bi4sCzVBdpYjyI4jY6aHX+YCUchUIrZrXKedjd2UM= k8s.io/apimachinery v0.26.4/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= k8s.io/apimachinery v0.28.0 h1:ScHS2AG16UlYWk63r46oU3D5y54T53cVI5mMJwwqFNA= k8s.io/apimachinery v0.28.0/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= -k8s.io/apimachinery v0.28.1 h1:EJD40og3GizBSV3mkIoXQBsws32okPOy+MkRyzh6nPY= -k8s.io/apimachinery v0.28.1/go.mod h1:X0xh/chESs2hP9koe+SdIAcXWcQ+RM5hy0ZynB+yEvw= k8s.io/client-go v0.28.0 h1:ebcPRDZsCjpj62+cMk1eGNX1QkMdRmQ6lmz5BLoFWeM= k8s.io/client-go v0.28.0/go.mod h1:0Asy9Xt3U98RypWJmU1ZrRAGKhP6NqDPmptlAzK2kMc= -k8s.io/client-go v0.28.1 h1:pRhMzB8HyLfVwpngWKE8hDcXRqifh1ga2Z/PU9SXVK8= -k8s.io/client-go v0.28.1/go.mod h1:pEZA3FqOsVkCc07pFVzK076R+P/eXqsgx5zuuRWukNE= k8s.io/component-base v0.27.4 h1:Wqc0jMKEDGjKXdae8hBXeskRP//vu1m6ypC+gwErj4c= k8s.io/component-base v0.27.4/go.mod h1:hoiEETnLc0ioLv6WPeDt8vD34DDeB35MfQnxCARq3kY= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= diff --git a/hack/generate-schemas/go.mod b/hack/generate-schemas/go.mod index 4b0ed3bea..69312a86f 100644 --- a/hack/generate-schemas/go.mod +++ b/hack/generate-schemas/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - ariga.io/atlas v0.13.2 // indirect + ariga.io/atlas v0.13.3 // indirect cloud.google.com/go v0.110.7 // indirect cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect @@ -20,15 +20,15 @@ require ( github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect - github.com/antonmedv/expr v1.14.3 // indirect - github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect - github.com/aws/aws-sdk-go v1.44.331 // indirect + github.com/antonmedv/expr v1.15.0 // indirect + github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect + github.com/aws/aws-sdk-go v1.45.0 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/flanksource/duty v1.0.161 // indirect - github.com/flanksource/gomplate/v3 v3.20.11 // indirect + github.com/flanksource/duty v1.0.166 // indirect + github.com/flanksource/gomplate/v3 v3.20.12 // indirect github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 // indirect github.com/flanksource/mapstructure v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -57,7 +57,7 @@ require ( github.com/hashicorp/go-getter v1.7.2 // indirect github.com/hashicorp/go-safetemp v1.0.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect - github.com/hashicorp/hcl/v2 v2.17.0 // indirect + github.com/hashicorp/hcl/v2 v2.18.0 // indirect github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/itchyny/gojq v0.12.13 // indirect @@ -103,7 +103,7 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/yuin/gopher-lua v1.1.0 // indirect - github.com/zclconf/go-cty v1.13.3 // indirect + github.com/zclconf/go-cty v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.25.0 // indirect diff --git a/hack/generate-schemas/go.sum b/hack/generate-schemas/go.sum index c0dba5106..3d68e308a 100644 --- a/hack/generate-schemas/go.sum +++ b/hack/generate-schemas/go.sum @@ -1,5 +1,5 @@ -ariga.io/atlas v0.13.2 h1:52uuqedNjRvuTLtpHJV2KNQve1iGmBNTibAPQTwpz80= -ariga.io/atlas v0.13.2/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= +ariga.io/atlas v0.13.3 h1:L6Zz/0/yo86flQclZwIqJEKrHoJxQOkj3Z2JPvzZEHE= +ariga.io/atlas v0.13.3/go.mod h1:+TR129FJZ5Lvzms6dvCeGWh1yR6hMvmXBhug4hrNIGk= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -629,18 +629,18 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/antonmedv/expr v1.14.3 h1:GPrP7xKPWkFaLANPS7tPrgkNs7FMHpZdL72Dc5kFykg= -github.com/antonmedv/expr v1.14.3/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= +github.com/antonmedv/expr v1.15.0 h1:sBHNMx1i+b1lZfkBFGhicvSLW6RLnca3R0B7jWrk8iM= +github.com/antonmedv/expr v1.15.0/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= -github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= -github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= +github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= +github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.44.331 h1:hEwdOTv6973uegCUY2EY8jyyq0OUg9INc0HOzcu2bjw= -github.com/aws/aws-sdk-go v1.44.331/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.45.0 h1:qoVOQHuLacxJMO71T49KeE70zm+Tk3vtrl7XO4VUPZc= +github.com/aws/aws-sdk-go v1.45.0/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d/go.mod h1:6QX/PXZ00z/TKoufEY6K/a0k6AhaJrQKdFe6OfVXsa4= @@ -704,11 +704,11 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fergusstrange/embedded-postgres v1.23.0 h1:ZYRD89nammxQDWDi6taJE2CYjDuAoVc1TpEqRIYQryc= github.com/flanksource/commons v1.11.0 h1:ThP3hnX4Xh4thxVl2GjQ92WvQ93jq5VqzJh46jbW23A= github.com/flanksource/commons v1.11.0/go.mod h1:zYEhi6E2+diQ+loVcROUHo/Bgv+Tn61W2NYmrb5MgVI= -github.com/flanksource/duty v1.0.161 h1:fuEFH5A7kKAApxDP/FzFRaD2HuCrUJhwYjeuNxj/JVY= -github.com/flanksource/duty v1.0.161/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= +github.com/flanksource/duty v1.0.166 h1:R20ksqEKgnk+Q8RCOWpLIhLx5ofYd1kdUAfxDxJRsC0= +github.com/flanksource/duty v1.0.166/go.mod h1:C3eT1PfdqTdefpGRDfUzLDVjSKuYjqZbgbIqX757FbA= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= -github.com/flanksource/gomplate/v3 v3.20.11 h1:B83fXI0dqlOii2QE+qPNI0blPlcZTmfnPPU8zwVGUtA= -github.com/flanksource/gomplate/v3 v3.20.11/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= +github.com/flanksource/gomplate/v3 v3.20.12 h1:SLo8eLaYkUTizHIuntZ4LxxLzbRfV0NvC6DTpu9fj94= +github.com/flanksource/gomplate/v3 v3.20.12/go.mod h1:1N1aptaAo0XUaGsyU5CWiwn9GMRpbIKX1AdsypfmZYo= github.com/flanksource/is-healthy v0.0.0-20230705092916-3b4cf510c5fc/go.mod h1:4pQhmF+TnVqJroQKY8wSnSp+T18oLson6YQ2M0qPHfQ= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37 h1:MHXg2Vo/oHB0rGLgsI0tkU9MGV7aDwqvO1lrbX7/shY= github.com/flanksource/is-healthy v0.0.0-20230713150444-ad2a5ef4bb37/go.mod h1:BH5gh9JyEAuuWVP6Q5y9h43VozS0RfKyjNpM9L4v4hw= @@ -904,8 +904,8 @@ github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mO github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/hcl/v2 v2.17.0 h1:z1XvSUyXd1HP10U4lrLg5e0JMVz6CPaJvAgxM0KNZVY= -github.com/hashicorp/hcl/v2 v2.17.0/go.mod h1:gJyW2PTShkJqQBKpAmPO3yxMxIuoXkOF2TpqXzrQyx4= +github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8= +github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 h1:i462o439ZjprVSFSZLZxcsoAe592sZB1rci2Z8j4wdk= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= @@ -1128,8 +1128,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -github.com/zclconf/go-cty v1.13.3 h1:m+b9q3YDbg6Bec5rr+KGy1MzEVzY/jC2X+YX4yqKtHI= -github.com/zclconf/go-cty v1.13.3/go.mod h1:YKQzy/7pZ7iq2jNFzy5go57xdxdWoLLpaEp4u238AE0= +github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc= +github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/notification/notification_test.go b/notification/notification_test.go index 2f9f2760b..afa5c028f 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -104,14 +104,14 @@ var _ = ginkgo.Describe("Test Notification on incident creation", ginkgo.Ordered }) ginkgo.It("should consume the event and send the notification", func() { - notifHandler := events.NewNotificationConsumerSync(db.Gorm, db.Pool) - sendHandler := events.NewNotificationSendConsumerAsync(db.Gorm, db.Pool) + notificationHandler := events.NewNotificationSaveConsumerSync().EventConsumer(db.Gorm, db.Pool) + sendHandler := events.NewNotificationSendConsumerAsync().EventConsumer(db.Gorm, db.Pool) ctx := api.NewContext(db.Gorm, nil) // Order of consumption is important as incident.create event // produces a notification.send event - notifHandler.ConsumeEventsUntilEmpty(ctx) + notificationHandler.ConsumeEventsUntilEmpty(ctx) sendHandler.ConsumeEventsUntilEmpty(ctx) Expect(webhookPostdata).To(Not(BeNil())) diff --git a/playbook/playbook_test.go b/playbook/playbook_test.go index 2efe34058..07fab4a69 100644 --- a/playbook/playbook_test.go +++ b/playbook/playbook_test.go @@ -13,6 +13,8 @@ import ( "github.com/flanksource/duty/models" "github.com/flanksource/incident-commander/api" v1 "github.com/flanksource/incident-commander/api/v1" + "github.com/flanksource/incident-commander/events" + "github.com/flanksource/incident-commander/events/eventconsumer" ginkgo "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gorm.io/gorm/clause" @@ -33,9 +35,12 @@ var _ = ginkgo.Describe("Playbook runner", ginkgo.Ordered, func() { }) ginkgo.It("start the queue consumer in background", func() { - go StartPlaybookRunConsumer(testDB, testDBPool) + go eventconsumer.New(testDB, testDBPool, "playbook_run_updates", EventConsumer). + WithNumConsumers(5). + WithNotifyTimeout(time.Second * 2). + Listen() - go ListenPlaybookPGNotify(testDB, testDBPool) + go events.StartConsumers(testDB, testDBPool, events.Config{}) }) ginkgo.It("should create a new playbook", func() { @@ -278,7 +283,7 @@ var _ = ginkgo.Describe("Playbook runner", ginkgo.Ordered, func() { } attempts += 1 - if attempts > 20 { // wait for 2 seconds + if attempts > 50 { // wait for 5 seconds ginkgo.Fail(fmt.Sprintf("Timed out waiting for run to complete. Run status: %s", updatedRun.Status)) } } diff --git a/playbook/run_consumer.go b/playbook/run_consumer.go index 3114a054a..0ce62588c 100644 --- a/playbook/run_consumer.go +++ b/playbook/run_consumer.go @@ -16,7 +16,7 @@ func StartPlaybookRunConsumer(db *gorm.DB, pool *pgxpool.Pool) { Listen() } -func EventConsumer(ctx *api.Context, batchSize int) error { +func EventConsumer(ctx *api.Context) error { tx := ctx.DB().Begin() if tx.Error != nil { return fmt.Errorf("error initiating db tx: %w", tx.Error) @@ -31,12 +31,11 @@ func EventConsumer(ctx *api.Context, batchSize int) error { WHERE status = ? AND start_time <= NOW() ORDER BY start_time - LIMIT ? FOR UPDATE SKIP LOCKED ` var runs []models.PlaybookRun - if err := tx.Raw(query, models.PlaybookRunStatusScheduled, batchSize).Find(&runs).Error; err != nil { + if err := tx.Raw(query, models.PlaybookRunStatusScheduled).Find(&runs).Error; err != nil { return err }