Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding metric prefix config when multiple prometheus metric reg… #86

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 46 additions & 45 deletions README.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -43,6 +45,10 @@
return &c, nil
}

func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
return b.base.GetMetricCollectors()

Check warning on line 49 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
}
Expand Down
20 changes: 12 additions & 8 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@

const Name = "kafka_konsumer"

type metricCollector struct {
type MetricCollector struct {
consumerMetric *ConsumerMetric

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) {

Check warning on line 18 in collector.go

View check run for this annotation

Codecov / codecov/patch

collector.go#L18

Added line #L18 was not covered by tests
prometheus.DescribeByCollect(s, ch)
}

var emptyStringList []string

func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) {

Check warning on line 24 in collector.go

View check run for this annotation

Codecov / codecov/patch

collector.go#L24

Added line #L24 was not covered by tests
ch <- prometheus.MustNewConstMetric(
s.totalProcessedMessagesCounter,
prometheus.CounterValue,
Expand All @@ -37,18 +37,22 @@
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
return &metricCollector{
func NewMetricCollector(consumerMetric *ConsumerMetric, prefix string) *MetricCollector {
if prefix == "" {
prefix = Name
}

return &MetricCollector{
consumerMetric: consumerMetric,

totalProcessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
prometheus.BuildFQName(prefix, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
),
totalUnprocessedMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
prometheus.BuildFQName(prefix, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
Expand All @@ -61,7 +65,7 @@
consumerMetric *ConsumerMetric,
metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error) {
prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric))
prometheus.DefaultRegisterer.MustRegister(NewMetricCollector(consumerMetric, cfg.MetricPrefix))

Check warning on line 68 in collector.go

View check run for this annotation

Codecov / codecov/patch

collector.go#L68

Added line #L68 was not covered by tests
prometheus.DefaultRegisterer.MustRegister(metricCollectors...)

fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID)
Expand Down
60 changes: 60 additions & 0 deletions collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kafka

import (
"reflect"
"testing"

"github.com/prometheus/client_golang/prometheus"
)

func Test_NewCollector(t *testing.T) {
t.Run("When_Default_Prefix_Value_Used", func(t *testing.T) {
cronsumerMetric := &ConsumerMetric{}
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
)
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
)

collector := NewMetricCollector(cronsumerMetric, "")

if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
}
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
}
})
t.Run("When_Custom_Prefix_Value_Used", func(t *testing.T) {
cronsumerMetric := &ConsumerMetric{}
customPrefix := "custom_prefix"
expectedTotalProcessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(customPrefix, "processed_messages_total", "current"),
"Total number of processed messages.",
emptyStringList,
nil,
)
expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc(
prometheus.BuildFQName(customPrefix, "unprocessed_messages_total", "current"),
"Total number of unprocessed messages.",
emptyStringList,
nil,
)

collector := NewMetricCollector(cronsumerMetric, customPrefix)

if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter)
}
if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) {
t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter)
}
})
}
6 changes: 6 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -49,6 +51,10 @@
go c.startBatch()
}

