From cb4fa83afee09a838e1f8c70be425c1d977a5d7e Mon Sep 17 00:00:00 2001 From: oguzyildirim Date: Sun, 8 Oct 2023 19:15:17 +0300 Subject: [PATCH 1/3] feat: implement non transactional batch consuming --- batch_consumer.go | 30 ++++++++++++++++++++++++++---- consumer_config.go | 1 + message.go | 4 +++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 82de420..a1f5ee4 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -13,6 +13,7 @@ type batchConsumer struct { messageGroupLimit int messageGroupDuration time.Duration + manuelRetry bool } func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -26,6 +27,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { consumeFn: cfg.BatchConfiguration.BatchConsumeFn, messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit, messageGroupDuration: cfg.BatchConfiguration.MessageGroupDuration, + manuelRetry: cfg.ManuelRetryEnabled, } if cfg.RetryEnabled { @@ -93,11 +95,11 @@ func (b *batchConsumer) process(messages []Message) { if consumeErr != nil { b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) - // Try to process same messages again - if consumeErr = b.consumeFn(messages); consumeErr != nil { - b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) - b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) + // if manuel retry enabled + if b.manuelRetry { + b.retry(getUnsuccessfulMessages(messages)) } + b.retry(messages) if consumeErr != nil && b.retryEnabled { cronsumerMessages := make([]kcronsumer.Message, 0, len(messages)) @@ -115,3 +117,23 @@ func (b *batchConsumer) process(messages []Message) { b.metric.TotalProcessedMessagesCounter += int64(len(messages)) } } + +func (b *batchConsumer) retry(messages []Message) { + // Try to process same messages again + if consumeErr := b.consumeFn(messages); consumeErr != nil { + b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) + b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) + } + return +} + +func getUnsuccessfulMessages(messages []Message) []Message { + var unsuccessfulMessages []Message + for _, message := range messages { + if !message.isSuccessful { + unsuccessfulMessages = append(unsuccessfulMessages, message) + } + } + + return unsuccessfulMessages +} diff --git a/consumer_config.go b/consumer_config.go index 468232f..17b7b89 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -44,6 +44,7 @@ type ConsumerConfig struct { Concurrency int RetryEnabled bool APIEnabled bool + ManuelRetryEnabled bool } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { diff --git a/message.go b/message.go index bc79210..ff8b308 100644 --- a/message.go +++ b/message.go @@ -20,7 +20,8 @@ type Message struct { WriterData interface{} Time time.Time // Context To enable distributed tracing support - Context context.Context + Context context.Context + isSuccessful bool } func (m *Message) toKafkaMessage() kafka.Message { @@ -49,6 +50,7 @@ func fromKafkaMessage(message *kafka.Message) Message { WriterData: message.WriterData, Time: message.Time, Context: context.TODO(), + isSuccessful: true, } } From f76789ebb8a748794d3d45469cba2f595f6dde3f Mon Sep 17 00:00:00 2001 From: oguzyildirim Date: Sun, 8 Oct 2023 19:22:36 +0300 Subject: [PATCH 2/3] chore: make message slice pointer --- batch_consumer.go | 6 +++--- consumer_config.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index a1f5ee4..380a7cf 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -9,7 +9,7 @@ import ( type batchConsumer struct { *base - consumeFn func([]Message) error + consumeFn func(*[]Message) error messageGroupLimit int messageGroupDuration time.Duration @@ -32,7 +32,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { if cfg.RetryEnabled { c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.consumeFn([]Message{toMessage(message)}) + return c.consumeFn(&[]Message{toMessage(message)}) }) } @@ -91,7 +91,7 @@ func (b *batchConsumer) startBatch() { } func (b *batchConsumer) process(messages []Message) { - consumeErr := b.consumeFn(messages) + consumeErr := b.consumeFn(&messages) if consumeErr != nil { b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) diff --git a/consumer_config.go b/consumer_config.go index 17b7b89..5fc5b0a 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -15,7 +15,7 @@ import ( type ReaderConfig kafka.ReaderConfig -type BatchConsumeFn func([]Message) error +type BatchConsumeFn func(*[]Message) error type ConsumeFn func(Message) error From e355ec43a602fbba5900f55f9c20d87964a9a578 Mon Sep 17 00:00:00 2001 From: oguzyildirim Date: Mon, 9 Oct 2023 17:27:30 +0300 Subject: [PATCH 3/3] chore: make message slice pointer --- batch_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch_consumer.go b/batch_consumer.go index 380a7cf..7e1449a 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -120,7 +120,7 @@ func (b *batchConsumer) process(messages []Message) { func (b *batchConsumer) retry(messages []Message) { // Try to process same messages again - if consumeErr := b.consumeFn(messages); consumeErr != nil { + if consumeErr := b.consumeFn(&messages); consumeErr != nil { b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) }