Skip to content

Commit

Permalink
Merge pull request #1005 from openmeterio/feat-kafka-metrics
Browse files Browse the repository at this point in the history
feat: add Kafka client metrics reporting to ingest services
  • Loading branch information
chrisgacsal authored Jun 13, 2024
2 parents f58db4e + 1fa627a commit 7375634
Show file tree
Hide file tree
Showing 16 changed files with 2,558 additions and 4 deletions.
9 changes: 8 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/openmeterio/openmeter/internal/ingest/ingestdriver"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer"
kafkametrics "github.com/openmeterio/openmeter/internal/kafka/metrics"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/namespace"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
Expand Down Expand Up @@ -430,8 +431,14 @@ func initKafkaIngest(ctx context.Context, config config.Configuration, logger *s
return nil, nil, fmt.Errorf("init kafka ingest: %w", err)
}

// Initialize Kafka Client Statistics reporter
kafkaMetrics, err := kafkametrics.New(metricMeter)
if err != nil {
return nil, nil, fmt.Errorf("failed to create Kafka client metrics: %w", err)
}

// TODO: move kafkaingest.KafkaProducerGroup to pkg/kafka
group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger))
group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics))

go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer"))

Expand Down
6 changes: 6 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ telemetry:
log:
level: debug

#ingest:
# kafka:
# # To enable stats reporting set this value to >=5s.
# # Setting this value to 0 makes reporting explicitly disabled.
# statsInterval: 5s

# dedupe:
# enabled: true
# driver: redis
Expand Down
11 changes: 11 additions & 0 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/spf13/viper"
Expand All @@ -30,6 +31,8 @@ type KafkaIngestConfiguration struct {
SaslPassword string
Partitions int
EventsTopicTemplate string

StatsInterval time.Duration
}

// CreateKafkaConfig creates a Kafka config map.
Expand Down Expand Up @@ -64,6 +67,10 @@ func (c KafkaIngestConfiguration) CreateKafkaConfig() kafka.ConfigMap {
config["sasl.password"] = c.SaslPassword
}

if c.StatsInterval > 0 {
config["statistics.interval.ms"] = int(c.StatsInterval.Milliseconds())
}

return config
}

Expand All @@ -77,6 +84,10 @@ func (c KafkaIngestConfiguration) Validate() error {
return errors.New("events topic template is required")
}

if c.StatsInterval > 0 && c.StatsInterval < 5*time.Second {
return errors.New("StatsInterval must be >=5s")
}

return nil
}

Expand Down
24 changes: 23 additions & 1 deletion internal/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package kafkaingest

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer"
kafkametrics "github.com/openmeterio/openmeter/internal/kafka/metrics"
kafkastats "github.com/openmeterio/openmeter/internal/kafka/metrics/stats"
)

// Collector is a receiver of events that handles sending those events to a downstream Kafka broker.
Expand Down Expand Up @@ -104,7 +108,7 @@ func (s Collector) Close() {
s.Producer.Close()
}

func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *slog.Logger) (execute func() error, interrupt func(error)) {
func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *slog.Logger, kafkaMetrics *kafkametrics.Metrics) (execute func() error, interrupt func(error)) {
ctx, cancel := context.WithCancel(ctx)
return func() error {
for {
Expand All @@ -122,6 +126,24 @@ func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *s
} else {
logger.Debug("kafka message delivered", "topic", *m.TopicPartition.Topic, "partition", m.TopicPartition.Partition, "offset", m.TopicPartition.Offset)
}
case *kafka.Stats:
// Report Kafka client metrics
if kafkaMetrics == nil {
continue
}

go func() {
var stats kafkastats.Stats

if err := json.Unmarshal([]byte(e.String()), &stats); err != nil {
logger.Warn("failed to unmarshal Kafka client stats", slog.String("err", err.Error()))
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

kafkaMetrics.Add(ctx, &stats)
}()
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
Expand Down
Loading

0 comments on commit 7375634

Please sign in to comment.