Skip to content

Commit

Permalink
feat(cannon): Add BEACON_API_ETH_V1_BEACON_VALIDATORS
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 7, 2024
1 parent 57d07e7 commit 786e0f9
Show file tree
Hide file tree
Showing 39 changed files with 3,889 additions and 2,661 deletions.
17 changes: 17 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ transforms:
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -873,3 +874,19 @@ sinks:
enabled: true
encoding:
codec: json
beacon_api_eth_v1_beacon_validators_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.beacon_api_eth_v1_beacon_validators
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: beacon-api-eth-v1-beacon-validators
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
65 changes: 65 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ sources:
- "^beacon-api-eth-v1-proposer-.+"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
beacon_api_eth_v1_beacon_validators_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
auto_offset_reset: earliest
group_id: xatu-vector-kafka-clickhouse-beacon-api-eth-v1-beacon-validators
key_field: "event.id"
decoding:
codec: json
topics:
- "beacon-api-eth-v1-beacon-validators"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
transforms:
xatu_server_events_meta:
type: remap
Expand All @@ -122,6 +134,7 @@ transforms:
- beacon_api_eth_v1_beacon_blob_sidecar_kafka
- beacon_p2p_events_kafka
- beacon_api_eth_v1_proposer_kafka
- beacon_api_eth_v1_beacon_validators_kafka
source: |-
.meta_client_name = .meta.client.name
.meta_client_id = .meta.client.id
Expand Down Expand Up @@ -312,6 +325,7 @@ transforms:
canonical_beacon_block_execution_transaction: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION"
canonical_beacon_block_proposer_slashing: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_PROPOSER_SLASHING"
canonical_beacon_block_voluntary_exit: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_VOLUNTARY_EXIT"
canonical_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
canonical_beacon_block_withdrawal: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL"
canonical_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_V2" && .meta.client.additional_data.finalized_when_requested == true
canonical_beacon_proposer_duty: .event.name == "BEACON_API_ETH_V1_PROPOSER_DUTY" && .meta.client.additional_data.state_id == "finalized"
Expand Down Expand Up @@ -343,6 +357,7 @@ transforms:
- xatu_server_events_router.beacon_p2p_attestation
- xatu_server_events_router.blockprint_block_classification
- xatu_server_events_router.canonical_beacon_blob_sidecar
- xatu_server_events_router.canonical_beacon_validators
- xatu_server_events_router.canonical_beacon_block
- xatu_server_events_router.canonical_beacon_block_attester_slashing
- xatu_server_events_router.canonical_beacon_block_elaborated_attestation
Expand Down Expand Up @@ -1455,6 +1470,56 @@ transforms:
del(.event)
del(.meta)
del(.data)
canonical_beacon_validators_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .meta.client.additional_data.slot.number
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.block_root = .data.block_root
.block_parent_root = .data.block_parent_root
.versioned_hash = .meta.client.additional_data.versioned_hash
.kzg_commitment = .data.kzg_commitment
.kzg_proof = .data.kzg_proof
.proposer_index = .data.proposer_index
.blob_index = .data.index
.blob_size = .meta.client.additional_data.data_size
.blob_empty_size = .meta.client.additional_data.data_empty_size
key, err = .block_root + .versioned_hash + .blob_index
if err != null {
.error = err
.error_description = "failed to generate unique key"
}
.unique_key = seahash(key)
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
blockprint_block_classification_formatted:
type: remap
inputs:
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ services:
"libp2p-trace-gossipsub-beacon-block"
"libp2p-trace-gossipsub-beacon-attestation"
"libp2p-trace-gossipsub-blob-sidecar"
"beacon-api-eth-v1-beacon-validators"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.13.15
github.com/ethpandaops/beacon v0.35.0
github.com/ethpandaops/beacon v0.37.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/go-co-op/gocron v1.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R
github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo=
github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.35.0 h1:ZkHfxm41N0wkv503Xdb6rFxLuEnIonClUQWUPFHS5VU=
github.com/ethpandaops/beacon v0.35.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/beacon v0.37.0 h1:T+F0IEjkSrevAbGA4zsqvqjnm4IRp+JKLsd8DyAO8ZQ=
github.com/ethpandaops/beacon v0.37.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,23 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon,
clientMeta,
),
v1.NewBeaconValidatorsDeriver(
c.log,
&c.Config.Derivers.BeaconValidatorsConfig,
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V1_BEACON_VALIDATORS,
c.coordinatorClient,
wallclock,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
}

c.eventDerivers = eventDerivers
Expand Down
Loading

0 comments on commit 786e0f9

Please sign in to comment.