Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added capability to return generated eventid if an event has been dispatched after applying filters in the dispatch method #2831

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions eventsources/common/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ func activateRoute(router Router, controller *Controller) {
}

// manageRouteChannels consumes data from route's data channel and stops the processing when the event source is stopped/removed
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) error) {
func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) {
route := router.GetRoute()
logger := route.Logger
for {
select {
case data := <-route.DataCh:
logger.Info("new event received, dispatching it...")
if err := dispatch(data); err != nil {
if _, err := dispatch(data); err != nil {
logger.Errorw("failed to send event", zap.Error(err))
route.Metrics.EventProcessingFailed(route.EventSourceName, route.EventName)
continue
Expand All @@ -199,7 +199,7 @@ func manageRouteChannels(router Router, dispatch func([]byte, ...eventsourcecomm
}

// ManagerRoute manages the lifecycle of a route
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func ManageRoute(ctx context.Context, router Router, controller *Controller, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
route := router.GetRoute()

logger := route.Logger
Expand Down
21 changes: 10 additions & 11 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type EventingServer interface {
GetEventSourceType() apicommon.EventSourceType

// Function to start listening events.
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error
StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error
}

// GetEventingServers returns the mapping of event source type and list of eventing servers
Expand Down Expand Up @@ -522,16 +522,16 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
Jitter: &jitter,
}
if err = common.DoWithRetry(&backoff, func() error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) error {
return s.StartListening(ctx, func(data []byte, opts ...eventsourcecommon.Option) (string, error) {
if filter, ok := filters[s.GetEventName()]; ok {
proceed, err := filterEvent(data, filter)
if err != nil {
logger.Errorw("Failed to filter event", zap.Error(err))
return nil
return "", nil
}
if !proceed {
logger.Info("Filter condition not met, skip dispatching")
return nil
return "", nil
}
}

Expand All @@ -544,20 +544,20 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
for _, opt := range opts {
err := opt(&event)
if err != nil {
return err
return "", err
}
}
err := event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
return err
return "", err
}
eventBody, err := json.Marshal(event)
if err != nil {
return err
return "", err
}

if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
return eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed"))
return "", eventbuscommon.NewEventBusError(fmt.Errorf("failed to publish event, eventbus connection closed"))
}

msg := eventbuscommon.Message{
Expand All @@ -568,19 +568,18 @@ func (e *EventSourceAdaptor) run(ctx context.Context, servers map[apicommon.Even
},
Body: eventBody,
}

if err = common.DoWithRetry(&common.DefaultBackoff, func() error {
return e.eventBusConn.Publish(ctx, msg)
}); err != nil {
logger.Errorw("Failed to publish an event", zap.Error(err), zap.String(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
e.metrics.EventSentFailed(s.GetEventSourceName(), s.GetEventName())
return eventbuscommon.NewEventBusError(err)
return "", eventbuscommon.NewEventBusError(err)
}
logger.Infow("Succeeded to publish an event", zap.String(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.String("eventID", event.ID()))
e.metrics.EventSent(s.GetEventSourceName(), s.GetEventName())
return nil
return msg.MsgHeader.ID, nil
})
}); err != nil {
logger.Errorw("Failed to start listening eventsource", zap.Any(logging.LabelEventSourceType,
Expand Down
6 changes: 3 additions & 3 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -156,7 +156,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, msg amqplib.Delivery, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down Expand Up @@ -190,7 +190,7 @@ func (el *EventListener) handleOne(amqpEventSource *v1alpha1.AMQPEventSource, ms
}

log.Info("dispatching event ...")
if err = dispatch(bodyBytes); err != nil {
if _, err = dispatch(bodyBytes); err != nil {
return fmt.Errorf("failed to dispatch AMQP event, %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/awssns/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts an SNS event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
logger := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down
11 changes: 8 additions & 3 deletions eventsources/sources/awssqs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the AWS SQS event source...")
Expand Down Expand Up @@ -141,7 +141,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) error, ack func(), log *zap.SugaredLogger) {
func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), ack func(), log *zap.SugaredLogger) {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand All @@ -167,10 +167,15 @@ func (el *EventListener) processMessage(message *sqslib.Message, dispatch func([
}
return
}
if err = dispatch(eventBytes); err != nil {
dispatchedEventId := ""
dispatchedEventId, err = dispatch(eventBytes)
if err != nil {
log.Errorw("failed to dispatch SQS event", zap.Error(err))
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
} else {
if dispatchedEventId != "" {
log.Infow("Successfully dispatched SQS Event", zap.Any("triggeredByEvent", dispatchedEventId), zap.Any("sqsMessageAttributes", data.MessageAttributes))
}
ack()
}
}
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Azure Events Hub event source...")
Expand Down Expand Up @@ -109,7 +109,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

log.Info("dispatching the event to eventbus...")
if err = dispatch(eventBytes); err != nil {
if _, err = dispatch(eventBytes); err != nil {
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
log.Errorw("failed to dispatch Azure EventHub event", zap.Error(err))
return err
Expand Down
6 changes: 3 additions & 3 deletions eventsources/sources/azurequeuestorage/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -118,7 +118,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) error, ack func(), log *zap.SugaredLogger) {
func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), ack func(), log *zap.SugaredLogger) {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down Expand Up @@ -155,7 +155,7 @@ func (el *EventListener) processMessage(message *azqueue.DequeuedMessage, dispat
}
return
}
if err = dispatch(eventBytes); err != nil {
if _, err = dispatch(eventBytes); err != nil {
log.Errorw("failed to dispatch azure queue storage event", zap.Error(err))
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
} else {
Expand Down
6 changes: 3 additions & 3 deletions eventsources/sources/azureservicebus/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
)

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())

Expand Down Expand Up @@ -155,7 +155,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}
}

func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceBusEventSource, message *servicebus.ReceivedMessage, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error {
defer func(start time.Time) {
el.Metrics.EventProcessingDuration(el.GetEventSourceName(), el.GetEventName(), float64(time.Since(start)/time.Millisecond))
}(time.Now())
Expand Down Expand Up @@ -186,7 +186,7 @@ func (el *EventListener) handleOne(servicebusEventSource *v1alpha1.AzureServiceB
}

log.Info("dispatching the event to eventbus...")
if err = dispatch(eventBytes); err != nil {
if _, err = dispatch(eventBytes); err != nil {
log.With("event_source", el.GetEventSourceName(), "event", el.GetEventName(), "message_id", message.MessageID).Errorw("failed to dispatch the event", zap.Error(err))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/bitbucket/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
defer sources.Recover(el.GetEventName())

bitbucketEventSource := &el.BitbucketEventSource
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/bitbucketserver/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (router *Router) PostInactivate() error {
}

// StartListening starts an event source
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
defer sources.Recover(el.GetEventName())

bitbucketserverEventSource := &el.BitbucketServerEventSource
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/calendar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (el *EventListener) getExecutionTime() (time.Time, error) {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
el.log = logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
el.log.Info("started processing the calendar event source...")
Expand Down Expand Up @@ -206,7 +206,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return fmt.Errorf("failed to marshal the event data for event source %s / %s, %w", el.GetEventSourceName(), el.GetEventName(), err)
}
el.log.Info("dispatching calendar event...")
err = dispatch(payload)
_, err = dispatch(payload)
if err != nil {
el.log.Errorw("failed to dispatch calendar event", zap.Error(err))
return fmt.Errorf("failed to dispatch calendar event, %w", err)
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
log.Info("started processing the Emitter event source...")
Expand Down Expand Up @@ -131,7 +131,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return
}
log.Info("dispatching event on data channel...")
if err = dispatch(eventBytes); err != nil {
if _, err = dispatch(eventBytes); err != nil {
log.Errorw("failed to dispatch event", zap.Error(err))
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
}
Expand Down
10 changes: 5 additions & 5 deletions eventsources/sources/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening starts listening events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
log := logging.FromContext(ctx).
With(logging.LabelEventSourceType, el.GetEventSourceType(), logging.LabelEventName, el.GetEventName())
defer sources.Recover(el.GetEventName())
Expand All @@ -82,7 +82,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

// listenEvents listen to file related events.
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -123,7 +123,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte,
return fmt.Errorf("failed to marshal the event to the fs event, %w", err)
}
log.Infow("dispatching file event on data channel...", zap.Any("event-type", event.Op.String()), zap.Any("descriptor-name", event.Name))
if err = dispatch(payload); err != nil {
if _, err = dispatch(payload); err != nil {
return fmt.Errorf("failed to dispatch a file event, %w", err)
}
return nil
Expand Down Expand Up @@ -162,7 +162,7 @@ func (el *EventListener) listenEvents(ctx context.Context, dispatch func([]byte,
}

// listenEvents listen to file related events using polling.
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error, log *zap.SugaredLogger) error {
func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error), log *zap.SugaredLogger) error {
fileEventSource := &el.FileEventSource

// create new fs watcher
Expand Down Expand Up @@ -200,7 +200,7 @@ func (el *EventListener) listenEventsPolling(ctx context.Context, dispatch func(
return fmt.Errorf("failed to marshal the event to the fs event, %w", err)
}
log.Infow("dispatching file event on data channel...", zap.Any("event-type", event.Op.String()), zap.Any("descriptor-name", event.Name))
if err = dispatch(payload); err != nil {
if _, err = dispatch(payload); err != nil {
return fmt.Errorf("failed to dispatch file event, %w", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions eventsources/sources/gcppubsub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (el *EventListener) GetEventSourceType() apicommon.EventSourceType {
}

// StartListening listens to GCP PubSub events
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) error) error {
func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byte, ...eventsourcecommon.Option) (string, error)) error {
// In order to listen events from GCP PubSub,
// 1. Parse the event source that contains configuration to connect to GCP PubSub
// 2. Create a new PubSub client
Expand Down Expand Up @@ -128,7 +128,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

log.Info("dispatching event...")
if err = dispatch(eventBytes); err != nil {
if _, err = dispatch(eventBytes); err != nil {
log.Errorw("failed to dispatch GCP PubSub event", zap.Error(err))
el.Metrics.EventProcessingFailed(el.GetEventSourceName(), el.GetEventName())
m.Nack()
Expand Down
Loading
Loading