Skip to content

Commit

Permalink
Merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 17, 2024
2 parents d941849 + 2b49552 commit 55f0bdb
Show file tree
Hide file tree
Showing 4 changed files with 492 additions and 4 deletions.
153 changes: 149 additions & 4 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,6 @@ transforms:
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"key": .event.id,
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
Expand Down Expand Up @@ -1527,9 +1526,115 @@ transforms:
"activation_epoch": validator.data.activation_epoch,
"effective_balance": validator.data.effective_balance,
"exit_epoch": validator.data.exit_epoch,
"pubkey": validator.data.pubkey,
"slashed": validator.data.slashed,
"withdrawable_epoch": validator.data.withdrawable_epoch,
"withdrawable_epoch": validator.data.withdrawable_epoch
})
}
. = events
canonical_beacon_validators_pubkeys_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
events = []
.updated_date_time = to_unix_timestamp(now())
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"pubkey": validator.data.pubkey
})
}
. = events
canonical_beacon_validators_withdrawal_credentials_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
events = []
.updated_date_time = to_unix_timestamp(now())
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"withdrawal_credentials": validator.data.withdrawal_credentials
})
}
Expand Down Expand Up @@ -2386,4 +2491,44 @@ sinks:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
skip_unknown_fields: false
canonical_beacon_validators_pubkeys_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_pubkeys_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_pubkeys
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_validators_withdrawal_credentials_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_withdrawal_credentials_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_withdrawal_credentials
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC;

DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys_local on cluster '{cluster}' SYNC;

DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' SYNC;

CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}'
(
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
epoch UInt32 CODEC(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
`index` UInt32 CODEC(ZSTD(1)),
balance UInt64 CODEC(ZSTD(1)),
`status` LowCardinality(String),
pubkey String CODEC(ZSTD(1)),
withdrawal_credentials String CODEC(ZSTD(1)),
effective_balance UInt64 CODEC(ZSTD(1)),
slashed Bool,
activation_epoch UInt64 CODEC(ZSTD(1)),
activation_eligibility_epoch UInt64 CODEC(ZSTD(1)),
exit_epoch UInt64 CODEC(ZSTD(1)),
withdrawable_epoch UInt64 CODEC(ZSTD(1)),
meta_client_name LowCardinality(String),
meta_client_id String CODEC(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)),
meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)),
meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String),
meta_consensus_version LowCardinality(String),
meta_consensus_version_major LowCardinality(String),
meta_consensus_version_minor LowCardinality(String),
meta_consensus_version_patch LowCardinality(String),
meta_consensus_implementation LowCardinality(String),
meta_labels Map(String, String) CODEC(ZSTD(1))
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(epoch_start_date_time)
ORDER BY (epoch_start_date_time, index, meta_network_name);

ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Contains a validator state for an epoch.',
COMMENT COLUMN updated_date_time 'When this row was last updated',
COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node',
COMMENT COLUMN epoch 'The epoch number from beacon block payload',
COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started',
COMMENT COLUMN `index` 'The index of the validator',
COMMENT COLUMN `balance` 'The balance of the validator',
COMMENT COLUMN `status` 'The status of the validator',
COMMENT COLUMN pubkey 'The public key of the validator',
COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator',
COMMENT COLUMN effective_balance 'The effective balance of the validator',
COMMENT COLUMN slashed 'Whether the validator is slashed',
COMMENT COLUMN activation_epoch 'The epoch when the validator was activated',
COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated',
COMMENT COLUMN exit_epoch 'The epoch when the validator exited',
COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Ethereum network ID',
COMMENT COLUMN meta_network_name 'Ethereum network name',
COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event',
COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event',
COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event',
COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event',
COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event',
COMMENT COLUMN meta_labels 'Labels associated with the event';

CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local
ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name));
Loading

0 comments on commit 55f0bdb

Please sign in to comment.