From 9428f7c9dd8784257f840256472348b66914ef5d Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 5 Apr 2024 15:43:57 +1000 Subject: [PATCH] feat(output): Add Xatu retries --- pkg/output/xatu/config.go | 6 ++++++ pkg/output/xatu/exporter.go | 35 +++++++++++++++++++++++++++++++---- pkg/server/server.go | 13 +++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index d8900f8d..5783bac4 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -14,6 +14,12 @@ type Config struct { ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` Workers int `yaml:"workers" default:"1"` + Retry RetryConfig `yaml:"retry"` +} + +type RetryConfig struct { + Interval time.Duration `yaml:"interval" default:"5s"` + MaxAttempts int `yaml:"maxAttempts" default:"5"` } func (c *Config) Validate() error { diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index 911a44b3..cb025427 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" "github.com/ethpandaops/xatu/pkg/observability" pb "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -13,10 +14,12 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + grpcCodes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) type ItemExporter struct { @@ -86,15 +89,39 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv Events: items, } + logCtx := e.log.WithField("num_events", len(items)) + md := metadata.New(e.config.Headers) ctx = metadata.NewOutgoingContext(ctx, md) - rsp, err := e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) - if err != nil { - return err + var rsp *pb.CreateEventsResponse + + var err error + + for attempt := 0; attempt < e.config.Retry.MaxAttempts; attempt++ { + rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) + if err != nil { + st, ok := status.FromError(err) + + if ok && st.Code() == grpcCodes.Unknown || st.Code() == grpcCodes.Unavailable { + logCtx. + WithField("attempt", attempt+1). + WithField("max_attempts", e.config.Retry.MaxAttempts). + WithField("code", st.Code()). + Warn("Transient error occurred when exporting items, retrying...") + + time.Sleep(e.config.Retry.Interval) + + continue + } + + return err + } + + break } - e.log.WithField("response", rsp).Debug("Received response from Xatu sink") + logCtx.WithField("response", rsp).Debug("Received response from Xatu sink") return nil } diff --git a/pkg/server/server.go b/pkg/server/server.go index 2f6a4df5..d46bbe3b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -237,6 +237,19 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error { Time: 1 * time.Minute, Timeout: 15 * time.Second, }), + grpc.ChainUnaryInterceptor( + func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + resp, err := handler(ctx, req) + if err != nil { + x.log. + WithField("method", info.FullMethod). + WithError(err). + Error("RPC Error") + } + + return resp, err + }, + ), } x.grpcServer = grpc.NewServer(opts...)