From 4173486ebb686602219db28537be154bf8c0c906 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani Date: Mon, 11 Mar 2024 18:09:27 +0100 Subject: [PATCH 1/4] add tests Add metrics tests for both old and new behaviour --- plugin/kotel/meter_test.go | 173 +++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/plugin/kotel/meter_test.go b/plugin/kotel/meter_test.go index ed9fe6f9..35630bc3 100644 --- a/plugin/kotel/meter_test.go +++ b/plugin/kotel/meter_test.go @@ -1,11 +1,21 @@ package kotel import ( + "context" + "errors" + "net" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kgo" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" ) @@ -38,3 +48,166 @@ func TestWithMeter(t *testing.T) { }) } } + +func TestHook_OnBrokerConnect(t *testing.T) { + t.Run("success path with mergeConnectsMeter:false", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp)) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + t.Run("failure path with mergeConnectsMeter:false", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp)) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error")) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connect_errors.count", + Description: "Total number of connection errors, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + + t.Run("success path with mergeConnectsMeter:true", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter()) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + attribute.String("outcome", "success"), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + t.Run("failure path with mergeConnectsMeter:true", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter()) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error")) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + attribute.String("outcome", "failure"), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + +} From ac43475e3b2deb2ff8d7c6959dd95355bb72cd22 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani Date: Mon, 11 Mar 2024 18:09:47 +0100 Subject: [PATCH 2/4] add new behaviour --- plugin/kotel/meter.go | 55 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/plugin/kotel/meter.go b/plugin/kotel/meter.go index 7ac92abb..8232e8f1 100644 --- a/plugin/kotel/meter.go +++ b/plugin/kotel/meter.go @@ -33,6 +33,8 @@ type Meter struct { provider metric.MeterProvider meter metric.Meter instruments instruments + + mergeConnectsMeter bool } // MeterOpt interface used for setting optional config properties. @@ -52,6 +54,17 @@ func MeterProvider(provider metric.MeterProvider) MeterOpt { }) } +// WithMergedConnectsMeter merges the `messaging.kafka.connect_errors.count` +// meter into the `messaging.kafka.connects.count` meter, adding an attribute +// "outcome" with the values "success" or "failure". This option may be used because +// ... +func WithMergedConnectsMeter() MeterOpt { + return meterOptFunc(func(m *Meter) { + m.mergeConnectsMeter = true + }) + +} + func (o meterOptFunc) apply(m *Meter) { o(m) } @@ -105,13 +118,17 @@ func (m *Meter) newInstruments() instruments { log.Printf("failed to create connects instrument, %v", err) } - connectErrs, err := m.meter.Int64Counter( - "messaging.kafka.connect_errors.count", - metric.WithUnit(dimensionless), - metric.WithDescription("Total number of connection errors, by broker"), - ) - if err != nil { - log.Printf("failed to create connectErrs instrument, %v", err) + var connectErrs metric.Int64Counter + if !m.mergeConnectsMeter { + var err error + connectErrs, err = m.meter.Int64Counter( + "messaging.kafka.connect_errors.count", + metric.WithUnit(dimensionless), + metric.WithDescription("Total number of connection errors, by broker"), + ) + if err != nil { + log.Printf("failed to create connectErrs instrument, %v", err) + } } disconnects, err := m.meter.Int64Counter( @@ -232,6 +249,30 @@ func strnode(node int32) string { func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strnode(meta.NodeID) + + if m.mergeConnectsMeter { + if err != nil { + m.instruments.connects.Add( + context.Background(), + 1, + metric.WithAttributes( + attribute.String("node_id", node), + attribute.String("outcome", "failure"), + ), + ) + return + } + m.instruments.connects.Add( + context.Background(), + 1, + metric.WithAttributes( + attribute.String("node_id", node), + attribute.String("outcome", "success"), + ), + ) + return + } + attributes := attribute.NewSet(attribute.String("node_id", node)) if err != nil { m.instruments.connectErrs.Add( From 7e7b669f747ea83df048f3b10188069f34ab0bb1 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani Date: Mon, 25 Mar 2024 11:52:40 +0100 Subject: [PATCH 3/4] use attribute.Set small change to align on using attribute.Set for metrics attributes. --- plugin/kotel/meter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/kotel/meter.go b/plugin/kotel/meter.go index 8232e8f1..5cfb4efb 100644 --- a/plugin/kotel/meter.go +++ b/plugin/kotel/meter.go @@ -255,20 +255,20 @@ func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net. m.instruments.connects.Add( context.Background(), 1, - metric.WithAttributes( + metric.WithAttributeSet(attribute.NewSet( attribute.String("node_id", node), attribute.String("outcome", "failure"), - ), + )), ) return } m.instruments.connects.Add( context.Background(), 1, - metric.WithAttributes( + metric.WithAttributeSet(attribute.NewSet( attribute.String("node_id", node), attribute.String("outcome", "success"), - ), + )), ) return } From 17f85d6e3a6936fa2953894e00acfcd886249d36 Mon Sep 17 00:00:00 2001 From: Edoardo Tenani Date: Mon, 25 Mar 2024 12:05:54 +0100 Subject: [PATCH 4/4] complete documentation --- plugin/kotel/meter.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/plugin/kotel/meter.go b/plugin/kotel/meter.go index 5cfb4efb..8c56a8c7 100644 --- a/plugin/kotel/meter.go +++ b/plugin/kotel/meter.go @@ -55,9 +55,12 @@ func MeterProvider(provider metric.MeterProvider) MeterOpt { } // WithMergedConnectsMeter merges the `messaging.kafka.connect_errors.count` -// meter into the `messaging.kafka.connects.count` meter, adding an attribute -// "outcome" with the values "success" or "failure". This option may be used because -// ... +// counter into the `messaging.kafka.connects.count` counter, adding an +// attribute "outcome" with the values "success" or "failure". This option +// shall be used when a single metric with different dimensions is preferred +// over two separate metrics that produce data at alternating intervals. +// For example, it becomes possible to alert on the metric no longer +// producing data. func WithMergedConnectsMeter() MeterOpt { return meterOptFunc(func(m *Meter) { m.mergeConnectsMeter = true