Skip to content

Commit

Permalink
feat: Use middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 8, 2024
1 parent 3f1f170 commit 1c2e8dc
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -150,8 +151,8 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.18.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
Expand Down Expand Up @@ -479,10 +481,15 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSm
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ=
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro=
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU=
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU=
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
11 changes: 6 additions & 5 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ type Config struct {
Retry RetryConfig `yaml:"retry"`
}

type RetryConfig struct {
Interval time.Duration `yaml:"interval" default:"5s"`
MaxAttempts int `yaml:"maxAttempts" default:"5"`
}

func (c *Config) Validate() error {
if c.Address == "" {
return errors.New("address is required")
}

return nil
}

type RetryConfig struct {
Enabled bool `yaml:"enabled" default:"true"`
Scalar time.Duration `yaml:"scalar" default:"0.5s"`
MaxAttempts int `yaml:"maxAttempts" default:"3"`
}
48 changes: 30 additions & 18 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
Expand All @@ -27,27 +29,12 @@ type ItemExporter struct {
conn *grpc.ClientConn
}

var (
retryPolicy = `{
"methodConfig": [{
"name": [{"service": "xatu.EventIngester"}],
"waitForReady": false,
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": "1s",
"MaxBackoff": "15s",
"BackoffMultiplier": 1.5,
"RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN", "INTERNAL" ]
}
}]}
`
)

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithStreamInterceptor(retry.StreamClientInterceptor()),
grpc.WithUnaryInterceptor(retry.UnaryClientInterceptor()),
}

if config.TLS {
Expand Down Expand Up @@ -112,7 +99,32 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv

var err error

rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
opts := []grpc.CallOption{
grpc.UseCompressor(gzip.Name),
}

if e.config.Retry.Enabled {
opts = append(opts,
retry.WithOnRetryCallback(func(ctx context.Context, attempt uint, err error) {
logCtx.
WithField("attempt", attempt).
WithError(err).
Warn("Failed to export events. Retrying...")
}),
retry.WithMax(uint(e.config.Retry.MaxAttempts)),
retry.WithBackoff(retry.BackoffExponential(e.config.Retry.Scalar)),
retry.WithCodes(
grpc_codes.Unavailable,
grpc_codes.Internal,
grpc_codes.ResourceExhausted,
grpc_codes.Unknown,
grpc_codes.Unauthenticated,
grpc_codes.Canceled,
),
)
}

rsp, err = e.client.CreateEvents(ctx, req, opts...)
if err != nil {
return err
}
Expand Down

0 comments on commit 1c2e8dc

Please sign in to comment.