Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cannon): split out validator pubkey & creds #333

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 149 additions & 4 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,6 @@ transforms:
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"key": .event.id,
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
Expand Down Expand Up @@ -1527,9 +1526,115 @@ transforms:
"activation_epoch": validator.data.activation_epoch,
"effective_balance": validator.data.effective_balance,
"exit_epoch": validator.data.exit_epoch,
"pubkey": validator.data.pubkey,
"slashed": validator.data.slashed,
"withdrawable_epoch": validator.data.withdrawable_epoch,
"withdrawable_epoch": validator.data.withdrawable_epoch
})
}
. = events
canonical_beacon_validators_pubkeys_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
events = []
.updated_date_time = to_unix_timestamp(now())
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"pubkey": validator.data.pubkey
})
}
. = events
canonical_beacon_validators_withdrawal_credentials_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
events = []
.updated_date_time = to_unix_timestamp(now())
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"withdrawal_credentials": validator.data.withdrawal_credentials
})
}
Expand Down Expand Up @@ -2386,4 +2491,44 @@ sinks:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
skip_unknown_fields: false
canonical_beacon_validators_pubkeys_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_pubkeys_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_pubkeys
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_validators_withdrawal_credentials_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_withdrawal_credentials_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_withdrawal_credentials
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC;

DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys_local on cluster '{cluster}' SYNC;

DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' SYNC;

CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}'
(
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
epoch UInt32 CODEC(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
`index` UInt32 CODEC(ZSTD(1)),
balance UInt64 CODEC(ZSTD(1)),
`status` LowCardinality(String),
pubkey String CODEC(ZSTD(1)),
withdrawal_credentials String CODEC(ZSTD(1)),
effective_balance UInt64 CODEC(ZSTD(1)),
slashed Bool,
activation_epoch UInt64 CODEC(ZSTD(1)),
activation_eligibility_epoch UInt64 CODEC(ZSTD(1)),
exit_epoch UInt64 CODEC(ZSTD(1)),
withdrawable_epoch UInt64 CODEC(ZSTD(1)),
meta_client_name LowCardinality(String),
meta_client_id String CODEC(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)),
meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)),
meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String),
meta_consensus_version LowCardinality(String),
meta_consensus_version_major LowCardinality(String),
meta_consensus_version_minor LowCardinality(String),
meta_consensus_version_patch LowCardinality(String),
meta_consensus_implementation LowCardinality(String),
meta_labels Map(String, String) CODEC(ZSTD(1))
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(epoch_start_date_time)
ORDER BY (epoch_start_date_time, index, meta_network_name);

ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Contains a validator state for an epoch.',
COMMENT COLUMN updated_date_time 'When this row was last updated',
COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node',
COMMENT COLUMN epoch 'The epoch number from beacon block payload',
COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started',
COMMENT COLUMN `index` 'The index of the validator',
COMMENT COLUMN `balance` 'The balance of the validator',
COMMENT COLUMN `status` 'The status of the validator',
COMMENT COLUMN pubkey 'The public key of the validator',
COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator',
COMMENT COLUMN effective_balance 'The effective balance of the validator',
COMMENT COLUMN slashed 'Whether the validator is slashed',
COMMENT COLUMN activation_epoch 'The epoch when the validator was activated',
COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated',
COMMENT COLUMN exit_epoch 'The epoch when the validator exited',
COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Ethereum network ID',
COMMENT COLUMN meta_network_name 'Ethereum network name',
COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event',
COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event',
COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event',
COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event',
COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event',
COMMENT COLUMN meta_labels 'Labels associated with the event';

CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local
ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name));
Loading
Loading