Skip to content

Commit

Permalink
feat: trim down on extraneous calls to ConsumeUntilEmpty in event
Browse files Browse the repository at this point in the history
consumers
  • Loading branch information
adityathebe committed Oct 25, 2024
1 parent 6431e39 commit 0537818
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
14 changes: 6 additions & 8 deletions events/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ const (
DefaultEventLogSize = 20
)

type Handler func(ctx context.Context, e models.Event) error

var SyncHandlers = utils.SyncedMap[string, func(ctx context.Context, e models.Event) error]{}
var SyncHandlers = utils.SyncedMap[string, postq.SyncEventHandlerFunc]{}
var AsyncHandlers = utils.SyncedMap[string, asyncHandlerData]{}

var consumers []*postq.PGConsumer
Expand All @@ -49,7 +47,7 @@ func RegisterAsyncHandler(fn func(ctx context.Context, e models.Events) models.E
}
}

func RegisterSyncHandler(fn Handler, events ...string) {
func RegisterSyncHandler(fn postq.SyncEventHandlerFunc, events ...string) {
for _, event := range events {
SyncHandlers.Append(event, fn)
}
Expand All @@ -74,11 +72,11 @@ func StartConsumers(ctx context.Context) {
notifyRouter := pg.NewNotifyRouter()
go notifyRouter.Run(ctx, eventQueueUpdateChannel)

SyncHandlers.Each(func(event string, handlers []func(ctx context.Context, e models.Event) error) {
SyncHandlers.Each(func(event string, handlers []postq.SyncEventHandlerFunc) {
log.Tracef("Registering %d sync event handlers for %s", len(handlers), event)
consumer := postq.SyncEventConsumer{
WatchEvents: []string{event},
Consumers: postq.SyncHandlers(handlers...),
Consumers: handlers,
ConsumerOption: &postq.ConsumerOption{
ErrorHandler: defaultLoggerErrorHandler,
},
Expand All @@ -87,7 +85,7 @@ func StartConsumers(ctx context.Context) {
if ec, err := consumer.EventConsumer(); err != nil {
log.Fatalf("failed to create event consumer: %s", err)
} else {
pgsyncNotifyChannel := notifyRouter.GetOrCreateChannel(event)
pgsyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(0, event)
consumers = append(consumers, ec)
go ec.Listen(ctx, pgsyncNotifyChannel)
}
Expand Down Expand Up @@ -124,7 +122,7 @@ func StartConsumers(ctx context.Context) {
if ec, err := consumer.EventConsumer(); err != nil {
log.Fatalf("failed to create event consumer: %s", err)
} else {
pgasyncNotifyChannel := notifyRouter.GetOrCreateChannel(event)
pgasyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(handler.numConsumers, event)
consumers = append(consumers, ec)
go ec.Listen(ctx, pgasyncNotifyChannel)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ require (
sigs.k8s.io/yaml v1.4.0
)

// replace github.com/flanksource/commons => /Users/moshe/go/src/github.com/flanksource/commons

// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/gomplate/v3 => ../gomplate
Expand Down

0 comments on commit 0537818

Please sign in to comment.