Skip to content

Commit

Permalink
Merge branch 'main' into feat/playbook-events
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 1, 2023
2 parents f1a1934 + c371eaf commit 03c4c53
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 294 deletions.
14 changes: 5 additions & 9 deletions events/checks.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package events

import (
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

func NewCheckConsumerSync(db *gorm.DB, pgPool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pgPool, eventQueueUpdateChannel,
newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["check"], addNotificationEvent, SavePlaybookRun))
func NewCheckConsumerSync() SyncEventConsumer {
return SyncEventConsumer{
watchEvents: []string{EventCheckPassed, EventCheckFailed},
consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun},
}
}
20 changes: 11 additions & 9 deletions events/components.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package events

import (
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

func NewComponentConsumerSync(db *gorm.DB, pgPool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pgPool, eventQueueUpdateChannel,
newEventQueueSyncConsumerFunc(syncConsumerWatchEvents["component"], addNotificationEvent, SavePlaybookRun))
func NewComponentConsumerSync() SyncEventConsumer {
return SyncEventConsumer{
watchEvents: []string{
EventComponentStatusError,
EventComponentStatusHealthy,
EventComponentStatusInfo,
EventComponentStatusUnhealthy,
EventComponentStatusWarning,
},
consumers: []SyncEventHandlerFunc{addNotificationEvent, schedulePlaybookRun},
}
}
238 changes: 96 additions & 142 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"time"

"github.com/flanksource/commons/collections/set"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/upstream"
"github.com/flanksource/incident-commander/api"
Expand All @@ -23,11 +22,11 @@ const (
)

type (
// AsyncEventHandler processes multiple events and returns the failed ones
AsyncEventHandler func(*api.Context, []api.Event) []api.Event
// AsyncEventHandlerFunc processes multiple events and returns the failed ones
AsyncEventHandlerFunc func(*api.Context, []api.Event) []api.Event

// SyncEventHandler processes a single event and ONLY makes db changes.
SyncEventHandler func(*api.Context, api.Event) error
// SyncEventHandlerFunc processes a single event and ONLY makes db changes.
SyncEventHandlerFunc func(*api.Context, api.Event) error
)

// List of all sync events in the `event_queue` table.
Expand All @@ -49,6 +48,10 @@ const (
EventNotificationUpdate = "notification.update"
EventNotificationDelete = "notification.delete"

EventPlaybookSpecApprovalUpdated = "playbook.spec.approval.updated"

EventPlaybookApprovalInserted = "playbook.approval.inserted"

EventIncidentCommentAdded = "incident.comment.added"
EventIncidentCreated = "incident.created"
EventIncidentDODAdded = "incident.dod.added"
Expand Down Expand Up @@ -88,88 +91,50 @@ type Config struct {
UpstreamPush upstream.UpstreamConfig
}

// syncConsumerWatchEvents keeps a registry of all the event_queue consumer and the events they watch.
// This helps in ensuring that a single event is not being consumed by multiple consumers.
var syncConsumerWatchEvents = map[string][]string{
"team": {EventTeamUpdate, EventTeamDelete},
"check": {EventCheckPassed, EventCheckFailed},
"component": {
EventComponentStatusError,
EventComponentStatusHealthy,
EventComponentStatusInfo,
EventComponentStatusUnhealthy,
EventComponentStatusWarning,
},
"incident.responder": {EventIncidentResponderAdded},
"incident.comment": {EventIncidentCommentAdded},
"notification_update": {
EventNotificationUpdate, EventNotificationDelete,
},
"notification_add": {
EventIncidentCreated,
EventIncidentDODAdded,
EventIncidentDODPassed,
EventIncidentDODRegressed,
EventIncidentResponderRemoved,
EventIncidentStatusCancelled,
EventIncidentStatusClosed,
EventIncidentStatusInvestigating,
EventIncidentStatusMitigated,
EventIncidentStatusOpen,
EventIncidentStatusResolved,
},
}

var asyncConsumerWatchEvents = map[string][]string{
"push_queue": {EventPushQueueCreate},
"notification_send": {EventNotificationSend},
"incident.responder": {
EventJiraResponderAdded, EventMSPlannerResponderAdded,
EventMSPlannerCommentAdded, EventJiraCommentAdded,
},
}

func StartConsumers(gormDB *gorm.DB, pgpool *pgxpool.Pool, config Config) {
uniqWatchEvents := set.New[string]()
for _, v := range syncConsumerWatchEvents {
for _, e := range v {
if uniqWatchEvents.Contains(e) {
logger.Fatalf("Error starting consumers: event[%s] has multiple consumers", e)
}

uniqWatchEvents.Add(e)
}
uniqEvents := make(map[string]struct{})
allSyncHandlers := []SyncEventConsumer{
NewTeamConsumerSync(),
NewCheckConsumerSync(),
NewComponentConsumerSync(),
NewResponderConsumerSync(),
NewCommentConsumerSync(),
NewNotificationSaveConsumerSync(),
NewNotificationUpdatesConsumerSync(),
NewPlaybookApprovalConsumerSync(),
NewPlaybookApprovalSpecUpdatedConsumerSync(),
}

for _, v := range asyncConsumerWatchEvents {
for _, e := range v {
if uniqWatchEvents.Contains(e) {
logger.Fatalf("Error starting consumers: event[%s] has multiple consumers", e)
for i := range allSyncHandlers {
for _, event := range allSyncHandlers[i].watchEvents {
if _, ok := uniqEvents[event]; ok {
logger.Fatalf("Watch event %s is duplicated", event)
}

uniqWatchEvents.Add(e)
uniqEvents[event] = struct{}{}
}

go allSyncHandlers[i].EventConsumer(gormDB, pgpool).Listen()
}

allConsumers := []*eventconsumer.EventConsumer{
NewTeamConsumerSync(gormDB, pgpool),
NewCheckConsumerSync(gormDB, pgpool),
NewComponentConsumerSync(gormDB, pgpool),
NewResponderConsumerSync(gormDB, pgpool),
NewCommentConsumerSync(gormDB, pgpool),
NewNotificationConsumerSync(gormDB, pgpool),
NewNotificationUpdatesConsumerSync(gormDB, pgpool),

// Async consumers
NewNotificationSendConsumerAsync(gormDB, pgpool),
NewResponderConsumerAsync(gormDB, pgpool),
asyncConsumers := []AsyncEventConsumer{
NewNotificationSendConsumerAsync(),
NewResponderConsumerAsync(),
}
if config.UpstreamPush.Valid() {
allConsumers = append(allConsumers, NewUpstreamPushConsumerAsync(gormDB, pgpool, config))
asyncConsumers = append(asyncConsumers, NewUpstreamPushConsumerAsync(config))
}

for i := range allConsumers {
go allConsumers[i].Listen()
for i := range asyncConsumers {
for _, event := range asyncConsumers[i].watchEvents {
if _, ok := uniqEvents[event]; ok {
logger.Fatalf("Watch event %s is duplicated", event)
}

uniqEvents[event] = struct{}{}
}

go asyncConsumers[i].EventConsumer(gormDB, pgpool).Listen()
}
}

Expand Down Expand Up @@ -206,96 +171,74 @@ func fetchEvents(ctx *api.Context, watchEvents []string, batchSize int) ([]api.E
return events, nil
}

// newEventQueueAsyncConsumerFunc returns a new event consumer for the `watchEvents` events in the `event_queue` table.
func newEventQueueAsyncConsumerFunc(watchEvents []string, handleEventsAsync AsyncEventHandler) eventconsumer.EventConsumerFunc {
return func(ctx *api.Context, batchSize int) error {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx)

events, err := fetchEvents(ctx, watchEvents, batchSize)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
}

if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
}

failedEvents := handleEventsAsync(ctx, events)
saveFailedEvents(tx, failedEvents)
type SyncEventConsumer struct {
watchEvents []string
consumers []SyncEventHandlerFunc
numConsumers int
}

return tx.Commit().Error
func (t SyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle)
if t.numConsumers > 0 {
consumer = consumer.WithNumConsumers(t.numConsumers)
}
}

// newEventQueueConsumerFunc returns a new sync event consumer for the `watchEvents` events in the `event_queue` table.
func newEventQueueSyncConsumerFunc(watchEvents []string, syncConsumers ...SyncEventHandler) eventconsumer.EventConsumerFunc {
return func(ctx *api.Context, batchSize int) error {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()
return consumer
}

events, err := fetchEvents(ctx.WithDB(tx), watchEvents, batchSize)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
}
func (t *SyncEventConsumer) Handle(ctx *api.Context) error {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
}
ctx = ctx.WithDB(tx)

failedEvents := handleEventsSync(ctx, events, syncConsumers)
saveFailedEvents(tx, failedEvents)
events, err := fetchEvents(ctx, t.watchEvents, 1)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
}

return tx.Commit().Error
if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
}
}

// handleEventsSync runs all the sync consumers for the given events.
//
// Each event is handled in isolation (in a separate transaction).
func handleEventsSync(ctx *api.Context, events []api.Event, syncConsumers []SyncEventHandler) []api.Event {
failedEvents := make([]api.Event, 0, len(events))
for i := range events {
if err := processSyncConsumers(ctx, events[i], syncConsumers); err != nil {
logger.Errorf("Failed to process event[%s]: %s", events[i].ID, err.Error())
events[i].Error = err.Error()
failedEvents = append(failedEvents, events[i])
for _, syncConsumer := range t.consumers {
if err := syncConsumer(ctx, events[0]); err != nil {
return fmt.Errorf("error processing sync consumer: %w", err)
}
}

return failedEvents
return tx.Commit().Error
}

// processSyncConsumers runs all the sync consumers for the given event.
//
// All of the consumers are expected to succeed.
// Returns error even if a single consumer fails and any changes are rolled back.
func processSyncConsumers(ctx *api.Context, event api.Event, syncConsumers []SyncEventHandler) error {
type AsyncEventConsumer struct {
watchEvents []string
batchSize int
consumer AsyncEventHandlerFunc
numConsumers int
}

func (t *AsyncEventConsumer) Handle(ctx *api.Context) error {
tx := ctx.DB().Begin()
if tx.Error != nil {
return fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

for _, syncConsumer := range syncConsumers {
if err := syncConsumer(ctx, event); err != nil {
return fmt.Errorf("error processing sync consumer: %w", err)
}
ctx = ctx.WithDB(tx)

events, err := fetchEvents(ctx, t.watchEvents, t.batchSize)
if err != nil {
return fmt.Errorf("error fetching events: %w", err)
}

return tx.Commit().Error
}
if len(events) == 0 {
return api.Errorf(api.ENOTFOUND, "No events found")
}

// saveFailedEvents saves failed events back to the `event_queue` table.
func saveFailedEvents(tx *gorm.DB, failedEvents []api.Event) {
failedEvents := t.consumer(ctx, events)
lastAttempt := time.Now()

for i := range failedEvents {
Expand All @@ -311,4 +254,15 @@ func saveFailedEvents(tx *gorm.DB, failedEvents []api.Event) {
logger.Errorf("Error inserting into table:event_queue with error:%v. %v", err)
}
}

return tx.Commit().Error
}

func (t AsyncEventConsumer) EventConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
consumer := eventconsumer.New(db, pool, eventQueueUpdateChannel, t.Handle)
if t.numConsumers > 0 {
consumer = consumer.WithNumConsumers(t.numConsumers)
}

return consumer
}
Loading

0 comments on commit 03c4c53

Please sign in to comment.