Skip to content

Commit

Permalink
feat(relay-monitor): Add MEV Relay Payload Delivered (#373)
Browse files Browse the repository at this point in the history
* feat(relay-monitor): Add MEV Relay Payload Delivered

* feat: Add MEV relay proposer payload delivered event type

* chore: Update Kafka configuration for mev relay payload

* refactor: Update http.NewRequestWithContext to use http.NoBody

* chore: Update config to fetch proposer payload delivered
  • Loading branch information
samcm committed Sep 16, 2024
1 parent 8273105 commit e4112e1
Show file tree
Hide file tree
Showing 17 changed files with 3,231 additions and 2,239 deletions.
19 changes: 18 additions & 1 deletion deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ transforms:
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -889,4 +890,20 @@ sinks:
healthcheck:
enabled: true
encoding:
codec: json
codec: json
mev_relay_proposer_payload_delivered_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.mev_relay_proposer_payload_delivered
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: mev-relay-proposer-payload-delivered
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
109 changes: 106 additions & 3 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ transforms:
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
mev_relay_bid_trace_builder_block_submission: .event.name == "MEV_RELAY_BID_TRACE_BUILDER_BLOCK_SUBMISSION"
mev_relay_proposer_payload_delivered: .event.name == "MEV_RELAY_PROPOSER_PAYLOAD_DELIVERED"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -394,6 +395,7 @@ transforms:
- xatu_server_events_router.mempool_transaction
- xatu_server_events_router.mempool_transaction_v2
- xatu_server_events_router.mev_relay_bid_trace_builder_block_submission
- xatu_server_events_router.mev_relay_proposer_payload_delivered
metrics:
- type: counter
field: event.name
Expand All @@ -414,6 +416,7 @@ transforms:
tags:
event: "{{event.name}}"
source: "xatu-kafka-clickhouse"

beacon_api_eth_v1_beacon_committee_formatted:
type: remap
inputs:
Expand Down Expand Up @@ -448,7 +451,6 @@ transforms:
log(., level: "error", rate_limit_secs: 60)
}
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -1620,7 +1622,7 @@ transforms:
del(.event)
del(.meta)
del(.data)
canonical_beacon_block_elatorated_attestation_formatted:
canonical_beacon_block_elaborated_attestation_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_block_elaborated_attestation
Expand Down Expand Up @@ -1837,6 +1839,87 @@ transforms:
del(.meta_consensus_implementation)
del(.meta_network_id)
del(.event)
del(.meta)
del(.data)
mev_relay_proposer_payload_delivered_formatted:
type: remap
inputs:
- xatu_server_events_router.mev_relay_proposer_payload_delivered
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .data.slot
.block_number = .data.block_number
.payload = .data.payload
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.wallclock_epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.relay_name = .meta.client.additional_data.relay.name
.parent_hash = .data.parent_hash
.block_hash = .data.block_hash
.builder_pubkey = .data.builder_pubkey
.proposer_pubkey = .data.proposer_pubkey
.proposer_fee_recipient = .data.proposer_fee_recipient
.gas_limit = .data.gas_limit
.gas_used = .data.gas_used
.payload = .data.payload
.block_number = .data.block_number
.num_tx = .data.num_tx
.timestamp = .data.timestamp
.timestamp_ms = .data.timestamp_ms
.updated_date_time = to_unix_timestamp(now())
del(.meta_consensus_implementation)
del(.meta_network_id)
del(.event)
del(.meta)
del(.data)
Expand Down Expand Up @@ -2351,7 +2434,7 @@ sinks:
canonical_beacon_block_elaborated_attestation_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_block_elatorated_attestation_formatted
- canonical_beacon_block_elaborated_attestation_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_elaborated_attestation
Expand Down Expand Up @@ -2448,3 +2531,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: true
mev_relay_proposer_payload_delivered_clickhouse:
type: clickhouse
inputs:
- mev_relay_proposer_payload_delivered_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: mev_relay_proposer_payload_delivered
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS mev_relay_proposer_payload_delivered ON CLUSTER '{cluster}' SYNC;
DROP TABLE IF EXISTS mev_relay_proposer_payload_delivered_local ON CLUSTER '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE TABLE default.mev_relay_proposer_payload_delivered_local ON CLUSTER '{cluster}' (
`updated_date_time` DateTime COMMENT 'Timestamp when the record was last updated' CODEC(DoubleDelta, ZSTD(1)),
`event_date_time` DateTime64(3) COMMENT 'When the payload was delivered' CODEC(DoubleDelta, ZSTD(1)),
`slot` UInt32 COMMENT 'Slot number within the payload' CODEC(DoubleDelta, ZSTD(1)),
`slot_start_date_time` DateTime COMMENT 'The start time for the slot that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`epoch` UInt32 COMMENT 'Epoch number derived from the slot that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`epoch_start_date_time` DateTime COMMENT 'The start time for the epoch that the bid is for' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot` UInt32 COMMENT 'The wallclock slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_slot_start_date_time` DateTime COMMENT 'The start time for the slot when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch` UInt32 COMMENT 'The wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`wallclock_epoch_start_date_time` DateTime COMMENT 'The start time for the wallclock epoch when the request was sent' CODEC(DoubleDelta, ZSTD(1)),
`block_number` UInt64 COMMENT 'The block number of the payload' CODEC(DoubleDelta, ZSTD(1)),
`relay_name` String COMMENT 'The relay that delivered the payload' CODEC(ZSTD(1)),
`block_hash` FixedString(66) COMMENT 'The block hash associated with the payload' CODEC(ZSTD(1)),
`proposer_pubkey` String COMMENT 'The proposer pubkey that received the payload' CODEC(ZSTD(1)),
`builder_pubkey` String COMMENT 'The builder pubkey that sent the payload' CODEC(ZSTD(1)),
`proposer_fee_recipient` FixedString(42) COMMENT 'The proposer fee recipient of the payload' CODEC(ZSTD(1)),
`gas_limit` UInt64 COMMENT 'The gas limit of the payload' CODEC(DoubleDelta, ZSTD(1)),
`gas_used` UInt64 COMMENT 'The gas used by the payload' CODEC(DoubleDelta, ZSTD(1)),
`num_tx` UInt32 COMMENT 'The number of transactions in the payload' CODEC(DoubleDelta, ZSTD(1)),
`meta_client_name` LowCardinality(String) COMMENT 'Name of the client that generated the event',
`meta_client_id` String COMMENT 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.' CODEC(ZSTD(1)),
`meta_client_version` LowCardinality(String) COMMENT 'Version of the client that generated the event',
`meta_client_implementation` LowCardinality(String) COMMENT 'Implementation of the client that generated the event',
`meta_client_os` LowCardinality(String) COMMENT 'Operating system of the client that generated the event',
`meta_client_ip` Nullable(IPv6) COMMENT 'IP address of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_city` LowCardinality(String) COMMENT 'City of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country` LowCardinality(String) COMMENT 'Country of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_country_code` LowCardinality(String) COMMENT 'Country code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_continent_code` LowCardinality(String) COMMENT 'Continent code of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_longitude` Nullable(Float64) COMMENT 'Longitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_latitude` Nullable(Float64) COMMENT 'Latitude of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_number` Nullable(UInt32) COMMENT 'Autonomous system number of the client that generated the event' CODEC(ZSTD(1)),
`meta_client_geo_autonomous_system_organization` Nullable(String) COMMENT 'Autonomous system organization of the client that generated the event' CODEC(ZSTD(1)),
`meta_network_name` LowCardinality(String) COMMENT 'Ethereum network name',
`meta_labels` Map(String, String) COMMENT 'Labels associated with the event' CODEC(ZSTD(1))
) ENGINE = ReplicatedReplacingMergeTree(
'/clickhouse/{installation}/{cluster}/{database}/tables/{table}/{shard}',
'{replica}',
updated_date_time
) PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY
(
slot_start_date_time,
meta_network_name,
relay_name,
block_hash,
meta_client_name,
builder_pubkey,
proposer_pubkey,
) COMMENT 'Contains MEV relay proposer payload delivered data.';

CREATE TABLE default.mev_relay_proposer_payload_delivered ON CLUSTER '{cluster}' AS default.mev_relay_proposer_payload_delivered_local ENGINE = Distributed(
'{cluster}',
default,
mev_relay_proposer_payload_delivered_local,
cityHash64(
slot,
meta_network_name
)
);
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ services:
"libp2p-trace-gossipsub-blob-sidecar"
"beacon-api-eth-v1-beacon-validators"
"mev-relay-bid-trace-builder-block-submission"
"mev-relay-proposer-payload-delivered"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down Expand Up @@ -514,7 +515,7 @@ services:
image: nginx:1.27.1-bookworm
container_name: xatu-nginx
ports:
- "${NGINX_ADDRESS:-127.0.0.1}:${NGINX_PORT:-80}:80"
- "${NGINX_ADDRESS:-127.0.0.1}:${NGINX_PORT:-8044}:80"
environment:
- BASE_HOSTNAME=${BASE_HOSTNAME:-example.com}
volumes:
Expand Down
Loading

0 comments on commit e4112e1

Please sign in to comment.