From eeee7159d2750975a3273d7913e24ac2d2c1d655 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 13 Jun 2024 12:41:22 +1000 Subject: [PATCH 1/2] feat(processor): Add worker support for sync workloads (#331) * feat(processor): Add worker support for sync workloads * Cancellable * test: Add unit tests for batch item processor --- pkg/processor/batch.go | 85 +++++++++++++++++++++++++------- pkg/processor/batch_test.go | 98 +++++++++++++++++++++++++++++++++---- 2 files changed, 156 insertions(+), 27 deletions(-) diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 386c4568..76e95aab 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -205,36 +205,85 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { } // ImmediatelyExportItems immediately exports the items to the exporter. -// Useful for propogating errors from the exporter. +// Useful for propagating errors from the exporter. func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") defer span.End() - if l := len(items); l > 0 { - countItemsToExport := len(items) + if len(items) == 0 { + return nil + } - // Split the items in to chunks of our max batch size - for i := 0; i < countItemsToExport; i += bvp.o.MaxExportBatchSize { - end := i + bvp.o.MaxExportBatchSize - if end > countItemsToExport { - end = countItemsToExport - } + countItemsToExport := len(items) - itemsBatch := items[i:end] + batchSize := bvp.o.MaxExportBatchSize + if batchSize == 0 { + batchSize = 1 // Ensure we can't divide by zero + } - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - }).Debug("Immediately exporting items") + batches := (countItemsToExport + batchSize - 1) / batchSize - err := bvp.exportWithTimeout(ctx, itemsBatch) + batchCh := make(chan []*T, batches) + errCh := make(chan error, 1) - if err != nil { - return err - } + defer close(errCh) + + var wg sync.WaitGroup + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + for i := 0; i < countItemsToExport; i += batchSize { + end := i + batchSize + if end > countItemsToExport { + end = countItemsToExport } + + itemsBatch := items[i:end] + batchCh <- itemsBatch } + close(batchCh) // Close the channel after all batches are sent - return nil + bvp.log. + WithField("workers", bvp.o.Workers). + WithField("batches", batches). + Debug("Split items into batches for immediate export") + + for i := 0; i < bvp.o.Workers && i < batches; i++ { + wg.Add(1) + + go func(workerID int) { + defer wg.Done() + + for itemsBatch := range batchCh { + bvp.log.WithFields(logrus.Fields{ + "count": len(itemsBatch), + "worker": workerID, + }).Debug("Immediately exporting items") + + err := bvp.exportWithTimeout(cctx, itemsBatch) + if err != nil { + select { + case errCh <- err: + default: + } + + cancel() // Cancel the context to stop other workers + + return + } + } + }(i) + } + + wg.Wait() + + select { + case err := <-errCh: + return err + default: + return nil + } } // exportWithTimeout exports the items with a timeout. diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index fa5c43d4..c7850f9d 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -9,6 +9,7 @@ package processor import ( "context" "errors" + "fmt" "io" "strconv" "sync" @@ -25,18 +26,28 @@ type TestItem struct { } type testBatchExporter[T TestItem] struct { - mu sync.Mutex - items []*T - sizes []int - batchCount int - shutdownCount int - errors []error - droppedCount int - idx int - err error + mu sync.Mutex + items []*T + sizes []int + batchCount int + shutdownCount int + errors []error + droppedCount int + idx int + err error + failNextExport bool + delay time.Duration } func (t *testBatchExporter[T]) ExportItems(ctx context.Context, items []*T) error { + time.Sleep(t.delay) + + if t.failNextExport { + t.failNextExport = false + + return errors.New("export failure") + } + t.mu.Lock() defer t.mu.Unlock() @@ -487,3 +498,72 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { t.Errorf("Expected write to fail") } } + +func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { + // Define a range of values for workers, maxExportBatchSize, and itemsToExport + workerCounts := []int{1, 5, 10} + maxBatchSizes := []int{1, 5, 10, 20} + itemCounts := []int{0, 1, 25, 50} + + for _, workers := range workerCounts { + for _, maxBatchSize := range maxBatchSizes { + for _, itemsToExport := range itemCounts { + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.NoError(t, err) + + expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize + if itemsToExport == 0 { + expectedBatches = 0 + } + + if te.len() != itemsToExport { + t.Errorf("Expected all items to be exported, got: %v", te.len()) + } + + if te.getBatchCount() != expectedBatches { + t.Errorf("Expected %v batches to be exported, got: %v", expectedBatches, te.getBatchCount()) + } + }) + } + } + } +} + +func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { + workers := 10 + maxBatchSize := 10 + itemsToExport := 5000 + + t.Run(fmt.Sprintf("%d workers, batch size %d, %d items with cancellation on failure", workers, maxBatchSize, itemsToExport), func(t *testing.T) { + te := testBatchExporter[TestItem]{ + delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time + } + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + require.NoError(t, err) + + items := make([]*TestItem, itemsToExport) + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Simulate export failure and expect cancellation + te.failNextExport = true + + err = bsp.ImmediatelyExportItems(context.Background(), items) + require.Error(t, err, "Expected an error due to simulated export failure") + + // Ensure we exported less than the number of batches since the export should've + // stopped due to the failure. + require.Less(t, te.batchCount, itemsToExport/maxBatchSize) + }) +} From 77c10d9b0d128ab97d7fdbec203e4dca04548157 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:12:00 +1000 Subject: [PATCH 2/2] feat(cannon): split out validator pubkey & creds --- .../vector-kafka-clickhouse.yaml | 153 +++++++++++- ...canonical_beacon_validators_split.down.sql | 94 +++++++ ...0_canonical_beacon_validators_split.up.sql | 230 ++++++++++++++++++ 3 files changed, 473 insertions(+), 4 deletions(-) create mode 100644 deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql create mode 100644 deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml index 2f9808f7..da626cb6 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml @@ -1499,7 +1499,6 @@ transforms: for_each(array!(.data.validators)) -> |_index, validator| { events = push(events, { - "key": .event.id, "event_date_time": .event_date_time, "updated_date_time": .updated_date_time, "meta_client_name": .meta_client_name, @@ -1527,9 +1526,115 @@ transforms: "activation_epoch": validator.data.activation_epoch, "effective_balance": validator.data.effective_balance, "exit_epoch": validator.data.exit_epoch, - "pubkey": validator.data.pubkey, "slashed": validator.data.slashed, - "withdrawable_epoch": validator.data.withdrawable_epoch, + "withdrawable_epoch": validator.data.withdrawable_epoch + }) + } + . = events + canonical_beacon_validators_pubkeys_formatted: + type: remap + inputs: + - xatu_server_events_router.canonical_beacon_validators + source: |- + event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); + if err == null { + .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") + } else { + .error = err + .error_description = "failed to parse event date time" + log(., level: "error", rate_limit_secs: 60) + } + + epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); + if err == null { + .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) + } else { + .error = err + .error_description = "failed to parse epoch start date time" + log(., level: "error", rate_limit_secs: 60) + } + + events = [] + + .updated_date_time = to_unix_timestamp(now()) + + for_each(array!(.data.validators)) -> |_index, validator| { + events = push(events, { + "event_date_time": .event_date_time, + "updated_date_time": .updated_date_time, + "meta_client_name": .meta_client_name, + "meta_client_id": .meta_client_id, + "meta_client_version": .meta_client_version, + "meta_client_implementation": .meta_client_implementation, + "meta_client_os": .meta_client_os, + "meta_client_ip": .meta_client_ip, + "meta_network_id": .meta_network_id, + "meta_network_name": .meta_network_name, + "meta_client_geo_city": .meta_client_geo_city, + "meta_client_geo_country": .meta_client_geo_country, + "meta_client_geo_country_code": .meta_client_geo_country_code, + "meta_client_geo_continent_code": .meta_client_geo_continent_code, + "meta_client_geo_longitude": .meta_client_geo_longitude, + "meta_client_geo_latitude": .meta_client_geo_latitude, + "meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number, + "meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization, + "epoch": .meta.client.additional_data.epoch.number, + "epoch_start_date_time": .epoch_start_date_time, + "index": validator.index, + "pubkey": validator.data.pubkey + }) + } + . = events + canonical_beacon_validators_withdrawal_credentials_formatted: + type: remap + inputs: + - xatu_server_events_router.canonical_beacon_validators + source: |- + event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); + if err == null { + .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") + } else { + .error = err + .error_description = "failed to parse event date time" + log(., level: "error", rate_limit_secs: 60) + } + + epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); + if err == null { + .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) + } else { + .error = err + .error_description = "failed to parse epoch start date time" + log(., level: "error", rate_limit_secs: 60) + } + + events = [] + + .updated_date_time = to_unix_timestamp(now()) + + for_each(array!(.data.validators)) -> |_index, validator| { + events = push(events, { + "event_date_time": .event_date_time, + "updated_date_time": .updated_date_time, + "meta_client_name": .meta_client_name, + "meta_client_id": .meta_client_id, + "meta_client_version": .meta_client_version, + "meta_client_implementation": .meta_client_implementation, + "meta_client_os": .meta_client_os, + "meta_client_ip": .meta_client_ip, + "meta_network_id": .meta_network_id, + "meta_network_name": .meta_network_name, + "meta_client_geo_city": .meta_client_geo_city, + "meta_client_geo_country": .meta_client_geo_country, + "meta_client_geo_country_code": .meta_client_geo_country_code, + "meta_client_geo_continent_code": .meta_client_geo_continent_code, + "meta_client_geo_longitude": .meta_client_geo_longitude, + "meta_client_geo_latitude": .meta_client_geo_latitude, + "meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number, + "meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization, + "epoch": .meta.client.additional_data.epoch.number, + "epoch_start_date_time": .epoch_start_date_time, + "index": validator.index, "withdrawal_credentials": validator.data.withdrawal_credentials }) } @@ -2386,4 +2491,44 @@ sinks: max_events: 200000 healthcheck: enabled: true - skip_unknown_fields: false \ No newline at end of file + skip_unknown_fields: false + canonical_beacon_validators_pubkeys_clickhouse: + type: clickhouse + inputs: + - canonical_beacon_validators_pubkeys_formatted + database: default + endpoint: "${CLICKHOUSE_ENDPOINT}" + table: canonical_beacon_validators_pubkeys + auth: + strategy: basic + user: "${CLICKHOUSE_USER}" + password: "${CLICKHOUSE_PASSWORD}" + batch: + max_bytes: 52428800 + max_events: 200000 + timeout_secs: 1 + buffer: + max_events: 200000 + healthcheck: + enabled: true + skip_unknown_fields: false + canonical_beacon_validators_withdrawal_credentials_clickhouse: + type: clickhouse + inputs: + - canonical_beacon_validators_withdrawal_credentials_formatted + database: default + endpoint: "${CLICKHOUSE_ENDPOINT}" + table: canonical_beacon_validators_withdrawal_credentials + auth: + strategy: basic + user: "${CLICKHOUSE_USER}" + password: "${CLICKHOUSE_PASSWORD}" + batch: + max_bytes: 52428800 + max_events: 200000 + timeout_secs: 1 + buffer: + max_events: 200000 + healthcheck: + enabled: true + skip_unknown_fields: false diff --git a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql new file mode 100644 index 00000000..d79fd523 --- /dev/null +++ b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql @@ -0,0 +1,94 @@ +DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC; +DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC; + +DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys on cluster '{cluster}' SYNC; +DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys_local on cluster '{cluster}' SYNC; + +DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' SYNC; +DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' SYNC; + +CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}' +( + updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), + epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), + epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + `index` UInt32 CODEC(ZSTD(1)), + balance UInt64 CODEC(ZSTD(1)), + `status` LowCardinality(String), + pubkey String CODEC(ZSTD(1)), + withdrawal_credentials String CODEC(ZSTD(1)), + effective_balance UInt64 CODEC(ZSTD(1)), + slashed Bool, + activation_epoch UInt64 CODEC(ZSTD(1)), + activation_eligibility_epoch UInt64 CODEC(ZSTD(1)), + exit_epoch UInt64 CODEC(ZSTD(1)), + withdrawable_epoch UInt64 CODEC(ZSTD(1)), + meta_client_name LowCardinality(String), + meta_client_id String CODEC(ZSTD(1)), + meta_client_version LowCardinality(String), + meta_client_implementation LowCardinality(String), + meta_client_os LowCardinality(String), + meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), + meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), + meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), + meta_network_name LowCardinality(String), + meta_consensus_version LowCardinality(String), + meta_consensus_version_major LowCardinality(String), + meta_consensus_version_minor LowCardinality(String), + meta_consensus_version_patch LowCardinality(String), + meta_consensus_implementation LowCardinality(String), + meta_labels Map(String, String) CODEC(ZSTD(1)) +) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time) +PARTITION BY toStartOfMonth(epoch_start_date_time) +ORDER BY (epoch_start_date_time, index, meta_network_name); + +ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}' +MODIFY COMMENT 'Contains a validator state for an epoch.', +COMMENT COLUMN updated_date_time 'When this row was last updated', +COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', +COMMENT COLUMN epoch 'The epoch number from beacon block payload', +COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', +COMMENT COLUMN `index` 'The index of the validator', +COMMENT COLUMN `balance` 'The balance of the validator', +COMMENT COLUMN `status` 'The status of the validator', +COMMENT COLUMN pubkey 'The public key of the validator', +COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator', +COMMENT COLUMN effective_balance 'The effective balance of the validator', +COMMENT COLUMN slashed 'Whether the validator is slashed', +COMMENT COLUMN activation_epoch 'The epoch when the validator was activated', +COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated', +COMMENT COLUMN exit_epoch 'The epoch when the validator exited', +COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw', +COMMENT COLUMN meta_client_name 'Name of the client that generated the event', +COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', +COMMENT COLUMN meta_client_version 'Version of the client that generated the event', +COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', +COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', +COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', +COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', +COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', +COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', +COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', +COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', +COMMENT COLUMN meta_network_id 'Ethereum network ID', +COMMENT COLUMN meta_network_name 'Ethereum network name', +COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', +COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', +COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', +COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', +COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', +COMMENT COLUMN meta_labels 'Labels associated with the event'; + +CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local +ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); diff --git a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql new file mode 100644 index 00000000..8b43fc36 --- /dev/null +++ b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql @@ -0,0 +1,230 @@ +DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC; +DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC; + +CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}' +( + updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), + epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), + epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + `index` UInt32 CODEC(ZSTD(1)), + balance UInt64 CODEC(ZSTD(1)), + `status` LowCardinality(String), + effective_balance UInt64 CODEC(ZSTD(1)), + slashed Bool, + activation_epoch UInt64 CODEC(ZSTD(1)), + activation_eligibility_epoch UInt64 CODEC(ZSTD(1)), + exit_epoch UInt64 CODEC(ZSTD(1)), + withdrawable_epoch UInt64 CODEC(ZSTD(1)), + meta_client_name LowCardinality(String), + meta_client_id String CODEC(ZSTD(1)), + meta_client_version LowCardinality(String), + meta_client_implementation LowCardinality(String), + meta_client_os LowCardinality(String), + meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), + meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), + meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), + meta_network_name LowCardinality(String), + meta_consensus_version LowCardinality(String), + meta_consensus_version_major LowCardinality(String), + meta_consensus_version_minor LowCardinality(String), + meta_consensus_version_patch LowCardinality(String), + meta_consensus_implementation LowCardinality(String), + meta_labels Map(String, String) CODEC(ZSTD(1)) +) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time) +PARTITION BY toStartOfMonth(epoch_start_date_time) +ORDER BY (epoch_start_date_time, index, meta_network_name); + +ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}' +MODIFY COMMENT 'Contains a validator state for an epoch.', +COMMENT COLUMN updated_date_time 'When this row was last updated', +COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', +COMMENT COLUMN epoch 'The epoch number from beacon block payload', +COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', +COMMENT COLUMN `index` 'The index of the validator', +COMMENT COLUMN balance 'The balance of the validator', +COMMENT COLUMN `status` 'The status of the validator', +COMMENT COLUMN effective_balance 'The effective balance of the validator', +COMMENT COLUMN slashed 'Whether the validator is slashed', +COMMENT COLUMN activation_epoch 'The epoch when the validator was activated', +COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated', +COMMENT COLUMN exit_epoch 'The epoch when the validator exited', +COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw', +COMMENT COLUMN meta_client_name 'Name of the client that generated the event', +COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', +COMMENT COLUMN meta_client_version 'Version of the client that generated the event', +COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', +COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', +COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', +COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', +COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', +COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', +COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', +COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', +COMMENT COLUMN meta_network_id 'Ethereum network ID', +COMMENT COLUMN meta_network_name 'Ethereum network name', +COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', +COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', +COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', +COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', +COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', +COMMENT COLUMN meta_labels 'Labels associated with the event'; + +CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local +ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); + +CREATE TABLE default.canonical_beacon_validators_pubkeys_local on cluster '{cluster}' +( + updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + -- ensure the first epoch the pubkey was seen is in this table + -- add the updated_date_time to make sure we can always overwrite the data + -- 4294967295 = UInt32 max + `version` UInt32 DEFAULT 4294967295 + toUnixTimestamp(updated_date_time) - toUnixTimestamp(epoch_start_date_time) CODEC(DoubleDelta, ZSTD(1)), + event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), + epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), + epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + `index` UInt32 CODEC(ZSTD(1)), + pubkey String CODEC(ZSTD(1)), + meta_client_name LowCardinality(String), + meta_client_id String CODEC(ZSTD(1)), + meta_client_version LowCardinality(String), + meta_client_implementation LowCardinality(String), + meta_client_os LowCardinality(String), + meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), + meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), + meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), + meta_network_name LowCardinality(String), + meta_consensus_version LowCardinality(String), + meta_consensus_version_major LowCardinality(String), + meta_consensus_version_minor LowCardinality(String), + meta_consensus_version_patch LowCardinality(String), + meta_consensus_implementation LowCardinality(String), + meta_labels Map(String, String) CODEC(ZSTD(1)) +) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', `version`) +PARTITION BY toStartOfMonth(epoch_start_date_time) +ORDER BY (index, pubkey, meta_network_name); + +ALTER TABLE default.canonical_beacon_validators_pubkeys_local ON CLUSTER '{cluster}' +MODIFY COMMENT 'Contains a validator state for an epoch.', +COMMENT COLUMN updated_date_time 'When this row was last updated', +COMMENT COLUMN `version` 'Version of this row, to help with de-duplication we want the latest updated_date_time but earliest epoch_start_date_time the pubkey was seen', +COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', +COMMENT COLUMN epoch 'The epoch number from beacon block payload', +COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', +COMMENT COLUMN `index` 'The index of the validator', +COMMENT COLUMN pubkey 'The public key of the validator', +COMMENT COLUMN meta_client_name 'Name of the client that generated the event', +COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', +COMMENT COLUMN meta_client_version 'Version of the client that generated the event', +COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', +COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', +COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', +COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', +COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', +COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', +COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', +COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', +COMMENT COLUMN meta_network_id 'Ethereum network ID', +COMMENT COLUMN meta_network_name 'Ethereum network name', +COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', +COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', +COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', +COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', +COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', +COMMENT COLUMN meta_labels 'Labels associated with the event'; + +CREATE TABLE canonical_beacon_validators_pubkeys on cluster '{cluster}' AS canonical_beacon_validators_pubkeys_local +ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_pubkeys_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); + +CREATE TABLE default.canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' +( + updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + -- ensure the first epoch the withdrawal_credentials was seen is in this table + -- add the updated_date_time to make sure we can always overwrite the data + -- 4294967295 = UInt32 max + `version` UInt32 DEFAULT 4294967295 + toUnixTimestamp(updated_date_time) - toUnixTimestamp(epoch_start_date_time) CODEC(DoubleDelta, ZSTD(1)), + event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), + epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), + epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), + `index` UInt32 CODEC(ZSTD(1)), + withdrawal_credentials String CODEC(ZSTD(1)), + meta_client_name LowCardinality(String), + meta_client_id String CODEC(ZSTD(1)), + meta_client_version LowCardinality(String), + meta_client_implementation LowCardinality(String), + meta_client_os LowCardinality(String), + meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), + meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), + meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), + meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), + meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), + meta_network_name LowCardinality(String), + meta_consensus_version LowCardinality(String), + meta_consensus_version_major LowCardinality(String), + meta_consensus_version_minor LowCardinality(String), + meta_consensus_version_patch LowCardinality(String), + meta_consensus_implementation LowCardinality(String), + meta_labels Map(String, String) CODEC(ZSTD(1)) +) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', `version`) +PARTITION BY toStartOfMonth(epoch_start_date_time) +ORDER BY (index, withdrawal_credentials, meta_network_name); + +ALTER TABLE default.canonical_beacon_validators_withdrawal_credentials_local ON CLUSTER '{cluster}' +MODIFY COMMENT 'Contains a validator state for an epoch.', +COMMENT COLUMN updated_date_time 'When this row was last updated', +COMMENT COLUMN `version` 'Version of this row, to help with de-duplication we want the latest updated_date_time but earliest epoch_start_date_time the withdrawal_credentials was seen', +COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', +COMMENT COLUMN epoch 'The epoch number from beacon block payload', +COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', +COMMENT COLUMN `index` 'The index of the validator', +COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator', +COMMENT COLUMN meta_client_name 'Name of the client that generated the event', +COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', +COMMENT COLUMN meta_client_version 'Version of the client that generated the event', +COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', +COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', +COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', +COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', +COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', +COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', +COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', +COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', +COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', +COMMENT COLUMN meta_network_id 'Ethereum network ID', +COMMENT COLUMN meta_network_name 'Ethereum network name', +COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', +COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', +COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', +COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', +COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', +COMMENT COLUMN meta_labels 'Labels associated with the event'; + +CREATE TABLE canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' AS canonical_beacon_validators_withdrawal_credentials_local +ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_withdrawal_credentials_local, cityHash64(epoch_start_date_time, `index`, meta_network_name));