From ba89df6c7ff635c71addbc650796ad3af6b3d17c Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Tue, 23 Apr 2024 21:19:52 +1000 Subject: [PATCH] fix(deploy): missing kafka clickhouse transforms --- .../vector-kafka-clickhouse.yaml | 315 +++++++++++------- 1 file changed, 201 insertions(+), 114 deletions(-) diff --git a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml index 55ac024c..fa114985 100644 --- a/deploy/local/docker-compose/vector-kafka-clickhouse.yaml +++ b/deploy/local/docker-compose/vector-kafka-clickhouse.yaml @@ -872,10 +872,17 @@ transforms: .nonce = .meta.client.additional_data.nonce .gas_price = .meta.client.additional_data.gas_price .gas = .meta.client.additional_data.gas + .gas_tip_cap = .meta.client.additional_data.gas_tip_cap + .gas_fee_cap = .meta.client.additional_data.gas_fee_cap .value = .meta.client.additional_data.value .type = .meta.client.additional_data.type .size = .meta.client.additional_data.size .call_data_size = .meta.client.additional_data.call_data_size + .blob_gas = .meta.client.additional_data.blob_gas + .blob_gas_fee_cap = .meta.client.additional_data.blob_gas_fee_cap + .blob_hashes = .meta.client.additional_data.blob_hashes + .blob_sidecars_size = .meta.client.additional_data.blob_sidecars_size + .blob_sidecars_empty_size = .meta.client.additional_data.blob_sidecars_empty_size del(.event) del(.meta) del(.data) @@ -1239,11 +1246,19 @@ transforms: .to = .data.to .nonce = .data.nonce .gas_price = .data.gas_price + .gas_tip_cap = .data.gas_tip_cap + .gas_fee_cap = .data.gas_fee_cap .gas = .data.gas .value = .data.value .type = .data.type + .blob_gas = .data.blob_gas + .blob_gas_fee_cap = .data.blob_gas_fee_cap + .blob_hashes = .data.blob_hashes .size = .meta.client.additional_data.size .call_data_size = .meta.client.additional_data.call_data_size + .blob_sidecars_size = .meta.client.additional_data.blob_sidecars_size + .blob_sidecars_empty_size = .meta.client.additional_data.blob_sidecars_empty_size + key, err = .block_root + .position + .hash + .nonce if err != null { .error = err @@ -1440,6 +1455,7 @@ transforms: .proposer_index = .data.proposer_index .blob_index = .data.index .blob_size = .meta.client.additional_data.data_size + .blob_empty_size = .meta.client.additional_data.data_empty_size key, err = .block_root + .versioned_hash + .blob_index if err != null { .error = err @@ -1639,7 +1655,6 @@ transforms: .error_description = "failed to parse event date time" log(., level: "error", rate_limit_secs: 60) } - .block_slot = .meta.client.additional_data.block.slot.number block_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.block.slot.start_date_time, format: "%+"); if err == null { @@ -1649,7 +1664,6 @@ transforms: .error_description = "failed to parse block slot start date time" log(., level: "error", rate_limit_secs: 60) } - .block_epoch = .meta.client.additional_data.block.epoch.number block_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.block.epoch.start_date_time, format: "%+"); if err == null { @@ -1662,7 +1676,6 @@ transforms: .position_in_block = .meta.client.additional_data.position_in_block .block_root = .meta.client.additional_data.block.root - .validators = .data.validator_indexes .committee_index = .data.data.index .beacon_block_root = .data.data.beacon_block_root @@ -1676,7 +1689,6 @@ transforms: .error_description = "failed to parse slot start date time" log(., level: "error", rate_limit_secs: 60) } - .epoch = .meta.client.additional_data.epoch.number epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+"); if err == null { @@ -1686,7 +1698,6 @@ transforms: .error_description = "failed to parse epoch start date time" log(., level: "error", rate_limit_secs: 60) } - .source_epoch = .data.data.source.epoch source_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.source.epoch.start_date_time, format: "%+"); if err == null { @@ -1697,7 +1708,6 @@ transforms: log(., level: "error", rate_limit_secs: 60) } .source_root = .data.data.source.root - .target_epoch = .data.data.target.epoch target_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.target.epoch.start_date_time, format: "%+"); if err == null { @@ -1713,10 +1723,8 @@ transforms: .error = err .error_description = "failed to generate unique key" } - .unique_key = seahash(key) .updated_date_time = to_unix_timestamp(now()) - del(.event) del(.meta) del(.data) @@ -1827,7 +1835,46 @@ transforms: .remote_geo_autonomous_system_number = .meta.server.additional_data.peer.geo.autonomous_system_number .remote_geo_autonomous_system_organization = .meta.server.additional_data.peer.geo.autonomous_system_organization } - .remote_agent_version = .data.agent_version + if is_string(.data.agent_version) { + agent_version_cleaned = replace(to_string!(.data.agent_version), "teku/teku", "teku", count: 1) + agent_version = split(agent_version_cleaned, "/", limit: 4) + if length(agent_version) > 0 { + if is_string(agent_version[0]) { + implementation, err = downcase(agent_version[0]) + if err == null { + .remote_agent_implementation = implementation + } + } + } + if length(agent_version) > 1 { + if is_string(agent_version[1]) { + .remote_agent_version = agent_version[1] + if is_string(.remote_agent_version) { + sematic_version, err = split(.remote_agent_version, ".", limit: 3) + if err == null { + if sematic_version[0] != null { + version_major, err = replace(sematic_version[0], "v", "", count: 1) + if err == null { + .remote_agent_version_major = version_major + .remote_agent_version_minor = sematic_version[1] + if sematic_version[2] != null { + version_patch, err = replace(sematic_version[2], r'[-+ ](.*)', "") + if err == null { + .remote_agent_version_patch = version_patch + } + } + } + } + } + } + } + } + if length(agent_version) > 2 { + if is_string(agent_version[2]) && .remote_agent_implementation != "prysm" { + .remote_agent_platform = agent_version[2] + } + } + } .direction = .data.direction opened, err = parse_timestamp(.data.opened, format: "%+") if err == null { @@ -1889,7 +1936,46 @@ transforms: .remote_geo_autonomous_system_number = .meta.server.additional_data.peer.geo.autonomous_system_number .remote_geo_autonomous_system_organization = .meta.server.additional_data.peer.geo.autonomous_system_organization } - .remote_agent_version = .data.agent_version + if is_string(.data.agent_version) { + agent_version_cleaned = replace(to_string!(.data.agent_version), "teku/teku", "teku", count: 1) + agent_version = split(agent_version_cleaned, "/", limit: 4) + if length(agent_version) > 0 { + if is_string(agent_version[0]) { + implementation, err = downcase(agent_version[0]) + if err == null { + .remote_agent_implementation = implementation + } + } + } + if length(agent_version) > 1 { + if is_string(agent_version[1]) { + .remote_agent_version = agent_version[1] + if is_string(.remote_agent_version) { + sematic_version, err = split(.remote_agent_version, ".", limit: 3) + if err == null { + if sematic_version[0] != null { + version_major, err = replace(sematic_version[0], "v", "", count: 1) + if err == null { + .remote_agent_version_major = version_major + .remote_agent_version_minor = sematic_version[1] + if sematic_version[2] != null { + version_patch, err = replace(sematic_version[2], r'[-+ ](.*)', "") + if err == null { + .remote_agent_version_patch = version_patch + } + } + } + } + } + } + } + } + if length(agent_version) > 2 { + if is_string(agent_version[2]) && .remote_agent_implementation != "prysm" { + .remote_agent_platform = agent_version[2] + } + } + } .direction = .data.direction opened, err = parse_timestamp(.data.opened, format: "%+") if err == null { @@ -1996,109 +2082,6 @@ transforms: .peer_id_unique_key = seahash(peer_id_key) .unique_key = seahash(.event.id) .updated_date_time = to_unix_timestamp(now()) - del(.event) - del(.meta) - del(.data) - del(.path) - libp2p_trace_handle_status_formatted: - type: remap - inputs: - - xatu_server_events_router.libp2p_trace_handle_status - source: |- - event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); - if err == null { - .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") - } else { - .error = err - .error_description = "failed to parse event date time" - log(., level: "error", rate_limit_secs: 60) - } - - peer_id_key, err = .data.peer_id + .meta.ethereum.network.name - if err != null { - .error = err - .error_description = "failed to generate peer id unique key" - log(., level: "error", rate_limit_secs: 60) - } - .peer_id_unique_key = seahash(peer_id_key) - .unique_key = seahash(.event.id) - - if .data.error != null { - .error = .data.error - } - - if .data.request != null { - .request_finalized_epoch = .data.request.finalized_epoch - .request_finalized_root = .data.request.finalized_root - .request_fork_digest = .data.request.fork_digest - .request_head_root = .data.request.head_root - .request_head_slot = .data.request.head_slot - } - if .data.response != null { - .response_finalized_epoch = .data.response.finalized_epoch - .response_finalized_root = .data.response.finalized_root - .response_fork_digest = .data.response.fork_digest - .response_head_root = .data.response.head_root - .response_head_slot = .data.response.head_slot - } - - .latency_milliseconds, err = .data.latency * 1000 - if err != null { - .error = err - .error_description = "failed to convert latency to millseconds" - log(., level: "error", rate_limit_secs: 60) - } - - .protocol = .data.protocol_id - - .updated_date_time = to_unix_timestamp(now()) - - del(.event) - del(.meta) - del(.data) - del(.path) - libp2p_trace_handle_metadata_formatted: - type: remap - inputs: - - xatu_server_events_router.libp2p_trace_handle_metadata - source: |- - event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); - if err == null { - .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") - } else { - .error = err - .error_description = "failed to parse event date time" - log(., level: "error", rate_limit_secs: 60) - } - - peer_id_key, err = .data.peer_id + .meta.ethereum.network.name - if err != null { - .error = err - .error_description = "failed to generate peer id unique key" - log(., level: "error", rate_limit_secs: 60) - } - .peer_id_unique_key = seahash(peer_id_key) - .unique_key = seahash(.event.id) - - if .data.error != null { - .error = .data.error - } - - .attnets = .data.metadata.attnets - .seq_number = .data.metadata.seq_number - .syncnets = .data.metadata.syncnets - - .latency_milliseconds, err = .data.latency * 1000 - if err != null { - .error = err - .error_description = "failed to convert latency to millseconds" - log(., level: "error", rate_limit_secs: 60) - } - - .protocol = .data.protocol_id - - .updated_date_time = to_unix_timestamp(now()) - del(.event) del(.meta) del(.data) @@ -2563,6 +2546,7 @@ transforms: events = push(events, rootEvent) . = events + libp2p_trace_rpc_events_formatted: type: route inputs: @@ -2576,6 +2560,109 @@ transforms: meta_subscription: .event_name == "LIBP2P_TRACE_RPC_META_SUBSCRIPTION" recv_rpc: .event_name == "LIBP2P_TRACE_RECV_RPC" send_rpc: .event_name == "LIBP2P_TRACE_SEND_RPC" + libp2p_trace_handle_status_formatted: + type: remap + inputs: + - xatu_server_events_router.libp2p_trace_handle_status + source: |- + event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); + if err == null { + .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") + } else { + .error = err + .error_description = "failed to parse event date time" + log(., level: "error", rate_limit_secs: 60) + } + + peer_id_key, err = .data.peer_id + .meta.ethereum.network.name + if err != null { + .error = err + .error_description = "failed to generate peer id unique key" + log(., level: "error", rate_limit_secs: 60) + } + .peer_id_unique_key = seahash(peer_id_key) + .unique_key = seahash(.event.id) + + if .data.error != null { + .error = .data.error + } + + if .data.request != null { + .request_finalized_epoch = .data.request.finalized_epoch + .request_finalized_root = .data.request.finalized_root + .request_fork_digest = .data.request.fork_digest + .request_head_root = .data.request.head_root + .request_head_slot = .data.request.head_slot + } + if .data.response != null { + .response_finalized_epoch = .data.response.finalized_epoch + .response_finalized_root = .data.response.finalized_root + .response_fork_digest = .data.response.fork_digest + .response_head_root = .data.response.head_root + .response_head_slot = .data.response.head_slot + } + + .latency_milliseconds, err = .data.latency * 1000 + if err != null { + .error = err + .error_description = "failed to convert latency to millseconds" + log(., level: "error", rate_limit_secs: 60) + } + + .protocol = .data.protocol_id + + .updated_date_time = to_unix_timestamp(now()) + + del(.event) + del(.meta) + del(.data) + del(.path) + libp2p_trace_handle_metadata_formatted: + type: remap + inputs: + - xatu_server_events_router.libp2p_trace_handle_metadata + source: |- + event_date_time, err = parse_timestamp(.event.date_time, format: "%+"); + if err == null { + .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds") + } else { + .error = err + .error_description = "failed to parse event date time" + log(., level: "error", rate_limit_secs: 60) + } + + peer_id_key, err = .data.peer_id + .meta.ethereum.network.name + if err != null { + .error = err + .error_description = "failed to generate peer id unique key" + log(., level: "error", rate_limit_secs: 60) + } + .peer_id_unique_key = seahash(peer_id_key) + .unique_key = seahash(.event.id) + + if .data.error != null { + .error = .data.error + } + + .attnets = .data.metadata.attnets + .seq_number = .data.metadata.seq_number + .syncnets = .data.metadata.syncnets + + .latency_milliseconds, err = .data.latency * 1000 + if err != null { + .error = err + .error_description = "failed to convert latency to millseconds" + log(., level: "error", rate_limit_secs: 60) + } + + .protocol = .data.protocol_id + + .updated_date_time = to_unix_timestamp(now()) + + del(.event) + del(.meta) + del(.data) + del(.path) sinks: metrics: type: prometheus_exporter @@ -3423,4 +3510,4 @@ sinks: max_events: 200000 healthcheck: enabled: true - skip_unknown_fields: false \ No newline at end of file + skip_unknown_fields: false