From 5718e0db75327d8de02fa99407ff772e69a8d4bd Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Thu, 19 Oct 2023 01:33:46 +0800 Subject: [PATCH] feat(output/kafka): Update docs & example configs. --- docs/cannon.md | 108 ++++++++++++++++++++++++++++--------------- docs/mimicry.md | 67 +++++++++++++++++++++------ docs/sentry.md | 65 ++++++++++++++++++++------ docs/server.md | 5 ++ example_cannon.yaml | 12 +++++ example_mimicry.yaml | 12 +++++ example_sentry.yaml | 12 +++++ 7 files changed, 216 insertions(+), 65 deletions(-) diff --git a/docs/cannon.md b/docs/cannon.md index 1618868b..c6452b8d 100644 --- a/docs/cannon.md +++ b/docs/cannon.md @@ -39,42 +39,42 @@ Flags: Cannon requires a single `yaml` config file. An example file can be found [here](../example_cannon.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | -| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| name | string | | Unique name of the cannon | -| labels | object | | A key value map of labels to append to every cannon event | -| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | -| ethereum.beaconNodeAddress | object | | A key value map of headers | -| ethereum.overrideNetworkName | string | | Override the network name | -| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache | -| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks | -| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks | -| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading | -| coordinator.address | string | | The address of the [Xatu server](./server.md) | -| coordinator.tls | bool | | Server requires TLS | -| coordinator.headers | object | | A key value map of headers to append to requests | -| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver | -| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver | -| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver | -| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver | -| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver | -| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver | -| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver | -| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head | -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| outputs | array | | List of outputs for the cannon to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| Name| Type | Default | Description | +| --- | --- | --- |--------------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | +| name | string | | Unique name of the cannon | +| labels | object | | A key value map of labels to append to every cannon event | +| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | +| ethereum.beaconNodeAddress | object | | A key value map of headers | +| ethereum.overrideNetworkName | string | | Override the network name | +| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache | +| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks | +| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks | +| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading | +| coordinator.address | string | | The address of the [Xatu server](./server.md) | +| coordinator.tls | bool | | Server requires TLS | +| coordinator.headers | object | | A key value map of headers to append to requests | +| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver | +| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver | +| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver | +| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver | +| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver | +| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver | +| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver | +| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head | +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| outputs | array | | List of outputs for the cannon to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Output `xatu` configuration @@ -103,6 +103,22 @@ Output configuration to send cannon events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -157,6 +173,21 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` ### Complex example with multiple outputs example ```yaml @@ -190,6 +221,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/mimicry.md b/docs/mimicry.md index 8e6d52ac..1d0b699f 100644 --- a/docs/mimicry.md +++ b/docs/mimicry.md @@ -36,22 +36,22 @@ Flags: Mimicry requires a single `yaml` config file. An example file can be found [here](../example_mimicry.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| Name| Type | Default | Description | +| --- | --- | --- |------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | | pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started | -| name | string | | Unique name of the mimicry | -| labels | object | | A key value map of labels to append to every mimicry event | -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| captureDelay | string | `3m` | Delay before starting to capture transactions | -| coordinator.type | string | | Type of output (`xatu`, `static`) | -| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) | -| outputs | array | | List of outputs for the mimicry to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started | +| name | string | | Unique name of the mimicry | +| labels | object | | A key value map of labels to append to every mimicry event | +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| captureDelay | string | `3m` | Delay before starting to capture transactions | +| coordinator.type | string | | Type of output (`xatu`, `static`) | +| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) | +| outputs | array | | List of outputs for the mimicry to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Coordinator `xatu` configuration @@ -106,6 +106,22 @@ Output configuration to send mimicry events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -181,6 +197,22 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` + ### Complex example with multiple outputs example ```yaml @@ -218,6 +250,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/sentry.md b/docs/sentry.md index 58aa6f67..30fab8db 100644 --- a/docs/sentry.md +++ b/docs/sentry.md @@ -39,20 +39,20 @@ Flags: Sentry requires a single `yaml` config file. An example file can be found [here](../example_sentry.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | -| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| name | string | | Unique name of the sentry | -| labels | object | | A key value map of labels to append to every sentry event | -| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | -| ethereum.beaconSubscriptions | array | | List of [topcis](https://ethereum.github.io/beacon-APIs/#/Events/eventstream) to subscribe to. If empty, all subscriptions are subscribed to. -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| outputs | array | | List of outputs for the sentry to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| Name| Type | Default | Description | +| --- | --- | --- |-----------------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | +| name | string | | Unique name of the sentry | +| labels | object | | A key value map of labels to append to every sentry event | +| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | +| ethereum.beaconSubscriptions | array | | List of [topcis](https://ethereum.github.io/beacon-APIs/#/Events/eventstream) to subscribe to. If empty, all subscriptions are subscribed to. +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| outputs | array | | List of outputs for the sentry to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Output `xatu` configuration @@ -81,6 +81,22 @@ Output configuration to send sentry events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -126,6 +142,22 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` + ### Complex example with multiple outputs example ```yaml @@ -156,6 +188,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/server.md b/docs/server.md index 3a47d89a..bddfa598 100644 --- a/docs/server.md +++ b/docs/server.md @@ -186,6 +186,11 @@ services: address: http://localhost:8081 headers: authorization: "Basic Someb64Value" + - name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Services diff --git a/example_cannon.yaml b/example_cannon.yaml index 7a27fbe1..2b3c531c 100644 --- a/example_cannon.yaml +++ b/example_cannon.yaml @@ -76,3 +76,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random diff --git a/example_mimicry.yaml b/example_mimicry.yaml index 37924ac6..8447c5dd 100644 --- a/example_mimicry.yaml +++ b/example_mimicry.yaml @@ -60,3 +60,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random diff --git a/example_sentry.yaml b/example_sentry.yaml index efb0cc10..3be1ebfa 100644 --- a/example_sentry.yaml +++ b/example_sentry.yaml @@ -80,3 +80,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random