diff --git a/go.mod b/go.mod index 456daed3..4f69c6ca 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gorilla/websocket v1.5.1 // indirect + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -150,8 +151,8 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.18.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect lukechampine.com/blake3 v1.2.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/go.sum b/go.sum index e89257a4..8284a6c6 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= @@ -479,10 +481,15 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSm golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ= google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= +google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU= google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= +google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8= +google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM= google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index 5783bac4..902d2232 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -17,11 +17,6 @@ type Config struct { Retry RetryConfig `yaml:"retry"` } -type RetryConfig struct { - Interval time.Duration `yaml:"interval" default:"5s"` - MaxAttempts int `yaml:"maxAttempts" default:"5"` -} - func (c *Config) Validate() error { if c.Address == "" { return errors.New("address is required") @@ -29,3 +24,9 @@ func (c *Config) Validate() error { return nil } + +type RetryConfig struct { + Enabled bool `yaml:"enabled" default:"true"` + Scalar time.Duration `yaml:"scalar" default:"0.5s"` + MaxAttempts int `yaml:"maxAttempts" default:"3"` +} diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index de8112bb..c5469ac3 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -7,12 +7,14 @@ import ( "github.com/ethpandaops/xatu/pkg/observability" pb "github.com/ethpandaops/xatu/pkg/proto/xatu" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + grpc_codes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" @@ -27,27 +29,12 @@ type ItemExporter struct { conn *grpc.ClientConn } -var ( - retryPolicy = `{ - "methodConfig": [{ - "name": [{"service": "xatu.EventIngester"}], - "waitForReady": false, - "retryPolicy": { - "MaxAttempts": 5, - "InitialBackoff": "1s", - "MaxBackoff": "15s", - "BackoffMultiplier": 1.5, - "RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN", "INTERNAL" ] - } - }]} - ` -) - func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { opts := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), - grpc.WithDefaultServiceConfig(retryPolicy), + grpc.WithStreamInterceptor(retry.StreamClientInterceptor()), + grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor()), } if config.TLS { @@ -112,7 +99,32 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv var err error - rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) + opts := []grpc.CallOption{ + grpc.UseCompressor(gzip.Name), + } + + if e.config.Retry.Enabled { + opts = append(opts, + retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) { + logCtx. + WithField("attempt", attempt). + WithError(err). + Warn("Failed to export events. Retrying...") + }), + retry.WithMax(uint(e.config.Retry.MaxAttempts)), + retry.WithBackoff(retry.BackoffExponential(e.config.Retry.Scalar)), + retry.WithCodes( + grpc_codes.Unavailable, + grpc_codes.Internal, + grpc_codes.ResourceExhausted, + grpc_codes.Unknown, + grpc_codes.Unauthenticated, + grpc_codes.Canceled, + ), + ) + } + + rsp, err = e.client.CreateEvents(ctx, req, opts...) if err != nil { return err }