Skip to content

Commit

Permalink
fix(processor): Metrics fix
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Sep 29, 2023
1 parent f7abb76 commit 89e231d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 39 deletions.
20 changes: 8 additions & 12 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
opt(&o)
}

metrics, err := NewMetrics("xatu", name)
if err != nil {
return nil, err
}
metrics := DefaultMetrics

bvp := BatchItemProcessor[T]{
e: exporter,
Expand All @@ -165,10 +162,14 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log

// OnEnd method enqueues a item for later processing.
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))

if bvp.o.ShippingMethod == ShippingMethodSync {
return bvp.ImmediatelyExportItems(ctx, s)
}

bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))

// Do not enqueue items if we are just going to drop them.
if bvp.e == nil {
return nil
Expand Down Expand Up @@ -205,7 +206,7 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it

err := bvp.exportWithTimeout(ctx, itemsBatch)

bvp.metrics.IncItemsExportedBy(float64(len(itemsBatch)))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch)))

if err != nil {
return err
Expand Down Expand Up @@ -329,8 +330,6 @@ func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption {

// exportItems is a subroutine of processing and draining the queue.
func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error {
bvp.metrics.SetItemsQueued(float64(len(bvp.queue)))

bvp.timer.Reset(bvp.o.BatchTimeout)

bvp.batchMutex.Lock()
Expand All @@ -351,13 +350,9 @@ func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error {
"total_dropped": atomic.LoadUint32(&bvp.dropped),
}).Debug("exporting items")

defer func() {
bvp.metrics.SetItemsDropped(float64(atomic.LoadUint32(&bvp.dropped)))
}()

err := bvp.e.ExportItems(ctx, bvp.batch)

bvp.metrics.IncItemsExportedBy(float64(countItemsToExport))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(countItemsToExport))

// A new batch is always created after exporting, even if the batch failed to be exported.
//
Expand Down Expand Up @@ -476,6 +471,7 @@ func (bvp *BatchItemProcessor[T]) enqueueDrop(ctx context.Context, sd *T) bool {
return true
default:
atomic.AddUint32(&bvp.dropped, 1)
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))
}

return false
Expand Down
44 changes: 17 additions & 27 deletions pkg/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package processor

import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
name string
var (
DefaultMetrics = NewMetrics("xatu")
)

type Metrics struct {
itemsQueued *prometheus.GaugeVec
itemsDropped *prometheus.GaugeVec
itemsDropped *prometheus.CounterVec
itemsExported *prometheus.CounterVec
}

func NewMetrics(namespace, name string) (*Metrics, error) {
func NewMetrics(namespace string) *Metrics {
if namespace != "" {
namespace += "_"
}
Expand All @@ -23,7 +25,7 @@ func NewMetrics(namespace, name string) (*Metrics, error) {
Namespace: namespace,
Help: "Number of items queued",
}, []string{"processor"}),
itemsDropped: prometheus.NewGaugeVec(prometheus.GaugeOpts{
itemsDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "items_dropped_total",
Namespace: namespace,
Help: "Number of items dropped",
Expand All @@ -35,33 +37,21 @@ func NewMetrics(namespace, name string) (*Metrics, error) {
}, []string{"processor"}),
}

dupMetricsCollectorError := "duplicate metrics collector registration attempted"

if err := prometheus.Register(m.itemsQueued); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

if err := prometheus.Register(m.itemsDropped); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

if err := prometheus.Register(m.itemsExported); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

m.name = name
prometheus.MustRegister(m.itemsQueued)
prometheus.MustRegister(m.itemsDropped)
prometheus.MustRegister(m.itemsExported)

return m, nil
return m
}

func (m *Metrics) SetItemsQueued(count float64) {
m.itemsQueued.WithLabelValues(m.name).Set(count)
func (m *Metrics) SetItemsQueued(name string, count float64) {
m.itemsQueued.WithLabelValues(name).Set(count)
}

func (m *Metrics) SetItemsDropped(count float64) {
m.itemsDropped.WithLabelValues(m.name).Set(count)
func (m *Metrics) IncItemsDroppedBy(name string, count float64) {
m.itemsDropped.WithLabelValues(name).Add(count)
}

func (m *Metrics) IncItemsExportedBy(count float64) {
m.itemsExported.WithLabelValues(m.name).Add(count)
func (m *Metrics) IncItemsExportedBy(name string, count float64) {
m.itemsExported.WithLabelValues(name).Add(count)
}

0 comments on commit 89e231d

Please sign in to comment.