Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Limit goroutines #332

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 4 additions & 149 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ transforms:

for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"key": .event.id,
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
Expand Down Expand Up @@ -1526,115 +1527,9 @@ transforms:
"activation_epoch": validator.data.activation_epoch,
"effective_balance": validator.data.effective_balance,
"exit_epoch": validator.data.exit_epoch,
"pubkey": validator.data.pubkey,
"slashed": validator.data.slashed,
"withdrawable_epoch": validator.data.withdrawable_epoch
})
}
. = events
canonical_beacon_validators_pubkeys_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

events = []

.updated_date_time = to_unix_timestamp(now())

for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"pubkey": validator.data.pubkey
})
}
. = events
canonical_beacon_validators_withdrawal_credentials_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}

epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}

events = []

.updated_date_time = to_unix_timestamp(now())

for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"withdrawable_epoch": validator.data.withdrawable_epoch,
"withdrawal_credentials": validator.data.withdrawal_credentials
})
}
Expand Down Expand Up @@ -2491,44 +2386,4 @@ sinks:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_validators_pubkeys_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_pubkeys_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_pubkeys
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_validators_withdrawal_credentials_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_withdrawal_credentials_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators_withdrawal_credentials
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
skip_unknown_fields: false

This file was deleted.

Loading
Loading