Skip to content

Commit

Permalink
feat(cl-mimicry): Initial (#297)
Browse files Browse the repository at this point in the history
* feat(cl-mimicry): Initial

* Add example-cl-mimicry.yaml

* feat(cl-mimicry): Add libp2p events

* feat(cl-mimicry): Add libp2p events

* Fix clickhouse migrations

* Add rpc tables

* Add rpc tables

* remove session_start_date_time

* Remove non-traced events

* feat: Tidying

* chore: Remove unused Kafka sink configurations

* refactor: Update table name and add column 'peer_id'

* merge master
  • Loading branch information
samcm authored Apr 17, 2024
1 parent cd92d1d commit be7d92e
Show file tree
Hide file tree
Showing 30 changed files with 8,387 additions and 1,591 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ sentry.yaml
server.yaml
discovery.yaml
mimicry.yaml
cl-mimicry.yaml
el-mimicry.yaml
cannon.yaml
sage.yaml
dist
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /src
COPY go.sum go.mod ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /bin/app .
RUN go build -o /bin/app .

FROM ubuntu:latest
RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-recommends \
Expand Down
86 changes: 86 additions & 0 deletions cmd/cl-mimicry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//nolint:dupl // disable duplicate code warning for cmds
package cmd

import (
"os"

"github.com/creasty/defaults"
"github.com/ethpandaops/xatu/pkg/clmimicry"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
yaml "gopkg.in/yaml.v3"
)

var (
clMimicryCfgFile string
)

// clMimicryCmd represents the consensus layer mimicry command
var clMimicryCmd = &cobra.Command{
Use: "cl-mimicry",
Short: "Runs Xatu in CL Mimicry mode.",
Long: `Runs Xatu in consensus layer Mimicry mode, which means it will connect to
the consensus layer p2p network and create events from the data it receives.`,
Run: func(cmd *cobra.Command, args []string) {
initCommon()

log.WithField("location", clMimicryCfgFile).Info("Loading config")

config, err := loadCLMimicryConfigFromFile(clMimicryCfgFile)
if err != nil {
log.Fatal(err)
}

log.Info("Config loaded")

logLevel, err := logrus.ParseLevel(config.LoggingLevel)
if err != nil {
log.WithField("logLevel", config.LoggingLevel).Fatal("invalid logging level")
}

log.SetLevel(logLevel)

mimicry, err := clmimicry.New(cmd.Context(), log, config)
if err != nil {
log.Fatal(err)
}

if err := mimicry.Start(cmd.Context()); err != nil {
log.Fatal(err)
}

log.Info("Xatu mimicry exited - cya!")
},
}

func init() {
rootCmd.AddCommand(clMimicryCmd)

clMimicryCmd.Flags().StringVar(&clMimicryCfgFile, "config", "cl-mimicry.yaml", "config file (default is cl-mimicry.yaml)")
}

func loadCLMimicryConfigFromFile(file string) (*clmimicry.Config, error) {
if file == "" {
file = "cl-mimicry.yaml"
}

config := &clmimicry.Config{}

if err := defaults.Set(config); err != nil {
return nil, err
}

yamlFile, err := os.ReadFile(file)

if err != nil {
return nil, err
}

type plain clmimicry.Config

if err := yaml.Unmarshal(yamlFile, (*plain)(config)); err != nil {
return nil, err
}

return config, nil
}
119 changes: 119 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ transforms:
eth_v2_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK"
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
libp2p_trace_connected: .event.name == "LIBP2P_TRACE_CONNECTED"
libp2p_trace_disconnected: .event.name == "LIBP2P_TRACE_DISCONNECTED"
libp2p_trace_add_peer: .event.name == "LIBP2P_TRACE_ADD_PEER"
libp2p_trace_remove_peer: .event.name == "LIBP2P_TRACE_REMOVE_PEER"
libp2p_trace_recv_rpc: .event.name == "LIBP2P_TRACE_RECV_RPC"
libp2p_trace_send_rpc: .event.name == "LIBP2P_TRACE_SEND_RPC"
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -669,3 +676,115 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_connected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_connected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-connected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_disconnected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_disconnected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-disconnected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_add_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_add_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-add-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_remove_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_remove_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-remove-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_recv_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_recv_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-recv-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_send_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_send_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-send-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_join_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_join
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-join
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
Loading

0 comments on commit be7d92e

Please sign in to comment.