Skip to content

Commit

Permalink
feat(cl-mimicry): Add libp2p events
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 11, 2024
1 parent cb9fe0e commit 7beb07b
Show file tree
Hide file tree
Showing 6 changed files with 4,369 additions and 1,626 deletions.
309 changes: 309 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ transforms:
eth_v2_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK"
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
libp2p_trace_connected: .event.name == "LIBP2P_TRACE_CONNECTED"
libp2p_trace_disconnected: .event.name == "LIBP2P_TRACE_DISCONNECTED"
libp2p_trace_add_peer: .event.name == "LIBP2P_TRACE_ADD_PEER"
libp2p_trace_remove_peer: .event.name == "LIBP2P_TRACE_REMOVE_PEER"
libp2p_trace_publish_message: .event.name == "LIBP2P_TRACE_PUBLISH_MESSAGE"
libp2p_trace_reject_message: .event.name == "LIBP2P_TRACE_REJECT_MESSAGE"
libp2p_trace_duplicate_message: .event.name == "LIBP2P_TRACE_DUPLICATE_MESSAGE"
libp2p_trace_deliver_message: .event.name == "LIBP2P_TRACE_DELIVER_MESSAGE"
libp2p_trace_recv_rpc: .event.name == "LIBP2P_TRACE_RECV_RPC"
libp2p_trace_send_rpc: .event.name == "LIBP2P_TRACE_SEND_RPC"
libp2p_trace_drop_rpc: .event.name == "LIBP2P_TRACE_DROP_RPC"
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_leave: .event.name == "LIBP2P_TRACE_LEAVE"
libp2p_trace_graft: .event.name == "LIBP2P_TRACE_GRAFT"
libp2p_trace_prune: .event.name == "LIBP2P_TRACE_PRUNE"
libp2p_trace_validate_message: .event.name == "LIBP2P_TRACE_VALIDATE_MESSAGE"
libp2p_trace_throttle_peer: .event.name == "LIBP2P_TRACE_THROTTLE_PEER"
libp2p_trace_undeliverable_message: .event.name == "LIBP2P_TRACE_UNDELIVERABLE_MESSAGE"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -669,3 +687,294 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_connected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_connected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-connected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_disconnected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_disconnected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-disconnected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_add_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_add_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-add-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_remove_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_remove_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-remove-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_publish_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_publish_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-publish-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_reject_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_reject_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-reject-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_duplicate_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_duplicate_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-duplicate-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_deliver_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_deliver_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-deliver-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_recv_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_recv_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-recv-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_send_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_send_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-send-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_drop_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_drop_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-drop-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_join_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_join
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-join
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_leave_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_leave
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-leave
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_graft_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_graft
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-graft
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_prune_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_prune
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-prune
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_validate_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_validate_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-validate-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_throttle_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_throttle_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-throttle-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_undeliverable_message_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_undeliverable_message
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-undeliverable-message
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json



18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@ services:
"blockprint-block-classification"
"mempool-transaction"
"mempool-transaction-v2"
"libp2p-trace-connected"
"libp2p-trace-disconnected"
"libp2p-trace-add-peer"
"libp2p-trace-remove-peer"
"libp2p-trace-publish-message"
"libp2p-trace-reject-message"
"libp2p-trace-duplicate-message"
"libp2p-trace-deliver-message"
"libp2p-trace-recv-rpc"
"libp2p-trace-send-rpc"
"libp2p-trace-drop-rpc"
"libp2p-trace-join"
"libp2p-trace-leave"
"libp2p-trace-graft"
"libp2p-trace-prune"
"libp2p-trace-validate-message"
"libp2p-trace-throttle-peer"
"libp2p-trace-undeliverable-message"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
20 changes: 14 additions & 6 deletions pkg/clmimicry/mimicry.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ func (m *Mimicry) startHermes(ctx context.Context) error {
return err
}

node.OnEvent(m.handleHermesEvent)
node.OnEvent(func(ctx context.Context, event *host.TraceEvent) {
if err := m.handleHermesEvent(ctx, event); err != nil {
m.log.WithError(err).Error("Failed to handle hermes event")
}
})

m.node = node

Expand Down Expand Up @@ -205,10 +209,6 @@ func (m *Mimicry) ServePProf(ctx context.Context) error {
return nil
}

func (m *Mimicry) handleHermesEvent(ctx context.Context, event *host.TraceEvent) {
m.log.WithField("event", event).Info("Received event")
}

func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, error) {
return &xatu.ClientMeta{
Name: m.Config.Name,
Expand All @@ -217,7 +217,15 @@ func (m *Mimicry) createNewClientMeta(ctx context.Context) (*xatu.ClientMeta, er
Implementation: xatu.Implementation,
Os: runtime.GOOS,
ClockDrift: uint64(m.clockDrift.Milliseconds()),
Labels: m.Config.Labels,
Ethereum: &xatu.ClientMeta_Ethereum{
Network: &xatu.ClientMeta_Ethereum_Network{
Name: m.Config.Ethereum.Network,
},
Execution: &xatu.ClientMeta_Ethereum_Execution{},
Consensus: &xatu.ClientMeta_Ethereum_Consensus{},
},

Labels: m.Config.Labels,
}, nil
}

Expand Down
Loading

0 comments on commit 7beb07b

Please sign in to comment.