Skip to content

Commit

Permalink
fix: multiple consumer sync pool issue (#87)
Browse files Browse the repository at this point in the history
* add log

* add log v2

* sync pool try

* remove kafka message pool

* fix batch remove pool

* remove pools

* remove sync pools

* remove unused log

* fix lint

---------

Co-authored-by: Abdulsametileri <sametileri07@gmail.com>
  • Loading branch information
mhmtszr and Abdulsametileri authored Jan 8, 2024
1 parent f5d385a commit b4c995d
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 62 deletions.
6 changes: 2 additions & 4 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func (b *batchConsumer) startBatch() {
return
}

messages = append(messages, msg)
messages = append(messages, msg.message)
commitMessages = append(commitMessages, *msg.kafkaMessage)

if len(messages) == maximumMessageLimit {
b.consume(&messages, &commitMessages)
Expand Down Expand Up @@ -143,14 +144,11 @@ func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka
<-b.messageProcessedStream
}

toKafkaMessages(allMessages, commitMessages)
if err := b.r.CommitMessages(*commitMessages); err != nil {
b.logger.Errorf("Commit Error %s,", err.Error())
}

// Clearing resources
putKafkaMessage(commitMessages)
putMessages(allMessages)
*commitMessages = (*commitMessages)[:0]
*allMessages = (*allMessages)[:0]
}
Expand Down
46 changes: 36 additions & 10 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/segmentio/kafka-go"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -20,7 +22,7 @@ func Test_batchConsumer_startBatch(t *testing.T) {
mc := mockReader{}
bc := batchConsumer{
base: &base{
incomingMessageStream: make(chan *Message, 1),
incomingMessageStream: make(chan *IncomingMessage, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
Expand All @@ -38,14 +40,26 @@ func Test_batchConsumer_startBatch(t *testing.T) {
}
go func() {
// Simulate messageGroupLimit
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}

time.Sleep(1 * time.Second)

Expand Down Expand Up @@ -75,7 +89,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
mc := mockReader{}
bc := batchConsumer{
base: &base{
incomingMessageStream: make(chan *Message, 1),
incomingMessageStream: make(chan *IncomingMessage, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
Expand All @@ -96,14 +110,26 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
}
go func() {
// Simulate messageGroupLimit
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}
bc.base.incomingMessageStream <- &IncomingMessage{
kafkaMessage: &kafka.Message{},
message: &Message{},
}

time.Sleep(1 * time.Second)

Expand Down
6 changes: 2 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (c *consumer) startBatch() {
return
}

messages = append(messages, msg)
messages = append(messages, msg.message)
commitMessages = append(commitMessages, *msg.kafkaMessage)

if len(messages) == c.concurrency {
c.consume(&messages, &commitMessages)
Expand Down Expand Up @@ -108,14 +109,11 @@ func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message
<-c.messageProcessedStream
}

toKafkaMessages(messages, commitMessages)
if err := c.r.CommitMessages(*commitMessages); err != nil {
c.logger.Errorf("Commit Error %s,", err.Error())
}

// Clearing resources
putKafkaMessage(commitMessages)
putMessages(messages)
*commitMessages = (*commitMessages)[:0]
*messages = (*messages)[:0]
}
Expand Down
14 changes: 9 additions & 5 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type base struct {
metric *ConsumerMetric
quit chan struct{}
messageProcessedStream chan struct{}
incomingMessageStream chan *Message
incomingMessageStream chan *IncomingMessage
singleConsumingStream chan *Message
batchConsumingStream chan []*Message
retryTopic string
Expand Down Expand Up @@ -77,7 +77,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {

c := base{
metric: &ConsumerMetric{},
incomingMessageStream: make(chan *Message, messageChSize),
incomingMessageStream: make(chan *IncomingMessage, messageChSize),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
Expand Down Expand Up @@ -129,7 +129,7 @@ func (c *base) startConsume() {
close(c.incomingMessageStream)
return
default:
m := kafkaMessagePool.Get().(*kafka.Message)
m := &kafka.Message{}
err := c.r.FetchMessage(c.context, m)
if err != nil {
if c.context.Err() != nil {
Expand All @@ -139,9 +139,13 @@ func (c *base) startConsume() {
continue
}

incomingMessage := fromKafkaMessage(m)
incomingMessage := &IncomingMessage{
kafkaMessage: m,
message: fromKafkaMessage(m),
}

if c.distributedTracingEnabled {
incomingMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m))
incomingMessage.message.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m))
}

c.incomingMessageStream <- incomingMessage
Expand Down
6 changes: 3 additions & 3 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Test_base_startConsume(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
incomingMessageStream: make(chan *Message),
incomingMessageStream: make(chan *IncomingMessage),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
}
Expand All @@ -39,7 +39,7 @@ func Test_base_startConsume(t *testing.T) {
t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) {
// Given
mc := mockReader{}
b := base{wg: sync.WaitGroup{}, r: &mc, incomingMessageStream: make(chan *Message)}
b := base{wg: sync.WaitGroup{}, r: &mc, incomingMessageStream: make(chan *IncomingMessage)}
b.wg.Add(1)

// When
Expand All @@ -51,7 +51,7 @@ func Test_base_startConsume(t *testing.T) {
//nolint:lll
expected := kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}}

if diff := cmp.Diff(actual.Headers[0], expected.Headers[0]); diff != "" {
if diff := cmp.Diff(actual.message.Headers[0], expected.Headers[0]); diff != "" {
t.Error(diff)
}
})
Expand Down
43 changes: 7 additions & 36 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"sync"
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
Expand Down Expand Up @@ -30,6 +29,11 @@ type Message struct {
IsFailed bool
}

type IncomingMessage struct {
kafkaMessage *kafka.Message
message *Message
}

func (m *Message) toKafkaMessage() kafka.Message {
return kafka.Message{
Topic: m.Topic,
Expand All @@ -44,40 +48,8 @@ func (m *Message) toKafkaMessage() kafka.Message {
}
}

func toKafkaMessages(messages *[]*Message, commitMessages *[]kafka.Message) {
for _, message := range *messages {
*commitMessages = append(*commitMessages, message.toKafkaMessage())
}
}

func putMessages(messages *[]*Message) {
for _, message := range *messages {
messagePool.Put(message)
}
}

func putKafkaMessage(messages *[]kafka.Message) {
for _, message := range *messages {
//nolint:gosec
kafkaMessagePool.Put(&message)
}
}

var messagePool = sync.Pool{
New: func() any {
return &Message{}
},
}

var kafkaMessagePool = sync.Pool{
New: func() any {
return &kafka.Message{}
},
}

func fromKafkaMessage(kafkaMessage *kafka.Message) *Message {
message := messagePool.Get().(*Message)

message := &Message{}
message.Topic = kafkaMessage.Topic
message.Partition = kafkaMessage.Partition
message.Offset = kafkaMessage.Offset
Expand All @@ -88,7 +60,6 @@ func fromKafkaMessage(kafkaMessage *kafka.Message) *Message {
message.WriterData = kafkaMessage.WriterData
message.Time = kafkaMessage.Time
message.Context = context.TODO()

return message
}

Expand Down Expand Up @@ -120,7 +91,7 @@ func toMessage(message kcronsumer.Message) *Message {
})
}

msg := messagePool.Get().(*Message)
msg := &Message{}
msg.Topic = message.Topic
msg.Partition = message.Partition
msg.Offset = message.Offset
Expand Down

0 comments on commit b4c995d

Please sign in to comment.