Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add manual kafka message commit integration #76

Merged
merged 24 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ After running `docker-compose up` command, you can run any application you want.
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}
Expand Down Expand Up @@ -140,9 +140,9 @@ After running `docker-compose up` command, you can run any application you want.
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}
Expand Down Expand Up @@ -198,6 +198,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s |
Expand All @@ -219,7 +220,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.messageGroupDuration` | Maximum time to wait for a batch | |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
113 changes: 85 additions & 28 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"time"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
)

Expand All @@ -11,21 +13,19 @@

consumeFn func([]*Message) error

messageGroupLimit int
messageGroupDuration time.Duration
messageGroupLimit int
}

func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg)
consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency)

Check warning on line 20 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L20

Added line #L20 was not covered by tests
if err != nil {
return nil, err
}

c := batchConsumer{
base: consumerBase,
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,
messageGroupDuration: cfg.BatchConfiguration.MessageGroupDuration,
base: consumerBase,
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,

Check warning on line 28 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L26-L28

Added lines #L26 - L28 were not covered by tests
}

if cfg.RetryEnabled {
Expand All @@ -50,10 +50,10 @@
b.wg.Add(1)
go b.startConsume()

for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go b.startBatch()
}
b.setupConcurrentWorkers()

Check warning on line 53 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L53

Added line #L53 was not covered by tests

b.wg.Add(1)
go b.startBatch()

Check warning on line 56 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

func (b *batchConsumer) startBatch() {
Expand All @@ -62,7 +62,9 @@
ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()

messages := make([]*Message, 0, b.messageGroupLimit)
maximumMessageLimit := b.messageGroupLimit * b.concurrency
messages := make([]*Message, 0, maximumMessageLimit)
commitMessages := make([]kafka.Message, 0, maximumMessageLimit)

for {
select {
Expand All @@ -71,44 +73,99 @@
continue
}

b.process(messages)
messages = messages[:0]
case msg, ok := <-b.messageCh:
b.consume(&messages, &commitMessages)
case msg, ok := <-b.incomingMessageStream:
if !ok {
return
}

messages = append(messages, msg)

if len(messages) == b.messageGroupLimit {
if len(messages) == maximumMessageLimit {
b.consume(&messages, &commitMessages)
}
}
}
}

func (b *batchConsumer) setupConcurrentWorkers() {
for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go func() {
defer b.wg.Done()
for messages := range b.batchConsumingStream {
b.process(messages)
messages = messages[:0]
b.messageProcessedStream <- struct{}{}
}
}()
}
}

func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message {
var chunks [][]*Message

allMessageList := *allMessages
for i := 0; i < len(allMessageList); i += chunkSize {
end := i + chunkSize

// necessary check to avoid slicing beyond
// slice capacity
if end > len(allMessageList) {
end = len(allMessageList)
}

chunks = append(chunks, allMessageList[i:end])
}

return chunks
}

func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) {
chunks := chunkMessages(allMessages, b.messageGroupLimit)

// Send the messages to process
for _, chunk := range chunks {
b.batchConsumingStream <- chunk
}

// Wait the messages to be processed
for i := 0; i < len(chunks); i++ {
<-b.messageProcessedStream
}

toKafkaMessages(allMessages, commitMessages)
if err := b.r.CommitMessages(*commitMessages); err != nil {
b.logger.Errorf("Commit Error %s,", err.Error())

Check warning on line 138 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L138

Added line #L138 was not covered by tests
}

// Clearing resources
putKafkaMessage(commitMessages)
putMessages(allMessages)
*commitMessages = (*commitMessages)[:0]
*allMessages = (*allMessages)[:0]
}

func (b *batchConsumer) process(messages []*Message) {
consumeErr := b.consumeFn(messages)
func (b *batchConsumer) process(chunkMessages []*Message) {
consumeErr := b.consumeFn(chunkMessages)

if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(messages); consumeErr != nil {
if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))
b.metric.TotalUnprocessedMessagesCounter += int64(len(chunkMessages))
}

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages))
if b.transactionalRetry {
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
for i := range chunkMessages {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
}
} else {
for i := range messages {
if messages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
for i := range chunkMessages {
if chunkMessages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
}
}
}
Expand All @@ -120,6 +177,6 @@
}

if consumeErr == nil {
b.metric.TotalProcessedMessagesCounter += int64(len(messages))
b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages))
}
}
99 changes: 87 additions & 12 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"errors"
"reflect"
"strconv"
"sync"
"testing"
"time"
Expand All @@ -15,39 +17,46 @@ func Test_batchConsumer_startBatch(t *testing.T) {
// Given
var numberOfBatch int

mc := mockReader{}
bc := batchConsumer{
base: &base{
messageCh: make(chan *Message),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
incomingMessageStream: make(chan *Message, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
r: &mc,
concurrency: 1,
},
messageGroupLimit: 3,
messageGroupDuration: 500 * time.Millisecond,
messageGroupLimit: 3,
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
}
go func() {
// Simulate messageGroupLimit
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.messageCh <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Return from startBatch
close(bc.base.messageCh)
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1)

// When
bc.setupConcurrentWorkers()
bc.startBatch()

// Then
Expand Down Expand Up @@ -205,6 +214,72 @@ func Test_batchConsumer_process(t *testing.T) {
})
}

func Test_batchConsumer_chunk(t *testing.T) {
tests := []struct {
allMessages []*Message
expected [][]*Message
chunkSize int
}{
{
allMessages: createMessages(0, 9),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
createMessages(3, 6),
createMessages(6, 9),
},
},
{
allMessages: []*Message{},
chunkSize: 3,
expected: [][]*Message{},
},
{
allMessages: createMessages(0, 1),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 1),
},
},
{
allMessages: createMessages(0, 8),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
createMessages(3, 6),
createMessages(6, 8),
},
},
{
allMessages: createMessages(0, 3),
chunkSize: 3,
expected: [][]*Message{
createMessages(0, 3),
},
},
}

for i, tc := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
chunkedMessages := chunkMessages(&tc.allMessages, tc.chunkSize)

if !reflect.DeepEqual(chunkedMessages, tc.expected) && !(len(chunkedMessages) == 0 && len(tc.expected) == 0) {
t.Errorf("For chunkSize %d, expected %v, but got %v", tc.chunkSize, tc.expected, chunkedMessages)
}
})
}
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
messages = append(messages, &Message{
Partition: i,
})
}
return messages
}

type mockCronsumer struct {
wantErr bool
}
Expand All @@ -221,11 +296,11 @@ func (m *mockCronsumer) Stop() {
panic("implement me")
}

func (m *mockCronsumer) WithLogger(logger lcronsumer.Interface) {
func (m *mockCronsumer) WithLogger(_ lcronsumer.Interface) {
panic("implement me")
}

func (m *mockCronsumer) Produce(message kcronsumer.Message) error {
func (m *mockCronsumer) Produce(_ kcronsumer.Message) error {
if m.wantErr {
return errors.New("error")
}
Expand Down
Loading