diff --git a/batch_consumer.go b/batch_consumer.go index 8ad15d7..82de420 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -90,15 +90,16 @@ func (b *batchConsumer) startBatch() { func (b *batchConsumer) process(messages []Message) { consumeErr := b.consumeFn(messages) - if consumeErr != nil && b.retryEnabled { - b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) - // Try to process same message again + if consumeErr != nil { + b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) + // Try to process same messages again if consumeErr = b.consumeFn(messages); consumeErr != nil { - b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) - b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) + b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) + } + if consumeErr != nil && b.retryEnabled { cronsumerMessages := make([]kcronsumer.Message, 0, len(messages)) for i := range messages { cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) @@ -110,5 +111,7 @@ func (b *batchConsumer) process(messages []Message) { } } - b.metric.TotalProcessedMessagesCounter += int64(len(messages)) + if consumeErr == nil { + b.metric.TotalProcessedMessagesCounter += int64(len(messages)) + } } diff --git a/batch_consumer_test.go b/batch_consumer_test.go new file mode 100644 index 0000000..3166006 --- /dev/null +++ b/batch_consumer_test.go @@ -0,0 +1,158 @@ +package kafka + +import ( + "errors" + "testing" + + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger" + "github.com/prometheus/client_golang/prometheus" +) + +func Test_batchConsumer_process(t *testing.T) { + t.Run("When_Processing_Is_Successful", func(t *testing.T) { + // Given + bc := batchConsumer{ + base: &base{metric: &ConsumerMetric{}}, + consumeFn: func([]Message) error { + return nil + }, + } + + // When + bc.process([]Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 3 { + t.Fatalf("Total Processed Message Counter must equal to 3") + } + if bc.metric.TotalUnprocessedMessagesCounter != 0 { + t.Fatalf("Total Unprocessed Message Counter must equal to 0") + } + }) + t.Run("When_Re-processing_Is_Successful", func(t *testing.T) { + // Given + gotOnlyOneTimeException := true + bc := batchConsumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + consumeFn: func(messages []Message) error { + if gotOnlyOneTimeException { + gotOnlyOneTimeException = false + return errors.New("simulate only one time exception") + } + return nil + }, + } + + // When + bc.process([]Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 3 { + t.Fatalf("Total Processed Message Counter must equal to 3") + } + if bc.metric.TotalUnprocessedMessagesCounter != 0 { + t.Fatalf("Total Unprocessed Message Counter must equal to 0") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) { + // Given + bc := batchConsumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + consumeFn: func(messages []Message) error { + return errors.New("error case") + }, + } + + // When + bc.process([]Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if bc.metric.TotalUnprocessedMessagesCounter != 3 { + t.Fatalf("Total Unprocessed Message Counter must equal to 3") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Enabled", func(t *testing.T) { + // Given + mc := mockCronsumer{} + bc := batchConsumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + consumeFn: func(messages []Message) error { + return errors.New("error case") + }, + } + + // When + bc.process([]Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if bc.metric.TotalUnprocessedMessagesCounter != 3 { + t.Fatalf("Total Unprocessed Message Counter must equal to 3") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) { + // Given + mc := mockCronsumer{wantErr: true} + bc := batchConsumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + consumeFn: func(messages []Message) error { + return errors.New("error case") + }, + } + + // When + bc.process([]Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if bc.metric.TotalUnprocessedMessagesCounter != 3 { + t.Fatalf("Total Unprocessed Message Counter must equal to 3") + } + }) +} + +type mockCronsumer struct { + wantErr bool +} + +func (m *mockCronsumer) Start() { + panic("implement me") +} + +func (m *mockCronsumer) Run() { + panic("implement me") +} + +func (m *mockCronsumer) Stop() { + panic("implement me") +} + +func (m *mockCronsumer) WithLogger(logger lcronsumer.Interface) { + panic("implement me") +} + +func (m *mockCronsumer) Produce(message kcronsumer.Message) error { + if m.wantErr { + return errors.New("error") + } + return nil +} + +func (m *mockCronsumer) GetMetricCollectors() []prometheus.Collector { + panic("implement me") +} + +func (m *mockCronsumer) ProduceBatch([]kcronsumer.Message) error { + if m.wantErr { + return errors.New("error") + } + return nil +} diff --git a/consumer.go b/consumer.go index 9d0ec1c..fa981bc 100644 --- a/consumer.go +++ b/consumer.go @@ -53,21 +53,25 @@ func (c *consumer) Consume() { func (c *consumer) process(message Message) { consumeErr := c.consumeFn(message) - if consumeErr != nil && c.retryEnabled { - c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error()) + if consumeErr != nil { + c.logger.Warnf("Consume Function Err %s, Message will be retried", consumeErr.Error()) // Try to process same message again if consumeErr = c.consumeFn(message); consumeErr != nil { - c.metric.TotalUnprocessedMessagesCounter++ c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic) + c.metric.TotalUnprocessedMessagesCounter++ + } + } - retryableMsg := message.toRetryableMessage(c.retryTopic) - if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil { - c.logger.Errorf("Error producing message %s to exception/retry topic %s", - string(retryableMsg.Value), produceErr.Error()) - } + if consumeErr != nil && c.retryEnabled { + retryableMsg := message.toRetryableMessage(c.retryTopic) + if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil { + c.logger.Errorf("Error producing message %s to exception/retry topic %s", + string(retryableMsg.Value), produceErr.Error()) } } - c.metric.TotalProcessedMessagesCounter++ + if consumeErr == nil { + c.metric.TotalProcessedMessagesCounter++ + } } diff --git a/consumer_base.go b/consumer_base.go index 88cb242..88f740a 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -12,8 +12,13 @@ import ( ) type Consumer interface { + // Consume starts consuming Consume() + + // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) + + // Stop for graceful shutdown. In order to avoid data loss, you have to call it! Stop() error } diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..89b5d95 --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,116 @@ +package kafka + +import ( + "errors" + "testing" +) + +func Test_consumer_process(t *testing.T) { + t.Run("When_Processing_Is_Successful", func(t *testing.T) { + // Given + c := consumer{ + base: &base{metric: &ConsumerMetric{}}, + consumeFn: func(Message) error { + return nil + }, + } + + // When + c.process(Message{}) + + // Then + if c.metric.TotalProcessedMessagesCounter != 1 { + t.Fatalf("Total Processed Message Counter must equal to 3") + } + if c.metric.TotalUnprocessedMessagesCounter != 0 { + t.Fatalf("Total Unprocessed Message Counter must equal to 0") + } + }) + t.Run("When_Re-processing_Is_Successful", func(t *testing.T) { + // Given + gotOnlyOneTimeException := true + c := consumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + consumeFn: func(Message) error { + if gotOnlyOneTimeException { + gotOnlyOneTimeException = false + return errors.New("simulate only one time exception") + } + return nil + }, + } + + // When + c.process(Message{}) + + // Then + if c.metric.TotalProcessedMessagesCounter != 1 { + t.Fatalf("Total Processed Message Counter must equal to 3") + } + if c.metric.TotalUnprocessedMessagesCounter != 0 { + t.Fatalf("Total Unprocessed Message Counter must equal to 0") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) { + // Given + c := consumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + consumeFn: func(Message) error { + return errors.New("error case") + }, + } + + // When + c.process(Message{}) + + // Then + if c.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if c.metric.TotalUnprocessedMessagesCounter != 1 { + t.Fatalf("Total Unprocessed Message Counter must equal to 1") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Enabled", func(t *testing.T) { + // Given + mc := mockCronsumer{} + c := consumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + consumeFn: func(Message) error { + return errors.New("error case") + }, + } + + // When + c.process(Message{}) + + // Then + if c.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if c.metric.TotalUnprocessedMessagesCounter != 1 { + t.Fatalf("Total Unprocessed Message Counter must equal to 1") + } + }) + t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) { + // Given + mc := mockCronsumer{wantErr: true} + c := consumer{ + base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + consumeFn: func(Message) error { + return errors.New("error case") + }, + } + + // When + c.process(Message{}) + + // Then + if c.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if c.metric.TotalUnprocessedMessagesCounter != 1 { + t.Fatalf("Total Unprocessed Message Counter must equal to 1") + } + }) +} diff --git a/examples/with-kafka-batch-consumer/main.go b/examples/with-kafka-batch-consumer/main.go index 2d7813d..9982031 100644 --- a/examples/with-kafka-batch-consumer/main.go +++ b/examples/with-kafka-batch-consumer/main.go @@ -15,14 +15,6 @@ func main() { Topic: "standart-topic", GroupID: "standart-cg", }, - RetryEnabled: true, - RetryConfiguration: kafka.RetryConfiguration{ - Brokers: []string{"localhost:29092"}, - Topic: "retry-topic", - StartTimeCron: "*/1 * * * *", - WorkDuration: 50 * time.Second, - MaxRetry: 3, - }, BatchConfiguration: &kafka.BatchConfiguration{ MessageGroupLimit: 1000, MessageGroupDuration: time.Second,