Skip to content

Commit

Permalink
fix: race condition issue on batch consumer (#33)
Browse files Browse the repository at this point in the history
* fix: race condition issue on batch consumer

* refactor: migrate from manuel commit to auto commit in order to prevent race condition issues
  • Loading branch information
A.Samet İleri authored Sep 6, 2023
1 parent efcead4 commit 11ed120
Showing 8 changed files with 65 additions and 60 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -143,6 +143,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout |
@@ -181,9 +182,11 @@ Kafka Konsumer offers an API that handles exposing several metrics.

### Exposed Metrics

| Metric Name | Description | Value Type |
|-------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total | Total number of processed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_messages_total | Total number of unprocessed messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total | Total number of unprocessed batch messages. | Counter |
| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter |

**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
34 changes: 12 additions & 22 deletions batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package kafka

import (
"context"
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/segmentio/kafka-go"
)

type batchConsumer struct {
@@ -43,25 +41,27 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
}

func (b *batchConsumer) Consume() {
go b.base.subprocesses.Start()
b.base.wg.Add(1)
go b.base.startConsume()
go b.subprocesses.Start()
b.wg.Add(1)
go b.startConsume()

for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go b.startBatch()
}
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
}

func (b *batchConsumer) startBatch() {
defer b.wg.Done()

ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()

messages := make([]Message, 0, b.messageGroupLimit)

for {
@@ -95,6 +95,8 @@ func (b *batchConsumer) process(messages []Message) {

// Try to process same message again
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))

b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)

cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
@@ -108,17 +110,5 @@ func (b *batchConsumer) process(messages []Message) {
}
}

segmentioMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
segmentioMessages = append(segmentioMessages, kafka.Message(messages[i]))
}

commitErr := b.r.CommitMessages(context.Background(), segmentioMessages...)
if commitErr != nil {
b.metric.TotalUnprocessedBatchMessagesCounter++
b.logger.Error("Error Committing messages %s", commitErr.Error())
return
}

b.metric.TotalProcessedBatchMessagesCounter++
b.metric.TotalProcessedMessagesCounter += int64(len(messages))
}
12 changes: 7 additions & 5 deletions collector.go
Original file line number Diff line number Diff line change
@@ -11,10 +11,12 @@ const Name = "kafka_konsumer"
type metricCollector struct {
consumerMetric *ConsumerMetric

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalUnprocessedBatchMessagesCounter *prometheus.Desc
totalProcessedBatchMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalProcessedBatchMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
@@ -38,14 +40,14 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
s.totalProcessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedBatchMessagesCounter),
float64(s.consumerMetric.TotalProcessedMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedBatchMessagesCounter),
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
}
17 changes: 3 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package kafka

import (
"context"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

@@ -39,9 +35,9 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

func (c *consumer) Consume() {
go c.base.subprocesses.Start()
go c.subprocesses.Start()
c.wg.Add(1)
go c.base.startConsume()
go c.startConsume()

for i := 0; i < c.concurrency; i++ {
c.wg.Add(1)
@@ -57,12 +53,12 @@ func (c *consumer) Consume() {

func (c *consumer) process(message Message) {
consumeErr := c.consumeFn(message)

if consumeErr != nil && c.retryEnabled {
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())

// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.metric.TotalUnprocessedMessagesCounter++
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)

retryableMsg := message.toRetryableMessage(c.retryTopic)
@@ -73,12 +69,5 @@ func (c *consumer) process(message Message) {
}
}

commitErr := c.r.CommitMessages(context.Background(), kafka.Message(message))
if commitErr != nil {
c.metric.TotalUnprocessedMessagesCounter++
c.logger.Errorf("Error Committing message %s, %s", string(message.Value), commitErr.Error())
return
}

c.metric.TotalProcessedMessagesCounter++
}
2 changes: 1 addition & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func (c *base) startConsume() {
case <-c.quit:
return
default:
message, err := c.r.FetchMessage(c.context)
message, err := c.r.ReadMessage(c.context)
if err != nil {
if c.context.Err() != nil {
continue
9 changes: 9 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ type ConsumerConfig struct {
LogLevel LogLevel
Reader ReaderConfig
RetryConfiguration RetryConfiguration
CommitInterval time.Duration
Concurrency int
RetryEnabled bool
APIEnabled bool
@@ -156,4 +157,12 @@ func (cfg *ConsumerConfig) validate() {
if cfg.Concurrency == 0 {
cfg.Concurrency = 1
}

if cfg.CommitInterval == 0 {
cfg.CommitInterval = time.Second
// Kafka-go library default value is 0, we need to also change this.
cfg.Reader.CommitInterval = time.Second
} else {
cfg.Reader.CommitInterval = cfg.CommitInterval
}
}
8 changes: 5 additions & 3 deletions metric.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package kafka

type ConsumerMetric struct {
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
// Deprecated
TotalUnprocessedBatchMessagesCounter int64
TotalProcessedBatchMessagesCounter int64
// Deprecated
TotalProcessedBatchMessagesCounter int64
}
28 changes: 19 additions & 9 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
@@ -82,7 +82,6 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {
messageCh <- message
return nil
},
Dial: &kafka.DialConfig{KeepAlive: 150 * time.Second, Timeout: 30 * time.Second},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
@@ -92,9 +91,10 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {

// When
produceMessages(t, conn, segmentio.Message{
Topic: topic,
Key: []byte("1"),
Value: []byte(`foo`),
Topic: topic,
Partition: 0,
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
@@ -105,6 +105,11 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {
if string(actual.Key) != "1" {
t.Fatalf("Key does not equal %s", actual.Key)
}

o, _ := conn.ReadLastOffset()
if o != 1 {
t.Fatalf("offset %v must be equal to 1", o)
}
}

func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
@@ -137,11 +142,11 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {

// When
produceMessages(t, conn,
segmentio.Message{Topic: topic, Key: []byte("1"), Value: []byte(`foo1`)},
segmentio.Message{Topic: topic, Key: []byte("2"), Value: []byte(`foo2`)},
segmentio.Message{Topic: topic, Key: []byte("3"), Value: []byte(`foo3`)},
segmentio.Message{Topic: topic, Key: []byte("4"), Value: []byte(`foo4`)},
segmentio.Message{Topic: topic, Key: []byte("5"), Value: []byte(`foo5`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)},
)

// Then
@@ -150,6 +155,11 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
if actual != 5 {
t.Fatalf("Message length does not equal %d", actual)
}

o, _ := conn.ReadLastOffset()
if o != 5 {
t.Fatalf("offset %v must be equal to 5", o)
}
}

func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) {

0 comments on commit 11ed120

Please sign in to comment.