From 2116d63abfbb8462ad2caa255821ae21e8d7101c Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 25 Oct 2024 19:15:57 +0545 Subject: [PATCH] feat: trim down on extraneous calls to ConsumeUntilEmpty in event consumers --- events/event_queue.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/events/event_queue.go b/events/event_queue.go index e93d080f9..eb76b4846 100644 --- a/events/event_queue.go +++ b/events/event_queue.go @@ -27,9 +27,7 @@ const ( DefaultEventLogSize = 20 ) -type Handler func(ctx context.Context, e models.Event) error - -var SyncHandlers = utils.SyncedMap[string, func(ctx context.Context, e models.Event) error]{} +var SyncHandlers = utils.SyncedMap[string, postq.SyncEventHandlerFunc]{} var AsyncHandlers = utils.SyncedMap[string, asyncHandlerData]{} var consumers []*postq.PGConsumer @@ -49,7 +47,7 @@ func RegisterAsyncHandler(fn func(ctx context.Context, e models.Events) models.E } } -func RegisterSyncHandler(fn Handler, events ...string) { +func RegisterSyncHandler(fn postq.SyncEventHandlerFunc, events ...string) { for _, event := range events { SyncHandlers.Append(event, fn) } @@ -74,11 +72,11 @@ func StartConsumers(ctx context.Context) { notifyRouter := pg.NewNotifyRouter() go notifyRouter.Run(ctx, eventQueueUpdateChannel) - SyncHandlers.Each(func(event string, handlers []func(ctx context.Context, e models.Event) error) { + SyncHandlers.Each(func(event string, handlers []postq.SyncEventHandlerFunc) { log.Tracef("Registering %d sync event handlers for %s", len(handlers), event) consumer := postq.SyncEventConsumer{ WatchEvents: []string{event}, - Consumers: postq.SyncHandlers(handlers...), + Consumers: handlers, ConsumerOption: &postq.ConsumerOption{ ErrorHandler: defaultLoggerErrorHandler, }, @@ -87,7 +85,7 @@ func StartConsumers(ctx context.Context) { if ec, err := consumer.EventConsumer(); err != nil { log.Fatalf("failed to create event consumer: %s", err) } else { - pgsyncNotifyChannel := notifyRouter.GetOrCreateChannel(event) + pgsyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(0, event) consumers = append(consumers, ec) go ec.Listen(ctx, pgsyncNotifyChannel) } @@ -124,7 +122,7 @@ func StartConsumers(ctx context.Context) { if ec, err := consumer.EventConsumer(); err != nil { log.Fatalf("failed to create event consumer: %s", err) } else { - pgasyncNotifyChannel := notifyRouter.GetOrCreateChannel(event) + pgasyncNotifyChannel := notifyRouter.GetOrCreateBufferedChannel(handler.numConsumers, event) consumers = append(consumers, ec) go ec.Listen(ctx, pgasyncNotifyChannel) }