func (c *consumer) GetMetricCollectors() []prometheus.Collector {
return c.base.GetMetricCollectors()

Check warning on line 55 in consumer.go

View check run for this annotation

Codecov / codecov/patch

consumer.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

func (c *consumer) startBatch() {
defer c.wg.Done()

Expand Down
18 changes: 18 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@

// Stop for graceful shutdown. In order to avoid data loss, you have to call it!
Stop() error

// GetMetricCollectors for the purpose of making metric collectors available.
// You can register these collectors on your own http server.
// Please look at the examples/with-metric-collector directory.
GetMetricCollectors() []prometheus.Collector
}

type Reader interface {
Expand Down Expand Up @@ -56,6 +61,7 @@
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
metricSuffix string
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -90,6 +96,7 @@
messageProcessedStream: make(chan struct{}, cfg.Concurrency),
singleConsumingStream: make(chan *Message, cfg.Concurrency),
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
metricSuffix: cfg.MetricPrefix,

Check warning on line 99 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L99

Added line #L99 was not covered by tests
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -120,6 +127,17 @@
c.subprocesses.Add(c.api)
}

func (c *base) GetMetricCollectors() []prometheus.Collector {
var metricCollectors []prometheus.Collector
if c.retryEnabled {
metricCollectors = c.cronsumer.GetMetricCollectors()

Check warning on line 133 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L130-L133

Added lines #L130 - L133 were not covered by tests
}

metricCollectors = append(metricCollectors, NewMetricCollector(c.metric, c.metricSuffix))

Check warning on line 136 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L136

Added line #L136 was not covered by tests

return metricCollectors

Check warning on line 138 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L138

Added line #L138 was not covered by tests
}

func (c *base) startConsume() {
defer c.wg.Done()

Expand Down
13 changes: 11 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,20 @@
DistributedTracingEnabled bool
RetryEnabled bool
APIEnabled bool

// MetricPrefix is used for prometheus fq name prefix.
// If not provided, default metric prefix value is `kafka_konsumer`.
// Currently, there are two exposed prometheus metrics. `processed_messages_total` and `unprocessed_messages_total`.
// So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and
// `kafka_konsumer_unprocessed_messages_total_current`.
MetricPrefix string
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
cronsumerCfg := kcronsumer.Config{
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,
MetricPrefix: cfg.RetryConfiguration.MetricPrefix,
ClientID: cfg.RetryConfiguration.ClientID,
Brokers: cfg.RetryConfiguration.Brokers,

Check warning on line 64 in consumer_config.go

View check run for this annotation

Codecov / codecov/patch

consumer_config.go#L62-L64

Added lines #L62 - L64 were not covered by tests
Consumer: kcronsumer.ConsumerConfig{
ClientID: cfg.ClientID,
GroupID: cfg.Reader.GroupID,
Expand Down Expand Up @@ -122,6 +130,7 @@
Brokers []string
MaxRetry int
WorkDuration time.Duration
MetricPrefix string
}

type BatchConfiguration struct {
Expand Down
1 change: 1 addition & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ services:
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:9092 1 20 && \
kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic another-standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
sleep infinity'"
Expand Down
37 changes: 37 additions & 0 deletions examples/with-metric-collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
If you run this example and go to http://localhost:8000/metrics,

you can see first and second consumet metrics as shown below

```
# HELP first_discarded_messages_total_current Total number of discarded messages.
# TYPE first_discarded_messages_total_current counter
first_discarded_messages_total_current 0

# HELP first_processed_messages_total_current Total number of processed messages.
# TYPE first_processed_messages_total_current counter
first_processed_messages_total_current 0

# HELP first_retried_messages_total_current Total number of retried messages.
# TYPE first_retried_messages_total_current counter
first_retried_messages_total_current 0

# HELP first_unprocessed_messages_total_current Total number of unprocessed messages.
# TYPE first_unprocessed_messages_total_current counter
first_unprocessed_messages_total_current 0

# HELP second_discarded_messages_total_current Total number of discarded messages.
# TYPE second_discarded_messages_total_current counter
second_discarded_messages_total_current 0

# HELP second_processed_messages_total_current Total number of processed messages.
# TYPE second_processed_messages_total_current counter
second_processed_messages_total_current 0

# HELP second_retried_messages_total_current Total number of retried messages.
# TYPE second_retried_messages_total_current counter
second_retried_messages_total_current 0

# HELP second_unprocessed_messages_total_current Total number of unprocessed messages.
# TYPE second_unprocessed_messages_total_current counter
second_unprocessed_messages_total_current 0
```
37 changes: 37 additions & 0 deletions examples/with-metric-collector/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/prometheus/client_golang/prometheus"
)

const port = 8000

func StartAPI(metricCollectors ...prometheus.Collector) {
f := fiber.New(
fiber.Config{
DisableStartupMessage: true,
DisableDefaultDate: true,
DisableHeaderNormalizing: true,
},
)

metricMiddleware, err := NewMetricMiddleware(f, metricCollectors...)

if err == nil {
f.Use(metricMiddleware)
} else {
fmt.Printf("metric middleware cannot be initialized: %v", err)
}

fmt.Printf("server starting on port %d", port)

go listen(f)
}

func listen(f *fiber.App) {
if err := f.Listen(fmt.Sprintf(":%d", port)); err != nil {
fmt.Printf("server cannot start on port %d, err: %v", port, err)
}
}
67 changes: 67 additions & 0 deletions examples/with-metric-collector/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"time"
)

func main() {
firstConsumerCfg := &kafka.ConsumerConfig{
MetricPrefix: "first",
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
MetricPrefix: "first",
Brokers: []string{"localhost:29092"},
Topic: "error-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}

firstConsumer, _ := kafka.NewConsumer(firstConsumerCfg)
defer firstConsumer.Stop()

go firstConsumer.Consume()

secondConsumerCfg := &kafka.ConsumerConfig{
MetricPrefix: "second",
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "another-standart-topic",
GroupID: "another-standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
MetricPrefix: "second",
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}

secondConsumer, _ := kafka.NewConsumer(secondConsumerCfg)
defer secondConsumer.Stop()

go secondConsumer.Consume()

allCollectors := append(firstConsumer.GetMetricCollectors(), secondConsumer.GetMetricCollectors()...)
StartAPI(allCollectors...)

select {}
}

func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Loading
Loading