Skip to content

Commit

Permalink
Fix clickhouse migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 15, 2024
1 parent 1ac1563 commit befd239
Show file tree
Hide file tree
Showing 12 changed files with 1,820 additions and 636 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /src
COPY go.sum go.mod ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /bin/app .
RUN go build -o /bin/app .

FROM ubuntu:latest
RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-recommends \
Expand Down
169 changes: 167 additions & 2 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
api:
enabled: true
address: 0.0.0.0:8686
playground: false
playground: true
acknowledgements:
enabled: true
sources:
Expand Down Expand Up @@ -109,6 +109,18 @@ sources:
- "^beacon-api-eth-v1-proposer-.+"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
libp2p_trace_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
group_id: xatu-vector-kafka-clickhouse-libp2p-trace
key_field: "event.id"
decoding:
codec: json
topics:
- "libp2p-trace-connected"
auto_offset_reset: earliest
librdkafka_options:
message.max.bytes: "10485760" # 10MB
transforms:
xatu_server_events_meta:
type: remap
Expand All @@ -122,6 +134,7 @@ transforms:
- beacon_api_eth_v1_beacon_blob_sidecar_kafka
- beacon_p2p_events_kafka
- beacon_api_eth_v1_proposer_kafka
- libp2p_trace_kafka
source: |-
.meta_client_name = .meta.client.name
.meta_client_id = .meta.client.id
Expand Down Expand Up @@ -337,6 +350,24 @@ transforms:
eth_v2_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK"
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
libp2p_trace_connected: .event.name == "LIBP2P_TRACE_CONNECTED"
# libp2p_trace_disconnected: .event.name == "LIBP2P_TRACE_DISCONNECTED"
# libp2p_trace_add_peer: .event.name == "LIBP2P_TRACE_ADD_PEER"
# libp2p_trace_remove_peer: .event.name == "LIBP2P_TRACE_REMOVE_PEER"
# libp2p_trace_publish_message: .event.name == "LIBP2P_TRACE_PUBLISH_MESSAGE"
# libp2p_trace_reject_message: .event.name == "LIBP2P_TRACE_REJECT_MESSAGE"
# libp2p_trace_duplicate_message: .event.name == "LIBP2P_TRACE_DUPLICATE_MESSAGE"
# libp2p_trace_deliver_message: .event.name == "LIBP2P_TRACE_DELIVER_MESSAGE"
# libp2p_trace_recv_rpc: .event.name == "LIBP2P_TRACE_RECV_RPC"
# libp2p_trace_send_rpc: .event.name == "LIBP2P_TRACE_SEND_RPC"
# libp2p_trace_drop_rpc: .event.name == "LIBP2P_TRACE_DROP_RPC"
# libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
# libp2p_trace_leave: .event.name == "LIBP2P_TRACE_LEAVE"
# libp2p_trace_graft: .event.name == "LIBP2P_TRACE_GRAFT"
# libp2p_trace_prune: .event.name == "LIBP2P_TRACE_PRUNE"
# libp2p_trace_validate_message: .event.name == "LIBP2P_TRACE_VALIDATE_MESSAGE"
# libp2p_trace_throttle_peer: .event.name == "LIBP2P_TRACE_THROTTLE_PEER"
# libp2p_trace_undeliverable_message: .event.name == "LIBP2P_TRACE_UNDELIVERABLE_MESSAGE"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand Down Expand Up @@ -375,6 +406,24 @@ transforms:
- xatu_server_events_router.eth_v2_beacon_block_v2
- xatu_server_events_router.mempool_transaction
- xatu_server_events_router.mempool_transaction_v2
- xatu_server_events_router.libp2p_trace_connected
# - xatu_server_events_router.libp2p_trace_disconnected
# - xatu_server_events_router.libp2p_trace_add_peer
# - xatu_server_events_router.libp2p_trace_remove_peer
# - xatu_server_events_router.libp2p_trace_publish_message
# - xatu_server_events_router.libp2p_trace_reject_message
# - xatu_server_events_router.libp2p_trace_duplicate_message
# - xatu_server_events_router.libp2p_trace_deliver_message
# - xatu_server_events_router.libp2p_trace_recv_rpc
# - xatu_server_events_router.libp2p_trace_send_rpc
# - xatu_server_events_router.libp2p_trace_drop_rpc
# - xatu_server_events_router.libp2p_trace_join
# - xatu_server_events_router.libp2p_trace_leave
# - xatu_server_events_router.libp2p_trace_graft
# - xatu_server_events_router.libp2p_trace_prune
# - xatu_server_events_router.libp2p_trace_validate_message
# - xatu_server_events_router.libp2p_trace_throttle_peer
# - xatu_server_events_router.libp2p_trace_undeliverable_message
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -1732,7 +1781,77 @@ transforms:
del(.event)
del(.meta)
del(.data)
libp2p_trace_connected_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_connected
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
}
session_start_date_time, err = parse_timestamp(.meta.client.additional_data.metadata.session_start_date_time, format: "%+");
if err == null {
.session_start_date_time = to_unix_timestamp(session_start_date_time, unit: "seconds")
} else {
.error = err
.error_description = "failed to parse session start date time"
}
.remote_peer = .data.remote_peer
.remote_maddrs = .data.remote_maddrs
.agent_version = .data.agent_version
.direction = .data.direction
opened, err = parse_timestamp(.data.opened, format: "%+")
if err == null {
.opened = to_unix_timestamp(opened, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse opened"
}
.transient = .data.transient
key, err = .event.date_time + .remote_peer + .meta.client.name
if err != null {
.error = err
.error_description = "failed to generate unique key"
}
.unique_key = seahash(key)
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
# libp2p_trace_disconnected_formatted:
# type: remap
# inputs:
# - xatu_server_events_router.libp2p_trace_disconnected
# source: |-
# event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
# if err == null {
# .event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
# } else {
# .error = err
# .error_description = "failed to parse event date time"
# log(., level: "error", rate_limit_secs: 60)
# }
# .session_start_date_time = to_unix_timestamp(.meta.trace.session_start_date_time)
# .remote_peer = .data.remote_peer
# .remote_maddrs = .data.remote_maddrs
# .agent_version = .data.agent_version
# .direction = .data.direction
# .opened = to_unix_timestamp(.data.opened)
# .transient = .data.transient
# key, err = .event_date_time + .remote_peer + .meta.client.name
# if err != null {
# .error = err
# .error_description = "failed to generate unique key"
# }
# .unique_key = seahash(key)
# .updated_date_time = to_unix_timestamp(now())
# del(.event)
# del(.meta)
# del(.data)
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -2261,3 +2380,49 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_connected_console:
type: console
inputs:
- libp2p_trace_connected_formatted
encoding:
codec: json
libp2p_trace_connected_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_connected_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_connected
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
# libp2p_trace_disconnected_clickhouse:
# type: clickhouse
# inputs:
# - libp2p_trace_disconnected
# database: default
# endpoint: "${CLICKHOUSE_ENDPOINT}"
# table: libp2p_disconnected
# auth:
# strategy: basic
# user: "${CLICKHOUSE_USER}"
# password: "${CLICKHOUSE_PASSWORD}"
# batch:
# max_bytes: 52428800
# max_events: 200000
# timeout_secs: 1
# buffer:
# max_events: 200000
# healthcheck:
# enabled: true
# skip_unknown_fields: false
22 changes: 22 additions & 0 deletions deploy/migrations/clickhouse/031_libp2p_trace.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,27 @@ DROP TABLE IF EXISTS default.libp2p_send_rpc_local;
DROP TABLE IF EXISTS default.libp2p_drop_rpc;
DROP TABLE IF EXISTS default.libp2p_drop_rpc_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_message;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_message_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_subscription;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_subscription_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_ihave;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_ihave_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_iwant;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_iwant_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_graft;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_graft_local;

DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_prune;
DROP TABLE IF EXISTS default.libp2p_rpc_meta_control_prune_local;

DROP TABLE IF EXISTS default.libp2p_recv_rpc;
DROP TABLE IF EXISTS default.libp2p_recv_rpc_local;

DROP TABLE IF EXISTS default.libp2p_join;
DROP TABLE IF EXISTS default.libp2p_join_local;

Expand Down Expand Up @@ -51,3 +72,4 @@ DROP TABLE IF EXISTS default.libp2p_connected_local;

DROP TABLE IF EXISTS default.libp2p_disconnected;
DROP TABLE IF EXISTS default.libp2p_disconnected_local;

Loading

0 comments on commit befd239

Please sign in to comment.