From 36fc36ed15fb1f552d5f723c3d5ce2ef061443a8 Mon Sep 17 00:00:00 2001 From: Surya V Date: Fri, 29 Sep 2023 10:25:35 +0530 Subject: [PATCH 1/5] Added capability to return true/false if an event has been dispatched by the dispatch method in the central evening logic. This helps to extend the concrete implementations to log details like sqs message attributes only when dispatched for correlation. Signed-off-by: Surya V --- eventsources/common/webhook/webhook.go | 6 +++--- eventsources/eventing.go | 20 +++++++++---------- eventsources/sources/amqp/start.go | 6 +++--- eventsources/sources/awssns/start.go | 2 +- eventsources/sources/awssqs/start.go | 11 +++++++--- eventsources/sources/azureeventshub/start.go | 4 ++-- .../sources/azurequeuestorage/start.go | 6 +++--- eventsources/sources/azureservicebus/start.go | 6 +++--- eventsources/sources/bitbucket/start.go | 2 +- eventsources/sources/bitbucketserver/start.go | 2 +- eventsources/sources/calendar/start.go | 4 ++-- eventsources/sources/emitter/start.go | 4 ++-- eventsources/sources/file/start.go | 10 +++++----- eventsources/sources/gcppubsub/start.go | 4 ++-- eventsources/sources/generic/start.go | 6 +++--- eventsources/sources/github/start.go | 2 +- eventsources/sources/gitlab/start.go | 2 +- eventsources/sources/hdfs/start.go | 6 +++--- eventsources/sources/kafka/start.go | 12 +++++------ eventsources/sources/minio/start.go | 6 +++--- eventsources/sources/mqtt/start.go | 4 ++-- eventsources/sources/nats/start.go | 4 ++-- eventsources/sources/nsq/start.go | 6 +++--- eventsources/sources/pulsar/start.go | 6 +++--- eventsources/sources/redis/start.go | 6 +++--- eventsources/sources/redis_stream/start.go | 6 +++--- eventsources/sources/resource/start.go | 4 ++-- eventsources/sources/sftp/start.go | 6 +++--- eventsources/sources/slack/start.go | 2 +- eventsources/sources/storagegrid/start.go | 2 +- eventsources/sources/stripe/start.go | 2 +- eventsources/sources/webhook/start.go | 2 +- 32 files changed, 88 insertions(+), 83 deletions(-) diff --git a/eventsources/common/webhook/webhook.go b/eventsources/common/webhook/webhook.go index 9dc7c1c87a..b1ea41782d 100644 --- a/eventsources/common/webhook/webhook.go +++ b/eventsources/common/webhook/webhook.go @@ -178,14 +178,14 @@ func activateRoute(router Router, controller *Controller) { } // manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed -func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) error) { +func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) { route := router.GetRoute() logger := route.Logger for { select { case data := <-route.DataCh: logger.Info("new event received, dispatching it...") - if err := dispatch(data); err != nil { + if _, err := dispatch(data); err != nil { logger.Errorw("failed to send event", zap.Error(err)) route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName) continue @@ -199,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecomm } // ManagerRoute manages the lifecycle of a route -func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { route := router.GetRoute() logger := route.Logger diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 06a8f5ce1b..7864c48c81 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -70,7 +70,7 @@ type EventingServer interface { GetEventSourceType() apicommon.EventSourceType // Function to start listening events. - StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error + StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error } // GetEventingServers returns the mapping of event source type and list of eventing servers @@ -522,16 +522,16 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even Jitter: &jitter, } if err = common.DoWithRetry(&backoff, func() error { - return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) error { + return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) (bool, error) { if filter, ok := filters[s.GetEventName()]; ok { proceed, err := filterEvent(data, filter) if err != nil { logger.Errorw("Failed to filter event", zap.Error(err)) - return nil + return false, nil } if !proceed { logger.Info("Filter condition not met, skip dispatching") - return nil + return false, nil } } @@ -544,20 +544,20 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even for _, opt := range opts { err := opt(&event) if err != nil { - return err + return false, err } } err := event.SetData(cloudevents.ApplicationJSON, data) if err != nil { - return err + return false, err } eventBody, err := json.Marshal(event) if err != nil { - return err + return false, err } if e.eventBusConn == nil || e.eventBusConn.IsClosed() { - return eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed")) + return false, eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed")) } msg := eventbuscommon.Message{ @@ -575,12 +575,12 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even logger.Errorw("Failed to publish an event", zap.Error(err), zap.String(logging.LabelEventName, s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType())) e.metrics.EventSentFailed(s.GetEventSourceName(), s.GetEventName()) - return eventbuscommon.NewEventBusError(err) + return false, eventbuscommon.NewEventBusError(err) } logger.Infow("Succeeded to publish an event", zap.String(logging.LabelEventName, s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.String("eventID", event.ID())) e.metrics.EventSent(s.GetEventSourceName(), s.GetEventName()) - return nil + return true, nil }) }); err != nil { logger.Errorw("Failed to start listening eventsource", zap.Any(logging.LabelEventSourceType, diff --git a/eventsources/sources/amqp/start.go b/eventsources/sources/amqp/start.go index abfc7f87cc..fd74b00210 100644 --- a/eventsources/sources/amqp/start.go +++ b/eventsources/sources/amqp/start.go @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -156,7 +156,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -190,7 +190,7 @@ func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, ms } log.Info("dispatching event ...") - if err = dispatch(bodyBytes); err != nil { + if _, err = dispatch(bodyBytes); err != nil { return fmt.Errorf("failed to dispatch AMQP event, %w", err) } return nil diff --git a/eventsources/sources/awssns/start.go b/eventsources/sources/awssns/start.go index bc91749c48..399312c912 100644 --- a/eventsources/sources/awssns/start.go +++ b/eventsources/sources/awssns/start.go @@ -276,7 +276,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/awssqs/start.go b/eventsources/sources/awssqs/start.go index e83c239ec1..3af77d3d70 100644 --- a/eventsources/sources/awssqs/start.go +++ b/eventsources/sources/awssqs/start.go @@ -62,7 +62,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the AWS SQS event source...") @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) error, ack func(), log *zap.SugaredLogger) { +func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), ack func(), log *zap.SugaredLogger) { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -167,10 +167,15 @@ func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([ } return } - if err = dispatch(eventBytes); err != nil { + dispatched := false + dispatched, err = dispatch(eventBytes) + if err != nil { log.Errorw("failed to dispatch SQS event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } else { + if dispatched == true { + log.Infow("Successfully dispatched SQS Event", zap.Any("triggeredByEvent", data.MessageId), zap.Any("sqsMessageAttributes", data.MessageAttributes)) + } ack() } } diff --git a/eventsources/sources/azureeventshub/start.go b/eventsources/sources/azureeventshub/start.go index ac0e1cc407..7bceaf5565 100644 --- a/eventsources/sources/azureeventshub/start.go +++ b/eventsources/sources/azureeventshub/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Azure Events Hub event source...") @@ -109,7 +109,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } log.Info("dispatching the event to eventbus...") - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) log.Errorw("failed to dispatch Azure EventHub event", zap.Error(err)) return err diff --git a/eventsources/sources/azurequeuestorage/start.go b/eventsources/sources/azurequeuestorage/start.go index 6418360af8..1fe60d237f 100644 --- a/eventsources/sources/azurequeuestorage/start.go +++ b/eventsources/sources/azurequeuestorage/start.go @@ -45,7 +45,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -118,7 +118,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) error, ack func(), log *zap.SugaredLogger) { +func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), ack func(), log *zap.SugaredLogger) { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -155,7 +155,7 @@ func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispat } return } - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { log.Errorw("failed to dispatch azure queue storage event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } else { diff --git a/eventsources/sources/azureservicebus/start.go b/eventsources/sources/azureservicebus/start.go index a7b9de9bef..9f1866f6a3 100644 --- a/eventsources/sources/azureservicebus/start.go +++ b/eventsources/sources/azureservicebus/start.go @@ -50,7 +50,7 @@ const ( ) // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -155,7 +155,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -186,7 +186,7 @@ func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceB } log.Info("dispatching the event to eventbus...") - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { log.With("event_source", el.GetEventSourceName(), "event", el.GetEventName(), "message_id", message.MessageID).Errorw("failed to dispatch the event", zap.Error(err)) return err } diff --git a/eventsources/sources/bitbucket/start.go b/eventsources/sources/bitbucket/start.go index 3a5a2babb7..60d068b8dc 100644 --- a/eventsources/sources/bitbucket/start.go +++ b/eventsources/sources/bitbucket/start.go @@ -139,7 +139,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { defer sources.Recover(el.GetEventName()) bitbucketEventSource := &el.BitbucketEventSource diff --git a/eventsources/sources/bitbucketserver/start.go b/eventsources/sources/bitbucketserver/start.go index d5afc6eb3d..377644b4b3 100644 --- a/eventsources/sources/bitbucketserver/start.go +++ b/eventsources/sources/bitbucketserver/start.go @@ -178,7 +178,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { defer sources.Recover(el.GetEventName()) bitbucketserverEventSource := &el.BitbucketServerEventSource diff --git a/eventsources/sources/calendar/start.go b/eventsources/sources/calendar/start.go index 2b4a0db1bb..92fc8aef65 100644 --- a/eventsources/sources/calendar/start.go +++ b/eventsources/sources/calendar/start.go @@ -134,7 +134,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { el.log = logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) el.log.Info("started processing the calendar event source...") @@ -206,7 +206,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return fmt.Errorf("failed to marshal the event data for event source %s / %s, %w", el.GetEventSourceName(), el.GetEventName(), err) } el.log.Info("dispatching calendar event...") - err = dispatch(payload) + _, err = dispatch(payload) if err != nil { el.log.Errorw("failed to dispatch calendar event", zap.Error(err)) return fmt.Errorf("failed to dispatch calendar event, %w", err) diff --git a/eventsources/sources/emitter/start.go b/eventsources/sources/emitter/start.go index bc6a434693..9867706c1e 100644 --- a/eventsources/sources/emitter/start.go +++ b/eventsources/sources/emitter/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") @@ -131,7 +131,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return } log.Info("dispatching event on data channel...") - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { log.Errorw("failed to dispatch event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } diff --git a/eventsources/sources/file/start.go b/eventsources/sources/file/start.go index caa4fa02fe..eaaf16f624 100644 --- a/eventsources/sources/file/start.go +++ b/eventsources/sources/file/start.go @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } // listenEvents listen to file related events. -func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -123,7 +123,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, return fmt.Errorf("failed to marshal the event to the fs event, %w", err) } log.Infow("dispatching file event on data channel...", zap.Any("event-type", event.Op.String()), zap.Any("descriptor-name", event.Name)) - if err = dispatch(payload); err != nil { + if _, err = dispatch(payload); err != nil { return fmt.Errorf("failed to dispatch a file event, %w", err) } return nil @@ -162,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, } // listenEvents listen to file related events using polling. -func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -200,7 +200,7 @@ func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func( return fmt.Errorf("failed to marshal the event to the fs event, %w", err) } log.Infow("dispatching file event on data channel...", zap.Any("event-type", event.Op.String()), zap.Any("descriptor-name", event.Name)) - if err = dispatch(payload); err != nil { + if _, err = dispatch(payload); err != nil { return fmt.Errorf("failed to dispatch file event, %w", err) } return nil diff --git a/eventsources/sources/gcppubsub/start.go b/eventsources/sources/gcppubsub/start.go index ba452f6f47..5acd00443c 100644 --- a/eventsources/sources/gcppubsub/start.go +++ b/eventsources/sources/gcppubsub/start.go @@ -65,7 +65,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to GCP PubSub events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { // In order to listen events from GCP PubSub, // 1. Parse the event source that contains configuration to connect to GCP PubSub // 2. Create a new PubSub client @@ -128,7 +128,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } log.Info("dispatching event...") - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { log.Errorw("failed to dispatch GCP PubSub event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) m.Nack() diff --git a/eventsources/sources/generic/start.go b/eventsources/sources/generic/start.go index beee0b2f6e..2fe5a242b5 100644 --- a/eventsources/sources/generic/start.go +++ b/eventsources/sources/generic/start.go @@ -48,7 +48,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to generic events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { logger := logging.FromContext(ctx). With(zap.String(logging.LabelEventSourceType, string(el.GetEventSourceType())), zap.String(logging.LabelEventName, el.GetEventName()), @@ -94,7 +94,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Option) error, logger *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), logger *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -113,7 +113,7 @@ func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...events return fmt.Errorf("failed to marshal the event data, %w", err) } logger.Info("dispatching event...") - if err := dispatch(eventBytes); err != nil { + if _, err := dispatch(eventBytes); err != nil { return fmt.Errorf("failed to dispatch a Generic event, %w", err) } return nil diff --git a/eventsources/sources/github/start.go b/eventsources/sources/github/start.go index 2ac106d96f..21d548d1f1 100644 --- a/eventsources/sources/github/start.go +++ b/eventsources/sources/github/start.go @@ -217,7 +217,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Github event source...") diff --git a/eventsources/sources/gitlab/start.go b/eventsources/sources/gitlab/start.go index 5c1fac493f..3d737e8548 100644 --- a/eventsources/sources/gitlab/start.go +++ b/eventsources/sources/gitlab/start.go @@ -156,7 +156,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Gitlab event source...") diff --git a/eventsources/sources/hdfs/start.go b/eventsources/sources/hdfs/start.go index 0774f7c766..d844280b0e 100644 --- a/eventsources/sources/hdfs/start.go +++ b/eventsources/sources/hdfs/start.go @@ -65,7 +65,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -174,7 +174,7 @@ func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, .. } logger.Info("dispatching event on data channel...") - if err = dispatch(payload); err != nil { + if _, err = dispatch(payload); err != nil { return fmt.Errorf("failed to dispatch an HDFS event, %w", err) } return nil diff --git a/eventsources/sources/kafka/start.go b/eventsources/sources/kafka/start.go index 4e73de0173..9f7988b86e 100644 --- a/eventsources/sources/kafka/start.go +++ b/eventsources/sources/kafka/start.go @@ -73,7 +73,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -88,7 +88,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { config, err := getSaramaConfig(kafkaEventSource, log) if err != nil { return err @@ -158,7 +158,7 @@ func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.Sug return nil } -func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { defer sources.Recover(el.GetEventName()) log.Info("start kafka event source...") @@ -244,7 +244,7 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared kafkaID := genUniqueID(el.GetEventSourceName(), el.GetEventName(), kafkaEventSource.URL, msg.Topic, msg.Partition, msg.Offset) - if err = dispatch(eventBody, eventsourcecommon.WithID(kafkaID)); err != nil { + if _, err = dispatch(eventBody, eventsourcecommon.WithID(kafkaID)); err != nil { return fmt.Errorf("failed to dispatch a Kafka event, %w", err) } return nil @@ -334,7 +334,7 @@ func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.Sugar // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool - dispatch func([]byte, ...eventsourcecommon.Option) error + dispatch func([]byte, ...eventsourcecommon.Option) (bool, error) logger *zap.SugaredLogger kafkaEventSource *v1alpha1.KafkaEventSource eventSourceName string @@ -418,7 +418,7 @@ func (consumer *Consumer) processOne(session sarama.ConsumerGroupSession, messag messageID := genUniqueID(consumer.eventSourceName, consumer.eventName, consumer.kafkaEventSource.URL, message.Topic, message.Partition, message.Offset) - if err = consumer.dispatch(eventBody, eventsourcecommon.WithID(messageID)); err != nil { + if _, err = consumer.dispatch(eventBody, eventsourcecommon.WithID(messageID)); err != nil { return fmt.Errorf("failed to dispatch a kafka event, %w", err) } session.MarkMessage(message, "") diff --git a/eventsources/sources/minio/start.go b/eventsources/sources/minio/start.go index 78b93d6b63..813c28b35f 100644 --- a/eventsources/sources/minio/start.go +++ b/eventsources/sources/minio/start.go @@ -60,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName(), zap.String("bucketName", el.MinioEventSource.Bucket.Name)) @@ -106,7 +106,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return nil } -func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -118,7 +118,7 @@ func (el *EventListener) handleOne(notification notification.Info, dispatch func } log.Info("dispatching the event on data channel...") - if err = dispatch(eventBytes); err != nil { + if _, err = dispatch(eventBytes); err != nil { return fmt.Errorf("failed to dispatch minio event, %w", err) } return nil diff --git a/eventsources/sources/mqtt/start.go b/eventsources/sources/mqtt/start.go index d6e2e3681d..9452ba0a3e 100644 --- a/eventsources/sources/mqtt/start.go +++ b/eventsources/sources/mqtt/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -97,7 +97,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return } log.Info("dispatching event on the data channel...") - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { log.Errorw("failed to dispatch MQTT event...", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } diff --git a/eventsources/sources/nats/start.go b/eventsources/sources/nats/start.go index 5e6286fdd1..596965969b 100644 --- a/eventsources/sources/nats/start.go +++ b/eventsources/sources/nats/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -153,7 +153,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return } log.Info("dispatching the event on data channel...") - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { log.Errorw("failed to dispatch a NATS event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } diff --git a/eventsources/sources/nsq/start.go b/eventsources/sources/nsq/start.go index 7eabc85d38..4b69792751 100644 --- a/eventsources/sources/nsq/start.go +++ b/eventsources/sources/nsq/start.go @@ -63,14 +63,14 @@ type messageHandler struct { eventSourceName string eventName string metrics *metrics.Metrics - dispatch func([]byte, ...eventsourcecommon.Option) error + dispatch func([]byte, ...eventsourcecommon.Option) (bool, error) logger *zap.SugaredLogger isJSON bool metadata map[string]string } // StartListening listens NSQ events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the NSQ event source...") @@ -144,7 +144,7 @@ func (h *messageHandler) HandleMessage(m *nsq.Message) error { } h.logger.Info("dispatching the event on the data channel...") - if err = h.dispatch(eventBody); err != nil { + if _, err = h.dispatch(eventBody); err != nil { h.metrics.EventProcessingFailed(h.eventSourceName, h.eventName) return err } diff --git a/eventsources/sources/pulsar/start.go b/eventsources/sources/pulsar/start.go index 18a2fcc94b..d3605f0cd2 100644 --- a/eventsources/sources/pulsar/start.go +++ b/eventsources/sources/pulsar/start.go @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens Pulsar events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Pulsar event source...") @@ -185,7 +185,7 @@ consumeMessages: return nil } -func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -208,7 +208,7 @@ func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ... } log.Infof("dispatching the message received on the topic %s to eventbus", msg.Topic()) - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { return fmt.Errorf("failed to dispatch a Pulsar event, %w", err) } return nil diff --git a/eventsources/sources/redis/start.go b/eventsources/sources/redis/start.go index 6f0ddbbca5..600caf872a 100644 --- a/eventsources/sources/redis/start.go +++ b/eventsources/sources/redis/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens events published by redis -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Redis event source...") @@ -130,7 +130,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -152,7 +152,7 @@ func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, return fmt.Errorf("failed to marshal the event data, rejecting the event, %w", err) } log.With("channel", message.Channel).Info("dispatching the event on the data channel...") - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { return fmt.Errorf("failed dispatch a Redis event, %w", err) } return nil diff --git a/eventsources/sources/redis_stream/start.go b/eventsources/sources/redis_stream/start.go index df35f3437b..a56d7609aa 100644 --- a/eventsources/sources/redis_stream/start.go +++ b/eventsources/sources/redis_stream/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens for new data on specified redis streams -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Redis stream event source...") @@ -194,7 +194,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(stream string, message redis.XMessage, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(stream string, message redis.XMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -211,7 +211,7 @@ func (el *EventListener) handleOne(stream string, message redis.XMessage, dispat return fmt.Errorf("failed to marshal the event data, rejecting the event, %w", err) } log.With("stream", stream).Info("dispatching the event on the data channel...") - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { return fmt.Errorf("failed dispatch a Redis stream event, %w", err) } return nil diff --git a/eventsources/sources/resource/start.go b/eventsources/sources/resource/start.go index 90a8cb9fef..9c532f7dde 100644 --- a/eventsources/sources/resource/start.go +++ b/eventsources/sources/resource/start.go @@ -77,7 +77,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening watches resource updates and consume those events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -164,7 +164,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return fmt.Errorf("failed to marshal the event. rejecting the event, %w", err) } - if err = dispatch(eventBody); err != nil { + if _, err = dispatch(eventBody); err != nil { return fmt.Errorf("failed to dispatch a resource event, %w", err) } return nil diff --git a/eventsources/sources/sftp/start.go b/eventsources/sources/sftp/start.go index e0ac2aa642..c2dfdf5401 100644 --- a/eventsources/sources/sftp/start.go +++ b/eventsources/sources/sftp/start.go @@ -64,7 +64,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -112,7 +112,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } // listenEvents listen to sftp related events. -func (el *EventListener) listenEvents(ctx context.Context, sftpClient *sftp.Client, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error { +func (el *EventListener) listenEvents(ctx context.Context, sftpClient *sftp.Client, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { sftpEventSource := &el.SFTPEventSource log.Info("identifying new files in sftp...") @@ -146,7 +146,7 @@ func (el *EventListener) listenEvents(ctx context.Context, sftpClient *sftp.Clie return fmt.Errorf("failed to marshal the event to the fs event, %w", err) } log.Infow("dispatching sftp event on data channel...", zap.Any("event-type", event.Op.String()), zap.Any("descriptor-name", event.Name)) - if err = dispatch(payload); err != nil { + if _, err = dispatch(payload); err != nil { return fmt.Errorf("failed to dispatch an sftp event, %w", err) } return nil diff --git a/eventsources/sources/slack/start.go b/eventsources/sources/slack/start.go index c5393c342c..0515f97460 100644 --- a/eventsources/sources/slack/start.go +++ b/eventsources/sources/slack/start.go @@ -312,7 +312,7 @@ func (rc *Router) verifyRequest(request *http.Request) error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/storagegrid/start.go b/eventsources/sources/storagegrid/start.go index db6aa02c24..bd5008b1a4 100644 --- a/eventsources/sources/storagegrid/start.go +++ b/eventsources/sources/storagegrid/start.go @@ -337,7 +337,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Storage Grid event source...") diff --git a/eventsources/sources/stripe/start.go b/eventsources/sources/stripe/start.go index 92648e969a..cff091b8da 100644 --- a/eventsources/sources/stripe/start.go +++ b/eventsources/sources/stripe/start.go @@ -190,7 +190,7 @@ func filterEvent(event *stripe.Event, filters []string) bool { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Stripe event source...") diff --git a/eventsources/sources/webhook/start.go b/eventsources/sources/webhook/start.go index 2be2ad90dc..001eead07b 100644 --- a/eventsources/sources/webhook/start.go +++ b/eventsources/sources/webhook/start.go @@ -151,7 +151,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the webhook event source...") From 38b443689a3e26f73a4fe621f050807e1fa03247 Mon Sep 17 00:00:00 2001 From: Surya V Date: Fri, 29 Sep 2023 11:36:51 +0530 Subject: [PATCH 2/5] Fixed the link (if logic) Signed-off-by: Surya V --- eventsources/sources/awssqs/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventsources/sources/awssqs/start.go b/eventsources/sources/awssqs/start.go index 3af77d3d70..77b5b2391f 100644 --- a/eventsources/sources/awssqs/start.go +++ b/eventsources/sources/awssqs/start.go @@ -173,7 +173,7 @@ func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([ log.Errorw("failed to dispatch SQS event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } else { - if dispatched == true { + if dispatched { log.Infow("Successfully dispatched SQS Event", zap.Any("triggeredByEvent", data.MessageId), zap.Any("sqsMessageAttributes", data.MessageAttributes)) } ack() From b52c74023d276314a1a77f7c66a87b684495ec0c Mon Sep 17 00:00:00 2001 From: Surya V Date: Fri, 29 Sep 2023 11:49:02 +0530 Subject: [PATCH 3/5] Updated unit test case Signed-off-by: Surya V --- eventsources/sources/redis/start_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eventsources/sources/redis/start_test.go b/eventsources/sources/redis/start_test.go index 9c3dcaa399..62072615c8 100644 --- a/eventsources/sources/redis/start_test.go +++ b/eventsources/sources/redis/start_test.go @@ -28,8 +28,8 @@ func Test_HandleOne(t *testing.T) { Payload: `{"a": "b"}`, } - getDispatcher := func(isJson bool) func(d []byte, opts ...eventsourcecommon.Option) error { - return func(d []byte, opts ...eventsourcecommon.Option) error { + getDispatcher := func(isJson bool) func(d []byte, opts ...eventsourcecommon.Option) (bool, error) { + return func(d []byte, opts ...eventsourcecommon.Option) (bool, error) { eventData := &events.RedisEventData{} err := json.Unmarshal(d, eventData) assert.NoError(t, err) @@ -44,7 +44,7 @@ func Test_HandleOne(t *testing.T) { assert.True(t, ok) assert.Equal(t, "b", s["a"]) } - return nil + return true, nil } } From b6278b16d6d2ee6689f70b0c8a47a5c16b1698c7 Mon Sep 17 00:00:00 2001 From: Surya V Date: Sat, 30 Sep 2023 13:03:14 +0530 Subject: [PATCH 4/5] Changes made to return the generated event id for concrete implementation of event sources to log it for correlation Signed-off-by: Surya V --- eventsources/common/webhook/webhook.go | 4 ++-- eventsources/eventing.go | 22 +++++++++---------- eventsources/sources/amqp/start.go | 4 ++-- eventsources/sources/awssns/start.go | 2 +- eventsources/sources/awssqs/start.go | 12 +++++----- eventsources/sources/azureeventshub/start.go | 2 +- .../sources/azurequeuestorage/start.go | 4 ++-- eventsources/sources/azureservicebus/start.go | 4 ++-- eventsources/sources/bitbucket/start.go | 2 +- eventsources/sources/bitbucketserver/start.go | 2 +- eventsources/sources/calendar/start.go | 2 +- eventsources/sources/emitter/start.go | 2 +- eventsources/sources/file/start.go | 6 ++--- eventsources/sources/gcppubsub/start.go | 2 +- eventsources/sources/generic/start.go | 4 ++-- eventsources/sources/github/start.go | 2 +- eventsources/sources/gitlab/start.go | 2 +- eventsources/sources/hdfs/start.go | 4 ++-- eventsources/sources/kafka/start.go | 8 +++---- eventsources/sources/minio/start.go | 4 ++-- eventsources/sources/mqtt/start.go | 2 +- eventsources/sources/nats/start.go | 2 +- eventsources/sources/nsq/start.go | 4 ++-- eventsources/sources/pulsar/start.go | 4 ++-- eventsources/sources/redis/start.go | 4 ++-- eventsources/sources/redis/start_test.go | 6 ++--- eventsources/sources/redis_stream/start.go | 4 ++-- eventsources/sources/resource/start.go | 2 +- eventsources/sources/sftp/start.go | 4 ++-- eventsources/sources/slack/start.go | 2 +- eventsources/sources/storagegrid/start.go | 2 +- eventsources/sources/stripe/start.go | 2 +- eventsources/sources/webhook/start.go | 2 +- 33 files changed, 67 insertions(+), 67 deletions(-) diff --git a/eventsources/common/webhook/webhook.go b/eventsources/common/webhook/webhook.go index b1ea41782d..d72d54db46 100644 --- a/eventsources/common/webhook/webhook.go +++ b/eventsources/common/webhook/webhook.go @@ -178,7 +178,7 @@ func activateRoute(router Router, controller *Controller) { } // manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed -func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) { +func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) { route := router.GetRoute() logger := route.Logger for { @@ -199,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecomm } // ManagerRoute manages the lifecycle of a route -func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { route := router.GetRoute() logger := route.Logger diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 7864c48c81..f4cc3396ea 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -70,7 +70,7 @@ type EventingServer interface { GetEventSourceType() apicommon.EventSourceType // Function to start listening events. - StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error + StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error } // GetEventingServers returns the mapping of event source type and list of eventing servers @@ -522,16 +522,16 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even Jitter: &jitter, } if err = common.DoWithRetry(&backoff, func() error { - return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) (bool, error) { + return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) (string, error) { if filter, ok := filters[s.GetEventName()]; ok { proceed, err := filterEvent(data, filter) if err != nil { logger.Errorw("Failed to filter event", zap.Error(err)) - return false, nil + return "", nil } if !proceed { logger.Info("Filter condition not met, skip dispatching") - return false, nil + return "", nil } } @@ -544,20 +544,20 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even for _, opt := range opts { err := opt(&event) if err != nil { - return false, err + return "", err } } err := event.SetData(cloudevents.ApplicationJSON, data) if err != nil { - return false, err + return "", err } eventBody, err := json.Marshal(event) if err != nil { - return false, err + return "", err } if e.eventBusConn == nil || e.eventBusConn.IsClosed() { - return false, eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed")) + return "", eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed")) } msg := eventbuscommon.Message{ @@ -568,20 +568,20 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even }, Body: eventBody, } - if err = common.DoWithRetry(&common.DefaultBackoff, func() error { return e.eventBusConn.Publish(ctx, msg) }); err != nil { logger.Errorw("Failed to publish an event", zap.Error(err), zap.String(logging.LabelEventName, s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType())) e.metrics.EventSentFailed(s.GetEventSourceName(), s.GetEventName()) - return false, eventbuscommon.NewEventBusError(err) + return "", eventbuscommon.NewEventBusError(err) } logger.Infow("Succeeded to publish an event", zap.String(logging.LabelEventName, s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.String("eventID", event.ID())) e.metrics.EventSent(s.GetEventSourceName(), s.GetEventName()) - return true, nil + return msg.MsgHeader.ID, nil }) + }); err != nil { logger.Errorw("Failed to start listening eventsource", zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.Any(logging.LabelEventName, s.GetEventName()), zap.Error(err)) diff --git a/eventsources/sources/amqp/start.go b/eventsources/sources/amqp/start.go index fd74b00210..3ff50a8b40 100644 --- a/eventsources/sources/amqp/start.go +++ b/eventsources/sources/amqp/start.go @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -156,7 +156,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/awssns/start.go b/eventsources/sources/awssns/start.go index 399312c912..3030aaca0c 100644 --- a/eventsources/sources/awssns/start.go +++ b/eventsources/sources/awssns/start.go @@ -276,7 +276,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts an SNS event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/awssqs/start.go b/eventsources/sources/awssqs/start.go index 77b5b2391f..90fa7a7df4 100644 --- a/eventsources/sources/awssqs/start.go +++ b/eventsources/sources/awssqs/start.go @@ -62,7 +62,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the AWS SQS event source...") @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), ack func(), log *zap.SugaredLogger) { +func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), ack func(), log *zap.SugaredLogger) { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) @@ -167,14 +167,14 @@ func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([ } return } - dispatched := false - dispatched, err = dispatch(eventBytes) + dispatchedEventId := "" + dispatchedEventId, err = dispatch(eventBytes) if err != nil { log.Errorw("failed to dispatch SQS event", zap.Error(err)) el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName()) } else { - if dispatched { - log.Infow("Successfully dispatched SQS Event", zap.Any("triggeredByEvent", data.MessageId), zap.Any("sqsMessageAttributes", data.MessageAttributes)) + if dispatchedEventId != "" { + log.Infow("Successfully dispatched SQS Event", zap.Any("triggeredByEvent", dispatchedEventId), zap.Any("sqsMessageAttributes", data.MessageAttributes)) } ack() } diff --git a/eventsources/sources/azureeventshub/start.go b/eventsources/sources/azureeventshub/start.go index 7bceaf5565..0932310e7b 100644 --- a/eventsources/sources/azureeventshub/start.go +++ b/eventsources/sources/azureeventshub/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Azure Events Hub event source...") diff --git a/eventsources/sources/azurequeuestorage/start.go b/eventsources/sources/azurequeuestorage/start.go index 1fe60d237f..dd121e562a 100644 --- a/eventsources/sources/azurequeuestorage/start.go +++ b/eventsources/sources/azurequeuestorage/start.go @@ -45,7 +45,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -118,7 +118,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), ack func(), log *zap.SugaredLogger) { +func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), ack func(), log *zap.SugaredLogger) { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/azureservicebus/start.go b/eventsources/sources/azureservicebus/start.go index 9f1866f6a3..4dd5ea8631 100644 --- a/eventsources/sources/azureservicebus/start.go +++ b/eventsources/sources/azureservicebus/start.go @@ -50,7 +50,7 @@ const ( ) // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) @@ -155,7 +155,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/bitbucket/start.go b/eventsources/sources/bitbucket/start.go index 60d068b8dc..14f46d9141 100644 --- a/eventsources/sources/bitbucket/start.go +++ b/eventsources/sources/bitbucket/start.go @@ -139,7 +139,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { defer sources.Recover(el.GetEventName()) bitbucketEventSource := &el.BitbucketEventSource diff --git a/eventsources/sources/bitbucketserver/start.go b/eventsources/sources/bitbucketserver/start.go index 377644b4b3..26ffe24188 100644 --- a/eventsources/sources/bitbucketserver/start.go +++ b/eventsources/sources/bitbucketserver/start.go @@ -178,7 +178,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { defer sources.Recover(el.GetEventName()) bitbucketserverEventSource := &el.BitbucketServerEventSource diff --git a/eventsources/sources/calendar/start.go b/eventsources/sources/calendar/start.go index 92fc8aef65..0bc3bd4764 100644 --- a/eventsources/sources/calendar/start.go +++ b/eventsources/sources/calendar/start.go @@ -134,7 +134,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { el.log = logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) el.log.Info("started processing the calendar event source...") diff --git a/eventsources/sources/emitter/start.go b/eventsources/sources/emitter/start.go index 9867706c1e..5ffe45e425 100644 --- a/eventsources/sources/emitter/start.go +++ b/eventsources/sources/emitter/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") diff --git a/eventsources/sources/file/start.go b/eventsources/sources/file/start.go index eaaf16f624..d0008bdfcd 100644 --- a/eventsources/sources/file/start.go +++ b/eventsources/sources/file/start.go @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } // listenEvents listen to file related events. -func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher @@ -162,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, } // listenEvents listen to file related events using polling. -func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { fileEventSource := &el.FileEventSource // create new fs watcher diff --git a/eventsources/sources/gcppubsub/start.go b/eventsources/sources/gcppubsub/start.go index 5acd00443c..bc8209ef66 100644 --- a/eventsources/sources/gcppubsub/start.go +++ b/eventsources/sources/gcppubsub/start.go @@ -65,7 +65,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to GCP PubSub events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { // In order to listen events from GCP PubSub, // 1. Parse the event source that contains configuration to connect to GCP PubSub // 2. Create a new PubSub client diff --git a/eventsources/sources/generic/start.go b/eventsources/sources/generic/start.go index 2fe5a242b5..3231713c72 100644 --- a/eventsources/sources/generic/start.go +++ b/eventsources/sources/generic/start.go @@ -48,7 +48,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens to generic events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { logger := logging.FromContext(ctx). With(zap.String(logging.LabelEventSourceType, string(el.GetEventSourceType())), zap.String(logging.LabelEventName, el.GetEventName()), @@ -94,7 +94,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), logger *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event *Event, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), logger *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/github/start.go b/eventsources/sources/github/start.go index 21d548d1f1..2d041ad2bc 100644 --- a/eventsources/sources/github/start.go +++ b/eventsources/sources/github/start.go @@ -217,7 +217,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Github event source...") diff --git a/eventsources/sources/gitlab/start.go b/eventsources/sources/gitlab/start.go index 3d737e8548..09483fe75e 100644 --- a/eventsources/sources/gitlab/start.go +++ b/eventsources/sources/gitlab/start.go @@ -156,7 +156,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { logger := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) logger.Info("started processing the Gitlab event source...") diff --git a/eventsources/sources/hdfs/start.go b/eventsources/sources/hdfs/start.go index d844280b0e..381baa093a 100644 --- a/eventsources/sources/hdfs/start.go +++ b/eventsources/sources/hdfs/start.go @@ -65,7 +65,7 @@ func (w *WatchableHDFS) GetFileID(fi os.FileInfo) interface{} { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Emitter event source...") @@ -157,7 +157,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(event fsevent.Event, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/kafka/start.go b/eventsources/sources/kafka/start.go index 9f7988b86e..92961ecdcd 100644 --- a/eventsources/sources/kafka/start.go +++ b/eventsources/sources/kafka/start.go @@ -73,7 +73,7 @@ func verifyPartitionAvailable(part int32, partitions []int32) bool { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -88,7 +88,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { config, err := getSaramaConfig(kafkaEventSource, log) if err != nil { return err @@ -158,7 +158,7 @@ func (el *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.Sug return nil } -func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { defer sources.Recover(el.GetEventName()) log.Info("start kafka event source...") @@ -334,7 +334,7 @@ func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.Sugar // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool - dispatch func([]byte, ...eventsourcecommon.Option) (bool, error) + dispatch func([]byte, ...eventsourcecommon.Option) (string, error) logger *zap.SugaredLogger kafkaEventSource *v1alpha1.KafkaEventSource eventSourceName string diff --git a/eventsources/sources/minio/start.go b/eventsources/sources/minio/start.go index 813c28b35f..3dbc4a33b2 100644 --- a/eventsources/sources/minio/start.go +++ b/eventsources/sources/minio/start.go @@ -60,7 +60,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName(), zap.String("bucketName", el.MinioEventSource.Bucket.Name)) @@ -106,7 +106,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt return nil } -func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(notification notification.Info, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/mqtt/start.go b/eventsources/sources/mqtt/start.go index 9452ba0a3e..9570c7f7b1 100644 --- a/eventsources/sources/mqtt/start.go +++ b/eventsources/sources/mqtt/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/nats/start.go b/eventsources/sources/nats/start.go index 596965969b..a0a5ce67cb 100644 --- a/eventsources/sources/nats/start.go +++ b/eventsources/sources/nats/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/nsq/start.go b/eventsources/sources/nsq/start.go index 4b69792751..6aec7baa19 100644 --- a/eventsources/sources/nsq/start.go +++ b/eventsources/sources/nsq/start.go @@ -63,14 +63,14 @@ type messageHandler struct { eventSourceName string eventName string metrics *metrics.Metrics - dispatch func([]byte, ...eventsourcecommon.Option) (bool, error) + dispatch func([]byte, ...eventsourcecommon.Option) (string, error) logger *zap.SugaredLogger isJSON bool metadata map[string]string } // StartListening listens NSQ events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the NSQ event source...") diff --git a/eventsources/sources/pulsar/start.go b/eventsources/sources/pulsar/start.go index d3605f0cd2..8a9486a5e1 100644 --- a/eventsources/sources/pulsar/start.go +++ b/eventsources/sources/pulsar/start.go @@ -58,7 +58,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens Pulsar events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Pulsar event source...") @@ -185,7 +185,7 @@ consumeMessages: return nil } -func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(msg pulsar.Message, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/redis/start.go b/eventsources/sources/redis/start.go index 600caf872a..fd78bf8137 100644 --- a/eventsources/sources/redis/start.go +++ b/eventsources/sources/redis/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens events published by redis -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Redis event source...") @@ -130,7 +130,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(message *redis.Message, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/redis/start_test.go b/eventsources/sources/redis/start_test.go index 62072615c8..60e6b480e4 100644 --- a/eventsources/sources/redis/start_test.go +++ b/eventsources/sources/redis/start_test.go @@ -28,8 +28,8 @@ func Test_HandleOne(t *testing.T) { Payload: `{"a": "b"}`, } - getDispatcher := func(isJson bool) func(d []byte, opts ...eventsourcecommon.Option) (bool, error) { - return func(d []byte, opts ...eventsourcecommon.Option) (bool, error) { + getDispatcher := func(isJson bool) func(d []byte, opts ...eventsourcecommon.Option) (string, error) { + return func(d []byte, opts ...eventsourcecommon.Option) (string, error) { eventData := &events.RedisEventData{} err := json.Unmarshal(d, eventData) assert.NoError(t, err) @@ -44,7 +44,7 @@ func Test_HandleOne(t *testing.T) { assert.True(t, ok) assert.Equal(t, "b", s["a"]) } - return true, nil + return "Event-1", nil } } diff --git a/eventsources/sources/redis_stream/start.go b/eventsources/sources/redis_stream/start.go index a56d7609aa..8238d5b833 100644 --- a/eventsources/sources/redis_stream/start.go +++ b/eventsources/sources/redis_stream/start.go @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening listens for new data on specified redis streams -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Redis stream event source...") @@ -194,7 +194,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } } -func (el *EventListener) handleOne(stream string, message redis.XMessage, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) handleOne(stream string, message redis.XMessage, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { defer func(start time.Time) { el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond)) }(time.Now()) diff --git a/eventsources/sources/resource/start.go b/eventsources/sources/resource/start.go index 9c532f7dde..545e935b9d 100644 --- a/eventsources/sources/resource/start.go +++ b/eventsources/sources/resource/start.go @@ -77,7 +77,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening watches resource updates and consume those events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) diff --git a/eventsources/sources/sftp/start.go b/eventsources/sources/sftp/start.go index c2dfdf5401..14293ea891 100644 --- a/eventsources/sources/sftp/start.go +++ b/eventsources/sources/sftp/start.go @@ -64,7 +64,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) defer sources.Recover(el.GetEventName()) @@ -112,7 +112,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt } // listenEvents listen to sftp related events. -func (el *EventListener) listenEvents(ctx context.Context, sftpClient *sftp.Client, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error), log *zap.SugaredLogger) error { +func (el *EventListener) listenEvents(ctx context.Context, sftpClient *sftp.Client, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error { sftpEventSource := &el.SFTPEventSource log.Info("identifying new files in sftp...") diff --git a/eventsources/sources/slack/start.go b/eventsources/sources/slack/start.go index 0515f97460..9a54ee0ca9 100644 --- a/eventsources/sources/slack/start.go +++ b/eventsources/sources/slack/start.go @@ -312,7 +312,7 @@ func (rc *Router) verifyRequest(request *http.Request) error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) diff --git a/eventsources/sources/storagegrid/start.go b/eventsources/sources/storagegrid/start.go index bd5008b1a4..229b6817a1 100644 --- a/eventsources/sources/storagegrid/start.go +++ b/eventsources/sources/storagegrid/start.go @@ -337,7 +337,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Storage Grid event source...") diff --git a/eventsources/sources/stripe/start.go b/eventsources/sources/stripe/start.go index cff091b8da..75d6a6502a 100644 --- a/eventsources/sources/stripe/start.go +++ b/eventsources/sources/stripe/start.go @@ -190,7 +190,7 @@ func filterEvent(event *stripe.Event, filters []string) bool { } // StartListening starts an event source -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the Stripe event source...") diff --git a/eventsources/sources/webhook/start.go b/eventsources/sources/webhook/start.go index 001eead07b..cd698e3021 100644 --- a/eventsources/sources/webhook/start.go +++ b/eventsources/sources/webhook/start.go @@ -151,7 +151,7 @@ func (router *Router) PostInactivate() error { } // StartListening starts listening events -func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (bool, error)) error { +func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error { log := logging.FromContext(ctx). With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName()) log.Info("started processing the webhook event source...") From 98f210fd6b7a1801b2c3351ee4151de0ee09667e Mon Sep 17 00:00:00 2001 From: Surya V Date: Sat, 30 Sep 2023 13:55:47 +0530 Subject: [PATCH 5/5] Fixed Lint issue Signed-off-by: Surya V --- eventsources/eventing.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eventsources/eventing.go b/eventsources/eventing.go index f4cc3396ea..d5415d80e5 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -581,7 +581,6 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even e.metrics.EventSent(s.GetEventSourceName(), s.GetEventName()) return msg.MsgHeader.ID, nil }) - }); err != nil { logger.Errorw("Failed to start listening eventsource", zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.Any(logging.LabelEventName, s.GetEventName()), zap.Error(err))