Skip to content

Commit

Permalink
fix: when error come we still increase total processed message issue …
Browse files Browse the repository at this point in the history
…and add tests
  • Loading branch information
Abdulsametileri committed Sep 16, 2023
1 parent a24d3e5 commit 9abfbf2
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 23 deletions.
15 changes: 9 additions & 6 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ func (b *batchConsumer) startBatch() {

func (b *batchConsumer) process(messages []Message) {
consumeErr := b.consumeFn(messages)
if consumeErr != nil && b.retryEnabled {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())

// Try to process same message again
if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))

b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))
}

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
Expand All @@ -110,5 +111,7 @@ func (b *batchConsumer) process(messages []Message) {
}
}

b.metric.TotalProcessedMessagesCounter += int64(len(messages))
if consumeErr == nil {
b.metric.TotalProcessedMessagesCounter += int64(len(messages))
}
}
158 changes: 158 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package kafka

import (
"errors"
"testing"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger"
"github.com/prometheus/client_golang/prometheus"
)

func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func([]Message) error {
return nil
},
}

// When
bc.process([]Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
t.Fatalf("Total Processed Message Counter must equal to 3")
}
if bc.metric.TotalUnprocessedMessagesCounter != 0 {
t.Fatalf("Total Unprocessed Message Counter must equal to 0")
}
})
t.Run("When_Re-processing_Is_Successful", func(t *testing.T) {
// Given
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
}
return nil
},
}

// When
bc.process([]Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
t.Fatalf("Total Processed Message Counter must equal to 3")
}
if bc.metric.TotalUnprocessedMessagesCounter != 0 {
t.Fatalf("Total Unprocessed Message Counter must equal to 0")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Enabled", func(t *testing.T) {
// Given
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
}

type mockCronsumer struct {
wantErr bool
}

func (m *mockCronsumer) Start() {
panic("implement me")
}

func (m *mockCronsumer) Run() {
panic("implement me")
}

func (m *mockCronsumer) Stop() {
panic("implement me")
}

func (m *mockCronsumer) WithLogger(logger lcronsumer.Interface) {
panic("implement me")
}

func (m *mockCronsumer) Produce(message kcronsumer.Message) error {
if m.wantErr {
return errors.New("error")
}
return nil
}

func (m *mockCronsumer) GetMetricCollectors() []prometheus.Collector {
panic("implement me")
}

func (m *mockCronsumer) ProduceBatch([]kcronsumer.Message) error {
if m.wantErr {
return errors.New("error")
}
return nil
}
22 changes: 13 additions & 9 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,25 @@ func (c *consumer) Consume() {

func (c *consumer) process(message Message) {
consumeErr := c.consumeFn(message)
if consumeErr != nil && c.retryEnabled {
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())

if consumeErr != nil {
c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error())
// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.metric.TotalUnprocessedMessagesCounter++
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)
c.metric.TotalUnprocessedMessagesCounter++
}
}

retryableMsg := message.toRetryableMessage(c.retryTopic)
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
}
if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic)
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
}
}

c.metric.TotalProcessedMessagesCounter++
if consumeErr == nil {
c.metric.TotalProcessedMessagesCounter++
}
}
5 changes: 5 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ import (
)

type Consumer interface {
// Consume starts consuming
Consume()

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

// Stop for graceful shutdown. In order to avoid data loss, you have to call it!
Stop() error
}

Expand Down
116 changes: 116 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package kafka

import (
"errors"
"testing"
)

func Test_consumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func(Message) error {
return nil
},
}

// When
c.process(Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
t.Fatalf("Total Processed Message Counter must equal to 3")
}
if c.metric.TotalUnprocessedMessagesCounter != 0 {
t.Fatalf("Total Unprocessed Message Counter must equal to 0")
}
})
t.Run("When_Re-processing_Is_Successful", func(t *testing.T) {
// Given
gotOnlyOneTimeException := true
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
}
return nil
},
}

// When
c.process(Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
t.Fatalf("Total Processed Message Counter must equal to 3")
}
if c.metric.TotalUnprocessedMessagesCounter != 0 {
t.Fatalf("Total Unprocessed Message Counter must equal to 0")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Enabled", func(t *testing.T) {
// Given
mc := mockCronsumer{}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
})
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
})
}
8 changes: 0 additions & 8 deletions examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ func main() {
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
Expand Down

0 comments on commit 9abfbf2

Please sign in to comment.