Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Metrics fix #219

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
opt(&o)
}

metrics, err := NewMetrics("xatu", name)
if err != nil {
return nil, err
}
metrics := DefaultMetrics

bvp := BatchItemProcessor[T]{
e: exporter,
Expand All @@ -165,10 +162,14 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log

// OnEnd method enqueues a item for later processing.
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))

if bvp.o.ShippingMethod == ShippingMethodSync {
return bvp.ImmediatelyExportItems(ctx, s)
}

bvp.metrics.SetItemsQueued(bvp.name, float64(len(bvp.queue)))

// Do not enqueue items if we are just going to drop them.
if bvp.e == nil {
return nil
Expand Down Expand Up @@ -205,7 +206,7 @@ func (bvp *BatchItemProcessor[T]) ImmediatelyExportItems(ctx context.Context, it

err := bvp.exportWithTimeout(ctx, itemsBatch)

bvp.metrics.IncItemsExportedBy(float64(len(itemsBatch)))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch)))

if err != nil {
return err
Expand Down Expand Up @@ -329,8 +330,6 @@ func WithShippingMethod(method ShippingMethod) BatchItemProcessorOption {

// exportItems is a subroutine of processing and draining the queue.
func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error {
bvp.metrics.SetItemsQueued(float64(len(bvp.queue)))

bvp.timer.Reset(bvp.o.BatchTimeout)

bvp.batchMutex.Lock()
Expand All @@ -351,13 +350,9 @@ func (bvp *BatchItemProcessor[T]) exportItems(ctx context.Context) error {
"total_dropped": atomic.LoadUint32(&bvp.dropped),
}).Debug("exporting items")

defer func() {
bvp.metrics.SetItemsDropped(float64(atomic.LoadUint32(&bvp.dropped)))
}()

err := bvp.e.ExportItems(ctx, bvp.batch)

bvp.metrics.IncItemsExportedBy(float64(countItemsToExport))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(countItemsToExport))

// A new batch is always created after exporting, even if the batch failed to be exported.
//
Expand Down Expand Up @@ -476,6 +471,7 @@ func (bvp *BatchItemProcessor[T]) enqueueDrop(ctx context.Context, sd *T) bool {
return true
default:
atomic.AddUint32(&bvp.dropped, 1)
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))
}

return false
Expand Down
44 changes: 17 additions & 27 deletions pkg/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package processor

import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
name string
var (
DefaultMetrics = NewMetrics("xatu")
)

type Metrics struct {
itemsQueued *prometheus.GaugeVec
itemsDropped *prometheus.GaugeVec
itemsDropped *prometheus.CounterVec
itemsExported *prometheus.CounterVec
}

func NewMetrics(namespace, name string) (*Metrics, error) {
func NewMetrics(namespace string) *Metrics {
if namespace != "" {
namespace += "_"
}
Expand All @@ -23,7 +25,7 @@ func NewMetrics(namespace, name string) (*Metrics, error) {
Namespace: namespace,
Help: "Number of items queued",
}, []string{"processor"}),
itemsDropped: prometheus.NewGaugeVec(prometheus.GaugeOpts{
itemsDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "items_dropped_total",
Namespace: namespace,
Help: "Number of items dropped",
Expand All @@ -35,33 +37,21 @@ func NewMetrics(namespace, name string) (*Metrics, error) {
}, []string{"processor"}),
}

dupMetricsCollectorError := "duplicate metrics collector registration attempted"

if err := prometheus.Register(m.itemsQueued); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

if err := prometheus.Register(m.itemsDropped); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

if err := prometheus.Register(m.itemsExported); err != nil && err.Error() != dupMetricsCollectorError {
return nil, err
}

m.name = name
prometheus.MustRegister(m.itemsQueued)
prometheus.MustRegister(m.itemsDropped)
prometheus.MustRegister(m.itemsExported)

return m, nil
return m
}

func (m *Metrics) SetItemsQueued(count float64) {
m.itemsQueued.WithLabelValues(m.name).Set(count)
func (m *Metrics) SetItemsQueued(name string, count float64) {
m.itemsQueued.WithLabelValues(name).Set(count)
}

func (m *Metrics) SetItemsDropped(count float64) {
m.itemsDropped.WithLabelValues(m.name).Set(count)
func (m *Metrics) IncItemsDroppedBy(name string, count float64) {
m.itemsDropped.WithLabelValues(name).Add(count)
}

func (m *Metrics) IncItemsExportedBy(count float64) {
m.itemsExported.WithLabelValues(m.name).Add(count)
func (m *Metrics) IncItemsExportedBy(name string, count float64) {
m.itemsExported.WithLabelValues(name).Add(count)
}
Loading