Skip to content

Commit

Permalink
feat(server): Create Event Router
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 13, 2024
1 parent 8e63c80 commit ddd737d
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 107 deletions.
287 changes: 181 additions & 106 deletions pkg/server/service/event-ingester/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,116 +82,191 @@ type Event interface {
AppendServerMeta(ctx context.Context, meta *xatu.ServerMeta) *xatu.ServerMeta
}

//nolint:gocyclo //not that complex
func New(eventType Type, log logrus.FieldLogger, event *xatu.DecoratedEvent, cache store.Cache, geoipProvider geoip.Provider) (Event, error) {
type EventRouter struct {
log logrus.FieldLogger
cache store.Cache
geoipProvider geoip.Provider
routes map[Type]func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error)
}

func NewEventRouter(log logrus.FieldLogger, cache store.Cache, geoipProvider geoip.Provider) *EventRouter {
router := &EventRouter{
log: log,
cache: cache,
geoipProvider: geoipProvider,
routes: make(map[Type]func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error)),
}

router.RegisterHandler(TypeBeaconETHV1EventsAttestationV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsAttestationV2(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceGossipSubBeaconAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceGossipSubBeaconAttestation(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1BeaconValidators, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewBeaconValidators(router.log, event), nil
})
router.RegisterHandler(TypeBeaconP2PAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewBeaconP2PAttestation(router.log, event, router.geoipProvider), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsAttestation(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsBlock(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsBlockV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsBlockV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsChainReorg, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsChainReorg(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsChainReorgV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsChainReorgV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsFinalizedCheckpoint, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsFinalizedCheckpoint(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsFinalizedCheckpointV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsFinalizedCheckpointV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsHead, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsHead(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsHeadV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsHeadV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsVoluntaryExit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsVoluntaryExit(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsVoluntaryExitV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsVoluntaryExitV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsContributionAndProof, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsContributionAndProof(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsContributionAndProofV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsContributionAndProofV2(router.log, event), nil
})
router.RegisterHandler(TypeMempoolTransaction, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return mempool.NewTransaction(router.log, event), nil
})
router.RegisterHandler(TypeMempoolTransactionV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return mempool.NewTransactionV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV2BeaconBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlock(router.log, event, router.cache), nil
})
router.RegisterHandler(TypeBeaconETHV2BeaconBlockV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockV2(router.log, event, router.cache), nil
})
router.RegisterHandler(TypeDebugForkChoice, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewDebugForkChoice(router.log, event), nil
})
router.RegisterHandler(TypeDebugForkChoiceV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewDebugForkChoiceV2(router.log, event), nil
})
router.RegisterHandler(TypeDebugForkChoiceReorg, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewDebugForkChoiceReorg(router.log, event), nil
})
router.RegisterHandler(TypeDebugForkChoiceReorgV2, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewDebugForkChoiceReorgV2(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV1BeaconCommittee, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewBeaconCommittee(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV1ValidatorAttestationData, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewValidatorAttestationData(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconBlockAttesterSlashing, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockAttesterSlashing(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconBlockProposerSlashing, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockProposerSlashing(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconBlockVoluntaryExit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockVoluntaryExit(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconBlockDeposit, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockDeposit(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconExecutionTransaction, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockExecutionTransaction(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconBLSToExecutionChange, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockBLSToExecutionChange(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconWithdrawal, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockWithdrawal(router.log, event), nil
})
router.RegisterHandler(TypeBlockprintBlockClassification, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return blockprint.NewBlockClassification(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1EventsBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewEventsBlobSidecar(router.log, event), nil
})
router.RegisterHandler(TypeBeaconETHV1BeaconBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewBeaconBlobSidecar(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV1ProposerDuty, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v1.NewBeaconProposerDuty(router.log, event), nil
})
router.RegisterHandler(TypeBeaconEthV2BeaconElaboratedAttestation, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return v2.NewBeaconBlockElaboratedAttestation(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceAddPeer, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceAddPeer(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceConnected, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceConnected(router.log, event, router.geoipProvider), nil
})
router.RegisterHandler(TypeLibP2PTraceJoin, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceJoin(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceDisconnected, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceDisconnected(router.log, event, router.geoipProvider), nil
})
router.RegisterHandler(TypeLibP2PTraceRemovePeer, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceRemovePeer(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceRecvRPC, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceRecvRPC(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceSendRPC, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceSendRPC(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceHandleStatus, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceHandleStatus(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceHandleMetadata, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceHandleMetadata(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceGossipSubBeaconBlock, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceGossipSubBeaconBlock(router.log, event), nil
})
router.RegisterHandler(TypeLibP2PTraceGossipSubBlobSidecar, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return libp2p.NewTraceGossipSubBlobSidecar(router.log, event), nil
})
router.RegisterHandler(TypeMEVRelayBidTraceBuilderBlockSubmission, func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error) {
return mevrelay.NewBidTraceBuilderBlockSubmission(router.log, event), nil
})

return router
}

func (er *EventRouter) RegisterHandler(eventType Type, handler func(event *xatu.DecoratedEvent, router *EventRouter) (Event, error)) {
er.routes[eventType] = handler
}

func (er *EventRouter) Route(eventType Type, event *xatu.DecoratedEvent) (Event, error) {
if eventType == TypeUnknown {
return nil, errors.New("event type is required")
}

switch eventType {
case TypeBeaconETHV1EventsAttestationV2:
return v1.NewEventsAttestationV2(log, event), nil
case TypeLibP2PTraceGossipSubBeaconAttestation:
return libp2p.NewTraceGossipSubBeaconAttestation(log, event), nil
case TypeBeaconETHV1BeaconValidators:
return v1.NewBeaconValidators(log, event), nil
case TypeBeaconP2PAttestation:
return v1.NewBeaconP2PAttestation(log, event, geoipProvider), nil
case TypeBeaconETHV1EventsAttestation:
return v1.NewEventsAttestation(log, event), nil
case TypeBeaconETHV1EventsBlock:
return v1.NewEventsBlock(log, event), nil
case TypeBeaconETHV1EventsBlockV2:
return v1.NewEventsBlockV2(log, event), nil
case TypeBeaconETHV1EventsChainReorg:
return v1.NewEventsChainReorg(log, event), nil
case TypeBeaconETHV1EventsChainReorgV2:
return v1.NewEventsChainReorgV2(log, event), nil
case TypeBeaconETHV1EventsFinalizedCheckpoint:
return v1.NewEventsFinalizedCheckpoint(log, event), nil
case TypeBeaconETHV1EventsFinalizedCheckpointV2:
return v1.NewEventsFinalizedCheckpointV2(log, event), nil
case TypeBeaconETHV1EventsHead:
return v1.NewEventsHead(log, event), nil
case TypeBeaconETHV1EventsHeadV2:
return v1.NewEventsHeadV2(log, event), nil
case TypeBeaconETHV1EventsVoluntaryExit:
return v1.NewEventsVoluntaryExit(log, event), nil
case TypeBeaconETHV1EventsVoluntaryExitV2:
return v1.NewEventsVoluntaryExitV2(log, event), nil
case TypeBeaconETHV1EventsContributionAndProof:
return v1.NewEventsContributionAndProof(log, event), nil
case TypeBeaconETHV1EventsContributionAndProofV2:
return v1.NewEventsContributionAndProofV2(log, event), nil
case TypeMempoolTransaction:
return mempool.NewTransaction(log, event), nil
case TypeMempoolTransactionV2:
return mempool.NewTransactionV2(log, event), nil
case TypeBeaconETHV2BeaconBlock:
return v2.NewBeaconBlock(log, event, cache), nil
case TypeBeaconETHV2BeaconBlockV2:
return v2.NewBeaconBlockV2(log, event, cache), nil
case TypeDebugForkChoice:
return v1.NewDebugForkChoice(log, event), nil
case TypeDebugForkChoiceV2:
return v1.NewDebugForkChoiceV2(log, event), nil
case TypeDebugForkChoiceReorg:
return v1.NewDebugForkChoiceReorg(log, event), nil
case TypeDebugForkChoiceReorgV2:
return v1.NewDebugForkChoiceReorgV2(log, event), nil
case TypeBeaconEthV1BeaconCommittee:
return v1.NewBeaconCommittee(log, event), nil
case TypeBeaconEthV1ValidatorAttestationData:
return v1.NewValidatorAttestationData(log, event), nil
case TypeBeaconEthV2BeaconBlockAttesterSlashing:
return v2.NewBeaconBlockAttesterSlashing(log, event), nil
case TypeBeaconEthV2BeaconBlockProposerSlashing:
return v2.NewBeaconBlockProposerSlashing(log, event), nil
case TypeBeaconEthV2BeaconBlockVoluntaryExit:
return v2.NewBeaconBlockVoluntaryExit(log, event), nil
case TypeBeaconEthV2BeaconBlockDeposit:
return v2.NewBeaconBlockDeposit(log, event), nil
case TypeBeaconEthV2BeaconExecutionTransaction:
return v2.NewBeaconBlockExecutionTransaction(log, event), nil
case TypeBeaconEthV2BeaconBLSToExecutionChange:
return v2.NewBeaconBlockBLSToExecutionChange(log, event), nil
case TypeBeaconEthV2BeaconWithdrawal:
return v2.NewBeaconBlockWithdrawal(log, event), nil
case TypeBlockprintBlockClassification:
return blockprint.NewBlockClassification(log, event), nil
case TypeBeaconETHV1EventsBlobSidecar:
return v1.NewEventsBlobSidecar(log, event), nil
case TypeBeaconETHV1BeaconBlobSidecar:
return v1.NewBeaconBlobSidecar(log, event), nil
case TypeBeaconEthV1ProposerDuty:
return v1.NewBeaconProposerDuty(log, event), nil
case TypeBeaconEthV2BeaconElaboratedAttestation:
return v2.NewBeaconBlockElaboratedAttestation(log, event), nil
case TypeLibP2PTraceAddPeer:
return libp2p.NewTraceAddPeer(log, event), nil
case TypeLibP2PTraceConnected:
return libp2p.NewTraceConnected(log, event, geoipProvider), nil
case TypeLibP2PTraceJoin:
return libp2p.NewTraceJoin(log, event), nil
case TypeLibP2PTraceDisconnected:
return libp2p.NewTraceDisconnected(log, event, geoipProvider), nil
case TypeLibP2PTraceRemovePeer:
return libp2p.NewTraceRemovePeer(log, event), nil
case TypeLibP2PTraceRecvRPC:
return libp2p.NewTraceRecvRPC(log, event), nil
case TypeLibP2PTraceSendRPC:
return libp2p.NewTraceSendRPC(log, event), nil
case TypeLibP2PTraceHandleStatus:
return libp2p.NewTraceHandleStatus(log, event), nil
case TypeLibP2PTraceHandleMetadata:
return libp2p.NewTraceHandleMetadata(log, event), nil
case TypeLibP2PTraceGossipSubBeaconBlock:
return libp2p.NewTraceGossipSubBeaconBlock(log, event), nil
case TypeLibP2PTraceGossipSubBlobSidecar:
return libp2p.NewTraceGossipSubBlobSidecar(log, event), nil
case TypeMEVRelayBidTraceBuilderBlockSubmission:
return mevrelay.NewBidTraceBuilderBlockSubmission(log, event), nil
default:
handler, exists := er.routes[eventType]
if !exists {
return nil, fmt.Errorf("event type %s is unknown", eventType)
}

return handler(event, er)
}
35 changes: 35 additions & 0 deletions pkg/server/service/event-ingester/event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package event

import (
"testing"

"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/stretchr/testify/assert"
)

func TestEventRouter_AllTypesHaveHandlers(t *testing.T) {
// Create a new EventRouter instance
router := NewEventRouter(nil, nil, nil)

// List of all event types from event_ingester.proto
eventTypes := xatu.Event_Name_name

for _, eventType := range eventTypes {
if eventType == "BEACON_API_ETH_V1_EVENTS_UNKNOWN" {
continue
}

if eventType == "LIBP2P_TRACE_UNKNOWN" {
continue
}

if eventType == "LIBP2P_TRACE_DROP_RPC" {
// Not implemented yet
continue
}

_, exists := router.routes[Type(eventType)]

assert.True(t, exists, "Handler for event type %s does not exist", eventType)
}
}
5 changes: 4 additions & 1 deletion pkg/server/service/event-ingester/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Handler struct {
cache store.Cache

metrics *Metrics

eventRouter *eventHandler.EventRouter
}

func NewHandler(log logrus.FieldLogger, clockDrift *time.Duration, geoipProvider geoip.Provider, cache store.Cache) *Handler {
Expand All @@ -34,6 +36,7 @@ func NewHandler(log logrus.FieldLogger, clockDrift *time.Duration, geoipProvider
geoipProvider: geoipProvider,
cache: cache,
metrics: NewMetrics("xatu_server_event_ingester"),
eventRouter: eventHandler.NewEventRouter(log, cache, geoipProvider),
}
}

Expand Down Expand Up @@ -122,7 +125,7 @@ func (h *Handler) Events(ctx context.Context, clientID string, events []*xatu.De

eventName := event.Event.Name.String()

e, err := eventHandler.New(eventHandler.Type(eventName), h.log, event, h.cache, h.geoipProvider)
e, err := h.eventRouter.Route(eventHandler.Type(eventName), event)
if err != nil {
h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler")

Expand Down

0 comments on commit ddd737d

Please sign in to comment.