diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index f48cc64f..5aac573c 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -112,13 +112,21 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv grpc.UseCompressor(gzip.Name), } + startTime := time.Now() + if e.config.Retry.Enabled { opts = append(opts, retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + duration := time.Since(startTime) + logCtx. WithField("attempt", attempt). WithError(err). + WithField("duration", duration). Warn("Failed to export events. Retrying...") + + // Reset the startTime to the current time + startTime = time.Now() }), retry.WithMax(uint(e.config.Retry.MaxAttempts)), retry.WithBackoff(retry.BackoffExponential(e.config.Retry.Scalar)), diff --git a/pkg/server/service/event-ingester/event/event.go b/pkg/server/service/event-ingester/event/event.go index 21a728fa..59ca0857 100644 --- a/pkg/server/service/event-ingester/event/event.go +++ b/pkg/server/service/event-ingester/event/event.go @@ -82,116 +82,191 @@ type Event interface { AppendServerMeta(ctx context.Context, meta *xatu.ServerMeta) *xatu.ServerMeta } -//nolint:gocyclo //not that complex -func New(eventType Type, log logrus.FieldLogger, event *xatu.DecoratedEvent, cache store.Cache, geoipProvider geoip.Provider) (Event, error) { +type EventRouter struct { + log logrus.FieldLogger + cache store.Cache + geoipProvider geoip.Provider + routes map[Type]func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) +} + +func NewEventRouter(log logrus.FieldLogger, cache store.Cache, geoipProvider geoip.Provider) *EventRouter { + router := &EventRouter{ + log: log, + cache: cache, + geoipProvider: geoipProvider, + routes: make(map[Type]func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error)), + } + + router.RegisterHandler(TypeBeaconETHV1EventsAttestationV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsAttestationV2(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceGossipSubBeaconAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceGossipSubBeaconAttestation(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1BeaconValidators, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewBeaconValidators(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconP2PAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewBeaconP2PAttestation(router.log, event, router.geoipProvider), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsAttestation(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsBlock(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsBlockV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsBlockV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsChainReorg, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsChainReorg(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsChainReorgV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsChainReorgV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsFinalizedCheckpoint, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsFinalizedCheckpoint(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsFinalizedCheckpointV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsFinalizedCheckpointV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsHead, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsHead(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsHeadV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsHeadV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsVoluntaryExit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsVoluntaryExit(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsVoluntaryExitV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsVoluntaryExitV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsContributionAndProof, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsContributionAndProof(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsContributionAndProofV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsContributionAndProofV2(router.log, event), nil + }) + router.RegisterHandler(TypeMempoolTransaction, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return mempool.NewTransaction(router.log, event), nil + }) + router.RegisterHandler(TypeMempoolTransactionV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return mempool.NewTransactionV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV2BeaconBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlock(router.log, event, router.cache), nil + }) + router.RegisterHandler(TypeBeaconETHV2BeaconBlockV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockV2(router.log, event, router.cache), nil + }) + router.RegisterHandler(TypeDebugForkChoice, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewDebugForkChoice(router.log, event), nil + }) + router.RegisterHandler(TypeDebugForkChoiceV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewDebugForkChoiceV2(router.log, event), nil + }) + router.RegisterHandler(TypeDebugForkChoiceReorg, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewDebugForkChoiceReorg(router.log, event), nil + }) + router.RegisterHandler(TypeDebugForkChoiceReorgV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewDebugForkChoiceReorgV2(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV1BeaconCommittee, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewBeaconCommittee(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV1ValidatorAttestationData, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewValidatorAttestationData(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconBlockAttesterSlashing, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockAttesterSlashing(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconBlockProposerSlashing, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockProposerSlashing(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconBlockVoluntaryExit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockVoluntaryExit(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconBlockDeposit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockDeposit(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconExecutionTransaction, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockExecutionTransaction(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconBLSToExecutionChange, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockBLSToExecutionChange(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconWithdrawal, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockWithdrawal(router.log, event), nil + }) + router.RegisterHandler(TypeBlockprintBlockClassification, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return blockprint.NewBlockClassification(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1EventsBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewEventsBlobSidecar(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconETHV1BeaconBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewBeaconBlobSidecar(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV1ProposerDuty, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v1.NewBeaconProposerDuty(router.log, event), nil + }) + router.RegisterHandler(TypeBeaconEthV2BeaconElaboratedAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return v2.NewBeaconBlockElaboratedAttestation(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceAddPeer, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceAddPeer(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceConnected, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceConnected(router.log, event, router.geoipProvider), nil + }) + router.RegisterHandler(TypeLibP2PTraceJoin, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceJoin(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceDisconnected, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceDisconnected(router.log, event, router.geoipProvider), nil + }) + router.RegisterHandler(TypeLibP2PTraceRemovePeer, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceRemovePeer(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceRecvRPC, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceRecvRPC(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceSendRPC, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceSendRPC(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceHandleStatus, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceHandleStatus(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceHandleMetadata, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceHandleMetadata(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceGossipSubBeaconBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceGossipSubBeaconBlock(router.log, event), nil + }) + router.RegisterHandler(TypeLibP2PTraceGossipSubBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return libp2p.NewTraceGossipSubBlobSidecar(router.log, event), nil + }) + router.RegisterHandler(TypeMEVRelayBidTraceBuilderBlockSubmission, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) { + return mevrelay.NewBidTraceBuilderBlockSubmission(router.log, event), nil + }) + + return router +} + +func (er *EventRouter) RegisterHandler(eventType Type, handler func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error)) { + er.routes[eventType] = handler +} + +func (er *EventRouter) Route(eventType Type, event *xatu.DecoratedEvent) (Event, error) { if eventType == TypeUnknown { return nil, errors.New("event type is required") } - switch eventType { - case TypeBeaconETHV1EventsAttestationV2: - return v1.NewEventsAttestationV2(log, event), nil - case TypeLibP2PTraceGossipSubBeaconAttestation: - return libp2p.NewTraceGossipSubBeaconAttestation(log, event), nil - case TypeBeaconETHV1BeaconValidators: - return v1.NewBeaconValidators(log, event), nil - case TypeBeaconP2PAttestation: - return v1.NewBeaconP2PAttestation(log, event, geoipProvider), nil - case TypeBeaconETHV1EventsAttestation: - return v1.NewEventsAttestation(log, event), nil - case TypeBeaconETHV1EventsBlock: - return v1.NewEventsBlock(log, event), nil - case TypeBeaconETHV1EventsBlockV2: - return v1.NewEventsBlockV2(log, event), nil - case TypeBeaconETHV1EventsChainReorg: - return v1.NewEventsChainReorg(log, event), nil - case TypeBeaconETHV1EventsChainReorgV2: - return v1.NewEventsChainReorgV2(log, event), nil - case TypeBeaconETHV1EventsFinalizedCheckpoint: - return v1.NewEventsFinalizedCheckpoint(log, event), nil - case TypeBeaconETHV1EventsFinalizedCheckpointV2: - return v1.NewEventsFinalizedCheckpointV2(log, event), nil - case TypeBeaconETHV1EventsHead: - return v1.NewEventsHead(log, event), nil - case TypeBeaconETHV1EventsHeadV2: - return v1.NewEventsHeadV2(log, event), nil - case TypeBeaconETHV1EventsVoluntaryExit: - return v1.NewEventsVoluntaryExit(log, event), nil - case TypeBeaconETHV1EventsVoluntaryExitV2: - return v1.NewEventsVoluntaryExitV2(log, event), nil - case TypeBeaconETHV1EventsContributionAndProof: - return v1.NewEventsContributionAndProof(log, event), nil - case TypeBeaconETHV1EventsContributionAndProofV2: - return v1.NewEventsContributionAndProofV2(log, event), nil - case TypeMempoolTransaction: - return mempool.NewTransaction(log, event), nil - case TypeMempoolTransactionV2: - return mempool.NewTransactionV2(log, event), nil - case TypeBeaconETHV2BeaconBlock: - return v2.NewBeaconBlock(log, event, cache), nil - case TypeBeaconETHV2BeaconBlockV2: - return v2.NewBeaconBlockV2(log, event, cache), nil - case TypeDebugForkChoice: - return v1.NewDebugForkChoice(log, event), nil - case TypeDebugForkChoiceV2: - return v1.NewDebugForkChoiceV2(log, event), nil - case TypeDebugForkChoiceReorg: - return v1.NewDebugForkChoiceReorg(log, event), nil - case TypeDebugForkChoiceReorgV2: - return v1.NewDebugForkChoiceReorgV2(log, event), nil - case TypeBeaconEthV1BeaconCommittee: - return v1.NewBeaconCommittee(log, event), nil - case TypeBeaconEthV1ValidatorAttestationData: - return v1.NewValidatorAttestationData(log, event), nil - case TypeBeaconEthV2BeaconBlockAttesterSlashing: - return v2.NewBeaconBlockAttesterSlashing(log, event), nil - case TypeBeaconEthV2BeaconBlockProposerSlashing: - return v2.NewBeaconBlockProposerSlashing(log, event), nil - case TypeBeaconEthV2BeaconBlockVoluntaryExit: - return v2.NewBeaconBlockVoluntaryExit(log, event), nil - case TypeBeaconEthV2BeaconBlockDeposit: - return v2.NewBeaconBlockDeposit(log, event), nil - case TypeBeaconEthV2BeaconExecutionTransaction: - return v2.NewBeaconBlockExecutionTransaction(log, event), nil - case TypeBeaconEthV2BeaconBLSToExecutionChange: - return v2.NewBeaconBlockBLSToExecutionChange(log, event), nil - case TypeBeaconEthV2BeaconWithdrawal: - return v2.NewBeaconBlockWithdrawal(log, event), nil - case TypeBlockprintBlockClassification: - return blockprint.NewBlockClassification(log, event), nil - case TypeBeaconETHV1EventsBlobSidecar: - return v1.NewEventsBlobSidecar(log, event), nil - case TypeBeaconETHV1BeaconBlobSidecar: - return v1.NewBeaconBlobSidecar(log, event), nil - case TypeBeaconEthV1ProposerDuty: - return v1.NewBeaconProposerDuty(log, event), nil - case TypeBeaconEthV2BeaconElaboratedAttestation: - return v2.NewBeaconBlockElaboratedAttestation(log, event), nil - case TypeLibP2PTraceAddPeer: - return libp2p.NewTraceAddPeer(log, event), nil - case TypeLibP2PTraceConnected: - return libp2p.NewTraceConnected(log, event, geoipProvider), nil - case TypeLibP2PTraceJoin: - return libp2p.NewTraceJoin(log, event), nil - case TypeLibP2PTraceDisconnected: - return libp2p.NewTraceDisconnected(log, event, geoipProvider), nil - case TypeLibP2PTraceRemovePeer: - return libp2p.NewTraceRemovePeer(log, event), nil - case TypeLibP2PTraceRecvRPC: - return libp2p.NewTraceRecvRPC(log, event), nil - case TypeLibP2PTraceSendRPC: - return libp2p.NewTraceSendRPC(log, event), nil - case TypeLibP2PTraceHandleStatus: - return libp2p.NewTraceHandleStatus(log, event), nil - case TypeLibP2PTraceHandleMetadata: - return libp2p.NewTraceHandleMetadata(log, event), nil - case TypeLibP2PTraceGossipSubBeaconBlock: - return libp2p.NewTraceGossipSubBeaconBlock(log, event), nil - case TypeLibP2PTraceGossipSubBlobSidecar: - return libp2p.NewTraceGossipSubBlobSidecar(log, event), nil - case TypeMEVRelayBidTraceBuilderBlockSubmission: - return mevrelay.NewBidTraceBuilderBlockSubmission(log, event), nil - default: + handler, exists := er.routes[eventType] + if !exists { return nil, fmt.Errorf("event type %s is unknown", eventType) } + + return handler(event, er) } diff --git a/pkg/server/service/event-ingester/event/event_test.go b/pkg/server/service/event-ingester/event/event_test.go new file mode 100644 index 00000000..04c2f4f8 --- /dev/null +++ b/pkg/server/service/event-ingester/event/event_test.go @@ -0,0 +1,35 @@ +package event + +import ( + "testing" + + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/stretchr/testify/assert" +) + +func TestEventRouter_AllTypesHaveHandlers(t *testing.T) { + // Create a new EventRouter instance + router := NewEventRouter(nil, nil, nil) + + // List of all event types from event_ingester.proto + eventTypes := xatu.Event_Name_name + + for _, eventType := range eventTypes { + if eventType == "BEACON_API_ETH_V1_EVENTS_UNKNOWN" { + continue + } + + if eventType == "LIBP2P_TRACE_UNKNOWN" { + continue + } + + if eventType == "LIBP2P_TRACE_DROP_RPC" { + // Not implemented yet + continue + } + + _, exists := router.routes[Type(eventType)] + + assert.True(t, exists, "Handler for event type %s does not exist", eventType) + } +} diff --git a/pkg/server/service/event-ingester/handler.go b/pkg/server/service/event-ingester/handler.go index fa9e9b53..7e1d5b0d 100644 --- a/pkg/server/service/event-ingester/handler.go +++ b/pkg/server/service/event-ingester/handler.go @@ -25,6 +25,8 @@ type Handler struct { cache store.Cache metrics *Metrics + + eventRouter *eventHandler.EventRouter } func NewHandler(log logrus.FieldLogger, clockDrift *time.Duration, geoipProvider geoip.Provider, cache store.Cache) *Handler { @@ -34,6 +36,7 @@ func NewHandler(log logrus.FieldLogger, clockDrift *time.Duration, geoipProvider geoipProvider: geoipProvider, cache: cache, metrics: NewMetrics("xatu_server_event_ingester"), + eventRouter: eventHandler.NewEventRouter(log, cache, geoipProvider), } } @@ -122,7 +125,7 @@ func (h *Handler) Events(ctx context.Context, clientID string, events []*xatu.De eventName := event.Event.Name.String() - e, err := eventHandler.New(eventHandler.Type(eventName), h.log, event, h.cache, h.geoipProvider) + e, err := h.eventRouter.Route(eventHandler.Type(eventName), event) if err != nil { h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler")