Skip to content

Commit

Permalink
feat(cl-mimicry): Add Gossipsub_beacon_block events (#315)
Browse files Browse the repository at this point in the history
* feat(cl-mimicry): Add Gossipsub_beacon_block events

* feat: Add script to drop tables in ClickHouse

* fix: Comment before distributed table

* Merge master

* style: Remove unnecessary empty line in constructor function
  • Loading branch information
samcm authored Apr 26, 2024
1 parent d5d1f6b commit f1f7211
Show file tree
Hide file tree
Showing 24 changed files with 3,753 additions and 1,946 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ proto:
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/eth/v2 --go_out=./pkg/proto/eth/v2/ pkg/proto/eth/v2/*.proto
protoc --proto_path=./ --proto_path=./pkg/proto/eth/v1 --proto_path=./pkg/proto/eth/v2 --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/xatu --go-grpc_out=. --go-grpc_opt=paths=source_relative --go_out=./pkg/proto/xatu pkg/proto/xatu/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/blockprint --go_out=./pkg/proto/blockprint pkg/proto/blockprint/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p --go_out=./pkg/proto/libp2p pkg/proto/libp2p/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p --go_out=./pkg/proto/libp2p pkg/proto/libp2p/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p/gossipsub --go_out=./pkg/proto/libp2p/gossipsub pkg/proto/libp2p/gossipsub/*.proto

17 changes: 17 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ transforms:
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_handle_metadata: .event.name == "LIBP2P_TRACE_HANDLE_METADATA"
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block : .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -822,3 +823,19 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_gossipsub_beacon_block_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-gossipsub-beacon-block
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
117 changes: 117 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ transforms:
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_handle_metadata: .event.name == "LIBP2P_TRACE_HANDLE_METADATA"
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -406,6 +407,7 @@ transforms:
- xatu_server_events_router.libp2p_trace_join
- xatu_server_events_router.libp2p_trace_handle_metadata
- xatu_server_events_router.libp2p_trace_handle_status
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -2086,6 +2088,101 @@ transforms:
del(.meta)
del(.data)
del(.path)
libp2p_trace_gossipsub_beacon_block_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
peer_id_key, err = .meta.client.additional_data.metadata.peer_id + .meta.ethereum.network.name
if err != null {
.error = err
.error_description = "failed to generate peer id unique key"
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.unique_key = seahash(.event.id)
.proposer_index = .data.proposer_index
.propagation_slot_start_diff = .meta.client.additional_data.propagation.slot_start_diff
.block = .data.block
.slot = .data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
topicParts, err = split(.meta.client.additional_data.topic, "/")
if err != null {
.error = err
.error_description = "failed to split topic"
} else {
if length(topicParts) != 5 {
errDebug = {
"topic": .meta.client.additional_data.topic,
}
.error_description = "failed to split topic"
}
}
.topic_layer = topicParts[1]
.topic_fork_digest_value = topicParts[2]
.topic_name = topicParts[3]
.topic_encoding = topicParts[4]
.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
del(.path)
libp2p_trace_rpc_exploder:
type: remap
inputs:
Expand Down Expand Up @@ -3511,3 +3608,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_gossipsub_beacon_block_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_gossipsub_beacon_block_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_gossipsub_beacon_block
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_block ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
CREATE TABLE libp2p_gossipsub_beacon_block_local on cluster '{cluster}' (
event_date_time DateTime64(3) Codec(DoubleDelta, ZSTD(1)),
slot UInt32 Codec(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_slot UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
propagation_slot_start_diff UInt32 Codec(ZSTD(1)),
block FixedString(66) Codec(ZSTD(1)),
proposer_index UInt32 CODEC(ZSTD(1)),
peer_id_unique_key Int64,
message_id String CODEC(ZSTD(1)),
message_size UInt32 Codec(ZSTD(1)),
topic_layer LowCardinality(String),
topic_fork_digest_value LowCardinality(String),
topic_name LowCardinality(String),
topic_encoding LowCardinality(String),
meta_client_name LowCardinality(String),
meta_client_id String Codec(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) Codec(ZSTD(1)),
meta_client_geo_city LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) Codec(ZSTD(1)),
meta_network_id Int32 Codec(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String)
) Engine = ReplicatedMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}')
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (slot_start_date_time, meta_network_name, meta_client_name);

ALTER TABLE libp2p_gossipsub_beacon_block_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Table for libp2p gossipsub beacon block data.',
COMMENT COLUMN event_date_time 'Timestamp of the event with millisecond precision',
COMMENT COLUMN slot 'Slot number associated with the event',
COMMENT COLUMN slot_start_date_time 'Start date and time of the slot',
COMMENT COLUMN epoch 'Epoch number associated with the event',
COMMENT COLUMN epoch_start_date_time 'Start date and time of the epoch',
COMMENT COLUMN wallclock_slot 'Slot number of the wall clock when the event was received',
COMMENT COLUMN wallclock_slot_start_date_time 'Start date and time of the wall clock slot when the event was received',
COMMENT COLUMN wallclock_epoch 'Epoch number of the wall clock when the event was received',
COMMENT COLUMN wallclock_epoch_start_date_time 'Start date and time of the wall clock epoch when the event was received',
COMMENT COLUMN propagation_slot_start_diff 'Difference in slot start time for propagation',
COMMENT COLUMN proposer_index 'The proposer index of the beacon block',
COMMENT COLUMN block 'The beacon block root hash',
COMMENT COLUMN peer_id_unique_key 'Unique key associated with the identifier of the peer',
COMMENT COLUMN message_id 'Identifier of the message',
COMMENT COLUMN message_size 'Size of the message in bytes',
COMMENT COLUMN topic_layer 'Layer of the topic in the gossipsub protocol',
COMMENT COLUMN topic_fork_digest_value 'Fork digest value of the topic',
COMMENT COLUMN topic_name 'Name of the topic',
COMMENT COLUMN topic_encoding 'Encoding used for the topic',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Network ID associated with the client',
COMMENT COLUMN meta_network_name 'Name of the network associated with the client';

CREATE TABLE libp2p_gossipsub_beacon_block on cluster '{cluster}' AS libp2p_gossipsub_beacon_block_local
ENGINE = Distributed('{cluster}', default, libp2p_gossipsub_beacon_block_local, rand());
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ services:
"libp2p-trace-join"
"libp2p-trace-handle-metadata"
"libp2p-trace-handle-status"
"libp2p-trace-gossipsub-beacon-block"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu

go 1.22.0

replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4
replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b

require (
github.com/IBM/sarama v1.43.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,12 @@ github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0Hw
github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.35.0 h1:ZkHfxm41N0wkv503Xdb6rFxLuEnIonClUQWUPFHS5VU=
github.com/ethpandaops/beacon v0.35.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/ethcore v0.0.0-20240306031202-16f9e1926c0c h1:B/Auy5g6Nv4Pd8R9W9dC1IpysO7LzEmqb2C4iYM4j2A=
github.com/ethpandaops/ethcore v0.0.0-20240306031202-16f9e1926c0c/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4 h1:y3MoajkTvlaNZFfEjGzI3U/EhBetpzDlntrEJjISPfU=
github.com/ethpandaops/hermes v0.0.0-20240422024036-5bcb536550d4/go.mod h1:WOtvdP1zl53vuoOX6PRZ9oAbbJUiDso50efmAjoUZzI=
github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b h1:Iigwjof7KReOiEPHRAl3KWXFYDkVBN3GjxC8qwzdfwM=
github.com/ethpandaops/hermes v0.0.0-20240424055612-d87881163f3b/go.mod h1:WOtvdP1zl53vuoOX6PRZ9oAbbJUiDso50efmAjoUZzI=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
Expand Down
Loading

0 comments on commit f1f7211

Please sign in to comment.