Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(output/kafka-sink) #239

Merged
merged 10 commits into from
Oct 25, 2023
Merged
108 changes: 72 additions & 36 deletions docs/cannon.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,42 @@ Flags:

Cannon requires a single `yaml` config file. An example file can be found [here](../example_cannon.yaml)

| Name| Type | Default | Description |
| --- | --- | --- | --- |
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| name | string | | Unique name of the cannon |
| labels | object | | A key value map of labels to append to every cannon event |
| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint |
| ethereum.beaconNodeAddress | object | | A key value map of headers |
| ethereum.overrideNetworkName | string | | Override the network name |
| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache |
| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks |
| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks |
| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading |
| coordinator.address | string | | The address of the [Xatu server](./server.md) |
| coordinator.tls | bool | | Server requires TLS |
| coordinator.headers | object | | A key value map of headers to append to requests |
| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver |
| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver |
| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver |
| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver |
| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver |
| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver |
| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver |
| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| outputs | array<object> | | List of outputs for the cannon to send data to |
| outputs[].name | string | | Name of the output |
| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) |
| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) |
| Name| Type | Default | Description |
| --- | --- | --- |--------------------------------------------------------------------------------------------------------------------------------------------|
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| name | string | | Unique name of the cannon |
| labels | object | | A key value map of labels to append to every cannon event |
| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint |
| ethereum.beaconNodeAddress | object | | A key value map of headers |
| ethereum.overrideNetworkName | string | | Override the network name |
| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache |
| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks |
| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks |
| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading |
| coordinator.address | string | | The address of the [Xatu server](./server.md) |
| coordinator.tls | bool | | Server requires TLS |
| coordinator.headers | object | | A key value map of headers to append to requests |
| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver |
| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver |
| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver |
| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver |
| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver |
| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver |
| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver |
| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| outputs | array<object> | | List of outputs for the cannon to send data to |
| outputs[].name | string | | Name of the output |
| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) |
| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) |

### Output `xatu` configuration

Expand Down Expand Up @@ -103,6 +103,22 @@ Output configuration to send cannon events to a http server.
| outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled |
| outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay |

### Output `kafka` configuration

Output configuration to send sentry events to a kafka server.

| Name | Type | Default | Allowed Values | Description |
| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` |
| outputs[].config.topic | string | | | Name of the topic. |
| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. |
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |

### Simple example

```yaml
Expand Down Expand Up @@ -157,6 +173,21 @@ outputs:
authorization: "Basic Someb64Value"
```

### kafka server output example

```yaml
name: example-instance-004

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```
### Complex example with multiple outputs example

```yaml
Expand Down Expand Up @@ -190,6 +221,11 @@ outputs:
batchTimeout: 5s
exportTimeout: 30s
maxExportBatchSize: 512
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```

## Running locally
Expand Down
Loading
Loading