Skip to content

Commit

Permalink
feat(sage): Initial (#252)
Browse files Browse the repository at this point in the history
* feat: Armiarma

* feat(seer): Initial

* feat(sage): Initial

* Add metrics

* Add metrics

* feat(config): add MetricsAddr option for prometheus metrics

* feat: Add support for serving pprof server

* Add metrics

* chore: linting

* docs

* docs

* docs
  • Loading branch information
samcm authored Dec 5, 2023
1 parent 4aa4a93 commit 17faf5b
Show file tree
Hide file tree
Showing 26 changed files with 3,949 additions and 1,553 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.54
version: v1.55

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ GeoLite2-ASN.mmdb
GeoLite2-City.mmdb
__debug_bin*
.vscode/launch.json
sage.yaml
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ proto:
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/eth/v2 --go_out=./pkg/proto/eth/v2/ pkg/proto/eth/v2/*.proto
protoc --proto_path=./ --proto_path=./pkg/proto/eth/v1 --proto_path=./pkg/proto/eth/v2 --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/xatu --go-grpc_out=. --go-grpc_opt=paths=source_relative --go_out=./pkg/proto/xatu pkg/proto/xatu/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/blockprint --go_out=./pkg/proto/blockprint pkg/proto/blockprint/*.proto
protoc --proto_path=./ --go_opt=module=github.com/ethpandaops/xatu/pkg/proto/libp2p --go_out=./pkg/proto/libp2p pkg/proto/libp2p/*.proto
39 changes: 24 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@ Ethereum network monitoring with collection clients and a centralized server for
Xatu can run in multiple modes. Each mode can be run independently. The following diagram shows the different modes and how they interact with each other and other services.

```
┌───────────┐
│ CONSENSUS │
│ CLIENT ◄─────┐
└─────▲─────┘ │
│ │
│ │
┌───▼────┐ ┌────▼─────┐ ┌───────────┐ ┌───────────┐
│ XATU │ │ XATU │ │ XATU │ │ XATU │
│ SENTRY │ │ CANNON │ │ MIMICRY │ │ DISCOVERY │
└───┬────┘ └─────┬────┘ └─────┬─────┘ └─────┬─────┘
│ │ │ │
│ │ │ │
│ ┌────▼─────┐ │ │
└───────► ◄───────┘─────────────┘
┌───────────┐
│ CONSENSUS │
│P2P NETWORK│
└─────▲─────┘
┌────────────┘─────────────┐
│ │
┌─────▲─────┐ ┌─────▲─────┐ ┌───────────┐
│ CONSENSUS │ │ ARMIARMA │ │ EXECUTION │
│ CLIENT ◄─────┐ │ │ │P2P NETWORK│
└─────▲─────┘ │ └─────▲─────┘ └─────▲─────┘
│ │ │ ┌─────┘───────┐
│ │ │ │ │
┌───▼────┐ ┌────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ XATU │ │ XATU │ │ XATU │ │ XATU │ │ XATU │
│ SENTRY │ │ CANNON │ │ SAGE │ │ MIMICRY │ │ DISCOVERY │
└───┬────┘ └─────┬────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │ │ │
│ │ │ │ │
│ ┌────▼─────┐ │ │ │
└───────► ◄───────┘─────────────┘─────────────┘
│ XATU │
│ SERVER │ ┌─────────────┐
│ ◄────► PERSISTENCE │
Expand All @@ -40,7 +48,8 @@ Follow the links for more information on each mode.
- [**Sentry**](./docs/sentry.md) - Client that runs along side a [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) and collects data via the consensus client's [Beacon API](https://ethereum.github.io/beacon-APIs/). *You must run your own consensus client* and this projects sentry will connect to it via the consensus client's http server.
- [**Discovery**](./docs/discovery.md) - Client that uses the [Node Discovery Protocol v5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md) and [Node Discovery Protocol v4](https://github.com/ethereum/devp2p/blob/master/discv4.md) to discovery nodes on the network. Also attempts to connect to execution layer nodes and collect meta data from them.
- [**Mimicry**](./docs/mimicry.md) - Client that collects data from the execution layer P2P network.
- [**Cannon**](./docs/sentry.md) - Client that runs along side a [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) and collects canonical finalized data via the consensus client's [Beacon API](https://ethereum.github.io/beacon-APIs/). *You must run your own consensus client* and this projects cannon client will connect to it via the consensus client's http server.
- [**Cannon**](./docs/cannon.md) - Client that runs along side a [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) and collects canonical finalized data via the consensus client's [Beacon API](https://ethereum.github.io/beacon-APIs/). *You must run your own consensus client* and this projects cannon client will connect to it via the consensus client's http server.
- [**Sage**](./docs/sage.md) - Client that connects to an [Armiarma](https://github.com/migalabs/armiarma) instance (which itself connects to an Ethereum Beacon Chain P2P network) and creates events from the events Armiarma emits.

## Getting Started

Expand Down
85 changes: 85 additions & 0 deletions cmd/sage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//nolint:dupl // disable duplicate code warning for cmds
package cmd

import (
"os"

"github.com/creasty/defaults"
"github.com/ethpandaops/xatu/pkg/sage"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
yaml "gopkg.in/yaml.v3"
)

var (
sageCfgFile string
)

// sageCmd represents the sage command
var sageCmd = &cobra.Command{
Use: "sage",
Short: "Runs Xatu in sage mode.",
Long: `Runs Xatu in sage mode, which means it will connect to an Ethereum beacon p2p network and listen for events (via Armiarma).`,
Run: func(cmd *cobra.Command, args []string) {
initCommon()

log.WithField("location", sageCfgFile).Info("Loading config")

config, err := loadsageConfigFromFile(sageCfgFile)
if err != nil {
log.Fatal(err)
}

log.Info("Config loaded")

logLevel, err := logrus.ParseLevel(config.LoggingLevel)
if err != nil {
log.WithField("logLevel", config.LoggingLevel).Fatal("invalid logging level")
}

log.SetLevel(logLevel)

sage, err := sage.New(cmd.Context(), log, config)
if err != nil {
log.Fatal(err)
}

if err := sage.Start(cmd.Context()); err != nil {
log.Fatal(err)
}

log.Info("Xatu sage exited - cya!")
},
}

func init() {
rootCmd.AddCommand(sageCmd)

sageCmd.Flags().StringVar(&sageCfgFile, "config", "sage.yaml", "config file (default is sage.yaml)")
}

func loadsageConfigFromFile(file string) (*sage.Config, error) {
if file == "" {
file = "sage.yaml"
}

config := &sage.Config{}

if err := defaults.Set(config); err != nil {
return nil, err
}

yamlFile, err := os.ReadFile(file)

if err != nil {
return nil, err
}

type plain sage.Config

if err := yaml.Unmarshal(yamlFile, (*plain)(config)); err != nil {
return nil, err
}

return config, nil
}
220 changes: 220 additions & 0 deletions docs/sage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# Sage

Client that is run along side an [Armiarma](https://github.com/migalabs/armiarma) instance and collects data from the beacon chain p2p network.

This sage can output events to various sinks and it is **not** a hard requirement to run the [Xatu server](./server.md).

## Table of contents

- [Usage](#usage)
- [Requirements](#requirements)
- [Configuration](#configuration)
- [`xatu` output](#output-xatu-configuration)
- [`http` output](#output-http-configuration)
- [Simple example](#simple-example)
- [Xatu server output example](#xatu-server-output-example)
- [HTTP server output example](#http-server-output-example)
- [Complex example with multiple outputs example](#complex-example-with-multiple-outputs-example)
- [Running locally](#running-locally)

## Usage

sage requires a [config file](#configuration).

```bash
Usage:
xatu sage [flags]

Flags:
--config string config file (default is sage.yaml) (default "sage.yaml")
-h, --help help for sage
```

## Requirements

- [Armiarma instance](https://github.com/migalabs/armiarma) with expose SSE server.

## Configuration

Sage requires a single `yaml` config file. An example file can be found [here](../example_sage.yaml)

| Name| Type | Default | Description |
| --- | --- | --- |--------------------------------------------------------------------------------------------------------------------------------------------|
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| name | string | | Unique name of the sage |
| labels | object | | A key value map of labels to append to every sage event |
| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint |
| ethereum.beaconNodeAddress | object | | A key value map of headers |
| ethereum.overrideNetworkName | string | | Override the network name |
| armiarmaUrl | string | | The address of the Armiarma instance |
| workers | int | 1 | The count of workers processing attestations. Will result in more duplicate attestations if count is greater than 1 |
| duplicateAttestationThreshold | int | 3 | The amount of times to forward on the same attestation |
| outputs | array<object> | | List of outputs for the sage to send data to |
| outputs[].name | string | | Name of the output |
| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) |
| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) |

### Output `xatu` configuration

Output configuration to send sage events to a [Xatu server](./server.md).

| Name| Type | Default | Description |
| --- | --- | --- | --- |
| outputs[].config.address | string | | The address of the server receiving events |
| outputs[].config.tls | bool | | Server requires TLS |
| outputs[].config.headers | object | | A key value map of headers to append to requests |
| outputs[].config.maxQueueSize | int | `51200` | The maximum queue size to buffer events for delayed processing. If the queue gets full it drops the events |
| outputs[].config.batchTimeout | string | `5s` | The maximum duration for constructing a batch. Processor forcefully sends available events when timeout is reached |
| outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled |
| outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay |

### Output `http` configuration

Output configuration to send sage events to a http server.

| Name| Type | Default | Description |
| --- | --- | --- | --- |
| outputs[].config.address | string | | The address of the server receiving events |
| outputs[].config.headers | object | | A key value map of headers to append to requests |
| outputs[].config.maxQueueSize | int | `51200` | The maximum queue size to buffer events for delayed processing. If the queue gets full it drops the events |
| outputs[].config.batchTimeout | string | `5s` | The maximum duration for constructing a batch. Processor forcefully sends available events when timeout is reached |
| outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled |
| outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay |

### Output `kafka` configuration

Output configuration to send sentry events to a kafka server.

| Name | Type | Default | Allowed Values | Description |
| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` |
| outputs[].config.topic | string | | | Name of the topic. |
| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. |
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |

### Simple example

```yaml
name: xatu-sage

armiarmaUrl: http://localhost:9099/events

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: standard-out
type: stdout
```
### Xatu server output example
```yaml
name: xatu-sage

armiarmaUrl: http://localhost:9099/events

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: xatu-output
type: xatu
config:
address: localhost:8080
```
### http server output example
```yaml
name: xatu-sage

armiarmaUrl: http://localhost:9099/events

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: http-basic-auth
type: http
config:
address: http://localhost:8080
headers:
authorization: "Basic Someb64Value"
```
### kafka server output example
```yaml
name: example-instance-004

armiarmaUrl: http://localhost:9099/events

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```
### Complex example with multiple outputs example
```yaml
logging: "debug"
metricsAddr: ":9090"
pprofAddr: ":6060"

name: xatu-sage

labels:
ethpandaops: rocks

ntpServer: time.google.com

armiarmaUrl: http://localhost:9099/events

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: log
type: stdout
- name: xatu-server
type: xatu
config:
address: localhost:8080
headers:
authorization: Someb64Value
maxQueueSize: 51200
batchTimeout: 5s
exportTimeout: 30s
maxExportBatchSize: 512
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```
## Running locally
```bash
# docker
docker run -d --name xatu-sage -v $HOST_DIR_CHANGE_ME/config.yaml:/opt/xatu/config.yaml -p 9090:9090 -it ethpandaops/xatu:latest sage --config /opt/xatu/config.yaml
# build
go build -o dist/xatu main.go
./dist/xatu sage --config sage.yaml
# dev
go run main.go sage --config sage.yaml
```
Loading

0 comments on commit 17faf5b

Please sign in to comment.