Skip to content

Commit

Permalink
feat(output): Add Xatu retries
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 5, 2024
1 parent ec1c84a commit 9428f7c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
6 changes: 6 additions & 0 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ type Config struct {
ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Workers int `yaml:"workers" default:"1"`
Retry RetryConfig `yaml:"retry"`
}

type RetryConfig struct {
Interval time.Duration `yaml:"interval" default:"5s"`
MaxAttempts int `yaml:"maxAttempts" default:"5"`
}

func (c *Config) Validate() error {
Expand Down
35 changes: 31 additions & 4 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -13,10 +14,12 @@ import (
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type ItemExporter struct {
Expand Down Expand Up @@ -86,15 +89,39 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv
Events: items,
}

logCtx := e.log.WithField("num_events", len(items))

md := metadata.New(e.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

rsp, err := e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
return err
var rsp *pb.CreateEventsResponse

var err error

for attempt := 0; attempt < e.config.Retry.MaxAttempts; attempt++ {
rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
st, ok := status.FromError(err)

if ok && st.Code() == grpcCodes.Unknown || st.Code() == grpcCodes.Unavailable {
logCtx.
WithField("attempt", attempt+1).
WithField("max_attempts", e.config.Retry.MaxAttempts).
WithField("code", st.Code()).
Warn("Transient error occurred when exporting items, retrying...")

time.Sleep(e.config.Retry.Interval)

continue
}

return err
}

break
}

e.log.WithField("response", rsp).Debug("Received response from Xatu sink")
logCtx.WithField("response", rsp).Debug("Received response from Xatu sink")

return nil
}
13 changes: 13 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ func (x *Xatu) startGrpcServer(ctx context.Context) error {
Time: 1 * time.Minute,
Timeout: 15 * time.Second,
}),
grpc.ChainUnaryInterceptor(
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
x.log.
WithField("method", info.FullMethod).
WithError(err).
Error("RPC Error")
}

return resp, err
},
),
}
x.grpcServer = grpc.NewServer(opts...)

Expand Down

0 comments on commit 9428f7c

Please sign in to comment.