From 1d8e0c97431ba2b26c19f3b910a0a2386835f9b7 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 5 Apr 2024 16:10:18 +1000 Subject: [PATCH] feat: Add new prometheus counter for failed items --- pkg/output/xatu/exporter.go | 44 +++++++++++++++++-------------------- pkg/processor/batch.go | 4 ++++ pkg/processor/metrics.go | 11 ++++++++++ 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index cb025427..b77a6d7e 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "time" "github.com/ethpandaops/xatu/pkg/observability" pb "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -14,12 +13,10 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - grpcCodes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) type ItemExporter struct { @@ -30,10 +27,27 @@ type ItemExporter struct { conn *grpc.ClientConn } +var ( + retryPolicy = `{ + "methodConfig": [{ + "name": [{"service": "xatu.EventIngester"}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 5, + "InitialBackoff": ".01s", + "MaxBackoff": "15s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN" ] + } + }]} + ` +) + func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { opts := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), + grpc.WithDefaultServiceConfig(retryPolicy), } if config.TLS { @@ -98,27 +112,9 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv var err error - for attempt := 0; attempt < e.config.Retry.MaxAttempts; attempt++ { - rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) - if err != nil { - st, ok := status.FromError(err) - - if ok && st.Code() == grpcCodes.Unknown || st.Code() == grpcCodes.Unavailable { - logCtx. - WithField("attempt", attempt+1). - WithField("max_attempts", e.config.Retry.MaxAttempts). - WithField("code", st.Code()). - Warn("Transient error occurred when exporting items, retrying...") - - time.Sleep(e.config.Retry.Interval) - - continue - } - - return err - } - - break + rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) + if err != nil { + return err } logCtx.WithField("response", rsp).Debug("Received response from Xatu sink") diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 632539ee..61fe0f66 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -229,6 +229,8 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it err := bvp.exportWithTimeout(ctx, itemsBatch) if err != nil { + bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) + return err } } @@ -250,6 +252,8 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa err := bvp.e.ExportItems(ctx, itemsBatch) if err != nil { + bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch))) + return err } diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index 6f20ab5c..82ff8947 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -9,6 +9,7 @@ var ( type Metrics struct { itemsQueued *prometheus.GaugeVec itemsDropped *prometheus.CounterVec + itemsFailed *prometheus.CounterVec itemsExported *prometheus.CounterVec } @@ -30,6 +31,11 @@ func NewMetrics(namespace string) *Metrics { Namespace: namespace, Help: "Number of items dropped", }, []string{"processor"}), + itemsFailed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "items_failed_total", + Namespace: namespace, + Help: "Number of items failed", + }, []string{"processor"}), itemsExported: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "items_exported_total", Namespace: namespace, @@ -39,6 +45,7 @@ func NewMetrics(namespace string) *Metrics { prometheus.MustRegister(m.itemsQueued) prometheus.MustRegister(m.itemsDropped) + prometheus.MustRegister(m.itemsFailed) prometheus.MustRegister(m.itemsExported) return m @@ -55,3 +62,7 @@ func (m *Metrics) IncItemsDroppedBy(name string, count float64) { func (m *Metrics) IncItemsExportedBy(name string, count float64) { m.itemsExported.WithLabelValues(name).Add(count) } + +func (m *Metrics) IncItemsFailedBy(name string, count float64) { + m.itemsFailed.WithLabelValues(name).Add(count) +}