diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml index da626cb6..2f9808f7 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml @@ -1499,6 +1499,7 @@ transforms: for_each(array!(.data.validators)) -> |_index, validator| { events = push(events, { + "key": .event.id, "event_date_time": .event_date_time, "updated_date_time": .updated_date_time, "meta_client_name": .meta_client_name, @@ -1526,115 +1527,9 @@ transforms: "activation_epoch": validator.data.activation_epoch, "effective_balance": validator.data.effective_balance, "exit_epoch": validator.data.exit_epoch, + "pubkey": validator.data.pubkey, "slashed": validator.data.slashed, - "withdrawable_epoch": validator.data.withdrawable_epoch - }) - } - . = events - canonical_beacon_validators_pubkeys_formatted: - type: remap - inputs: - - xatu_server_events_router.canonical_beacon_validators - source: |- - event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); - if err == null { - .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") - } else { - .error = err - .error_description = "failed to parse event date time" - log(., level: "error", rate_limit_secs: 60) - } - - epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); - if err == null { - .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) - } else { - .error = err - .error_description = "failed to parse epoch start date time" - log(., level: "error", rate_limit_secs: 60) - } - - events = [] - - .updated_date_time = to_unix_timestamp(now()) - - for_each(array!(.data.validators)) -> |_index, validator| { - events = push(events, { - "event_date_time": .event_date_time, - "updated_date_time": .updated_date_time, - "meta_client_name": .meta_client_name, - "meta_client_id": .meta_client_id, - "meta_client_version": .meta_client_version, - "meta_client_implementation": .meta_client_implementation, - "meta_client_os": .meta_client_os, - "meta_client_ip": .meta_client_ip, - "meta_network_id": .meta_network_id, - "meta_network_name": .meta_network_name, - "meta_client_geo_city": .meta_client_geo_city, - "meta_client_geo_country": .meta_client_geo_country, - "meta_client_geo_country_code": .meta_client_geo_country_code, - "meta_client_geo_continent_code": .meta_client_geo_continent_code, - "meta_client_geo_longitude": .meta_client_geo_longitude, - "meta_client_geo_latitude": .meta_client_geo_latitude, - "meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number, - "meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization, - "epoch": .meta.client.additional_data.epoch.number, - "epoch_start_date_time": .epoch_start_date_time, - "index": validator.index, - "pubkey": validator.data.pubkey - }) - } - . = events - canonical_beacon_validators_withdrawal_credentials_formatted: - type: remap - inputs: - - xatu_server_events_router.canonical_beacon_validators - source: |- - event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); - if err == null { - .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") - } else { - .error = err - .error_description = "failed to parse event date time" - log(., level: "error", rate_limit_secs: 60) - } - - epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); - if err == null { - .epoch_start_date_time = to_unix_timestamp(epoch_start_date_time) - } else { - .error = err - .error_description = "failed to parse epoch start date time" - log(., level: "error", rate_limit_secs: 60) - } - - events = [] - - .updated_date_time = to_unix_timestamp(now()) - - for_each(array!(.data.validators)) -> |_index, validator| { - events = push(events, { - "event_date_time": .event_date_time, - "updated_date_time": .updated_date_time, - "meta_client_name": .meta_client_name, - "meta_client_id": .meta_client_id, - "meta_client_version": .meta_client_version, - "meta_client_implementation": .meta_client_implementation, - "meta_client_os": .meta_client_os, - "meta_client_ip": .meta_client_ip, - "meta_network_id": .meta_network_id, - "meta_network_name": .meta_network_name, - "meta_client_geo_city": .meta_client_geo_city, - "meta_client_geo_country": .meta_client_geo_country, - "meta_client_geo_country_code": .meta_client_geo_country_code, - "meta_client_geo_continent_code": .meta_client_geo_continent_code, - "meta_client_geo_longitude": .meta_client_geo_longitude, - "meta_client_geo_latitude": .meta_client_geo_latitude, - "meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number, - "meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization, - "epoch": .meta.client.additional_data.epoch.number, - "epoch_start_date_time": .epoch_start_date_time, - "index": validator.index, + "withdrawable_epoch": validator.data.withdrawable_epoch, "withdrawal_credentials": validator.data.withdrawal_credentials }) } @@ -2491,44 +2386,4 @@ sinks: max_events: 200000 healthcheck: enabled: true - skip_unknown_fields: false - canonical_beacon_validators_pubkeys_clickhouse: - type: clickhouse - inputs: - - canonical_beacon_validators_pubkeys_formatted - database: default - endpoint: "${CLICKHOUSE_ENDPOINT}" - table: canonical_beacon_validators_pubkeys - auth: - strategy: basic - user: "${CLICKHOUSE_USER}" - password: "${CLICKHOUSE_PASSWORD}" - batch: - max_bytes: 52428800 - max_events: 200000 - timeout_secs: 1 - buffer: - max_events: 200000 - healthcheck: - enabled: true - skip_unknown_fields: false - canonical_beacon_validators_withdrawal_credentials_clickhouse: - type: clickhouse - inputs: - - canonical_beacon_validators_withdrawal_credentials_formatted - database: default - endpoint: "${CLICKHOUSE_ENDPOINT}" - table: canonical_beacon_validators_withdrawal_credentials - auth: - strategy: basic - user: "${CLICKHOUSE_USER}" - password: "${CLICKHOUSE_PASSWORD}" - batch: - max_bytes: 52428800 - max_events: 200000 - timeout_secs: 1 - buffer: - max_events: 200000 - healthcheck: - enabled: true - skip_unknown_fields: false + skip_unknown_fields: false \ No newline at end of file diff --git a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql deleted file mode 100644 index d79fd523..00000000 --- a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.down.sql +++ /dev/null @@ -1,94 +0,0 @@ -DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC; -DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC; - -DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys on cluster '{cluster}' SYNC; -DROP TABLE IF EXISTS canonical_beacon_validators_pubkeys_local on cluster '{cluster}' SYNC; - -DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' SYNC; -DROP TABLE IF EXISTS canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' SYNC; - -CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}' -( - updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), - epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), - epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - `index` UInt32 CODEC(ZSTD(1)), - balance UInt64 CODEC(ZSTD(1)), - `status` LowCardinality(String), - pubkey String CODEC(ZSTD(1)), - withdrawal_credentials String CODEC(ZSTD(1)), - effective_balance UInt64 CODEC(ZSTD(1)), - slashed Bool, - activation_epoch UInt64 CODEC(ZSTD(1)), - activation_eligibility_epoch UInt64 CODEC(ZSTD(1)), - exit_epoch UInt64 CODEC(ZSTD(1)), - withdrawable_epoch UInt64 CODEC(ZSTD(1)), - meta_client_name LowCardinality(String), - meta_client_id String CODEC(ZSTD(1)), - meta_client_version LowCardinality(String), - meta_client_implementation LowCardinality(String), - meta_client_os LowCardinality(String), - meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), - meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), - meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), - meta_network_name LowCardinality(String), - meta_consensus_version LowCardinality(String), - meta_consensus_version_major LowCardinality(String), - meta_consensus_version_minor LowCardinality(String), - meta_consensus_version_patch LowCardinality(String), - meta_consensus_implementation LowCardinality(String), - meta_labels Map(String, String) CODEC(ZSTD(1)) -) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time) -PARTITION BY toStartOfMonth(epoch_start_date_time) -ORDER BY (epoch_start_date_time, index, meta_network_name); - -ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}' -MODIFY COMMENT 'Contains a validator state for an epoch.', -COMMENT COLUMN updated_date_time 'When this row was last updated', -COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', -COMMENT COLUMN epoch 'The epoch number from beacon block payload', -COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', -COMMENT COLUMN `index` 'The index of the validator', -COMMENT COLUMN `balance` 'The balance of the validator', -COMMENT COLUMN `status` 'The status of the validator', -COMMENT COLUMN pubkey 'The public key of the validator', -COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator', -COMMENT COLUMN effective_balance 'The effective balance of the validator', -COMMENT COLUMN slashed 'Whether the validator is slashed', -COMMENT COLUMN activation_epoch 'The epoch when the validator was activated', -COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated', -COMMENT COLUMN exit_epoch 'The epoch when the validator exited', -COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw', -COMMENT COLUMN meta_client_name 'Name of the client that generated the event', -COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', -COMMENT COLUMN meta_client_version 'Version of the client that generated the event', -COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', -COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', -COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', -COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', -COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', -COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', -COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', -COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', -COMMENT COLUMN meta_network_id 'Ethereum network ID', -COMMENT COLUMN meta_network_name 'Ethereum network name', -COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', -COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', -COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', -COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', -COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', -COMMENT COLUMN meta_labels 'Labels associated with the event'; - -CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local -ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); diff --git a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql b/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql deleted file mode 100644 index 8b43fc36..00000000 --- a/deploy/migrations/clickhouse/040_canonical_beacon_validators_split.up.sql +++ /dev/null @@ -1,230 +0,0 @@ -DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC; -DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC; - -CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}' -( - updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), - epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), - epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - `index` UInt32 CODEC(ZSTD(1)), - balance UInt64 CODEC(ZSTD(1)), - `status` LowCardinality(String), - effective_balance UInt64 CODEC(ZSTD(1)), - slashed Bool, - activation_epoch UInt64 CODEC(ZSTD(1)), - activation_eligibility_epoch UInt64 CODEC(ZSTD(1)), - exit_epoch UInt64 CODEC(ZSTD(1)), - withdrawable_epoch UInt64 CODEC(ZSTD(1)), - meta_client_name LowCardinality(String), - meta_client_id String CODEC(ZSTD(1)), - meta_client_version LowCardinality(String), - meta_client_implementation LowCardinality(String), - meta_client_os LowCardinality(String), - meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), - meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), - meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), - meta_network_name LowCardinality(String), - meta_consensus_version LowCardinality(String), - meta_consensus_version_major LowCardinality(String), - meta_consensus_version_minor LowCardinality(String), - meta_consensus_version_patch LowCardinality(String), - meta_consensus_implementation LowCardinality(String), - meta_labels Map(String, String) CODEC(ZSTD(1)) -) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time) -PARTITION BY toStartOfMonth(epoch_start_date_time) -ORDER BY (epoch_start_date_time, index, meta_network_name); - -ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}' -MODIFY COMMENT 'Contains a validator state for an epoch.', -COMMENT COLUMN updated_date_time 'When this row was last updated', -COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', -COMMENT COLUMN epoch 'The epoch number from beacon block payload', -COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', -COMMENT COLUMN `index` 'The index of the validator', -COMMENT COLUMN balance 'The balance of the validator', -COMMENT COLUMN `status` 'The status of the validator', -COMMENT COLUMN effective_balance 'The effective balance of the validator', -COMMENT COLUMN slashed 'Whether the validator is slashed', -COMMENT COLUMN activation_epoch 'The epoch when the validator was activated', -COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated', -COMMENT COLUMN exit_epoch 'The epoch when the validator exited', -COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw', -COMMENT COLUMN meta_client_name 'Name of the client that generated the event', -COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', -COMMENT COLUMN meta_client_version 'Version of the client that generated the event', -COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', -COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', -COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', -COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', -COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', -COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', -COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', -COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', -COMMENT COLUMN meta_network_id 'Ethereum network ID', -COMMENT COLUMN meta_network_name 'Ethereum network name', -COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', -COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', -COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', -COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', -COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', -COMMENT COLUMN meta_labels 'Labels associated with the event'; - -CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local -ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); - -CREATE TABLE default.canonical_beacon_validators_pubkeys_local on cluster '{cluster}' -( - updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - -- ensure the first epoch the pubkey was seen is in this table - -- add the updated_date_time to make sure we can always overwrite the data - -- 4294967295 = UInt32 max - `version` UInt32 DEFAULT 4294967295 + toUnixTimestamp(updated_date_time) - toUnixTimestamp(epoch_start_date_time) CODEC(DoubleDelta, ZSTD(1)), - event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), - epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), - epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - `index` UInt32 CODEC(ZSTD(1)), - pubkey String CODEC(ZSTD(1)), - meta_client_name LowCardinality(String), - meta_client_id String CODEC(ZSTD(1)), - meta_client_version LowCardinality(String), - meta_client_implementation LowCardinality(String), - meta_client_os LowCardinality(String), - meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), - meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), - meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), - meta_network_name LowCardinality(String), - meta_consensus_version LowCardinality(String), - meta_consensus_version_major LowCardinality(String), - meta_consensus_version_minor LowCardinality(String), - meta_consensus_version_patch LowCardinality(String), - meta_consensus_implementation LowCardinality(String), - meta_labels Map(String, String) CODEC(ZSTD(1)) -) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', `version`) -PARTITION BY toStartOfMonth(epoch_start_date_time) -ORDER BY (index, pubkey, meta_network_name); - -ALTER TABLE default.canonical_beacon_validators_pubkeys_local ON CLUSTER '{cluster}' -MODIFY COMMENT 'Contains a validator state for an epoch.', -COMMENT COLUMN updated_date_time 'When this row was last updated', -COMMENT COLUMN `version` 'Version of this row, to help with de-duplication we want the latest updated_date_time but earliest epoch_start_date_time the pubkey was seen', -COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', -COMMENT COLUMN epoch 'The epoch number from beacon block payload', -COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', -COMMENT COLUMN `index` 'The index of the validator', -COMMENT COLUMN pubkey 'The public key of the validator', -COMMENT COLUMN meta_client_name 'Name of the client that generated the event', -COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', -COMMENT COLUMN meta_client_version 'Version of the client that generated the event', -COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', -COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', -COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', -COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', -COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', -COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', -COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', -COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', -COMMENT COLUMN meta_network_id 'Ethereum network ID', -COMMENT COLUMN meta_network_name 'Ethereum network name', -COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', -COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', -COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', -COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', -COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', -COMMENT COLUMN meta_labels 'Labels associated with the event'; - -CREATE TABLE canonical_beacon_validators_pubkeys on cluster '{cluster}' AS canonical_beacon_validators_pubkeys_local -ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_pubkeys_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); - -CREATE TABLE default.canonical_beacon_validators_withdrawal_credentials_local on cluster '{cluster}' -( - updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - -- ensure the first epoch the withdrawal_credentials was seen is in this table - -- add the updated_date_time to make sure we can always overwrite the data - -- 4294967295 = UInt32 max - `version` UInt32 DEFAULT 4294967295 + toUnixTimestamp(updated_date_time) - toUnixTimestamp(epoch_start_date_time) CODEC(DoubleDelta, ZSTD(1)), - event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)), - epoch UInt32 CODEC(DoubleDelta, ZSTD(1)), - epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)), - `index` UInt32 CODEC(ZSTD(1)), - withdrawal_credentials String CODEC(ZSTD(1)), - meta_client_name LowCardinality(String), - meta_client_id String CODEC(ZSTD(1)), - meta_client_version LowCardinality(String), - meta_client_implementation LowCardinality(String), - meta_client_os LowCardinality(String), - meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)), - meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)), - meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)), - meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)), - meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)), - meta_network_name LowCardinality(String), - meta_consensus_version LowCardinality(String), - meta_consensus_version_major LowCardinality(String), - meta_consensus_version_minor LowCardinality(String), - meta_consensus_version_patch LowCardinality(String), - meta_consensus_implementation LowCardinality(String), - meta_labels Map(String, String) CODEC(ZSTD(1)) -) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', `version`) -PARTITION BY toStartOfMonth(epoch_start_date_time) -ORDER BY (index, withdrawal_credentials, meta_network_name); - -ALTER TABLE default.canonical_beacon_validators_withdrawal_credentials_local ON CLUSTER '{cluster}' -MODIFY COMMENT 'Contains a validator state for an epoch.', -COMMENT COLUMN updated_date_time 'When this row was last updated', -COMMENT COLUMN `version` 'Version of this row, to help with de-duplication we want the latest updated_date_time but earliest epoch_start_date_time the withdrawal_credentials was seen', -COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node', -COMMENT COLUMN epoch 'The epoch number from beacon block payload', -COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started', -COMMENT COLUMN `index` 'The index of the validator', -COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator', -COMMENT COLUMN meta_client_name 'Name of the client that generated the event', -COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.', -COMMENT COLUMN meta_client_version 'Version of the client that generated the event', -COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event', -COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event', -COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event', -COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event', -COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event', -COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event', -COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event', -COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event', -COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event', -COMMENT COLUMN meta_network_id 'Ethereum network ID', -COMMENT COLUMN meta_network_name 'Ethereum network name', -COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event', -COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event', -COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event', -COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event', -COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event', -COMMENT COLUMN meta_labels 'Labels associated with the event'; - -CREATE TABLE canonical_beacon_validators_withdrawal_credentials on cluster '{cluster}' AS canonical_beacon_validators_withdrawal_credentials_local -ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_withdrawal_credentials_local, cityHash64(epoch_start_date_time, `index`, meta_network_name)); diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 76e95aab..b9de255b 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -1,14 +1,9 @@ -/* - * This processor was adapted from the OpenTelemetry Collector's batch processor. - * - * Authors: OpenTelemetry - * URL: https://github.com/open-telemetry/opentelemetry-go/blob/496c086ece129182662c14d6a023a2b2da09fe30/sdk/trace/batch_span_processor.go - */ - package processor import ( "context" + "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -18,6 +13,7 @@ import ( "github.com/sirupsen/logrus" ) +// ItemExporter is an interface for exporting items. type ItemExporter[T any] interface { // ExportItems exports a batch of items. // @@ -39,7 +35,6 @@ type ItemExporter[T any] interface { Shutdown(ctx context.Context) error } -// Defaults for BatchItemProcessorOptions. const ( DefaultMaxQueueSize = 51200 DefaultScheduleDelay = 5000 @@ -49,6 +44,7 @@ const ( DefaultNumWorkers = 1 ) +// ShippingMethod is the method of shipping items for export. type ShippingMethod string const ( @@ -57,17 +53,14 @@ const ( ShippingMethodSync ShippingMethod = "sync" ) -// BatchItemProcessorOption configures a BatchItemProcessor. +// BatchItemProcessorOption is a functional option for the batch item processor. type BatchItemProcessorOption func(o *BatchItemProcessorOptions) -// BatchItemProcessorOptions is configuration settings for a -// BatchItemProcessor. type BatchItemProcessorOptions struct { // MaxQueueSize is the maximum queue size to buffer items for delayed processing. If the // queue gets full it drops the items. // The default value of MaxQueueSize is 51200. MaxQueueSize int - // BatchTimeout is the maximum duration for constructing a batch. Processor // forcefully sends available items when timeout is reached. // The default value of BatchTimeout is 5000 msec. @@ -77,40 +70,47 @@ type BatchItemProcessorOptions struct { // is reached, the export will be cancelled. // The default value of ExportTimeout is 30000 msec. ExportTimeout time.Duration - - // MaxExportBatchSize is the maximum number of items to process in a single batch. - // If there are more than one batch worth of items then it processes multiple batches - // of items one batch after the other without any delay. + // MaxExportBatchSize is the maximum number of items to include in a batch. // The default value of MaxExportBatchSize is 512. MaxExportBatchSize int - - // ShippingMethod is the method used to ship items to the exporter. + // ShippingMethod is the method of shipping items for export. The default value + // of ShippingMethod is "async". ShippingMethod ShippingMethod - - // Number of workers to process items. + // Workers is the number of workers to process batches. + // The default value of Workers is 1. Workers int } -// BatchItemProcessor is a buffer that batches asynchronously-received -// items and sends them to a exporter when complete. +func (o *BatchItemProcessorOptions) Validate() error { + if o.MaxExportBatchSize > o.MaxQueueSize { + return errors.New("max export batch size cannot be greater than max queue size") + } + + if o.Workers == 0 { + return errors.New("workers must be greater than 0") + } + + if o.MaxExportBatchSize < 1 { + return errors.New("max export batch size must be greater than 0") + } + + return nil +} + +// BatchItemProcessor is a processor that batches items for export. type BatchItemProcessor[T any] struct { e ItemExporter[T] o BatchItemProcessorOptions log logrus.FieldLogger - queue chan *T + queue chan traceableItem[T] + batchCh chan []traceableItem[T] dropped uint32 name string metrics *Metrics - batches chan []*T - batchReady chan bool - - batch []*T - batchMutex sync.Mutex - timer *time.Timer stopWait sync.WaitGroup stopOnce sync.Once @@ -118,10 +118,13 @@ type BatchItemProcessor[T any] struct { stopWorkersCh chan struct{} } -// NewBatchItemProcessor creates a new ItemProcessor that will send completed -// item batches to the exporter with the supplied options. -// -// If the exporter is nil, the item processor will preform no action. +type traceableItem[T any] struct { + item *T + errCh chan error + completedCh chan struct{} +} + +// NewBatchItemProcessor creates a new batch item processor. func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log logrus.FieldLogger, options ...BatchItemProcessorOption) (*BatchItemProcessor[T], error) { maxQueueSize := DefaultMaxQueueSize maxExportBatchSize := DefaultMaxExportBatchSize @@ -146,6 +149,10 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log opt(&o) } + if err := o.Validate(); err != nil { + return nil, fmt.Errorf("invalid batch item processor options: %w: %s", err, name) + } + metrics := DefaultMetrics bvp := BatchItemProcessor[T]{ @@ -154,140 +161,103 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log log: log, name: name, metrics: metrics, - batch: make([]*T, 0, o.MaxExportBatchSize), timer: time.NewTimer(o.BatchTimeout), - queue: make(chan *T, o.MaxQueueSize), + queue: make(chan traceableItem[T], o.MaxQueueSize), + batchCh: make(chan []traceableItem[T], o.Workers), stopCh: make(chan struct{}), stopWorkersCh: make(chan struct{}), } - bvp.batches = make(chan []*T, o.Workers) // Buffer the channel to hold batches for each worker - bvp.batchReady = make(chan bool, 1) + bvp.log.WithFields( + logrus.Fields{ + "workers": bvp.o.Workers, + "batch_timeout": bvp.o.BatchTimeout, + "export_timeout": bvp.o.ExportTimeout, + "max_export_batch_size": bvp.o.MaxExportBatchSize, + "max_queue_size": bvp.o.MaxQueueSize, + "shipping_method": bvp.o.ShippingMethod, + }, + ).Info("Batch item processor initialized") bvp.stopWait.Add(o.Workers) for i := 0; i < o.Workers; i++ { - go func() { + go func(num int) { defer bvp.stopWait.Done() - - bvp.worker(context.Background()) - }() + bvp.worker(context.Background(), num) + }(i) } - go bvp.batchBuilder(context.Background()) // Start building batches + go func() { + bvp.batchBuilder(context.Background()) + bvp.log.Info("Batch builder exited") + }() return &bvp, nil } -// OnEnd method enqueues a item for later processing. +// Write writes items to the queue. If the Processor is configured to use +// the sync shipping method, the items will be written to the queue and this +// function will return when all items have been processed. If the Processor is +// configured to use the async shipping method, the items will be written to +// the queue and this function will return immediately. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write") defer span.End() bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - if bvp.o.ShippingMethod == ShippingMethodSync { - return bvp.ImmediatelyExportItems(ctx, s) - } - - bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) - - // Do not enqueue items if we are just going to drop them. if bvp.e == nil { - return nil - } - - for _, i := range s { - bvp.enqueue(i) - } - - return nil -} - -// ImmediatelyExportItems immediately exports the items to the exporter. -// Useful for propagating errors from the exporter. -func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, items []*T) error { - _, span := observability.Tracer().Start(ctx, "BatchItemProcessor.ImmediatelyExportItems") - defer span.End() - - if len(items) == 0 { - return nil + return errors.New("exporter is nil") } - countItemsToExport := len(items) - - batchSize := bvp.o.MaxExportBatchSize - if batchSize == 0 { - batchSize = 1 // Ensure we can't divide by zero - } - - batches := (countItemsToExport + batchSize - 1) / batchSize - - batchCh := make(chan []*T, batches) - errCh := make(chan error, 1) - - defer close(errCh) - - var wg sync.WaitGroup + // Break our items up in to chunks that can be processed at + // one time by our workers. This is to prevent wasting + // resources sending items if we've failed an earlier + // batch. + batchSize := bvp.o.Workers * bvp.o.MaxExportBatchSize + for start := 0; start < len(s); start += batchSize { + end := start + batchSize + if end > len(s) { + end = len(s) + } - cctx, cancel := context.WithCancel(ctx) - defer cancel() + prepared := []traceableItem[T]{} + for _, i := range s[start:end] { + prepared = append(prepared, traceableItem[T]{ + item: i, + errCh: make(chan error, 1), + completedCh: make(chan struct{}, 1), + }) + } - for i := 0; i < countItemsToExport; i += batchSize { - end := i + batchSize - if end > countItemsToExport { - end = countItemsToExport + for _, i := range prepared { + if err := bvp.enqueueOrDrop(ctx, i); err != nil { + return err + } } - itemsBatch := items[i:end] - batchCh <- itemsBatch - } - close(batchCh) // Close the channel after all batches are sent - - bvp.log. - WithField("workers", bvp.o.Workers). - WithField("batches", batches). - Debug("Split items into batches for immediate export") - - for i := 0; i < bvp.o.Workers && i < batches; i++ { - wg.Add(1) - - go func(workerID int) { - defer wg.Done() - - for itemsBatch := range batchCh { - bvp.log.WithFields(logrus.Fields{ - "count": len(itemsBatch), - "worker": workerID, - }).Debug("Immediately exporting items") - - err := bvp.exportWithTimeout(cctx, itemsBatch) - if err != nil { - select { - case errCh <- err: - default: + if bvp.o.ShippingMethod == ShippingMethodSync { + for _, item := range prepared { + select { + case err := <-item.errCh: + if err != nil { + return err } - - cancel() // Cancel the context to stop other workers - - return + case <-item.completedCh: + continue + case <-ctx.Done(): + return ctx.Err() } } - }(i) + } } - wg.Wait() - - select { - case err := <-errCh: - return err - default: - return nil - } + return nil } -// exportWithTimeout exports the items with a timeout. -func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*T) error { +// exportWithTimeout exports items with a timeout. +func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error { if bvp.o.ExportTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bvp.o.ExportTimeout) @@ -295,44 +265,52 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa defer cancel() } - err := bvp.e.ExportItems(ctx, itemsBatch) + items := make([]*T, len(itemsBatch)) + for i, item := range itemsBatch { + items[i] = item.item + } + + err := bvp.e.ExportItems(ctx, items) if err != nil { bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) - - return err + } else { + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) } - bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) + for _, item := range itemsBatch { + if item.errCh != nil { + item.errCh <- err + close(item.errCh) + } + + if item.completedCh != nil { + item.completedCh <- struct{}{} + close(item.completedCh) + } + } return nil } -// Shutdown flushes the queue and waits until all items are processed. -// It only executes once. Subsequent call does nothing. +// Shutdown shuts down the batch item processor. func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { var err error - bvp.log.Debug("Shutting down processor") - bvp.stopOnce.Do(func() { wait := make(chan struct{}) go func() { - // Stop accepting new items - close(bvp.stopCh) + bvp.log.Info("Stopping processor") - // Drain the queue - bvp.drainQueue() + close(bvp.stopCh) - // Stop the timer bvp.timer.Stop() - // Stop the workers + bvp.drainQueue() + close(bvp.stopWorkersCh) - // Wait for the workers to finish bvp.stopWait.Wait() - // Shutdown the exporter if bvp.e != nil { if err = bvp.e.Shutdown(ctx); err != nil { bvp.log.WithError(err).Error("failed to shutdown processor") @@ -341,7 +319,6 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { close(wait) }() - // Wait until the wait group is done or the context is cancelled select { case <-wait: case <-ctx.Done(): @@ -352,51 +329,36 @@ func (bvp *BatchItemProcessor[T]) Shutdown(ctx context.Context) error { return err } -// WithMaxQueueSize returns a BatchItemProcessorOption that configures the -// maximum queue size allowed for a BatchItemProcessor. func WithMaxQueueSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxQueueSize = size } } -// WithMaxExportBatchSize returns a BatchItemProcessorOption that configures -// the maximum export batch size allowed for a BatchItemProcessor. func WithMaxExportBatchSize(size int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.MaxExportBatchSize = size } } -// WithBatchTimeout returns a BatchItemProcessorOption that configures the -// maximum delay allowed for a BatchItemProcessor before it will export any -// held item (whether the queue is full or not). func WithBatchTimeout(delay time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.BatchTimeout = delay } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithExportTimeout(timeout time.Duration) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ExportTimeout = timeout } } -// WithExportTimeout returns a BatchItemProcessorOption that configures the -// amount of time a BatchItemProcessor waits for an exporter to export before -// abandoning the export. func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.ShippingMethod = method } } -// WithWorkers returns a BatchItemProcessorOption that configures the -// number of workers to process items. func WithWorkers(workers int) BatchItemProcessorOption { return func(o *BatchItemProcessorOptions) { o.Workers = workers @@ -404,56 +366,60 @@ func WithWorkers(workers int) BatchItemProcessorOption { } func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) { + log := bvp.log.WithField("module", "batch_builder") + + var batch []traceableItem[T] + for { select { case <-bvp.stopWorkersCh: + log.Info("Stopping batch builder") + return - case sd := <-bvp.queue: - bvp.batchMutex.Lock() + case item := <-bvp.queue: + batch = append(batch, item) - bvp.batch = append(bvp.batch, sd) + if len(batch) >= bvp.o.MaxExportBatchSize { + bvp.sendBatch(batch, "max_export_batch_size") - if len(bvp.batch) >= bvp.o.MaxExportBatchSize { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchCopy - bvp.batch = bvp.batch[:0] - bvp.batchReady <- true + batch = []traceableItem[T]{} } - - bvp.batchMutex.Unlock() case <-bvp.timer.C: - bvp.batchMutex.Lock() - - if len(bvp.batch) > 0 { - batchCopy := make([]*T, len(bvp.batch)) - copy(batchCopy, bvp.batch) - bvp.batches <- batchCopy - bvp.batch = bvp.batch[:0] - bvp.batchReady <- true + if len(batch) > 0 { + bvp.sendBatch(batch, "timer") + batch = []traceableItem[T]{} } else { - // Reset the timer if there are no items in the batch. - // If there are items in the batch, one of the workers will reset the timer. bvp.timer.Reset(bvp.o.BatchTimeout) } - - bvp.batchMutex.Unlock() } } } +func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) { + log := bvp.log.WithField("reason", reason) + log.Tracef("Creating a batch of %d items", len(batch)) + + batchCopy := make([]traceableItem[T], len(batch)) + copy(batchCopy, batch) + + log.Tracef("Batch items copied") + + bvp.batchCh <- batchCopy + + log.Tracef("Batch sent to batch channel") +} + +func (bvp *BatchItemProcessor[T]) worker(ctx context.Context, number int) { + bvp.log.Infof("Starting worker %d", number) -// worker removes items from the `queue` channel until processor -// is shut down. It calls the exporter in batches of up to MaxExportBatchSize -// waiting up to BatchTimeout to form a batch. -func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { for { select { case <-bvp.stopWorkersCh: + bvp.log.Infof("Stopping worker %d", number) + return - case <-bvp.batchReady: + case batch := <-bvp.batchCh: bvp.timer.Reset(bvp.o.BatchTimeout) - batch := <-bvp.batches if err := bvp.exportWithTimeout(ctx, batch); err != nil { bvp.log.WithError(err).Error("failed to export items") } @@ -461,26 +427,22 @@ func (bvp *BatchItemProcessor[T]) worker(ctx context.Context) { } } -// drainQueue awaits the any caller that had added to bvp.stopWait -// to finish the enqueue, then exports the final batch. func (bvp *BatchItemProcessor[T]) drainQueue() { - // Wait for the batch builder to send all remaining items to the workers. + bvp.log.Info("Draining queue: waiting for the batch builder to pull all the items from the queue") + for len(bvp.queue) > 0 { time.Sleep(10 * time.Millisecond) } - // Wait for the workers to finish processing all batches. - for len(bvp.batches) > 0 { - time.Sleep(10 * time.Millisecond) + bvp.log.Info("Draining queue: waiting for workers to finish processing batches") + + for len(bvp.queue) > 0 { + <-bvp.queue } - // Close the batches channel since no more batches will be sent. - close(bvp.batches) -} + bvp.log.Info("Draining queue: all batches finished") -func (bvp *BatchItemProcessor[T]) enqueue(sd *T) { - ctx := context.TODO() - bvp.enqueueOrDrop(ctx, sd) + close(bvp.queue) } func recoverSendOnClosedChan() { @@ -496,24 +458,24 @@ func recoverSendOnClosedChan() { panic(x) } -func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, sd *T) bool { +func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error { // This ensures the bvp.queue<- below does not panic as the // processor shuts down. defer recoverSendOnClosedChan() select { case <-bvp.stopCh: - return false + return errors.New("processor is shutting down") default: } select { - case bvp.queue <- sd: - return true + case bvp.queue <- item: + return nil default: atomic.AddUint32(&bvp.dropped, 1) bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) } - return false + return errors.New("queue is full") } diff --git a/pkg/processor/batch_test.go b/pkg/processor/batch_test.go index c7850f9d..8a41ec73 100644 --- a/pkg/processor/batch_test.go +++ b/pkg/processor/batch_test.go @@ -98,10 +98,11 @@ func TestNewBatchItemProcessorWithNilExporter(t *testing.T) { bsp, err := NewBatchItemProcessor[TestItem](nil, "processor", nullLogger()) require.NoError(t, err) - if err := bsp.Write(context.Background(), []*TestItem{{ + err = bsp.Write(context.Background(), []*TestItem{{ name: "test", - }}); err != nil { - t.Errorf("failed to Write to the BatchItemProcessor: %v", err) + }}) + if err == nil || err.Error() != "exporter is nil" { + t.Errorf("expected error 'exporter is nil', got: %v", err) } if err := bsp.Shutdown(context.Background()); err != nil { @@ -118,20 +119,26 @@ type testOption struct { wantBatchCount int } -func TestNewBatchItemProcessorWithOptions(t *testing.T) { +func TestAsyncNewBatchItemProcessorWithOptions(t *testing.T) { schDelay := 100 * time.Millisecond options := []testOption{ { - name: "default", - o: []BatchItemProcessorOption{}, + name: "default", + o: []BatchItemProcessorOption{ + WithShippingMethod(ShippingMethodAsync), + WithBatchTimeout(10 * time.Millisecond), + }, + writeNumItems: 2048, genNumItems: 2053, - wantNumItems: 2048, - wantBatchCount: 4, + wantNumItems: 2053, + wantBatchCount: 5, }, { name: "non-default BatchTimeout", o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), + WithShippingMethod(ShippingMethodAsync), + WithMaxExportBatchSize(512), }, writeNumItems: 2048, genNumItems: 2053, @@ -143,23 +150,13 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { o: []BatchItemProcessorOption{ WithBatchTimeout(schDelay), WithMaxQueueSize(200), + WithMaxExportBatchSize(200), + WithShippingMethod(ShippingMethodAsync), }, writeNumItems: 200, genNumItems: 205, wantNumItems: 205, - wantBatchCount: 1, - }, - { - name: "blocking option", - o: []BatchItemProcessorOption{ - WithBatchTimeout(schDelay), - WithMaxQueueSize(200), - WithMaxExportBatchSize(20), - }, - writeNumItems: 200, - genNumItems: 205, - wantNumItems: 205, - wantBatchCount: 11, + wantBatchCount: 2, }, } @@ -169,11 +166,11 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { ssp, err := createAndRegisterBatchSP(option.o, &te) if err != nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } if ssp == nil { - t.Fatalf("%s: Error creating new instance of BatchItemProcessor\n", option.name) + require.NoError(t, err) } for i := 0; i < option.genNumItems; i++ { @@ -184,7 +181,7 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { if err := ssp.Write(context.Background(), []*TestItem{{ name: "test", }}); err != nil { - t.Errorf("%s: Error writing to BatchItemProcessor\n", option.name) + t.Errorf("%s: Error writing to BatchItemProcessor", option.name) } } @@ -192,15 +189,15 @@ func TestNewBatchItemProcessorWithOptions(t *testing.T) { gotNumOfItems := te.len() if option.wantNumItems > 0 && option.wantNumItems != gotNumOfItems { - t.Errorf("number of exported items: got %+v, want %+v\n", + t.Errorf("number of exported items: got %v, want %v", gotNumOfItems, option.wantNumItems) } gotBatchCount := te.getBatchCount() if option.wantBatchCount > 0 && gotBatchCount != option.wantBatchCount { - t.Errorf("number batches: got %+v, want >= %+v\n", + t.Errorf("number batches: got %v, want %v", gotBatchCount, option.wantBatchCount) - t.Errorf("Batches %v\n", te.sizes) + t.Errorf("Batches %v", te.sizes) } }) } @@ -270,10 +267,11 @@ func TestBatchItemProcessorShutdown(t *testing.T) { func TestBatchItemProcessorDrainQueue(t *testing.T) { be := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(5), WithBatchTimeout(5*time.Minute)) + log := logrus.New() + bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync)) require.NoError(t, err) - itemsToExport := 500 + itemsToExport := 5000 for i := 0; i < itemsToExport; i++ { if err := bsp.Write(context.Background(), []*TestItem{{ @@ -306,11 +304,10 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) { lenJustAfterShutdown := be.len() for i := 0; i < 60; i++ { - if err := bsp.Write(context.Background(), []*TestItem{{ + err := bsp.Write(context.Background(), []*TestItem{{ name: strconv.Itoa(i), - }}); err != nil { - t.Errorf("Error writing to BatchItemProcessor\n") - } + }}) + require.Error(t, err) } assert.Equal(t, lenJustAfterShutdown, be.len(), "Write should have no effect after Shutdown") @@ -487,8 +484,8 @@ func (ErrorItemExporter[T]) ExportItems(ctx context.Context, _ []*T) error { } // TestBatchItemProcessorWithSyncErrorExporter tests a processor with ShippingMethod = sync and an exporter that only returns errors. -func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { - bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync)) +func TestBatchItemProcessorWithSyncErrorExporter(t *testing.T) { + bsp, err := NewBatchItemProcessor[TestItem](ErrorItemExporter[TestItem]{}, "processor", nullLogger(), WithShippingMethod(ShippingMethodSync), WithBatchTimeout(100*time.Millisecond)) if err != nil { t.Fatalf("failed to create batch processor: %v", err) } @@ -499,18 +496,26 @@ func TestBatchItemProcessorWithAsyncErrorExporter(t *testing.T) { } } -func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { +func TestBatchItemProcessorSyncShipping(t *testing.T) { // Define a range of values for workers, maxExportBatchSize, and itemsToExport workerCounts := []int{1, 5, 10} maxBatchSizes := []int{1, 5, 10, 20} - itemCounts := []int{0, 1, 25, 50} + itemCounts := []int{0, 1, 10, 25, 50} for _, workers := range workerCounts { for _, maxBatchSize := range maxBatchSizes { for _, itemsToExport := range itemCounts { t.Run(fmt.Sprintf("%d workers, batch size %d, %d items", workers, maxBatchSize, itemsToExport), func(t *testing.T) { te := testBatchExporter[TestItem]{} - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + bsp, err := NewBatchItemProcessor[TestItem]( + &te, + "processor", + logrus.New(), + WithMaxExportBatchSize(maxBatchSize), + WithWorkers(workers), + WithShippingMethod(ShippingMethodSync), + WithBatchTimeout(100*time.Millisecond), + ) require.NoError(t, err) items := make([]*TestItem, itemsToExport) @@ -518,7 +523,7 @@ func TestBatchItemProcessorImmediatelyExportItems(t *testing.T) { items[i] = &TestItem{name: strconv.Itoa(i)} } - err = bsp.ImmediatelyExportItems(context.Background(), items) + err = bsp.Write(context.Background(), items) require.NoError(t, err) expectedBatches := (itemsToExport + maxBatchSize - 1) / maxBatchSize @@ -548,7 +553,7 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { te := testBatchExporter[TestItem]{ delay: 100 * time.Millisecond, // Introduce a delay to simulate processing time } - bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers)) + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync)) require.NoError(t, err) items := make([]*TestItem, itemsToExport) @@ -559,7 +564,7 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { // Simulate export failure and expect cancellation te.failNextExport = true - err = bsp.ImmediatelyExportItems(context.Background(), items) + err = bsp.Write(context.Background(), items) require.Error(t, err, "Expected an error due to simulated export failure") // Ensure we exported less than the number of batches since the export should've @@ -567,3 +572,109 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) { require.Less(t, te.batchCount, itemsToExport/maxBatchSize) }) } + +func TestBatchItemProcessorWithZeroWorkers(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(0)) + require.Error(t, err, "Expected an error when initializing with zero workers") +} + +func TestBatchItemProcessorWithNegativeBatchSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(-1), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with negative batch size") +} + +func TestBatchItemProcessorWithNegativeQueueSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(-1), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with negative queue size") +} + +func TestBatchItemProcessorWithZeroBatchSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(0), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with zero batch size") +} + +func TestBatchItemProcessorWithZeroQueueSize(t *testing.T) { + te := testBatchExporter[TestItem]{} + _, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxQueueSize(0), WithWorkers(5)) + require.Error(t, err, "Expected an error when initializing with zero queue size") +} + +func TestBatchItemProcessorShutdownWithoutExport(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5)) + require.NoError(t, err) + + require.NoError(t, bsp.Shutdown(context.Background()), "shutting down BatchItemProcessor") + require.Equal(t, 0, te.len(), "No items should have been exported") +} + +func TestBatchItemProcessorExportWithTimeout(t *testing.T) { + te := testBatchExporter[TestItem]{delay: 2 * time.Second} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithExportTimeout(1*time.Second), WithShippingMethod(ShippingMethodSync)) + require.NoError(t, err) + + itemsToExport := 10 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + err = bsp.Write(context.Background(), items) + require.Error(t, err, "Expected an error due to export timeout") + require.Less(t, te.len(), itemsToExport, "Expected some items to be exported before timeout") +} + +func TestBatchItemProcessorWithBatchTimeout(t *testing.T) { + te := testBatchExporter[TestItem]{} + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithBatchTimeout(1*time.Second)) + require.NoError(t, err) + + itemsToExport := 5 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + for _, item := range items { + err := bsp.Write(context.Background(), []*TestItem{item}) + require.NoError(t, err) + } + + time.Sleep(2 * time.Second) + require.Equal(t, itemsToExport, te.len(), "Expected all items to be exported after batch timeout") +} + +func TestBatchItemProcessorQueueSize(t *testing.T) { + te := indefiniteExporter[TestItem]{} + maxQueueSize := 5 + bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithBatchTimeout(10*time.Minute), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), WithShippingMethod(ShippingMethodAsync)) + require.NoError(t, err) + + itemsToExport := 10 + items := make([]*TestItem, itemsToExport) + + for i := 0; i < itemsToExport; i++ { + items[i] = &TestItem{name: strconv.Itoa(i)} + } + + // Write items to the processor + for i := 0; i < itemsToExport; i++ { + err := bsp.Write(context.Background(), []*TestItem{items[i]}) + if i < maxQueueSize { + require.NoError(t, err, "Expected no error for item %d", i) + } else { + require.Error(t, err, "Expected an error for item %d due to queue size limit", i) + } + } + + // Ensure that the queue size is respected + require.Equal(t, maxQueueSize, len(bsp.queue), "Queue size should be equal to maxQueueSize") + // Ensure that the dropped count is correct + require.Equal(t, uint32(itemsToExport-maxQueueSize), bsp.dropped, "Dropped count should be equal to the number of items that exceeded the queue size") +}