Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cl-mimicry): Add Gossipsub_beacon_block events #315

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ proto:
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/eth/v2 --go_out=./pkg/proto/eth/v2/ pkg/proto/eth/v2/*.proto
protoc --proto_path=./ --proto_path=./pkg/proto/eth/v1 --proto_path=./pkg/proto/eth/v2 --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/xatu --go-grpc_out=. --go-grpc_opt=paths=source_relative --go_out=./pkg/proto/xatu pkg/proto/xatu/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/blockprint --go_out=./pkg/proto/blockprint pkg/proto/blockprint/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p --go_out=./pkg/proto/libp2p pkg/proto/libp2p/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p --go_out=./pkg/proto/libp2p pkg/proto/libp2p/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p/gossipsub --go_out=./pkg/proto/libp2p/gossipsub pkg/proto/libp2p/gossipsub/*.proto

17 changes: 17 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ transforms:
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_handle_metadata: .event.name == "LIBP2P_TRACE_HANDLE_METADATA"
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block : .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -822,3 +823,19 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_gossipsub_beacon_block_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-gossipsub-beacon-block
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
117 changes: 117 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ transforms:
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_handle_metadata: .event.name == "LIBP2P_TRACE_HANDLE_METADATA"
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -406,6 +407,7 @@ transforms:
- xatu_server_events_router.libp2p_trace_join
- xatu_server_events_router.libp2p_trace_handle_metadata
- xatu_server_events_router.libp2p_trace_handle_status
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -2086,6 +2088,101 @@ transforms:
del(.meta)
del(.data)
del(.path)

libp2p_trace_gossipsub_beacon_block_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

peer_id_key, err = .meta.client.additional_data.metadata.peer_id + .meta.ethereum.network.name
if err != null {
.error = err
.error_description = "failed to generate peer id unique key"
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.unique_key = seahash(.event.id)

.proposer_index = .data.proposer_index


.propagation_slot_start_diff = .meta.client.additional_data.propagation.slot_start_diff
.block = .data.block

.slot = .data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

topicParts, err = split(.meta.client.additional_data.topic, "/")
if err != null {
.error = err
.error_description = "failed to split topic"
} else {
if length(topicParts) != 5 {
errDebug = {
"topic": .meta.client.additional_data.topic,
}
.error_description = "failed to split topic"
}
}

.topic_layer = topicParts[1]
.topic_fork_digest_value = topicParts[2]
.topic_name = topicParts[3]
.topic_encoding = topicParts[4]

.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id

.updated_date_time = to_unix_timestamp(now())

del(.event)
del(.meta)
del(.data)
del(.path)
libp2p_trace_rpc_exploder:
type: remap
inputs:
Expand Down Expand Up @@ -3511,3 +3608,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_gossipsub_beacon_block_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_gossipsub_beacon_block_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_gossipsub_beacon_block
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_block ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
CREATE TABLE libp2p_gossipsub_beacon_block_local on cluster '{cluster}' (
event_date_time DateTime64(3) Codec(DoubleDelta, ZSTD(1)),
slot UInt32 Codec(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_slot UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
propagation_slot_start_diff UInt32 Codec(ZSTD(1)),
block FixedString(66) Codec(ZSTD(1)),
proposer_index UInt32 CODEC(ZSTD(1)),
peer_id_unique_key Int64,
message_id String CODEC(ZSTD(1)),
message_size UInt32 Codec(ZSTD(1)),
topic_layer LowCardinality(String),
topic_fork_digest_value LowCardinality(String),
topic_name LowCardinality(String),
topic_encoding LowCardinality(String),
meta_client_name LowCardinality(String),
meta_client_id String Codec(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) Codec(ZSTD(1)),
meta_client_geo_city LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) Codec(ZSTD(1)),
meta_network_id Int32 Codec(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String)
) Engine = ReplicatedMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}')
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (slot_start_date_time, meta_network_name, meta_client_name);

ALTER TABLE libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}'
samcm marked this conversation as resolved.
Show resolved Hide resolved
MODIFY COMMENT 'Table for libp2p gossipsub beacon block data.',
COMMENT COLUMN event_date_time 'Timestamp of the event with millisecond precision',
COMMENT COLUMN slot 'Slot number associated with the event',
COMMENT COLUMN slot_start_date_time 'Start date and time of the slot',
COMMENT COLUMN epoch 'Epoch number associated with the event',
COMMENT COLUMN epoch_start_date_time 'Start date and time of the epoch',
COMMENT COLUMN wallclock_slot 'Slot number of the wall clock when the event was received',
COMMENT COLUMN wallclock_slot_start_date_time 'Start date and time of the wall clock slot when the event was received',
COMMENT COLUMN wallclock_epoch 'Epoch number of the wall clock when the event was received',
COMMENT COLUMN wallclock_epoch_start_date_time 'Start date and time of the wall clock epoch when the event was received',
COMMENT COLUMN propagation_slot_start_diff 'Difference in slot start time for propagation',
COMMENT COLUMN proposer_index 'The proposer index of the beacon block',
COMMENT COLUMN block 'The beacon block root hash',
COMMENT COLUMN peer_id_unique_key 'Unique key associated with the identifier of the peer',
COMMENT COLUMN message_id 'Identifier of the message',
COMMENT COLUMN message_size 'Size of the message in bytes',
COMMENT COLUMN topic_layer 'Layer of the topic in the gossipsub protocol',
COMMENT COLUMN topic_fork_digest_value 'Fork digest value of the topic',
COMMENT COLUMN topic_name 'Name of the topic',
COMMENT COLUMN topic_encoding 'Encoding used for the topic',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Network ID associated with the client',
COMMENT COLUMN meta_network_name 'Name of the network associated with the client';

CREATE TABLE libp2p_gossipsub_beacon_block on cluster '{cluster}' AS libp2p_gossipsub_beacon_block_local
ENGINE = Distributed('{cluster}', default, libp2p_gossipsub_beacon_block_local, rand());
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ services:
"libp2p-trace-join"
"libp2p-trace-handle-metadata"
"libp2p-trace-handle-status"
"libp2p-trace-gossipsub-beacon-block"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu

go 1.22.0

replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4
replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b

require (
github.com/IBM/sarama v1.43.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,12 @@ github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0Hw
github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.35.0 h1:ZkHfxm41N0wkv503Xdb6rFxLuEnIonClUQWUPFHS5VU=
github.com/ethpandaops/beacon v0.35.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/ethcore v0.0.0-20240306031202-16f9e1926c0c h1:B/Auy5g6Nv4Pd8R9W9dC1IpysO7LzEmqb2C4iYM4j2A=
github.com/ethpandaops/ethcore v0.0.0-20240306031202-16f9e1926c0c/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4 h1:y3MoajkTvlaNZFfEjGzI3U/EhBetpzDlntrEJjISPfU=
github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4/go.mod h1:WOtvdP1zl53vuoOX6PRZ9oAbbJUiDso50efmAjoUZzI=
github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b h1:Iigwjof7KReOiEPHRAl3KWXFYDkVBN3GjxC8qwzdfwM=
github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b/go.mod h1:WOtvdP1zl53vuoOX6PRZ9oAbbJUiDso50efmAjoUZzI=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
Expand Down
Loading
Loading