Skip to content

Commit

Permalink
refactor: move event consumer to a separate package to prevent import
Browse files Browse the repository at this point in the history
cycle
  • Loading branch information
adityathebe committed Aug 30, 2023
1 parent ae54f01 commit efcd952
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 16 deletions.
5 changes: 3 additions & 2 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/upstream"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -91,7 +92,7 @@ func StartConsumers(gormDB *gorm.DB, pgpool *pgxpool.Pool, config Config) {
}
}

allConsumers := []*EventConsumer{
allConsumers := []*eventconsumer.EventConsumer{
NewTeamConsumer(gormDB, pgpool),
NewNotificationConsumer(gormDB, pgpool),
NewNotificationSendConsumer(gormDB, pgpool),
Expand All @@ -108,7 +109,7 @@ func StartConsumers(gormDB *gorm.DB, pgpool *pgxpool.Pool, config Config) {

// newEventQueueConsumerFunc returns a new event consumer for the `event_queue` table
// based on the given watch events and process batch function.
func newEventQueueConsumerFunc(watchEvents []string, processBatchFunc ProcessBatchFunc) EventConsumerFunc {
func newEventQueueConsumerFunc(watchEvents []string, processBatchFunc ProcessBatchFunc) eventconsumer.EventConsumerFunc {
return func(ctx *api.Context, batchSize int) error {
tx := ctx.DB().Begin()
if tx.Error != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package eventconsumer

import (
"fmt"
Expand Down Expand Up @@ -44,7 +44,8 @@ type EventConsumer struct {
consumerFunc EventConsumerFunc
}

func NewEventConsumer(DB *gorm.DB, PGPool *pgxpool.Pool, PgNotifyChannel string, ConsumerFunc EventConsumerFunc) *EventConsumer {
// New returns a new EventConsumer
func New(DB *gorm.DB, PGPool *pgxpool.Pool, PgNotifyChannel string, ConsumerFunc EventConsumerFunc) *EventConsumer {
return &EventConsumer{
batchSize: 1,
numConsumers: 1,
Expand Down
9 changes: 5 additions & 4 deletions events/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
"github.com/flanksource/commons/template"
"github.com/flanksource/duty/models"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
pkgNotification "github.com/flanksource/incident-commander/notification"
"github.com/flanksource/incident-commander/teams"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

func NewNotificationConsumer(db *gorm.DB, pool *pgxpool.Pool) *EventConsumer {
return NewEventConsumer(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["notification"], processNotificationEvents))
func NewNotificationConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["notification"], processNotificationEvents))
}

func NewNotificationSendConsumer(db *gorm.DB, pool *pgxpool.Pool) *EventConsumer {
return NewEventConsumer(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["notification_send"], processNotificationEvents)).
func NewNotificationSendConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["notification_send"], processNotificationEvents)).
WithNumConsumers(5)
}

Expand Down
5 changes: 3 additions & 2 deletions events/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (

"github.com/flanksource/commons/logger"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
pkgResponder "github.com/flanksource/incident-commander/responder"
"github.com/jackc/pgx/v5/pgxpool"
)

func NewResponderConsumer(db *gorm.DB, pool *pgxpool.Pool) *EventConsumer {
return NewEventConsumer(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["responder"], processResponderEvents))
func NewResponderConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["responder"], processResponderEvents))
}

func processResponderEvents(ctx *api.Context, events []api.Event) []api.Event {
Expand Down
5 changes: 3 additions & 2 deletions events/teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (

"github.com/flanksource/commons/logger"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/flanksource/incident-commander/responder"
"github.com/flanksource/incident-commander/teams"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

func NewTeamConsumer(db *gorm.DB, pool *pgxpool.Pool) *EventConsumer {
return NewEventConsumer(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["team"], processTeamEvents))
func NewTeamConsumer(db *gorm.DB, pool *pgxpool.Pool) *eventconsumer.EventConsumer {
return eventconsumer.New(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["team"], processTeamEvents))
}

func processTeamEvents(ctx *api.Context, events []api.Event) []api.Event {
Expand Down
5 changes: 3 additions & 2 deletions events/upstream_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/upstream"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

var upstreamPushEventHandler *pushToUpstreamEventHandler

func NewUpstreamPushConsumer(db *gorm.DB, pool *pgxpool.Pool, config Config) *EventConsumer {
func NewUpstreamPushConsumer(db *gorm.DB, pool *pgxpool.Pool, config Config) *eventconsumer.EventConsumer {
if config.UpstreamPush.Valid() {
upstreamPushEventHandler = newPushToUpstreamEventHandler(config.UpstreamPush)
}

return NewEventConsumer(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["push_queue"], handleUpstreamPushEvents)).
return eventconsumer.New(db, pool, "event_queue_updates", newEventQueueConsumerFunc(consumerWatchEvents["push_queue"], handleUpstreamPushEvents)).
WithBatchSize(50).
WithNumConsumers(5)
}
Expand Down
4 changes: 2 additions & 2 deletions playbook/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (

"github.com/flanksource/duty/models"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/events"
"github.com/flanksource/incident-commander/events/eventconsumer"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
)

func StartPlaybookRunConsumer(db *gorm.DB, pool *pgxpool.Pool) {
events.NewEventConsumer(db, pool, "playbook_run_updates", EventConsumer).
eventconsumer.New(db, pool, "playbook_run_updates", EventConsumer).
WithNumConsumers(5).
Listen()
}
Expand Down

0 comments on commit efcd952

Please sign in to comment.