From 8ae6eed74e50fe6798c9abc4aaef2a9de10e6753 Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Fri, 13 Oct 2023 22:56:39 +0800 Subject: [PATCH 1/8] feat(output/kafka-sink): Update kafka sink dependencies. --- go.mod | 20 +++++++++++++++++--- go.sum | 50 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 11e6e77a..4f804d0f 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ethpandaops/xatu go 1.19 require ( + github.com/IBM/sarama v1.41.2 github.com/attestantio/go-eth2-client v0.18.4-0.20231012194602-0eff364fec01 github.com/avast/retry-go/v4 v4.3.4 github.com/beevik/ntp v1.0.0 @@ -55,6 +56,9 @@ require ( github.com/deckarep/golang-set/v2 v2.3.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.14.1 // indirect github.com/ferranbt/fastssz v0.1.3 // indirect github.com/getsentry/sentry-go v0.23.0 // indirect @@ -68,6 +72,9 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.3 // indirect github.com/huandu/xstrings v1.4.0 // indirect @@ -75,6 +82,11 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -95,12 +107,14 @@ require ( github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect github.com/r3labs/sse/v2 v2.10.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect @@ -116,11 +130,11 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.10.0 // indirect - golang.org/x/crypto v0.11.0 // indirect + golang.org/x/crypto v0.13.0 // indirect golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b // indirect - golang.org/x/net v0.12.0 // indirect + golang.org/x/net v0.15.0 // indirect golang.org/x/sys v0.12.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect diff --git a/go.sum b/go.sum index be91a79e..d96b7de3 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/IBM/sarama v1.41.2 h1:ZDBZfGPHAD4uuAtSv4U22fRZBgst0eEwGFzLj0fb85c= +github.com/IBM/sarama v1.41.2/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= @@ -49,6 +51,12 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/ethereum/go-ethereum v1.12.0 h1:bdnhLPtqETd4m3mS8BGMNvBTf36bO5bx/hxE2zljOa0= github.com/ethereum/go-ethereum v1.12.0/go.mod h1:/oo2X/dZLJjf2mJ6YT9wcWxa4nNJDBKDBU6sFIpx1Gs= github.com/ethpandaops/beacon v0.31.0 h1:jD5zSeQNv35nfPuT6Qxx/5XwNhlHAWzO1m4Ux8FuRzI= @@ -62,6 +70,7 @@ github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/ferranbt/fastssz v0.1.3 h1:ZI+z3JH05h4kgmFXdHuR1aWYsgrg7o+Fw7/NCzM16Mo= github.com/ferranbt/fastssz v0.1.3/go.mod h1:0Y9TEd/9XuFlh7mskMPfXiI2Dkw4Ddg9EyXt1W7MRvE= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -115,12 +124,21 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.3 h1:K8UWO1HUJpRMXBxbmaY1Y8IAMZC/RsKB+ArEnnK4l5o= @@ -143,6 +161,18 @@ github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= github.com/jellydator/ttlcache/v3 v3.1.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -220,6 +250,8 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/oschwald/maxminddb-golang v1.10.0 h1:Xp1u0ZhqkSuopaKmk1WwHtjF0H9Hd9181uj2MQ5Vndg= github.com/oschwald/maxminddb-golang v1.10.0/go.mod h1:Y2ELenReaLAZ0b400URyGwvYxHV1dLIxBuyOsyYjHK0= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -238,6 +270,8 @@ github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4 github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4= github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.0.4 h1:FC82T+CHJ/Q/PdyLW++GeCO+Ol59Y4T7R4jbgjvktgc= github.com/redis/go-redis/v9 v9.0.4/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -274,6 +308,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -312,8 +347,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= -golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -324,6 +360,7 @@ golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -331,9 +368,10 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= +golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -384,8 +422,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= From 7be1904a7d8e9c9625c3a786335a07aea9e1613d Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Fri, 13 Oct 2023 22:57:48 +0800 Subject: [PATCH 2/8] feat(output/kafka-sink): Add Kafka Sink. --- pkg/output/kafka/client.go | 73 +++++++++++++++++++++++ pkg/output/kafka/config.go | 29 ++++++++++ pkg/output/kafka/exporter.go | 108 +++++++++++++++++++++++++++++++++++ pkg/output/kafka/kafka.go | 103 +++++++++++++++++++++++++++++++++ 4 files changed, 313 insertions(+) create mode 100644 pkg/output/kafka/client.go create mode 100644 pkg/output/kafka/config.go create mode 100644 pkg/output/kafka/exporter.go create mode 100644 pkg/output/kafka/kafka.go diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go new file mode 100644 index 00000000..93907e61 --- /dev/null +++ b/pkg/output/kafka/client.go @@ -0,0 +1,73 @@ +package kafka + +import ( + "github.com/IBM/sarama" + "strings" +) + +type CompressionStrategy string + +var ( + CompressionStrategyNone CompressionStrategy = "none" + CompressionStrategyGZIP CompressionStrategy = "gzip" + CompressionStrategySnappy CompressionStrategy = "snappy" + CompressionStrategyLZ4 CompressionStrategy = "lz4" + CompressionStrategyZSTD CompressionStrategy = "zstd" +) + +type RequiredAcks string + +var ( + RequiredAcksLeader RequiredAcks = "leader" + RequiredAcksAll RequiredAcks = "all" + RequiredAcksNone RequiredAcks = "none" +) + +type PartitionStrategy string + +var ( + PartitionStrategyNone PartitionStrategy = "none" + PartitionStrategyRandom PartitionStrategy = "random" +) + +func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { + producerConfig := Init(config) + brokersList := strings.Split(config.Brokers, ",") + return sarama.NewSyncProducer(brokersList, producerConfig) +} +func Init(config *Config) *sarama.Config { + c := sarama.NewConfig() + c.Producer.Flush.Bytes = config.FlushBytes + c.Producer.Flush.Messages = config.FlushMessages + c.Producer.Flush.Frequency = config.FlushFrequency + c.Producer.Retry.Max = config.MaxRetries + c.Producer.Return.Successes = true + + switch config.RequiredAcks { + case RequiredAcksNone: + c.Producer.RequiredAcks = sarama.NoResponse + case RequiredAcksAll: + c.Producer.RequiredAcks = sarama.WaitForAll + default: + c.Producer.RequiredAcks = sarama.WaitForLocal + } + switch config.Compression { + case CompressionStrategyLZ4: + c.Producer.Compression = sarama.CompressionLZ4 + case CompressionStrategyGZIP: + c.Producer.Compression = sarama.CompressionGZIP + case CompressionStrategySnappy: + c.Producer.Compression = sarama.CompressionSnappy + case CompressionStrategyZSTD: + c.Producer.Compression = sarama.CompressionZSTD + default: + c.Producer.Compression = sarama.CompressionNone + } + switch config.Partitioning { + case PartitionStrategyNone: + c.Producer.Partitioner = sarama.NewHashPartitioner + default: + c.Producer.Partitioner = sarama.NewRandomPartitioner + } + return c +} diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go new file mode 100644 index 00000000..0f460654 --- /dev/null +++ b/pkg/output/kafka/config.go @@ -0,0 +1,29 @@ +package kafka + +import ( + "errors" + "time" +) + +type Config struct { + Brokers string `yaml:"brokers"` + Topic string `yaml:"topic"` + TLS bool `yaml:"tls" default:"false"` + MaxQueueSize int `yaml:"maxQueueSize" default:"51200"` + FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"` + FlushMessages int `yaml:"flushMessages" default:"500"` + FlushBytes int `yaml:"flushBytes" default:"1000000"` + MaxRetries int `yaml:"maxRetries" default:"3"` + Compression CompressionStrategy `yaml:"compression" default:"snappy"` + RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"` + Partitioning PartitionStrategy `yaml:"partitioning" default:"none"` +} + +func (c *Config) Validate() error { + + if c.Brokers == "" { + return errors.New("address is required") + } + + return nil +} diff --git a/pkg/output/kafka/exporter.go b/pkg/output/kafka/exporter.go new file mode 100644 index 00000000..75214dba --- /dev/null +++ b/pkg/output/kafka/exporter.go @@ -0,0 +1,108 @@ +package kafka + +import ( + "context" + "errors" + "github.com/IBM/sarama" + "github.com/ethpandaops/xatu/pkg/observability" + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/encoding/protojson" +) + +type ItemExporter struct { + config *Config + log logrus.FieldLogger + client sarama.SyncProducer +} + +func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { + + producer, err := NewSyncProducer(config) + + if err != nil { + log. + WithError(err). + WithField("output_name", name). + WithField("output_type", SinkType). + Error("Error while creating the Kafka Client") + return ItemExporter{}, err + } + + return ItemExporter{ + config: config, + log: log.WithField("output_name", name).WithField("output_type", SinkType), + client: producer, + }, nil +} +func (e ItemExporter) ExportItems(ctx context.Context, items []*xatu.DecoratedEvent) error { + _, span := observability.Tracer().Start(ctx, "KafkaItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items))))) + defer span.End() + + e.log.WithField("events", len(items)).Debug("Sending batch of events to Kafka sink") + + if err := e.sendUpstream(ctx, items); err != nil { + e.log. + WithError(err). + WithField("num_events", len(items)). + Error("Failed to send events upstream") + + span.SetStatus(codes.Error, err.Error()) + + return err + } + + return nil +} + +func (e ItemExporter) Shutdown(ctx context.Context) error { + return nil +} + +const MAX_MSG_BYTE_SIZE = 1024 * 1024 + +func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.DecoratedEvent) error { + msgs := make([]*sarama.ProducerMessage, 0, len(items)) + msgByteSize := 0 + for _, p := range items { + r, err := protojson.Marshal(p) + if err != nil { + return err + } + routingKey := sarama.StringEncoder(p.Event.Id) + eventPayload := sarama.StringEncoder(r) + m := &sarama.ProducerMessage{ + Topic: e.config.Topic, + Key: routingKey, + Value: eventPayload, + } + msgByteSize = m.ByteSize(2) + if msgByteSize > MAX_MSG_BYTE_SIZE { + e.log.WithField("event_id", routingKey).WithField("msg_size", msgByteSize).Debug("Message too large, consider increasing `max_message_bytes`") + continue + } + msgs = append(msgs, m) + } + err := e.client.SendMessages(msgs) + errorCount := 0 + if err != nil { + var errs sarama.ProducerErrors + if errors.As(err, &errs) { + errorCount = len(errs) + for _, producerError := range errs { + e.log. + WithError(producerError.Err). + WithField("num_events", len(errs)). + Error("Failed to send events to Kafka") + return producerError + } + } + return err + } + e.log.WithField("count", len(msgs)-errorCount).Debug("Items written to Kafka") + + return nil +} diff --git a/pkg/output/kafka/kafka.go b/pkg/output/kafka/kafka.go new file mode 100644 index 00000000..673ab1ee --- /dev/null +++ b/pkg/output/kafka/kafka.go @@ -0,0 +1,103 @@ +package kafka + +import ( + "context" + "errors" + "github.com/ethpandaops/xatu/pkg/processor" + "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/sirupsen/logrus" +) + +const SinkType = "kafka" + +type Kafka struct { + name string + config *Config + log logrus.FieldLogger + proc *processor.BatchItemProcessor[xatu.DecoratedEvent] + filter xatu.EventFilter +} + +func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu.EventFilterConfig, shippingMethod processor.ShippingMethod) (*Kafka, error) { + if config == nil { + return nil, errors.New("config is required") + } + + if err := config.Validate(); err != nil { + return nil, err + } + + exporter, err := NewItemExporter(name, config, log) + if err != nil { + return nil, err + } + + filter, err := xatu.NewEventFilter(filterConfig) + if err != nil { + return nil, err + } + + proc, err := processor.NewBatchItemProcessor[xatu.DecoratedEvent]( + exporter, + xatu.ImplementationLower()+"_output_"+SinkType+"_"+name, + log, + processor.WithShippingMethod(shippingMethod), + ) + if err != nil { + return nil, err + } + + return &Kafka{ + name: name, + config: config, + log: log, + proc: proc, + filter: filter, + }, nil +} + +func (h *Kafka) Name() string { + return h.name +} + +func (h *Kafka) Type() string { + return SinkType +} + +func (h *Kafka) Start(ctx context.Context) error { + return nil +} + +func (h *Kafka) Stop(ctx context.Context) error { + return h.proc.Shutdown(ctx) +} + +func (h *Kafka) HandleNewDecoratedEvent(ctx context.Context, event *xatu.DecoratedEvent) error { + shouldBeDropped, err := h.filter.ShouldBeDropped(event) + if err != nil { + return err + } + + if shouldBeDropped { + return nil + } + + return h.proc.Write(ctx, []*xatu.DecoratedEvent{event}) +} + +func (h *Kafka) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.DecoratedEvent) error { + filtered := []*xatu.DecoratedEvent{} + + for _, event := range events { + shouldBeDropped, err := h.filter.ShouldBeDropped(event) + if err != nil { + return err + } + + if !shouldBeDropped { + filtered = append(filtered, event) + } + } + + return h.proc.Write(ctx, filtered) +} From 9d57aac38b24a7b65a541eee1741a31322d90b41 Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Fri, 13 Oct 2023 23:19:10 +0800 Subject: [PATCH 3/8] feat(output/kafka-sink): Enable Kafka Sink. --- pkg/output/config.go | 15 +++++++++++++++ pkg/output/sink.go | 2 ++ 2 files changed, 17 insertions(+) diff --git a/pkg/output/config.go b/pkg/output/config.go index 70c3b724..246ac488 100644 --- a/pkg/output/config.go +++ b/pkg/output/config.go @@ -6,6 +6,7 @@ import ( "github.com/creasty/defaults" "github.com/ethpandaops/xatu/pkg/output/http" + "github.com/ethpandaops/xatu/pkg/output/kafka" "github.com/ethpandaops/xatu/pkg/output/stdout" "github.com/ethpandaops/xatu/pkg/output/xatu" "github.com/ethpandaops/xatu/pkg/processor" @@ -78,6 +79,20 @@ func NewSink(name string, sinkType SinkType, config *RawMessage, log logrus.Fiel } return xatu.New(name, conf, log, &filterConfig, shippingMethod) + case SinkTypeKafka: + conf := &kafka.Config{} + + if config != nil { + if err := config.Unmarshal(conf); err != nil { + return nil, err + } + } + + if err := defaults.Set(conf); err != nil { + return nil, err + } + + return kafka.New(name, conf, log, &filterConfig, shippingMethod) default: return nil, fmt.Errorf("sink type %s is unknown", sinkType) } diff --git a/pkg/output/sink.go b/pkg/output/sink.go index 5568e2ff..2b8362a8 100644 --- a/pkg/output/sink.go +++ b/pkg/output/sink.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ethpandaops/xatu/pkg/output/http" + "github.com/ethpandaops/xatu/pkg/output/kafka" "github.com/ethpandaops/xatu/pkg/output/stdout" xatuSink "github.com/ethpandaops/xatu/pkg/output/xatu" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -16,6 +17,7 @@ const ( SinkTypeHTTP SinkType = http.SinkType SinkTypeStdOut SinkType = stdout.SinkType SinkTypeXatu SinkType = xatuSink.SinkType + SinkTypeKafka SinkType = kafka.SinkType ) type Sink interface { From 4456b42467f75dba92a9b7c64645040620f34477 Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Mon, 16 Oct 2023 23:52:16 +0800 Subject: [PATCH 4/8] feat(output/kafka-sink): * fix typos. * Use flushBytes as a reference to check against msgByteSize. --- pkg/output/kafka/config.go | 2 +- pkg/output/kafka/exporter.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index 0f460654..da193c0d 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -22,7 +22,7 @@ type Config struct { func (c *Config) Validate() error { if c.Brokers == "" { - return errors.New("address is required") + return errors.New("brokers is required") } return nil diff --git a/pkg/output/kafka/exporter.go b/pkg/output/kafka/exporter.go index 75214dba..c35324e1 100644 --- a/pkg/output/kafka/exporter.go +++ b/pkg/output/kafka/exporter.go @@ -62,8 +62,6 @@ func (e ItemExporter) Shutdown(ctx context.Context) error { return nil } -const MAX_MSG_BYTE_SIZE = 1024 * 1024 - func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.DecoratedEvent) error { msgs := make([]*sarama.ProducerMessage, 0, len(items)) msgByteSize := 0 @@ -80,7 +78,7 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated Value: eventPayload, } msgByteSize = m.ByteSize(2) - if msgByteSize > MAX_MSG_BYTE_SIZE { + if msgByteSize > e.config.FlushBytes { e.log.WithField("event_id", routingKey).WithField("msg_size", msgByteSize).Debug("Message too large, consider increasing `max_message_bytes`") continue } From 0f3d26696aa73fc1f82d6ac65586c2b3013f9147 Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Tue, 17 Oct 2023 23:40:22 +0800 Subject: [PATCH 5/8] feat(output/kafka-sink): * fix topic validation. --- pkg/output/kafka/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index da193c0d..0b0d7305 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -24,6 +24,9 @@ func (c *Config) Validate() error { if c.Brokers == "" { return errors.New("brokers is required") } + if c.Topic == "" { + return errors.New("topic is required") + } return nil } From cb7f11f2f4bb10343459424d68f32e895d850966 Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Wed, 18 Oct 2023 21:01:41 +0800 Subject: [PATCH 6/8] feat(output/kafka-sink): fix lint --- pkg/output/kafka/client.go | 7 ++++++- pkg/output/kafka/config.go | 2 +- pkg/output/kafka/exporter.go | 21 ++++++++++++++++----- pkg/output/kafka/kafka.go | 1 + 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/pkg/output/kafka/client.go b/pkg/output/kafka/client.go index 93907e61..76b1e737 100644 --- a/pkg/output/kafka/client.go +++ b/pkg/output/kafka/client.go @@ -1,8 +1,9 @@ package kafka import ( - "github.com/IBM/sarama" "strings" + + "github.com/IBM/sarama" ) type CompressionStrategy string @@ -33,6 +34,7 @@ var ( func NewSyncProducer(config *Config) (sarama.SyncProducer, error) { producerConfig := Init(config) brokersList := strings.Split(config.Brokers, ",") + return sarama.NewSyncProducer(brokersList, producerConfig) } func Init(config *Config) *sarama.Config { @@ -51,6 +53,7 @@ func Init(config *Config) *sarama.Config { default: c.Producer.RequiredAcks = sarama.WaitForLocal } + switch config.Compression { case CompressionStrategyLZ4: c.Producer.Compression = sarama.CompressionLZ4 @@ -63,11 +66,13 @@ func Init(config *Config) *sarama.Config { default: c.Producer.Compression = sarama.CompressionNone } + switch config.Partitioning { case PartitionStrategyNone: c.Producer.Partitioner = sarama.NewHashPartitioner default: c.Producer.Partitioner = sarama.NewRandomPartitioner } + return c } diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index 0b0d7305..b8d14431 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -20,10 +20,10 @@ type Config struct { } func (c *Config) Validate() error { - if c.Brokers == "" { return errors.New("brokers is required") } + if c.Topic == "" { return errors.New("topic is required") } diff --git a/pkg/output/kafka/exporter.go b/pkg/output/kafka/exporter.go index c35324e1..0ea93d3d 100644 --- a/pkg/output/kafka/exporter.go +++ b/pkg/output/kafka/exporter.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "github.com/IBM/sarama" "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -20,7 +21,6 @@ type ItemExporter struct { } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { - producer, err := NewSyncProducer(config) if err != nil { @@ -29,6 +29,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE WithField("output_name", name). WithField("output_type", SinkType). Error("Error while creating the Kafka Client") + return ItemExporter{}, err } @@ -65,41 +66,51 @@ func (e ItemExporter) Shutdown(ctx context.Context) error { func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.DecoratedEvent) error { msgs := make([]*sarama.ProducerMessage, 0, len(items)) msgByteSize := 0 + for _, p := range items { r, err := protojson.Marshal(p) if err != nil { return err } - routingKey := sarama.StringEncoder(p.Event.Id) - eventPayload := sarama.StringEncoder(r) + + routingKey, eventPayload := sarama.StringEncoder(p.Event.Id), sarama.StringEncoder(r) m := &sarama.ProducerMessage{ Topic: e.config.Topic, Key: routingKey, Value: eventPayload, } + msgByteSize = m.ByteSize(2) if msgByteSize > e.config.FlushBytes { e.log.WithField("event_id", routingKey).WithField("msg_size", msgByteSize).Debug("Message too large, consider increasing `max_message_bytes`") + continue } + msgs = append(msgs, m) } - err := e.client.SendMessages(msgs) + errorCount := 0 + + err := e.client.SendMessages(msgs) if err != nil { var errs sarama.ProducerErrors if errors.As(err, &errs) { errorCount = len(errs) + for _, producerError := range errs { e.log. WithError(producerError.Err). - WithField("num_events", len(errs)). + WithField("events", errorCount). Error("Failed to send events to Kafka") + return producerError } } + return err } + e.log.WithField("count", len(msgs)-errorCount).Debug("Items written to Kafka") return nil diff --git a/pkg/output/kafka/kafka.go b/pkg/output/kafka/kafka.go index 673ab1ee..7db4becb 100644 --- a/pkg/output/kafka/kafka.go +++ b/pkg/output/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "github.com/ethpandaops/xatu/pkg/processor" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/sirupsen/logrus" From 5718e0db75327d8de02fa99407ff772e69a8d4bd Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Thu, 19 Oct 2023 01:33:46 +0800 Subject: [PATCH 7/8] feat(output/kafka): Update docs & example configs. --- docs/cannon.md | 108 ++++++++++++++++++++++++++++--------------- docs/mimicry.md | 67 +++++++++++++++++++++------ docs/sentry.md | 65 ++++++++++++++++++++------ docs/server.md | 5 ++ example_cannon.yaml | 12 +++++ example_mimicry.yaml | 12 +++++ example_sentry.yaml | 12 +++++ 7 files changed, 216 insertions(+), 65 deletions(-) diff --git a/docs/cannon.md b/docs/cannon.md index 1618868b..c6452b8d 100644 --- a/docs/cannon.md +++ b/docs/cannon.md @@ -39,42 +39,42 @@ Flags: Cannon requires a single `yaml` config file. An example file can be found [here](../example_cannon.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | -| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| name | string | | Unique name of the cannon | -| labels | object | | A key value map of labels to append to every cannon event | -| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | -| ethereum.beaconNodeAddress | object | | A key value map of headers | -| ethereum.overrideNetworkName | string | | Override the network name | -| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache | -| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks | -| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks | -| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading | -| coordinator.address | string | | The address of the [Xatu server](./server.md) | -| coordinator.tls | bool | | Server requires TLS | -| coordinator.headers | object | | A key value map of headers to append to requests | -| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver | -| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver | -| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver | -| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver | -| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver | -| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver | -| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | -| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver | -| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head | -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| outputs | array | | List of outputs for the cannon to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| Name| Type | Default | Description | +| --- | --- | --- |--------------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | +| name | string | | Unique name of the cannon | +| labels | object | | A key value map of labels to append to every cannon event | +| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | +| ethereum.beaconNodeAddress | object | | A key value map of headers | +| ethereum.overrideNetworkName | string | | Override the network name | +| ethereum.blockCacheSize | int | `1000` | The maximum number of blocks to cache | +| ethereum.blockCacheTtl | string | `1h` | The maximum duration to cache blocks | +| ethereum.blockPreloadWorkers | int | `5` | The number of workers to use for preloading blocks | +| ethereum.blockPreloadQueueSize | int | `5000` | The maximum number of blocks to queue for preloading | +| coordinator.address | string | | The address of the [Xatu server](./server.md) | +| coordinator.tls | bool | | Server requires TLS | +| coordinator.headers | object | | A key value map of headers to append to requests | +| derivers.attesterSlashing.enabled | bool | `true` | Enable the attester slashing deriver | +| derivers.attesterSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.blsToExecutionChange.enabled | bool | `true` | Enable the BLS to execution change deriver | +| derivers.blsToExecutionChange.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.deposit.enabled | bool | `true` | Enable the deposit deriver | +| derivers.deposit.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.withdrawal.enabled | bool | `true` | Enable the withdrawal deriver | +| derivers.withdrawal.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.executionTransaction.enabled | bool | `true` | Enable the execution transaction deriver | +| derivers.executionTransaction.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.proposerSlashing.enabled | bool | `true` | Enable the proposer slashing deriver | +| derivers.proposerSlashing.headSlotLag | int | `5` | The number of slots to lag behind the head | +| derivers.voluntaryExit.enabled | bool | `true` | Enable the voluntary exit deriver | +| derivers.voluntaryExit.headSlotLag | int | `5` | The number of slots to lag behind the head | +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| outputs | array | | List of outputs for the cannon to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Output `xatu` configuration @@ -103,6 +103,22 @@ Output configuration to send cannon events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -157,6 +173,21 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` ### Complex example with multiple outputs example ```yaml @@ -190,6 +221,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/mimicry.md b/docs/mimicry.md index 8e6d52ac..1d0b699f 100644 --- a/docs/mimicry.md +++ b/docs/mimicry.md @@ -36,22 +36,22 @@ Flags: Mimicry requires a single `yaml` config file. An example file can be found [here](../example_mimicry.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| Name| Type | Default | Description | +| --- | --- | --- |------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | | pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started | -| name | string | | Unique name of the mimicry | -| labels | object | | A key value map of labels to append to every mimicry event | -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| captureDelay | string | `3m` | Delay before starting to capture transactions | -| coordinator.type | string | | Type of output (`xatu`, `static`) | -| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) | -| outputs | array | | List of outputs for the mimicry to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| probeAddr | string | | The address for health probes. When ommited, the probe server will not be started | +| name | string | | Unique name of the mimicry | +| labels | object | | A key value map of labels to append to every mimicry event | +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| captureDelay | string | `3m` | Delay before starting to capture transactions | +| coordinator.type | string | | Type of output (`xatu`, `static`) | +| coordinator.config | object | | Coordinator type configuration [`xatu`](#coordinator-xatu-configuration)/[`static`](#coordinator-static-configuration) | +| outputs | array | | List of outputs for the mimicry to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, `stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Coordinator `xatu` configuration @@ -106,6 +106,22 @@ Output configuration to send mimicry events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -181,6 +197,22 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` + ### Complex example with multiple outputs example ```yaml @@ -218,6 +250,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/sentry.md b/docs/sentry.md index 58aa6f67..30fab8db 100644 --- a/docs/sentry.md +++ b/docs/sentry.md @@ -39,20 +39,20 @@ Flags: Sentry requires a single `yaml` config file. An example file can be found [here](../example_sentry.yaml) -| Name| Type | Default | Description | -| --- | --- | --- | --- | -| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | -| metricsAddr | string | `:9090` | The address the metrics server will listen on | -| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | -| name | string | | Unique name of the sentry | -| labels | object | | A key value map of labels to append to every sentry event | -| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | -| ethereum.beaconSubscriptions | array | | List of [topcis](https://ethereum.github.io/beacon-APIs/#/Events/eventstream) to subscribe to. If empty, all subscriptions are subscribed to. -| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | -| outputs | array | | List of outputs for the sentry to send data to | -| outputs[].name | string | | Name of the output | -| outputs[].type | string | | Type of output (`xatu`, `http`, `stdout`) | -| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration) | +| Name| Type | Default | Description | +| --- | --- | --- |-----------------------------------------------------------------------------------------------------------------------------------------------| +| logging | string | `warn` | Log level (`panic`, `fatal`, `warn`, `info`, `debug`, `trace`) | +| metricsAddr | string | `:9090` | The address the metrics server will listen on | +| pprofAddr | string | | The address the [pprof](https://github.com/google/pprof) server will listen on. When ommited, the pprof server will not be started | +| name | string | | Unique name of the sentry | +| labels | object | | A key value map of labels to append to every sentry event | +| ethereum.beaconNodeAddress | string | | [Ethereum consensus client](https://ethereum.org/en/developers/docs/nodes-and-clients/#consensus-clients) http server endpoint | +| ethereum.beaconSubscriptions | array | | List of [topcis](https://ethereum.github.io/beacon-APIs/#/Events/eventstream) to subscribe to. If empty, all subscriptions are subscribed to. +| ntpServer | string | `pool.ntp.org` | NTP server to calculate clock drift for events | +| outputs | array | | List of outputs for the sentry to send data to | +| outputs[].name | string | | Name of the output | +| outputs[].type | string | | Type of output (`xatu`, `http`, `kafka`, stdout`) | +| outputs[].config | object | | Output type configuration [`xatu`](#output-xatu-configuration)/[`http`](#output-http-configuration)/[`kafka`](#output-kafka-configuration) | ### Output `xatu` configuration @@ -81,6 +81,22 @@ Output configuration to send sentry events to a http server. | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +### Output `kafka` configuration + +Output configuration to send sentry events to a kafka server. + +| Name | Type | Default | Allowed Values | Description | +| ------------------------------- | ------ |-----------|-------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| +| outputs[].config.brokers | string | | | Comma delimited list of brokers. Eg: `localhost:19091,localhost:19092` | +| outputs[].config.topic | string | | | Name of the topic. | +| outputs[].config.flushFrequency | string | `3s` | | The maximum time a single batch can wait before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushMessages | int | `500` | | The maximum number of events in a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.flushBytes | int | `1000000` | | The maximum size (in bytes) of a single batch before a flush. Producer flushes the batch when the limit is reached. | +| outputs[].config.maxRetries | int | `3` | | The maximum retries allowed for a single batch delivery. The batch would be dropped, if the producer fails to flush with-in this limit. | +| outputs[].config.compression | string | `none` | `none` `gzip` `snappy` `lz4` `zstd` | Compression to use. | +| outputs[].config.requiredAcks | string | `leader` | `none` `leader` `all` | Number of ack's required for a succesful batch delivery. | +| outputs[].config.partitioning | string | `none` | `none` `random` | Paritioning to use for the distribution of messages across the partitions. | + ### Simple example ```yaml @@ -126,6 +142,22 @@ outputs: authorization: "Basic Someb64Value" ``` +### kafka server output example + +```yaml +name: example-instance-004 + +ethereum: + beaconNodeAddress: http://localhost:5052 + +outputs: +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events +``` + ### Complex example with multiple outputs example ```yaml @@ -156,6 +188,11 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Running locally diff --git a/docs/server.md b/docs/server.md index 3a47d89a..bddfa598 100644 --- a/docs/server.md +++ b/docs/server.md @@ -186,6 +186,11 @@ services: address: http://localhost:8081 headers: authorization: "Basic Someb64Value" + - name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events ``` ## Services diff --git a/example_cannon.yaml b/example_cannon.yaml index 7a27fbe1..2b3c531c 100644 --- a/example_cannon.yaml +++ b/example_cannon.yaml @@ -76,3 +76,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random diff --git a/example_mimicry.yaml b/example_mimicry.yaml index 37924ac6..8447c5dd 100644 --- a/example_mimicry.yaml +++ b/example_mimicry.yaml @@ -60,3 +60,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random diff --git a/example_sentry.yaml b/example_sentry.yaml index efb0cc10..3be1ebfa 100644 --- a/example_sentry.yaml +++ b/example_sentry.yaml @@ -80,3 +80,15 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 +- name: kafka-sink + type: kafka + config: + brokers: localhost:19092 + topic: events + flushFrequency: 1s + flushMessages: 500 + flushBytes: 1000000 + maxRetries: 6 + compression: snappy + requiredAcks: leader + partitioning: random From b48ebb9d0d0b84b3c984353e6137a5e0f4d9519f Mon Sep 17 00:00:00 2001 From: 0x-dx <145253945+0x-dx@users.noreply.github.com> Date: Thu, 19 Oct 2023 01:37:05 +0800 Subject: [PATCH 8/8] feat(output/kafka): fix config. --- pkg/output/kafka/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/output/kafka/config.go b/pkg/output/kafka/config.go index b8d14431..47eed42e 100644 --- a/pkg/output/kafka/config.go +++ b/pkg/output/kafka/config.go @@ -14,7 +14,7 @@ type Config struct { FlushMessages int `yaml:"flushMessages" default:"500"` FlushBytes int `yaml:"flushBytes" default:"1000000"` MaxRetries int `yaml:"maxRetries" default:"3"` - Compression CompressionStrategy `yaml:"compression" default:"snappy"` + Compression CompressionStrategy `yaml:"compression" default:"none"` RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"` Partitioning PartitionStrategy `yaml:"partitioning" default:"none"` }