diff --git a/plugin/kotel/meter.go b/plugin/kotel/meter.go index 7ac92abb..8c56a8c7 100644 --- a/plugin/kotel/meter.go +++ b/plugin/kotel/meter.go @@ -33,6 +33,8 @@ type Meter struct { provider metric.MeterProvider meter metric.Meter instruments instruments + + mergeConnectsMeter bool } // MeterOpt interface used for setting optional config properties. @@ -52,6 +54,20 @@ func MeterProvider(provider metric.MeterProvider) MeterOpt { }) } +// WithMergedConnectsMeter merges the `messaging.kafka.connect_errors.count` +// counter into the `messaging.kafka.connects.count` counter, adding an +// attribute "outcome" with the values "success" or "failure". This option +// shall be used when a single metric with different dimensions is preferred +// over two separate metrics that produce data at alternating intervals. +// For example, it becomes possible to alert on the metric no longer +// producing data. +func WithMergedConnectsMeter() MeterOpt { + return meterOptFunc(func(m *Meter) { + m.mergeConnectsMeter = true + }) + +} + func (o meterOptFunc) apply(m *Meter) { o(m) } @@ -105,13 +121,17 @@ func (m *Meter) newInstruments() instruments { log.Printf("failed to create connects instrument, %v", err) } - connectErrs, err := m.meter.Int64Counter( - "messaging.kafka.connect_errors.count", - metric.WithUnit(dimensionless), - metric.WithDescription("Total number of connection errors, by broker"), - ) - if err != nil { - log.Printf("failed to create connectErrs instrument, %v", err) + var connectErrs metric.Int64Counter + if !m.mergeConnectsMeter { + var err error + connectErrs, err = m.meter.Int64Counter( + "messaging.kafka.connect_errors.count", + metric.WithUnit(dimensionless), + metric.WithDescription("Total number of connection errors, by broker"), + ) + if err != nil { + log.Printf("failed to create connectErrs instrument, %v", err) + } } disconnects, err := m.meter.Int64Counter( @@ -232,6 +252,30 @@ func strnode(node int32) string { func (m *Meter) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strnode(meta.NodeID) + + if m.mergeConnectsMeter { + if err != nil { + m.instruments.connects.Add( + context.Background(), + 1, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("node_id", node), + attribute.String("outcome", "failure"), + )), + ) + return + } + m.instruments.connects.Add( + context.Background(), + 1, + metric.WithAttributeSet(attribute.NewSet( + attribute.String("node_id", node), + attribute.String("outcome", "success"), + )), + ) + return + } + attributes := attribute.NewSet(attribute.String("node_id", node)) if err != nil { m.instruments.connectErrs.Add( diff --git a/plugin/kotel/meter_test.go b/plugin/kotel/meter_test.go index ed9fe6f9..35630bc3 100644 --- a/plugin/kotel/meter_test.go +++ b/plugin/kotel/meter_test.go @@ -1,11 +1,21 @@ package kotel import ( + "context" + "errors" + "net" + "strconv" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kgo" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" semconv "go.opentelemetry.io/otel/semconv/v1.12.0" ) @@ -38,3 +48,166 @@ func TestWithMeter(t *testing.T) { }) } } + +func TestHook_OnBrokerConnect(t *testing.T) { + t.Run("success path with mergeConnectsMeter:false", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp)) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + t.Run("failure path with mergeConnectsMeter:false", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp)) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error")) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connect_errors.count", + Description: "Total number of connection errors, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + + t.Run("success path with mergeConnectsMeter:true", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter()) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, nil) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + attribute.String("outcome", "success"), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + t.Run("failure path with mergeConnectsMeter:true", func(t *testing.T) { + r := sdkmetric.NewManualReader() + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(r)) + m := NewMeter(MeterProvider(mp), WithMergedConnectsMeter()) + + meta := kgo.BrokerMetadata{NodeID: 1} + m.OnBrokerConnect(meta, time.Second, &net.TCPConn{}, errors.New("whatever error")) + + rm := metricdata.ResourceMetrics{} + if err := r.Collect(context.Background(), &rm); err != nil { + t.Errorf("unexpected error collecting metrics: %s", err) + } + + want := metricdata.Metrics{ + Name: "messaging.kafka.connects.count", + Description: "Total number of connections opened, by broker", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 1, + Attributes: attribute.NewSet( + attribute.String("node_id", strconv.Itoa(int(meta.NodeID))), + attribute.String("outcome", "failure"), + ), + }, + }, + }, + } + + if len(rm.ScopeMetrics) != 1 { + t.Errorf("expecting only 1 metrics in meter but got %d", len(rm.ScopeMetrics)) + } + + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0].Metrics[0], + metricdatatest.IgnoreTimestamp(), + ) + }) + +}