diff --git a/events/event_queue.go b/events/event_queue.go index e93d080f9..eb76b4846 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -27,9 +27,7 @@ const ( DefaultEventLogSize = 20 ) -type Handler func(ctx context.Context, e models.Event) error - -var SyncHandlers = utils.SyncedMap[string, func(ctx context.Context, e models.Event) error]{} +var SyncHandlers = utils.SyncedMap[string, postq.SyncEventHandlerFunc]{} var AsyncHandlers = utils.SyncedMap[string, asyncHandlerData]{} var consumers []*postq.PGConsumer @@ -49,7 +47,7 @@ func RegisterAsyncHandler(fn func(ctx context.Context, e models.Events) models.E } } -func RegisterSyncHandler(fn Handler, events ...string) { +func RegisterSyncHandler(fn postq.SyncEventHandlerFunc, events ...string) { for _, event := range events { SyncHandlers.Append(event, fn) } @@ -74,11 +72,11 @@ func StartConsumers(ctx context.Context) { notifyRouter := pg.NewNotifyRouter() go notifyRouter.Run(ctx, eventQueueUpdateChannel) - SyncHandlers.Each(func(event string, handlers []func(ctx context.Context, e models.Event) error) { + SyncHandlers.Each(func(event string, handlers []postq.SyncEventHandlerFunc) { log.Tracef("Registering %d sync event handlers for %s", len(handlers), event) consumer := postq.SyncEventConsumer{ WatchEvents: []string{event}, - Consumers: postq.SyncHandlers(handlers...), + Consumers: handlers, ConsumerOption: &postq.ConsumerOption{ ErrorHandler: defaultLoggerErrorHandler, }, @@ -87,7 +85,7 @@ func StartConsumers(ctx context.Context) { if ec, err := consumer.EventConsumer(); err != nil { log.Fatalf("failed to create event consumer: %s", err) } else { - pgsyncNotifyChannel := notifyRouter.GetOrCreateChannel(event) + pgsyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(0, event) consumers = append(consumers, ec) go ec.Listen(ctx, pgsyncNotifyChannel) } @@ -124,7 +122,7 @@ func StartConsumers(ctx context.Context) { if ec, err := consumer.EventConsumer(); err != nil { log.Fatalf("failed to create event consumer: %s", err) } else { - pgasyncNotifyChannel := notifyRouter.GetOrCreateChannel(event) + pgasyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(handler.numConsumers, event) consumers = append(consumers, ec) go ec.Listen(ctx, pgasyncNotifyChannel) } diff --git a/go.mod b/go.mod index e89430b3a..59a7b4b02 100644 --- a/go.mod +++ b/go.mod @@ -359,6 +359,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +// replace github.com/flanksource/commons => /Users/moshe/go/src/github.com/flanksource/commons + // replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/gomplate/v3 => ../gomplate