Skip to content

Commit

Permalink
feat: Add new prometheus counter for failed items
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 5, 2024
1 parent 9c6ac96 commit 1d8e0c9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 24 deletions.
44 changes: 20 additions & 24 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -14,12 +13,10 @@ import (
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type ItemExporter struct {
Expand All @@ -30,10 +27,27 @@ type ItemExporter struct {
conn *grpc.ClientConn
}

var (
retryPolicy = `{
"methodConfig": [{
"name": [{"service": "xatu.EventIngester"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": ".01s",
"MaxBackoff": "15s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE", "UNKNOWN" ]
}
}]}
`
)

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithDefaultServiceConfig(retryPolicy),
}

if config.TLS {
Expand Down Expand Up @@ -98,27 +112,9 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv

var err error

for attempt := 0; attempt < e.config.Retry.MaxAttempts; attempt++ {
rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
st, ok := status.FromError(err)

if ok && st.Code() == grpcCodes.Unknown || st.Code() == grpcCodes.Unavailable {
logCtx.
WithField("attempt", attempt+1).
WithField("max_attempts", e.config.Retry.MaxAttempts).
WithField("code", st.Code()).
Warn("Transient error occurred when exporting items, retrying...")

time.Sleep(e.config.Retry.Interval)

continue
}

return err
}

break
rsp, err = e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
return err
}

logCtx.WithField("response", rsp).Debug("Received response from Xatu sink")
Expand Down
4 changes: 4 additions & 0 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it
err := bvp.exportWithTimeout(ctx, itemsBatch)

if err != nil {
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch)))

return err
}
}
Expand All @@ -250,6 +252,8 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa

err := bvp.e.ExportItems(ctx, itemsBatch)
if err != nil {
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch)))

return err
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var (
type Metrics struct {
itemsQueued *prometheus.GaugeVec
itemsDropped *prometheus.CounterVec
itemsFailed *prometheus.CounterVec
itemsExported *prometheus.CounterVec
}

Expand All @@ -30,6 +31,11 @@ func NewMetrics(namespace string) *Metrics {
Namespace: namespace,
Help: "Number of items dropped",
}, []string{"processor"}),
itemsFailed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "items_failed_total",
Namespace: namespace,
Help: "Number of items failed",
}, []string{"processor"}),
itemsExported: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "items_exported_total",
Namespace: namespace,
Expand All @@ -39,6 +45,7 @@ func NewMetrics(namespace string) *Metrics {

prometheus.MustRegister(m.itemsQueued)
prometheus.MustRegister(m.itemsDropped)
prometheus.MustRegister(m.itemsFailed)
prometheus.MustRegister(m.itemsExported)

return m
Expand All @@ -55,3 +62,7 @@ func (m *Metrics) IncItemsDroppedBy(name string, count float64) {
func (m *Metrics) IncItemsExportedBy(name string, count float64) {
m.itemsExported.WithLabelValues(name).Add(count)
}

func (m *Metrics) IncItemsFailedBy(name string, count float64) {
m.itemsFailed.WithLabelValues(name).Add(count)
}

0 comments on commit 1d8e0c9

Please sign in to comment.