diff --git a/pkg/processor/batch.go b/pkg/processor/batch.go index 924a7195..52d25e42 100644 --- a/pkg/processor/batch.go +++ b/pkg/processor/batch.go @@ -135,10 +135,7 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log opt(&o) } - metrics, err := NewMetrics("xatu", name) - if err != nil { - return nil, err - } + metrics := DefaultMetrics bvp := BatchItemProcessor[T]{ e: exporter, @@ -165,10 +162,14 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log // OnEnd method enqueues a item for later processing. func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error { + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + if bvp.o.ShippingMethod == ShippingMethodSync { return bvp.ImmediatelyExportItems(ctx, s) } + bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue))) + // Do not enqueue items if we are just going to drop them. if bvp.e == nil { return nil @@ -205,7 +206,7 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it err := bvp.exportWithTimeout(ctx, itemsBatch) - bvp.metrics.IncItemsExportedBy(float64(len(itemsBatch))) + bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch))) if err != nil { return err @@ -329,8 +330,6 @@ func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption { // exportItems is a subroutine of processing and draining the queue. func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error { - bvp.metrics.SetItemsQueued(float64(len(bvp.queue))) - bvp.timer.Reset(bvp.o.BatchTimeout) bvp.batchMutex.Lock() @@ -351,13 +350,9 @@ func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error { "total_dropped": atomic.LoadUint32(&bvp.dropped), }).Debug("exporting items") - defer func() { - bvp.metrics.SetItemsDropped(float64(atomic.LoadUint32(&bvp.dropped))) - }() - err := bvp.e.ExportItems(ctx, bvp.batch) - bvp.metrics.IncItemsExportedBy(float64(countItemsToExport)) + bvp.metrics.IncItemsExportedBy(bvp.name, float64(countItemsToExport)) // A new batch is always created after exporting, even if the batch failed to be exported. // @@ -476,6 +471,7 @@ func (bvp *BatchItemProcessor[T]) enqueueDrop(ctx context.Context, sd *T) bool { return true default: atomic.AddUint32(&bvp.dropped, 1) + bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1)) } return false diff --git a/pkg/processor/metrics.go b/pkg/processor/metrics.go index 796d0a19..6f20ab5c 100644 --- a/pkg/processor/metrics.go +++ b/pkg/processor/metrics.go @@ -2,15 +2,17 @@ package processor import "github.com/prometheus/client_golang/prometheus" -type Metrics struct { - name string +var ( + DefaultMetrics = NewMetrics("xatu") +) +type Metrics struct { itemsQueued *prometheus.GaugeVec - itemsDropped *prometheus.GaugeVec + itemsDropped *prometheus.CounterVec itemsExported *prometheus.CounterVec } -func NewMetrics(namespace, name string) (*Metrics, error) { +func NewMetrics(namespace string) *Metrics { if namespace != "" { namespace += "_" } @@ -23,7 +25,7 @@ func NewMetrics(namespace, name string) (*Metrics, error) { Namespace: namespace, Help: "Number of items queued", }, []string{"processor"}), - itemsDropped: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + itemsDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "items_dropped_total", Namespace: namespace, Help: "Number of items dropped", @@ -35,33 +37,21 @@ func NewMetrics(namespace, name string) (*Metrics, error) { }, []string{"processor"}), } - dupMetricsCollectorError := "duplicate metrics collector registration attempted" - - if err := prometheus.Register(m.itemsQueued); err != nil && err.Error() != dupMetricsCollectorError { - return nil, err - } - - if err := prometheus.Register(m.itemsDropped); err != nil && err.Error() != dupMetricsCollectorError { - return nil, err - } - - if err := prometheus.Register(m.itemsExported); err != nil && err.Error() != dupMetricsCollectorError { - return nil, err - } - - m.name = name + prometheus.MustRegister(m.itemsQueued) + prometheus.MustRegister(m.itemsDropped) + prometheus.MustRegister(m.itemsExported) - return m, nil + return m } -func (m *Metrics) SetItemsQueued(count float64) { - m.itemsQueued.WithLabelValues(m.name).Set(count) +func (m *Metrics) SetItemsQueued(name string, count float64) { + m.itemsQueued.WithLabelValues(name).Set(count) } -func (m *Metrics) SetItemsDropped(count float64) { - m.itemsDropped.WithLabelValues(m.name).Set(count) +func (m *Metrics) IncItemsDroppedBy(name string, count float64) { + m.itemsDropped.WithLabelValues(name).Add(count) } -func (m *Metrics) IncItemsExportedBy(count float64) { - m.itemsExported.WithLabelValues(m.name).Add(count) +func (m *Metrics) IncItemsExportedBy(name string, count float64) { + m.itemsExported.WithLabelValues(name).Add(count) }