Skip to content

Commit

Permalink
feat(output/kafka-sink) (#239)
Browse files Browse the repository at this point in the history
* feat(output/kafka-sink): Update kafka sink dependencies.

* feat(output/kafka-sink): Add Kafka Sink.

* feat(output/kafka-sink): Enable Kafka Sink.

* feat(output/kafka-sink):
* fix typos.
* Use flushBytes as a reference to check against msgByteSize.

* feat(output/kafka-sink):
* fix topic validation.

* feat(output/kafka-sink): fix lint

* feat(output/kafka): Update docs & example configs.

* feat(output/kafka): fix config.

---------

Co-authored-by: 0x-dx <145253945+0x-dx@users.noreply.github.com>
  • Loading branch information
00x-dx and 00x-dx committed Oct 25, 2023
1 parent c166b96 commit a943c15
Show file tree
Hide file tree
Showing 15 changed files with 625 additions and 74 deletions.
108 changes: 72 additions & 36 deletions docs/cannon.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,42 @@ Flags:

Cannon requires a single `yaml` config file. An example file can be found [here](../example_cannon.yaml)

| Name| Type | Default | Description |
| --- | --- | --- | --- |
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| name | string | | Unique name of the cannon |
| labels | object | | A key value map of labels to append to every cannon event |
| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint |
| ethereum.beaconNodeAddress | object | | A key value map of headers |
| ethereum.overrideNetworkName | string | | Override the network name |
| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache |
| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks |
| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks |
| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading |
| coordinator.address | string | | The address of the [Xatu server](./server.md) |
| coordinator.tls | bool | | Server requires TLS |
| coordinator.headers | object | | A key value map of headers to append to requests |
| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver |
| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver |
| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver |
| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver |
| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver |
| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver |
| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver |
| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| outputs | array<object> | | List of outputs for the cannon to send data to |
| outputs[].name | string | | Name of the output |
| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) |
| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) |
| Name| Type | Default | Description |
| --- | --- | --- |--------------------------------------------------------------------------------------------------------------------------------------------|
| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) |
| metricsAddr | string | `:9090` | The address the metrics server will listen on |
| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started |
| name | string | | Unique name of the cannon |
| labels | object | | A key value map of labels to append to every cannon event |
| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint |
| ethereum.beaconNodeAddress | object | | A key value map of headers |
| ethereum.overrideNetworkName | string | | Override the network name |
| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache |
| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks |
| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks |
| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading |
| coordinator.address | string | | The address of the [Xatu server](./server.md) |
| coordinator.tls | bool | | Server requires TLS |
| coordinator.headers | object | | A key value map of headers to append to requests |
| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver |
| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver |
| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver |
| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver |
| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver |
| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver |
| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head |
| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver |
| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head |
| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events |
| outputs | array<object> | | List of outputs for the cannon to send data to |
| outputs[].name | string | | Name of the output |
| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) |
| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) |

### Output `xatu` configuration

Expand Down Expand Up @@ -103,6 +103,22 @@ Output configuration to send cannon events to a http server.
| outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled |
| outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay |

### Output `kafka` configuration

Output configuration to send sentry events to a kafka server.

| Name | Type | Default | Allowed Values | Description |
| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` |
| outputs[].config.topic | string | | | Name of the topic. |
| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. |
| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. |
| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. |
| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. |
| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. |

### Simple example

```yaml
Expand Down Expand Up @@ -157,6 +173,21 @@ outputs:
authorization: "Basic Someb64Value"
```
### kafka server output example
```yaml
name: example-instance-004

ethereum:
beaconNodeAddress: http://localhost:5052

outputs:
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```
### Complex example with multiple outputs example
```yaml
Expand Down Expand Up @@ -190,6 +221,11 @@ outputs:
batchTimeout: 5s
exportTimeout: 30s
maxExportBatchSize: 512
- name: kafka-sink
type: kafka
config:
brokers: localhost:19092
topic: events
```
## Running locally
Expand Down
Loading

0 comments on commit a943c15

Please sign in to comment.