Skip to content

Commit

Permalink
chore: Update config to fetch proposer payload delivered
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 16, 2024
1 parent 58ea9ba commit c8727bf
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ ORDER BY
(
slot_start_date_time,
meta_network_name,
slot,
relay_name,
block_hash,
meta_client_name,
builder_pubkey,
proposer_pubkey
proposer_pubkey,
) COMMENT 'Contains MEV relay proposer payload delivered data.';

CREATE TABLE default.mev_relay_proposer_payload_delivered ON CLUSTER '{cluster}' AS default.mev_relay_proposer_payload_delivered_local ENGINE = Distributed(
Expand Down
2 changes: 2 additions & 0 deletions pkg/relaymonitor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Config struct {
Schedule Schedule `yaml:"schedule"`

Relays []relay.Config `yaml:"relays"`

FetchProposerPayloadDelivered bool `yaml:"fetchProposerPayloadDelivered" default:"true"`
}

func (c *Config) Validate() error {
Expand Down
6 changes: 0 additions & 6 deletions pkg/relaymonitor/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ func (r *RelayMonitor) fetchProposerPayloadDelivered(ctx context.Context, client
responseAt := time.Now()

for _, payload := range payloads {
if r.bidCache.Has(client.Name(), slot, payload.BlockHash.GetValue()) {
continue
}

r.bidCache.Set(client.Name(), slot, payload.BlockHash.GetValue())

event, err := r.createNewPayloadDeliveredDecoratedEvent(ctx, client, slot, payload, requestedAt, responseAt)
if err != nil {
return errors.Wrap(err, "failed to create new decorated event")
Expand Down
2 changes: 2 additions & 0 deletions pkg/relaymonitor/relay/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (c *Client) GetProposerPayloadDelivered(ctx context.Context, params url.Val
payloads := make([]*mevrelay.ProposerPayloadDelivered, len(rawPayloads))

for i, rawPayload := range rawPayloads {
c.metrics.IncProposerPayloadDelivered(c.name, c.networkName, 1)

slot, _ := strconv.ParseUint(rawPayload.Slot, 10, 64)
gasLimit, _ := strconv.ParseUint(rawPayload.GasLimit, 10, 64)
gasUsed, _ := strconv.ParseUint(rawPayload.GasUsed, 10, 64)
Expand Down
17 changes: 14 additions & 3 deletions pkg/relaymonitor/relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type Metrics struct {
apiRequestsTotal *prometheus.CounterVec
bidsReceived *prometheus.CounterVec
apiFailuresTotal *prometheus.CounterVec
apiRequestsTotal *prometheus.CounterVec
bidsReceived *prometheus.CounterVec
apiFailuresTotal *prometheus.CounterVec
proposerPayloadDelivered *prometheus.CounterVec
}

var (
Expand Down Expand Up @@ -38,6 +39,11 @@ func newMetrics(namespace string) *Metrics {
Name: "bids_received_total",
Help: "Total number of bids received from the relay",
}, []string{"relay", "network"}),
proposerPayloadDelivered: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "proposer_payload_delivered_total",
Help: "Total number of proposer payload delivered from the relay",
}, []string{"relay", "network"}),
apiFailuresTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "api_failures_total",
Expand All @@ -49,6 +55,7 @@ func newMetrics(namespace string) *Metrics {
m.apiRequestsTotal,
m.bidsReceived,
m.apiFailuresTotal,
m.proposerPayloadDelivered,
)

return m
Expand All @@ -65,3 +72,7 @@ func (m *Metrics) IncBidsReceived(relay, network string, count int) {
func (m *Metrics) IncAPIFailures(relay, endpoint, network string) {
m.apiFailuresTotal.WithLabelValues(relay, endpoint, network).Inc()
}

func (m *Metrics) IncProposerPayloadDelivered(relay, network string, count int) {
m.proposerPayloadDelivered.WithLabelValues(relay, network).Add(float64(count))
}
9 changes: 6 additions & 3 deletions pkg/relaymonitor/relay_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config) (*RelayMon
relays := make([]*relay.Client, 0, len(config.Relays))

for _, r := range config.Relays {
//
relayClient, err := relay.NewClient(namespace, r, config.Ethereum.Network)
if err != nil {
return nil, perrors.Wrap(err, "failed to create relay client")
Expand Down Expand Up @@ -260,8 +259,12 @@ func (r *RelayMonitor) startCrons(ctx context.Context) error {
}
}

for _, relayClient := range r.relays {
r.scheduleProposerPayloadDeliveredFetching(ctx, relayClient)
if !r.Config.FetchProposerPayloadDelivered {
r.log.Warn("Proposer payload delivered fetching is disabled")
} else {
for _, relayClient := range r.relays {
r.scheduleProposerPayloadDeliveredFetching(ctx, relayClient)
}
}

c.StartAsync()
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/service/event-ingester/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ func (er *EventRouter) RegisterHandler(eventType Type, handler func(event *xatu.
er.routes[eventType] = handler
}

func (er *EventRouter) HasRoute(eventType Type) bool {
_, exists := er.routes[eventType]

return exists
}

func (er *EventRouter) Route(eventType Type, event *xatu.DecoratedEvent) (Event, error) {
if eventType == TypeUnknown {
return nil, errors.New("event type is required")
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/service/event-ingester/event/event_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package event
package event_test

import (
"testing"

"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/ethpandaops/xatu/pkg/server/service/event-ingester/event"
"github.com/stretchr/testify/assert"
)

func TestEventRouter_AllTypesHaveHandlers(t *testing.T) {
// Create a new EventRouter instance
router := NewEventRouter(nil, nil, nil)
router := event.NewEventRouter(nil, nil, nil)

// List of all event types from event_ingester.proto
eventTypes := xatu.Event_Name_name
Expand All @@ -28,7 +29,7 @@ func TestEventRouter_AllTypesHaveHandlers(t *testing.T) {
continue
}

_, exists := router.routes[Type(eventType)]
exists := router.HasRoute(event.Type(eventType))

assert.True(t, exists, "Handler for event type %s does not exist", eventType)
}
Expand Down

0 comments on commit c8727bf

Please sign in to comment.