Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cl-mimicry): Initial #297

Merged
merged 14 commits into from
Apr 17, 2024
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ sentry.yaml
server.yaml
discovery.yaml
mimicry.yaml
cl-mimicry.yaml
el-mimicry.yaml
cannon.yaml
sage.yaml
dist
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /src
COPY go.sum go.mod ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /bin/app .
RUN go build -o /bin/app .

FROM ubuntu:latest
RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-recommends \
Expand Down
86 changes: 86 additions & 0 deletions cmd/cl-mimicry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//nolint:dupl // disable duplicate code warning for cmds
package cmd

import (
"os"

"github.com/creasty/defaults"
"github.com/ethpandaops/xatu/pkg/clmimicry"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
yaml "gopkg.in/yaml.v3"
)

var (
clMimicryCfgFile string
)

// clMimicryCmd represents the consensus layer mimicry command
var clMimicryCmd = &cobra.Command{
Use: "cl-mimicry",
Short: "Runs Xatu in CL Mimicry mode.",
Long: `Runs Xatu in consensus layer Mimicry mode, which means it will connect to
the consensus layer p2p network and create events from the data it receives.`,
Run: func(cmd *cobra.Command, args []string) {
initCommon()

log.WithField("location", clMimicryCfgFile).Info("Loading config")

config, err := loadCLMimicryConfigFromFile(clMimicryCfgFile)
if err != nil {
log.Fatal(err)
}

log.Info("Config loaded")

logLevel, err := logrus.ParseLevel(config.LoggingLevel)
if err != nil {
log.WithField("logLevel", config.LoggingLevel).Fatal("invalid logging level")
}

log.SetLevel(logLevel)

mimicry, err := clmimicry.New(cmd.Context(), log, config)
if err != nil {
log.Fatal(err)
}

if err := mimicry.Start(cmd.Context()); err != nil {
log.Fatal(err)
}

log.Info("Xatu mimicry exited - cya!")
},
}

func init() {
rootCmd.AddCommand(clMimicryCmd)

clMimicryCmd.Flags().StringVar(&clMimicryCfgFile, "config", "cl-mimicry.yaml", "config file (default is cl-mimicry.yaml)")
}

func loadCLMimicryConfigFromFile(file string) (*clmimicry.Config, error) {
if file == "" {
file = "cl-mimicry.yaml"
}

config := &clmimicry.Config{}

if err := defaults.Set(config); err != nil {
return nil, err
}

yamlFile, err := os.ReadFile(file)

if err != nil {
return nil, err
}

type plain clmimicry.Config

if err := yaml.Unmarshal(yamlFile, (*plain)(config)); err != nil {
return nil, err
}

return config, nil
}
119 changes: 119 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ transforms:
eth_v2_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK"
mempool_transaction_v2: .event.name == "MEMPOOL_TRANSACTION_V2"
mempool_transaction: .event.name == "MEMPOOL_TRANSACTION"
libp2p_trace_connected: .event.name == "LIBP2P_TRACE_CONNECTED"
libp2p_trace_disconnected: .event.name == "LIBP2P_TRACE_DISCONNECTED"
libp2p_trace_add_peer: .event.name == "LIBP2P_TRACE_ADD_PEER"
libp2p_trace_remove_peer: .event.name == "LIBP2P_TRACE_REMOVE_PEER"
libp2p_trace_recv_rpc: .event.name == "LIBP2P_TRACE_RECV_RPC"
libp2p_trace_send_rpc: .event.name == "LIBP2P_TRACE_SEND_RPC"
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -669,3 +676,115 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_connected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_connected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-connected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_disconnected_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_disconnected
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-disconnected
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_add_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_add_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-add-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_remove_peer_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_remove_peer
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-remove-peer
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_recv_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_recv_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-recv-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_send_rpc_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_send_rpc
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-send-rpc
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
libp2p_trace_join_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_join
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-join
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
Loading
Loading