From 6ece3563e573aa2d55302c483de56408fc60bdd7 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 2 Dec 2023 15:53:23 +0300 Subject: [PATCH 01/24] feature: add manual kafka message commit integration --- batch_consumer.go | 86 +++++++++++++------ batch_consumer_test.go | 10 +-- consumer.go | 60 +++++++++++-- consumer_base.go | 12 ++- consumer_base_test.go | 6 +- consumer_config.go | 16 ++-- examples/with-kafka-batch-consumer/main.go | 6 +- .../main.go | 6 +- message.go | 8 ++ otel_reader_wrapper.go | 79 ++++++++--------- reader_wrapper.go | 8 +- test/integration/integration_test.go | 10 +-- 12 files changed, 202 insertions(+), 105 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 5ea7255..b44d2c4 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "sync" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -11,21 +12,19 @@ type batchConsumer struct { consumeFn func([]*Message) error - messageGroupLimit int - messageGroupDuration time.Duration + messageGroupLimit int } func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { - consumerBase, err := newBase(cfg) + consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency) if err != nil { return nil, err } c := batchConsumer{ - base: consumerBase, - consumeFn: cfg.BatchConfiguration.BatchConsumeFn, - messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit, - messageGroupDuration: cfg.BatchConfiguration.MessageGroupDuration, + base: consumerBase, + consumeFn: cfg.BatchConfiguration.BatchConsumeFn, + messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit, } if cfg.RetryEnabled { @@ -50,10 +49,8 @@ func (b *batchConsumer) Consume() { b.wg.Add(1) go b.startConsume() - for i := 0; i < b.concurrency; i++ { - b.wg.Add(1) - go b.startBatch() - } + b.wg.Add(1) + go b.startBatch() } func (b *batchConsumer) startBatch() { @@ -62,7 +59,7 @@ func (b *batchConsumer) startBatch() { ticker := time.NewTicker(b.messageGroupDuration) defer ticker.Stop() - messages := make([]*Message, 0, b.messageGroupLimit) + messages := make([]*Message, 0, b.messageGroupLimit*b.concurrency) for { select { @@ -71,7 +68,7 @@ func (b *batchConsumer) startBatch() { continue } - b.process(messages) + b.consume(messages) messages = messages[:0] case msg, ok := <-b.messageCh: if !ok { @@ -80,35 +77,72 @@ func (b *batchConsumer) startBatch() { messages = append(messages, msg) - if len(messages) == b.messageGroupLimit { - b.process(messages) + if len(messages) == (b.messageGroupLimit * b.concurrency) { + b.consume(messages) messages = messages[:0] } } } } -func (b *batchConsumer) process(messages []*Message) { - consumeErr := b.consumeFn(messages) +func chunkMessages(allMessages []*Message, chunkSize int) [][]*Message { + var chunks [][]*Message + for i := 0; i < len(allMessages); i += chunkSize { + end := i + chunkSize + + // necessary check to avoid slicing beyond + // slice capacity + if end > len(allMessages) { + end = len(allMessages) + } + + chunks = append(chunks, allMessages[i:end]) + } + + return chunks +} + +func (b *batchConsumer) consume(allMessages []*Message) { + chunks := chunkMessages(allMessages, b.messageGroupLimit) + + var wg sync.WaitGroup + wg.Add(len(chunks)) + for _, chunk := range chunks { + go func(chunk []*Message) { + defer wg.Done() + b.process(chunk) + }(chunk) + } + wg.Wait() + + kafkaMessages := toKafkaMessages(allMessages) + err := b.r.CommitMessages(kafkaMessages) + if err != nil { + b.logger.Errorf("Commit Error %s,", err.Error()) + } +} + +func (b *batchConsumer) process(chunkMessages []*Message) { + consumeErr := b.consumeFn(chunkMessages) if consumeErr != nil { b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) // Try to process same messages again for resolving transient network errors etc. - if consumeErr = b.consumeFn(messages); consumeErr != nil { + if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil { b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) - b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) + b.metric.TotalUnprocessedMessagesCounter += int64(len(chunkMessages)) } if consumeErr != nil && b.retryEnabled { - cronsumerMessages := make([]kcronsumer.Message, 0, len(messages)) + cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages)) if b.transactionalRetry { - for i := range messages { - cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + for i := range chunkMessages { + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) } } else { - for i := range messages { - if messages[i].IsFailed { - cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + for i := range chunkMessages { + if chunkMessages[i].IsFailed { + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) } } } @@ -120,6 +154,6 @@ func (b *batchConsumer) process(messages []*Message) { } if consumeErr == nil { - b.metric.TotalProcessedMessagesCounter += int64(len(messages)) + b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages)) } } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 555f750..8c1349a 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -17,12 +17,12 @@ func Test_batchConsumer_startBatch(t *testing.T) { bc := batchConsumer{ base: &base{ - messageCh: make(chan *Message), - metric: &ConsumerMetric{}, - wg: sync.WaitGroup{}, + messageCh: make(chan *Message), + metric: &ConsumerMetric{}, + wg: sync.WaitGroup{}, + messageGroupDuration: 500 * time.Millisecond, }, - messageGroupLimit: 3, - messageGroupDuration: 500 * time.Millisecond, + messageGroupLimit: 3, consumeFn: func(messages []*Message) error { numberOfBatch++ return nil diff --git a/consumer.go b/consumer.go index 6c5c6b8..60e7f64 100644 --- a/consumer.go +++ b/consumer.go @@ -2,6 +2,8 @@ package kafka import ( kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "sync" + "time" ) type consumer struct { @@ -11,7 +13,7 @@ type consumer struct { } func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { - consumerBase, err := newBase(cfg) + consumerBase, err := newBase(cfg, cfg.Concurrency) if err != nil { return nil, err } @@ -39,15 +41,57 @@ func (c *consumer) Consume() { c.wg.Add(1) go c.startConsume() - for i := 0; i < c.concurrency; i++ { - c.wg.Add(1) - go func() { - defer c.wg.Done() + c.wg.Add(1) + go c.startBatch() +} + +func (c *consumer) startBatch() { + defer c.wg.Done() + + ticker := time.NewTicker(c.messageGroupDuration) + defer ticker.Stop() + + messages := make([]*Message, 0, c.concurrency) - for message := range c.messageCh { - c.process(message) + for { + select { + case <-ticker.C: + if len(messages) == 0 { + continue } - }() + + c.consume(messages) + messages = messages[:0] + case msg, ok := <-c.messageCh: + if !ok { + return + } + + messages = append(messages, msg) + + if len(messages) == c.concurrency { + c.consume(messages) + messages = messages[:0] + } + } + } +} + +func (c *consumer) consume(messages []*Message) { + var wg sync.WaitGroup + wg.Add(len(messages)) + for _, message := range messages { + go func(message *Message) { + defer wg.Done() + c.process(message) + }(message) + } + wg.Wait() + + kafkaMessages := toKafkaMessages(messages) + err := c.r.CommitMessages(kafkaMessages) + if err != nil { + c.logger.Errorf("Commit Error %s,", err.Error()) } } diff --git a/consumer_base.go b/consumer_base.go index 51b6a8c..d62e6ff 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -3,6 +3,7 @@ package kafka import ( "context" "sync" + "time" "github.com/Trendyol/otel-kafka-konsumer" "go.opentelemetry.io/otel/propagation" @@ -26,8 +27,9 @@ type Consumer interface { } type Reader interface { - ReadMessage(ctx context.Context) (*kafka.Message, error) + FetchMessage(ctx context.Context) (*kafka.Message, error) Close() error + CommitMessages(messages []kafka.Message) error } type base struct { @@ -42,6 +44,7 @@ type base struct { r Reader retryTopic string subprocesses subprocesses + messageGroupDuration time.Duration wg sync.WaitGroup concurrency int once sync.Once @@ -59,7 +62,7 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { return newSingleConsumer(cfg) } -func newBase(cfg *ConsumerConfig) (*base, error) { +func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { log := NewZapLogger(cfg.LogLevel) reader, err := cfg.newKafkaReader() @@ -70,7 +73,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) { c := base{ metric: &ConsumerMetric{}, - messageCh: make(chan *Message, cfg.Concurrency), + messageCh: make(chan *Message, messageChSize), quit: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, @@ -79,6 +82,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) { logger: log, subprocesses: newSubProcesses(), r: reader, + messageGroupDuration: cfg.MessageGroupDuration, } if cfg.DistributedTracingEnabled { @@ -117,7 +121,7 @@ func (c *base) startConsume() { case <-c.quit: return default: - message, err := c.r.ReadMessage(c.context) + message, err := c.r.FetchMessage(c.context) if err != nil { if c.context.Err() != nil { continue diff --git a/consumer_base_test.go b/consumer_base_test.go index c63ae2a..5e22956 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -83,7 +83,7 @@ type mockReader struct { wantErr bool } -func (m *mockReader) ReadMessage(ctx context.Context) (*kafka.Message, error) { +func (m *mockReader) FetchMessage(ctx context.Context) (*kafka.Message, error) { if m.wantErr { return nil, errors.New("err") } @@ -94,3 +94,7 @@ func (m *mockReader) ReadMessage(ctx context.Context) (*kafka.Message, error) { func (m *mockReader) Close() error { panic("implement me") } + +func (m *mockReader) CommitMessages(messages []kafka.Message) error { + panic("implement me") +} diff --git a/consumer_config.go b/consumer_config.go index d73976c..692352e 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -39,6 +39,7 @@ type ConsumerConfig struct { Reader ReaderConfig RetryConfiguration RetryConfiguration CommitInterval time.Duration + MessageGroupDuration time.Duration DistributedTracingEnabled bool DistributedTracingConfiguration DistributedTracingConfiguration Concurrency int @@ -122,9 +123,8 @@ type RetryConfiguration struct { } type BatchConfiguration struct { - BatchConsumeFn BatchConsumeFn - MessageGroupLimit int - MessageGroupDuration time.Duration + BatchConsumeFn BatchConsumeFn + MessageGroupLimit int } func (cfg *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) { @@ -166,9 +166,9 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { reader := kafka.NewReader(readerCfg) - if cfg.DistributedTracingEnabled { - return NewOtelReaderWrapper(cfg, reader) - } + //if cfg.DistributedTracingEnabled { + // return NewOtelReaderWrapper(cfg, reader) + //} return NewReaderWrapper(reader), nil } @@ -186,6 +186,10 @@ func (cfg *ConsumerConfig) setDefaults() { cfg.Reader.CommitInterval = cfg.CommitInterval } + if cfg.MessageGroupDuration == 0 { + cfg.MessageGroupDuration = time.Second + } + if cfg.DistributedTracingEnabled { if cfg.DistributedTracingConfiguration.Propagator == nil { cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator() diff --git a/examples/with-kafka-batch-consumer/main.go b/examples/with-kafka-batch-consumer/main.go index ed24f17..f52d8a9 100644 --- a/examples/with-kafka-batch-consumer/main.go +++ b/examples/with-kafka-batch-consumer/main.go @@ -16,9 +16,8 @@ func main() { GroupID: "standart-cg", }, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, - BatchConsumeFn: batchConsumeFn, + MessageGroupLimit: 1000, + BatchConsumeFn: batchConsumeFn, }, RetryEnabled: true, RetryConfiguration: kafka.RetryConfiguration{ @@ -28,6 +27,7 @@ func main() { WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index f1783c7..60464c2 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -17,9 +17,8 @@ func main() { GroupID: "standart-cg", }, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, - BatchConsumeFn: batchConsumeFn, + MessageGroupLimit: 1000, + BatchConsumeFn: batchConsumeFn, }, RetryEnabled: true, TransactionalRetry: kafka.NewBoolPtr(false), @@ -30,6 +29,7 @@ func main() { WorkDuration: 4 * time.Minute, MaxRetry: 3, }, + MessageGroupDuration: time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) diff --git a/message.go b/message.go index 363e536..4936b78 100644 --- a/message.go +++ b/message.go @@ -44,6 +44,14 @@ func (m *Message) toKafkaMessage() kafka.Message { } } +func toKafkaMessages(messages []*Message) []kafka.Message { + kafkaMessages := make([]kafka.Message, len(messages)) + for _, message := range messages { + kafkaMessages = append(kafkaMessages, message.toKafkaMessage()) + } + return kafkaMessages +} + func fromKafkaMessage(message *kafka.Message) *Message { return &Message{ Topic: message.Topic, diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 0fa07a5..74e5d55 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,44 +1,39 @@ package kafka -import ( - "context" - - "github.com/Trendyol/otel-kafka-konsumer" - segmentio "github.com/segmentio/kafka-go" - "go.opentelemetry.io/otel/attribute" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" -) - -type otelReaderWrapper struct { - r *otelkafkakonsumer.Reader -} - -func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { - cfg.setDefaults() - - newReader, err := otelkafkakonsumer.NewReader( - reader, - otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), - otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), - otelkafkakonsumer.WithAttributes( - []attribute.KeyValue{ - semconv.MessagingDestinationKindTopic, - semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), - }, - )) - if err != nil { - return nil, err - } - - return &otelReaderWrapper{ - r: newReader, - }, nil -} - -func (o *otelReaderWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) { - return o.r.ReadMessage(ctx) -} - -func (o *otelReaderWrapper) Close() error { - return o.r.Close() -} +//type otelReaderWrapper struct { +// r *otelkafkakonsumer.Reader +//} +// +//func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { +// cfg.setDefaults() +// +// newReader, err := otelkafkakonsumer.NewReader( +// reader, +// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), +// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), +// otelkafkakonsumer.WithAttributes( +// []attribute.KeyValue{ +// semconv.MessagingDestinationKindTopic, +// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), +// }, +// )) +// if err != nil { +// return nil, err +// } +// +// return &otelReaderWrapper{ +// r: newReader, +// }, nil +//} +// +//func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { +// return o.r.FetchMessage(ctx) +//} +// +//func (o *otelReaderWrapper) Close() error { +// return o.r.Close() +//} +// +//func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { +// return o.r.CommitMessages(context.Background(), messages...) +//} diff --git a/reader_wrapper.go b/reader_wrapper.go index a283538..8311f10 100644 --- a/reader_wrapper.go +++ b/reader_wrapper.go @@ -15,11 +15,15 @@ func NewReaderWrapper(reader *segmentio.Reader) Reader { } // ReadMessage returns pointer of kafka message because we will support distributed tracing in the near future -func (s *readerWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) { - message, err := s.r.ReadMessage(ctx) +func (s *readerWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { + message, err := s.r.FetchMessage(ctx) return &message, err } func (s *readerWrapper) Close() error { return s.r.Close() } + +func (s *readerWrapper) CommitMessages(messages []segmentio.Message) error { + return s.r.CommitMessages(context.Background(), messages...) +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index b674867..131ec01 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -125,10 +125,10 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { messagesLen := make(chan int) consumerCfg := &kafka.ConsumerConfig{ - Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + MessageGroupDuration: time.Second, + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, - MessageGroupDuration: time.Second, + MessageGroupLimit: 100, BatchConsumeFn: func(messages []*kafka.Message) error { messagesLen <- len(messages) return nil @@ -186,9 +186,9 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di MaxRetry: 3, LogLevel: "error", }, + MessageGroupDuration: time.Second, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, - MessageGroupDuration: time.Second, + MessageGroupLimit: 100, BatchConsumeFn: func(messages []*kafka.Message) error { messages[1].IsFailed = true return errors.New("err") From d5498b71e11d325ada07f71551153bb4d30d09c0 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 2 Dec 2023 15:58:01 +0300 Subject: [PATCH 02/24] feature: fix golang ci errors --- consumer.go | 3 ++- consumer_config.go | 2 +- otel_reader_wrapper.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index 60e7f64..178719d 100644 --- a/consumer.go +++ b/consumer.go @@ -1,9 +1,10 @@ package kafka import ( - kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "sync" "time" + + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) type consumer struct { diff --git a/consumer_config.go b/consumer_config.go index 692352e..4b644b5 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -166,7 +166,7 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { reader := kafka.NewReader(readerCfg) - //if cfg.DistributedTracingEnabled { + // if cfg.DistributedTracingEnabled { // return NewOtelReaderWrapper(cfg, reader) //} diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 74e5d55..5a8124e 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,6 +1,6 @@ package kafka -//type otelReaderWrapper struct { +// type otelReaderWrapper struct { // r *otelkafkakonsumer.Reader //} // From 8c9b7bb2861c69cafc1dd9291a05eb9e5fc2328a Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 2 Dec 2023 15:59:34 +0300 Subject: [PATCH 03/24] feature: fix golang ci errors --- otel_reader_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 5a8124e..73b53cb 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -4,7 +4,7 @@ package kafka // r *otelkafkakonsumer.Reader //} // -//func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { +// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { // cfg.setDefaults() // // newReader, err := otelkafkakonsumer.NewReader( From 986f52760ac21a1bc2faad504bef0fcaa1ff609f Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 2 Dec 2023 16:01:36 +0300 Subject: [PATCH 04/24] feature: fix golang ci errors --- otel_reader_wrapper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 73b53cb..52a14be 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -26,14 +26,14 @@ package kafka // }, nil //} // -//func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { +// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { // return o.r.FetchMessage(ctx) //} // -//func (o *otelReaderWrapper) Close() error { +// func (o *otelReaderWrapper) Close() error { // return o.r.Close() //} // -//func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { +// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { // return o.r.CommitMessages(context.Background(), messages...) //} From 3301839d08537f56fdebbc43329ad882d290c0eb Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 2 Dec 2023 16:07:44 +0300 Subject: [PATCH 05/24] feature: fix tests --- batch_consumer_test.go | 2 ++ consumer_base_test.go | 10 ++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 8c1349a..5c5d950 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -15,12 +15,14 @@ func Test_batchConsumer_startBatch(t *testing.T) { // Given var numberOfBatch int + mc := mockReader{} bc := batchConsumer{ base: &base{ messageCh: make(chan *Message), metric: &ConsumerMetric{}, wg: sync.WaitGroup{}, messageGroupDuration: 500 * time.Millisecond, + r: &mc, }, messageGroupLimit: 3, consumeFn: func(messages []*Message) error { diff --git a/consumer_base_test.go b/consumer_base_test.go index 5e22956..63ef367 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -92,9 +92,15 @@ func (m *mockReader) FetchMessage(ctx context.Context) (*kafka.Message, error) { } func (m *mockReader) Close() error { - panic("implement me") + if m.wantErr { + return errors.New("err") + } + return nil } func (m *mockReader) CommitMessages(messages []kafka.Message) error { - panic("implement me") + if m.wantErr { + return errors.New("err") + } + return nil } From 1530acb6381a31cf3bc38cb3e4f8c6665550caf0 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Wed, 6 Dec 2023 14:16:49 +0300 Subject: [PATCH 06/24] add unit tests for chunk messages --- batch_consumer_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 5c5d950..0679d8a 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -2,6 +2,8 @@ package kafka import ( "errors" + "reflect" + "strconv" "sync" "testing" "time" @@ -207,6 +209,72 @@ func Test_batchConsumer_process(t *testing.T) { }) } +func Test_batchConsumer_chunk(t *testing.T) { + tests := []struct { + allMessages []*Message + chunkSize int + expected [][]*Message + }{ + { + allMessages: createMessages(0, 9), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + createMessages(3, 6), + createMessages(6, 9), + }, + }, + { + allMessages: []*Message{}, + chunkSize: 3, + expected: [][]*Message{}, + }, + { + allMessages: createMessages(0, 1), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 1), + }, + }, + { + allMessages: createMessages(0, 8), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + createMessages(3, 6), + createMessages(6, 8), + }, + }, + { + allMessages: createMessages(0, 3), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + }, + }, + } + + for i, tc := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + chunkedMessages := chunkMessages(tc.allMessages, tc.chunkSize) + + if !reflect.DeepEqual(chunkedMessages, tc.expected) && !(len(chunkedMessages) == 0 && len(tc.expected) == 0) { + t.Errorf("For chunkSize %d, expected %v, but got %v", tc.chunkSize, tc.expected, chunkedMessages) + } + }) + } +} + +func createMessages(partitionStart int, partitionEnd int) []*Message { + messages := make([]*Message, 0) + for i := partitionStart; i < partitionEnd; i++ { + messages = append(messages, &Message{ + Partition: i, + }) + } + return messages +} + type mockCronsumer struct { wantErr bool } From 5e34ff91cba1f238e512f1245ce5b33477a55cf1 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 7 Dec 2023 09:29:17 +0300 Subject: [PATCH 07/24] feat: add manual commit support to otel readers --- examples/with-sasl-plaintext/go.mod | 2 +- examples/with-sasl-plaintext/go.sum | 1 + go.mod | 2 +- go.sum | 6 +-- otel_reader_wrapper.go | 82 ++++++++++++++++------------- test/integration/go.mod | 2 +- test/integration/go.sum | 1 + 7 files changed, 52 insertions(+), 44 deletions(-) diff --git a/examples/with-sasl-plaintext/go.mod b/examples/with-sasl-plaintext/go.mod index e419066..e8b69fe 100644 --- a/examples/with-sasl-plaintext/go.mod +++ b/examples/with-sasl-plaintext/go.mod @@ -8,7 +8,7 @@ require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/examples/with-sasl-plaintext/go.sum b/examples/with-sasl-plaintext/go.sum index 58296f6..1f83a88 100644 --- a/examples/with-sasl-plaintext/go.sum +++ b/examples/with-sasl-plaintext/go.sum @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6 github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/go.mod b/go.mod index 9e5fc93..d9225b5 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 - github.com/Trendyol/otel-kafka-konsumer v0.0.5 + github.com/Trendyol/otel-kafka-konsumer v0.0.6 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 github.com/prometheus/client_golang v1.16.0 diff --git a/go.sum b/go.sum index 2c16b5e..78c6c87 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ -github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE= -github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= -github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 52a14be..c374f34 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,39 +1,47 @@ package kafka -// type otelReaderWrapper struct { -// r *otelkafkakonsumer.Reader -//} -// -// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { -// cfg.setDefaults() -// -// newReader, err := otelkafkakonsumer.NewReader( -// reader, -// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), -// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), -// otelkafkakonsumer.WithAttributes( -// []attribute.KeyValue{ -// semconv.MessagingDestinationKindTopic, -// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), -// }, -// )) -// if err != nil { -// return nil, err -// } -// -// return &otelReaderWrapper{ -// r: newReader, -// }, nil -//} -// -// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { -// return o.r.FetchMessage(ctx) -//} -// -// func (o *otelReaderWrapper) Close() error { -// return o.r.Close() -//} -// -// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { -// return o.r.CommitMessages(context.Background(), messages...) -//} +import ( + "context" + "github.com/Trendyol/otel-kafka-konsumer" + segmentio "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +) + +type otelReaderWrapper struct { + r *otelkafkakonsumer.Reader +} + +func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { + cfg.setDefaults() + + newReader, err := otelkafkakonsumer.NewReader( + reader, + otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), + otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), + otelkafkakonsumer.WithAttributes( + []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), + }, + )) + if err != nil { + return nil, err + } + + return &otelReaderWrapper{ + r: newReader, + }, nil +} + +func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { + return o.r.FetchMessage(ctx) +} + +func (o *otelReaderWrapper) Close() error { + return o.r.Close() +} + +func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { + return o.r.CommitMessages(context.Background(), messages...) +} diff --git a/test/integration/go.mod b/test/integration/go.mod index aec012d..5f13bd3 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -11,7 +11,7 @@ require ( require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 58296f6..1f83a88 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6 github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= From fbb14a05c09b181bc0575051ea9da8fd9213d6fc Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 7 Dec 2023 09:31:22 +0300 Subject: [PATCH 08/24] fix golang ci --- otel_reader_wrapper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index c374f34..06f6f16 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "github.com/Trendyol/otel-kafka-konsumer" segmentio "github.com/segmentio/kafka-go" "go.opentelemetry.io/otel/attribute" From 7ed8258311457225481a45041a5aa45e46f8d358 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 7 Dec 2023 09:50:41 +0300 Subject: [PATCH 09/24] Update README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index bfb3fa6..6db9eec 100644 --- a/README.md +++ b/README.md @@ -100,9 +100,9 @@ After running `docker-compose up` command, you can run any application you want. WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, BatchConfiguration: kafka.BatchConfiguration{ MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, BatchConsumeFn: batchConsumeFn, }, } @@ -140,9 +140,9 @@ After running `docker-compose up` command, you can run any application you want. WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, BatchConfiguration: kafka.BatchConfiguration{ MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, BatchConsumeFn: batchConsumeFn, }, } @@ -198,6 +198,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | | `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | | `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | | +| `messageGroupDuration` | Maximum time to wait for a batch | | | `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout | | `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled | | `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s | @@ -219,7 +220,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | -| `batchConfiguration.messageGroupDuration` | Maximum time to wait for a batch | | | `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | | `sasl.authType` | `SCRAM` or `PLAIN` | | From 0a277e8113c1130a831aae54d21c037fdf11e0c7 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Fri, 8 Dec 2023 22:31:56 +0300 Subject: [PATCH 10/24] Reduce memory usage --- batch_consumer.go | 37 +++++++++++------ consumer.go | 36 ++++++++++------ consumer_base.go | 9 ++++ .../with-kafka-batch-consumer/main_test.go | 41 +++++++++++++++++++ message.go | 6 +-- 5 files changed, 99 insertions(+), 30 deletions(-) create mode 100644 examples/with-kafka-batch-consumer/main_test.go diff --git a/batch_consumer.go b/batch_consumer.go index b44d2c4..07f8c87 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,7 +1,7 @@ package kafka import ( - "sync" + "github.com/segmentio/kafka-go" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -49,6 +49,17 @@ func (b *batchConsumer) Consume() { b.wg.Add(1) go b.startConsume() + for i := 0; i < b.concurrency; i++ { + b.wg.Add(1) + go func() { + defer b.wg.Done() + for messages := range b.batchMessageCommitCh { + b.process(messages) + b.waitMessageProcess <- struct{}{} + } + }() + } + b.wg.Add(1) go b.startBatch() } @@ -60,6 +71,7 @@ func (b *batchConsumer) startBatch() { defer ticker.Stop() messages := make([]*Message, 0, b.messageGroupLimit*b.concurrency) + commitMessages := make([]kafka.Message, 0, b.messageGroupLimit*b.concurrency) for { select { @@ -68,7 +80,7 @@ func (b *batchConsumer) startBatch() { continue } - b.consume(messages) + b.consume(messages, &commitMessages) messages = messages[:0] case msg, ok := <-b.messageCh: if !ok { @@ -78,7 +90,7 @@ func (b *batchConsumer) startBatch() { messages = append(messages, msg) if len(messages) == (b.messageGroupLimit * b.concurrency) { - b.consume(messages) + b.consume(messages, &commitMessages) messages = messages[:0] } } @@ -102,21 +114,20 @@ func chunkMessages(allMessages []*Message, chunkSize int) [][]*Message { return chunks } -func (b *batchConsumer) consume(allMessages []*Message) { +func (b *batchConsumer) consume(allMessages []*Message, commitMessages *[]kafka.Message) { chunks := chunkMessages(allMessages, b.messageGroupLimit) - var wg sync.WaitGroup - wg.Add(len(chunks)) for _, chunk := range chunks { - go func(chunk []*Message) { - defer wg.Done() - b.process(chunk) - }(chunk) + b.batchMessageCommitCh <- chunk + } + + for i := 0; i < len(chunks); i++ { + <-b.waitMessageProcess } - wg.Wait() - kafkaMessages := toKafkaMessages(allMessages) - err := b.r.CommitMessages(kafkaMessages) + toKafkaMessages(allMessages, commitMessages) + err := b.r.CommitMessages(*commitMessages) + *commitMessages = (*commitMessages)[:0] if err != nil { b.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer.go b/consumer.go index 178719d..a7963bf 100644 --- a/consumer.go +++ b/consumer.go @@ -1,7 +1,7 @@ package kafka import ( - "sync" + "github.com/segmentio/kafka-go" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -42,6 +42,16 @@ func (c *consumer) Consume() { c.wg.Add(1) go c.startConsume() + for i := 0; i < c.concurrency; i++ { + c.wg.Add(1) + go func() { + defer c.wg.Done() + for message := range c.singleMessageCommitCh { + c.process(message) + c.waitMessageProcess <- struct{}{} + } + }() + } c.wg.Add(1) go c.startBatch() } @@ -53,6 +63,7 @@ func (c *consumer) startBatch() { defer ticker.Stop() messages := make([]*Message, 0, c.concurrency) + commitMessages := make([]kafka.Message, 0, c.concurrency) for { select { @@ -61,7 +72,7 @@ func (c *consumer) startBatch() { continue } - c.consume(messages) + c.consume(messages, &commitMessages) messages = messages[:0] case msg, ok := <-c.messageCh: if !ok { @@ -71,26 +82,25 @@ func (c *consumer) startBatch() { messages = append(messages, msg) if len(messages) == c.concurrency { - c.consume(messages) + c.consume(messages, &commitMessages) messages = messages[:0] } } } } -func (c *consumer) consume(messages []*Message) { - var wg sync.WaitGroup - wg.Add(len(messages)) +func (c *consumer) consume(messages []*Message, commitMessages *[]kafka.Message) { for _, message := range messages { - go func(message *Message) { - defer wg.Done() - c.process(message) - }(message) + c.singleMessageCommitCh <- message + } + + for i := 0; i < len(messages); i++ { + <-c.waitMessageProcess } - wg.Wait() - kafkaMessages := toKafkaMessages(messages) - err := c.r.CommitMessages(kafkaMessages) + toKafkaMessages(messages, commitMessages) + err := c.r.CommitMessages(*commitMessages) + *commitMessages = (*commitMessages)[:0] if err != nil { c.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer_base.go b/consumer_base.go index d62e6ff..2fb9ed7 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -39,7 +39,10 @@ type base struct { metric *ConsumerMetric context context.Context messageCh chan *Message + singleMessageCommitCh chan *Message + batchMessageCommitCh chan []*Message quit chan struct{} + waitMessageProcess chan struct{} cancelFn context.CancelFunc r Reader retryTopic string @@ -83,6 +86,9 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { subprocesses: newSubProcesses(), r: reader, messageGroupDuration: cfg.MessageGroupDuration, + waitMessageProcess: make(chan struct{}, cfg.Concurrency), + singleMessageCommitCh: make(chan *Message, cfg.Concurrency), + batchMessageCommitCh: make(chan []*Message, cfg.Concurrency), } if cfg.DistributedTracingEnabled { @@ -152,6 +158,9 @@ func (c *base) Stop() error { c.cancelFn() c.quit <- struct{}{} close(c.messageCh) + close(c.singleMessageCommitCh) + close(c.batchMessageCommitCh) + close(c.waitMessageProcess) c.wg.Wait() err = c.r.Close() }) diff --git a/examples/with-kafka-batch-consumer/main_test.go b/examples/with-kafka-batch-consumer/main_test.go new file mode 100644 index 0000000..1e16343 --- /dev/null +++ b/examples/with-kafka-batch-consumer/main_test.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer" + "testing" + "time" +) + +func BenchmarkKonsumer(b *testing.B) { + for i := 0; i < b.N; i++ { + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + BatchConfiguration: &kafka.BatchConfiguration{ + MessageGroupLimit: 1000, + BatchConsumeFn: batchConsumeFn, + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + MessageGroupDuration: time.Second, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + + consumer.Consume() + + fmt.Println("Consumer started...!") + + time.Sleep(10 * time.Second) + } +} diff --git a/message.go b/message.go index 4936b78..ae903df 100644 --- a/message.go +++ b/message.go @@ -44,12 +44,10 @@ func (m *Message) toKafkaMessage() kafka.Message { } } -func toKafkaMessages(messages []*Message) []kafka.Message { - kafkaMessages := make([]kafka.Message, len(messages)) +func toKafkaMessages(messages []*Message, commitMessages *[]kafka.Message) { for _, message := range messages { - kafkaMessages = append(kafkaMessages, message.toKafkaMessage()) + *commitMessages = append(*commitMessages, message.toKafkaMessage()) } - return kafkaMessages } func fromKafkaMessage(message *kafka.Message) *Message { From ab741492a533308d1aee7c047b4a5c681b4dbb7c Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Fri, 8 Dec 2023 22:33:14 +0300 Subject: [PATCH 11/24] Remove --- .../with-kafka-batch-consumer/main_test.go | 41 ------------------- 1 file changed, 41 deletions(-) delete mode 100644 examples/with-kafka-batch-consumer/main_test.go diff --git a/examples/with-kafka-batch-consumer/main_test.go b/examples/with-kafka-batch-consumer/main_test.go deleted file mode 100644 index 1e16343..0000000 --- a/examples/with-kafka-batch-consumer/main_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package main - -import ( - "fmt" - "github.com/Trendyol/kafka-konsumer" - "testing" - "time" -) - -func BenchmarkKonsumer(b *testing.B) { - for i := 0; i < b.N; i++ { - consumerCfg := &kafka.ConsumerConfig{ - Reader: kafka.ReaderConfig{ - Brokers: []string{"localhost:29092"}, - Topic: "standart-topic", - GroupID: "standart-cg", - }, - BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 1000, - BatchConsumeFn: batchConsumeFn, - }, - RetryEnabled: true, - RetryConfiguration: kafka.RetryConfiguration{ - Brokers: []string{"localhost:29092"}, - Topic: "retry-topic", - StartTimeCron: "*/1 * * * *", - WorkDuration: 50 * time.Second, - MaxRetry: 3, - }, - MessageGroupDuration: time.Second, - } - - consumer, _ := kafka.NewConsumer(consumerCfg) - - consumer.Consume() - - fmt.Println("Consumer started...!") - - time.Sleep(10 * time.Second) - } -} From 9110569391eaf5bb1975ed4757f52c37c375aca4 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Fri, 8 Dec 2023 22:34:24 +0300 Subject: [PATCH 12/24] Fix golangci lint --- batch_consumer.go | 3 ++- consumer.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 07f8c87..4f92ad0 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,9 +1,10 @@ package kafka import ( - "github.com/segmentio/kafka-go" "time" + "github.com/segmentio/kafka-go" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) diff --git a/consumer.go b/consumer.go index a7963bf..0443741 100644 --- a/consumer.go +++ b/consumer.go @@ -1,9 +1,10 @@ package kafka import ( - "github.com/segmentio/kafka-go" "time" + "github.com/segmentio/kafka-go" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) From 13566639162e4b9515c894bbb285fc7cf0fa677a Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Fri, 8 Dec 2023 23:00:26 +0300 Subject: [PATCH 13/24] Fix tests --- batch_consumer.go | 14 +++++++------- batch_consumer_test.go | 14 +++++++++----- consumer.go | 13 +++++++------ 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 4f92ad0..6b6e781 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -50,6 +50,13 @@ func (b *batchConsumer) Consume() { b.wg.Add(1) go b.startConsume() + b.wg.Add(1) + go b.startBatch() +} + +func (b *batchConsumer) startBatch() { + defer b.wg.Done() + for i := 0; i < b.concurrency; i++ { b.wg.Add(1) go func() { @@ -61,13 +68,6 @@ func (b *batchConsumer) Consume() { }() } - b.wg.Add(1) - go b.startBatch() -} - -func (b *batchConsumer) startBatch() { - defer b.wg.Done() - ticker := time.NewTicker(b.messageGroupDuration) defer ticker.Stop() diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 0679d8a..f308779 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -20,11 +20,15 @@ func Test_batchConsumer_startBatch(t *testing.T) { mc := mockReader{} bc := batchConsumer{ base: &base{ - messageCh: make(chan *Message), - metric: &ConsumerMetric{}, - wg: sync.WaitGroup{}, - messageGroupDuration: 500 * time.Millisecond, - r: &mc, + messageCh: make(chan *Message, 1), + batchMessageCommitCh: make(chan []*Message, 1), + singleMessageCommitCh: make(chan *Message, 1), + waitMessageProcess: make(chan struct{}, 1), + metric: &ConsumerMetric{}, + wg: sync.WaitGroup{}, + messageGroupDuration: 500 * time.Millisecond, + r: &mc, + concurrency: 1, }, messageGroupLimit: 3, consumeFn: func(messages []*Message) error { diff --git a/consumer.go b/consumer.go index 0443741..763e713 100644 --- a/consumer.go +++ b/consumer.go @@ -43,6 +43,13 @@ func (c *consumer) Consume() { c.wg.Add(1) go c.startConsume() + c.wg.Add(1) + go c.startBatch() +} + +func (c *consumer) startBatch() { + defer c.wg.Done() + for i := 0; i < c.concurrency; i++ { c.wg.Add(1) go func() { @@ -53,12 +60,6 @@ func (c *consumer) Consume() { } }() } - c.wg.Add(1) - go c.startBatch() -} - -func (c *consumer) startBatch() { - defer c.wg.Done() ticker := time.NewTicker(c.messageGroupDuration) defer ticker.Stop() From d3031c5fd99824f9c4fc526bbd16bf3ec1067069 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 08:39:39 +0300 Subject: [PATCH 14/24] Prevent copying message slice --- batch_consumer.go | 18 ++++++++++-------- consumer.go | 11 ++++++----- message.go | 4 ++-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 6b6e781..882fdd4 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -81,7 +81,7 @@ func (b *batchConsumer) startBatch() { continue } - b.consume(messages, &commitMessages) + b.consume(&messages, &commitMessages) messages = messages[:0] case msg, ok := <-b.messageCh: if !ok { @@ -91,31 +91,33 @@ func (b *batchConsumer) startBatch() { messages = append(messages, msg) if len(messages) == (b.messageGroupLimit * b.concurrency) { - b.consume(messages, &commitMessages) + b.consume(&messages, &commitMessages) messages = messages[:0] } } } } -func chunkMessages(allMessages []*Message, chunkSize int) [][]*Message { +func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message { var chunks [][]*Message - for i := 0; i < len(allMessages); i += chunkSize { + + allMessageList := *allMessages + for i := 0; i < len(allMessageList); i += chunkSize { end := i + chunkSize // necessary check to avoid slicing beyond // slice capacity - if end > len(allMessages) { - end = len(allMessages) + if end > len(allMessageList) { + end = len(allMessageList) } - chunks = append(chunks, allMessages[i:end]) + chunks = append(chunks, allMessageList[i:end]) } return chunks } -func (b *batchConsumer) consume(allMessages []*Message, commitMessages *[]kafka.Message) { +func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) { chunks := chunkMessages(allMessages, b.messageGroupLimit) for _, chunk := range chunks { diff --git a/consumer.go b/consumer.go index 763e713..3146b3b 100644 --- a/consumer.go +++ b/consumer.go @@ -74,7 +74,7 @@ func (c *consumer) startBatch() { continue } - c.consume(messages, &commitMessages) + c.consume(&messages, &commitMessages) messages = messages[:0] case msg, ok := <-c.messageCh: if !ok { @@ -84,19 +84,20 @@ func (c *consumer) startBatch() { messages = append(messages, msg) if len(messages) == c.concurrency { - c.consume(messages, &commitMessages) + c.consume(&messages, &commitMessages) messages = messages[:0] } } } } -func (c *consumer) consume(messages []*Message, commitMessages *[]kafka.Message) { - for _, message := range messages { +func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message) { + messageList := *messages + for _, message := range messageList { c.singleMessageCommitCh <- message } - for i := 0; i < len(messages); i++ { + for i := 0; i < len(messageList); i++ { <-c.waitMessageProcess } diff --git a/message.go b/message.go index ae903df..6cbf807 100644 --- a/message.go +++ b/message.go @@ -44,8 +44,8 @@ func (m *Message) toKafkaMessage() kafka.Message { } } -func toKafkaMessages(messages []*Message, commitMessages *[]kafka.Message) { - for _, message := range messages { +func toKafkaMessages(messages *[]*Message, commitMessages *[]kafka.Message) { + for _, message := range *messages { *commitMessages = append(*commitMessages, message.toKafkaMessage()) } } From d994372761423004d03fd0b02f664514ff32f034 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 08:40:47 +0300 Subject: [PATCH 15/24] fix tests --- batch_consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index f308779..3492576 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -260,7 +260,7 @@ func Test_batchConsumer_chunk(t *testing.T) { for i, tc := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - chunkedMessages := chunkMessages(tc.allMessages, tc.chunkSize) + chunkedMessages := chunkMessages(&tc.allMessages, tc.chunkSize) if !reflect.DeepEqual(chunkedMessages, tc.expected) && !(len(chunkedMessages) == 0 && len(tc.expected) == 0) { t.Errorf("For chunkSize %d, expected %v, but got %v", tc.chunkSize, tc.expected, chunkedMessages) From d881a8b48ae2c40c1295cc77ef58a9e9fbf3397e Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 08:47:16 +0300 Subject: [PATCH 16/24] go mod tidy for integration --- test/integration/go.mod | 2 +- test/integration/go.sum | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test/integration/go.mod b/test/integration/go.mod index 5f13bd3..2150330 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -5,7 +5,7 @@ go 1.19 replace github.com/Trendyol/kafka-konsumer => ../.. require ( - github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 + github.com/Trendyol/kafka-konsumer v1.8.8 github.com/segmentio/kafka-go v0.4.43 ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 1f83a88..5d4ea20 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,7 +1,6 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= -github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM= github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= From 9c403e7ccf9b4f85723138236c2eb64a28127edf Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 09:28:31 +0300 Subject: [PATCH 17/24] fieldalignment --- batch_consumer_test.go | 2 +- consumer_base.go | 16 ++++++++-------- consumer_config.go | 16 ++++++++-------- go.mod | 1 + go.sum | 2 ++ message.go | 17 ++++++++--------- producer_config.go | 4 ++-- 7 files changed, 30 insertions(+), 28 deletions(-) diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 3492576..9b82dc1 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -216,8 +216,8 @@ func Test_batchConsumer_process(t *testing.T) { func Test_batchConsumer_chunk(t *testing.T) { tests := []struct { allMessages []*Message - chunkSize int expected [][]*Message + chunkSize int }{ { allMessages: createMessages(0, 9), diff --git a/consumer_base.go b/consumer_base.go index 2fb9ed7..57293f6 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -36,25 +36,25 @@ type base struct { cronsumer kcronsumer.Cronsumer api API logger LoggerInterface - metric *ConsumerMetric + propagator propagation.TextMapPropagator context context.Context - messageCh chan *Message - singleMessageCommitCh chan *Message - batchMessageCommitCh chan []*Message + r Reader + cancelFn context.CancelFunc + metric *ConsumerMetric quit chan struct{} waitMessageProcess chan struct{} - cancelFn context.CancelFunc - r Reader + singleMessageCommitCh chan *Message + messageCh chan *Message + batchMessageCommitCh chan []*Message retryTopic string subprocesses subprocesses - messageGroupDuration time.Duration wg sync.WaitGroup concurrency int + messageGroupDuration time.Duration once sync.Once retryEnabled bool transactionalRetry bool distributedTracingEnabled bool - propagator propagation.TextMapPropagator } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { diff --git a/consumer_config.go b/consumer_config.go index 4b644b5..ac0eec2 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -25,27 +25,27 @@ type DialConfig struct { } type ConsumerConfig struct { - APIConfiguration APIConfiguration + DistributedTracingConfiguration DistributedTracingConfiguration Logger LoggerInterface + APIConfiguration APIConfiguration MetricConfiguration MetricConfiguration SASL *SASLConfig TLS *TLSConfig Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn - ClientID string - Rack string + TransactionalRetry *bool + RetryConfiguration RetryConfiguration LogLevel LogLevel + Rack string + ClientID string Reader ReaderConfig - RetryConfiguration RetryConfiguration CommitInterval time.Duration MessageGroupDuration time.Duration - DistributedTracingEnabled bool - DistributedTracingConfiguration DistributedTracingConfiguration Concurrency int + DistributedTracingEnabled bool RetryEnabled bool APIEnabled bool - TransactionalRetry *bool } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { @@ -116,10 +116,10 @@ type RetryConfiguration struct { Topic string DeadLetterTopic string Rack string + LogLevel LogLevel Brokers []string MaxRetry int WorkDuration time.Duration - LogLevel LogLevel } type BatchConfiguration struct { diff --git a/go.mod b/go.mod index d9225b5..1d02a49 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.9.0 // indirect diff --git a/go.sum b/go.sum index 78c6c87..6eb5d5d 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmY go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/message.go b/message.go index 6cbf807..666b6b9 100644 --- a/message.go +++ b/message.go @@ -12,22 +12,21 @@ import ( type Header = protocol.Header type Message struct { + Time time.Time + WriterData interface{} + + // Context To enable distributed tracing support + Context context.Context Topic string + Key []byte + Value []byte + Headers []Header Partition int Offset int64 HighWaterMark int64 // IsFailed Is only used on transactional retry disabled IsFailed bool - - Key []byte - Value []byte - Headers []Header - WriterData interface{} - Time time.Time - - // Context To enable distributed tracing support - Context context.Context } func (m *Message) toKafkaMessage() kafka.Message { diff --git a/producer_config.go b/producer_config.go index fde6746..1feeecd 100644 --- a/producer_config.go +++ b/producer_config.go @@ -30,20 +30,20 @@ type WriterConfig struct { } type TransportConfig struct { + MetadataTopics []string DialTimeout time.Duration IdleTimeout time.Duration MetadataTTL time.Duration - MetadataTopics []string } type ProducerConfig struct { + DistributedTracingConfiguration DistributedTracingConfiguration Transport *TransportConfig SASL *SASLConfig TLS *TLSConfig ClientID string Writer WriterConfig DistributedTracingEnabled bool - DistributedTracingConfiguration DistributedTracingConfiguration } func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { From ad9fb73a221c82e8ab6fd5920c3db8cc29aed885 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:24:57 +0300 Subject: [PATCH 18/24] reduce memory usage --- batch_consumer.go | 1 + batch_consumer_test.go | 4 +-- collector.go | 10 ++++--- consumer.go | 1 + consumer_base.go | 2 +- consumer_base_test.go | 2 +- message.go | 60 ++++++++++++++++++++++++++---------------- otel_producer_test.go | 4 +-- producer_test.go | 2 +- 9 files changed, 53 insertions(+), 33 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 882fdd4..b67c06d 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -131,6 +131,7 @@ func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka toKafkaMessages(allMessages, commitMessages) err := b.r.CommitMessages(*commitMessages) *commitMessages = (*commitMessages)[:0] + putMessages(allMessages) if err != nil { b.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 9b82dc1..15b87e0 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -295,11 +295,11 @@ func (m *mockCronsumer) Stop() { panic("implement me") } -func (m *mockCronsumer) WithLogger(logger lcronsumer.Interface) { +func (m *mockCronsumer) WithLogger(_ lcronsumer.Interface) { panic("implement me") } -func (m *mockCronsumer) Produce(message kcronsumer.Message) error { +func (m *mockCronsumer) Produce(_ kcronsumer.Message) error { if m.wantErr { return errors.New("error") } diff --git a/collector.go b/collector.go index 50f1d0a..3f702d2 100644 --- a/collector.go +++ b/collector.go @@ -19,19 +19,21 @@ func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(s, ch) } +var emptyStringList []string + func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, float64(s.consumerMetric.TotalProcessedMessagesCounter), - []string{}..., + emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalUnprocessedMessagesCounter, prometheus.CounterValue, float64(s.consumerMetric.TotalUnprocessedMessagesCounter), - []string{}..., + emptyStringList..., ) } @@ -42,13 +44,13 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { totalProcessedMessagesCounter: prometheus.NewDesc( prometheus.BuildFQName(Name, "processed_messages_total", "current"), "Total number of processed messages.", - []string{}, + emptyStringList, nil, ), totalUnprocessedMessagesCounter: prometheus.NewDesc( prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), "Total number of unprocessed messages.", - []string{}, + emptyStringList, nil, ), } diff --git a/consumer.go b/consumer.go index 3146b3b..f0fa547 100644 --- a/consumer.go +++ b/consumer.go @@ -104,6 +104,7 @@ func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message toKafkaMessages(messages, commitMessages) err := c.r.CommitMessages(*commitMessages) *commitMessages = (*commitMessages)[:0] + putMessages(messages) if err != nil { c.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer_base.go b/consumer_base.go index 57293f6..f905038 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -2,10 +2,10 @@ package kafka import ( "context" + otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" "sync" "time" - "github.com/Trendyol/otel-kafka-konsumer" "go.opentelemetry.io/otel/propagation" "github.com/prometheus/client_golang/prometheus" diff --git a/consumer_base_test.go b/consumer_base_test.go index 63ef367..fbc1634 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -98,7 +98,7 @@ func (m *mockReader) Close() error { return nil } -func (m *mockReader) CommitMessages(messages []kafka.Message) error { +func (m *mockReader) CommitMessages(_ []kafka.Message) error { if m.wantErr { return errors.New("err") } diff --git a/message.go b/message.go index 666b6b9..1ec5154 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "sync" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -49,21 +50,35 @@ func toKafkaMessages(messages *[]*Message, commitMessages *[]kafka.Message) { } } -func fromKafkaMessage(message *kafka.Message) *Message { - return &Message{ - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - HighWaterMark: message.HighWaterMark, - Key: message.Key, - Value: message.Value, - Headers: message.Headers, - WriterData: message.WriterData, - Time: message.Time, - Context: context.TODO(), +func putMessages(messages *[]*Message) { + for _, message := range *messages { + messagePool.Put(message) } } +var messagePool = sync.Pool{ + New: func() any { + return &Message{} + }, +} + +func fromKafkaMessage(kafkaMessage *kafka.Message) *Message { + message := messagePool.Get().(*Message) + + message.Topic = kafkaMessage.Topic + message.Partition = kafkaMessage.Partition + message.Offset = kafkaMessage.Offset + message.HighWaterMark = kafkaMessage.HighWaterMark + message.Key = kafkaMessage.Key + message.Value = kafkaMessage.Value + message.Headers = kafkaMessage.Headers + message.WriterData = kafkaMessage.WriterData + message.Time = kafkaMessage.Time + message.Context = context.TODO() + + return message +} + func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { headers := make([]kcronsumer.Header, 0, len(m.Headers)) for i := range m.Headers { @@ -92,16 +107,17 @@ func toMessage(message kcronsumer.Message) *Message { }) } - return &Message{ - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - HighWaterMark: message.HighWaterMark, - Key: message.Key, - Value: message.Value, - Headers: headers, - Time: message.Time, - } + msg := messagePool.Get().(*Message) + msg.Topic = message.Topic + msg.Partition = message.Partition + msg.Offset = message.Offset + msg.HighWaterMark = message.HighWaterMark + msg.Key = message.Key + msg.Value = message.Value + msg.Headers = headers + msg.Time = message.Time + + return msg } func (m *Message) Header(key string) *kafka.Header { diff --git a/otel_producer_test.go b/otel_producer_test.go index 65d23e3..ff4a0d3 100644 --- a/otel_producer_test.go +++ b/otel_producer_test.go @@ -63,14 +63,14 @@ type mockOtelKafkaKonsumerWriter struct { var _ OtelKafkaKonsumerWriter = (*mockOtelKafkaKonsumerWriter)(nil) -func (m mockOtelKafkaKonsumerWriter) WriteMessage(ctx context.Context, msg kafka.Message) error { +func (m mockOtelKafkaKonsumerWriter) WriteMessage(_ context.Context, _ kafka.Message) error { if m.wantErr { return errors.New("err occurred") } return nil } -func (m mockOtelKafkaKonsumerWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { +func (m mockOtelKafkaKonsumerWriter) WriteMessages(_ context.Context, _ []kafka.Message) error { if m.wantErr { return errors.New("err occurred") } diff --git a/producer_test.go b/producer_test.go index 26c1eaa..d4fd8ac 100644 --- a/producer_test.go +++ b/producer_test.go @@ -48,7 +48,7 @@ func Test_producer_Close_Successfully(t *testing.T) { type mockWriter struct{} -func (m *mockWriter) WriteMessages(ctx context.Context, message ...kafka.Message) error { +func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error { return nil } From 270f7c8fc6044b0fd813ef6840a46fe1bc6a14ff Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:25:36 +0300 Subject: [PATCH 19/24] fix ci lint --- consumer_base.go | 3 ++- consumer_base_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/consumer_base.go b/consumer_base.go index f905038..830a58e 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -2,10 +2,11 @@ package kafka import ( "context" - otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" "sync" "time" + otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" + "go.opentelemetry.io/otel/propagation" "github.com/prometheus/client_golang/prometheus" diff --git a/consumer_base_test.go b/consumer_base_test.go index fbc1634..968a8d8 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -83,7 +83,7 @@ type mockReader struct { wantErr bool } -func (m *mockReader) FetchMessage(ctx context.Context) (*kafka.Message, error) { +func (m *mockReader) FetchMessage(_ context.Context) (*kafka.Message, error) { if m.wantErr { return nil, errors.New("err") } From 4f487d4cae4c7db40fdb8a124ee0d9c5a209b48a Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:37:05 +0300 Subject: [PATCH 20/24] add sync pool to segmentio messages to reduce heap allocations --- batch_consumer.go | 3 +- consumer.go | 3 +- consumer_base.go | 8 ++-- consumer_base_test.go | 7 ++-- message.go | 12 ++++++ otel_reader_wrapper.go | 92 +++++++++++++++++++++--------------------- reader_wrapper.go | 5 ++- 7 files changed, 73 insertions(+), 57 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index b67c06d..d93459d 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -130,8 +130,9 @@ func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka toKafkaMessages(allMessages, commitMessages) err := b.r.CommitMessages(*commitMessages) - *commitMessages = (*commitMessages)[:0] + putKafkaMessage(commitMessages) putMessages(allMessages) + *commitMessages = (*commitMessages)[:0] if err != nil { b.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer.go b/consumer.go index f0fa547..cfd0a5c 100644 --- a/consumer.go +++ b/consumer.go @@ -103,8 +103,9 @@ func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message toKafkaMessages(messages, commitMessages) err := c.r.CommitMessages(*commitMessages) - *commitMessages = (*commitMessages)[:0] + putKafkaMessage(commitMessages) putMessages(messages) + *commitMessages = (*commitMessages)[:0] if err != nil { c.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer_base.go b/consumer_base.go index 830a58e..e86c73d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -2,11 +2,10 @@ package kafka import ( "context" + otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" "sync" "time" - otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" - "go.opentelemetry.io/otel/propagation" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +27,7 @@ type Consumer interface { } type Reader interface { - FetchMessage(ctx context.Context) (*kafka.Message, error) + FetchMessage(ctx context.Context, msg *kafka.Message) error Close() error CommitMessages(messages []kafka.Message) error } @@ -128,7 +127,8 @@ func (c *base) startConsume() { case <-c.quit: return default: - message, err := c.r.FetchMessage(c.context) + message := kafkaMessagePool.Get().(*kafka.Message) + err := c.r.FetchMessage(c.context, message) if err != nil { if c.context.Err() != nil { continue diff --git a/consumer_base_test.go b/consumer_base_test.go index 968a8d8..8d47c5a 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -83,12 +83,13 @@ type mockReader struct { wantErr bool } -func (m *mockReader) FetchMessage(_ context.Context) (*kafka.Message, error) { +func (m *mockReader) FetchMessage(_ context.Context, msg *kafka.Message) error { if m.wantErr { - return nil, errors.New("err") + return errors.New("err") } //nolint:lll - return &kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}}, nil + *msg = kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}} + return nil } func (m *mockReader) Close() error { diff --git a/message.go b/message.go index 1ec5154..2542497 100644 --- a/message.go +++ b/message.go @@ -56,12 +56,24 @@ func putMessages(messages *[]*Message) { } } +func putKafkaMessage(messages *[]kafka.Message) { + for _, message := range *messages { + kafkaMessagePool.Put(&message) + } +} + var messagePool = sync.Pool{ New: func() any { return &Message{} }, } +var kafkaMessagePool = sync.Pool{ + New: func() any { + return &kafka.Message{} + }, +} + func fromKafkaMessage(kafkaMessage *kafka.Message) *Message { message := messagePool.Get().(*Message) diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 06f6f16..33a929c 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,48 +1,48 @@ package kafka -import ( - "context" - - "github.com/Trendyol/otel-kafka-konsumer" - segmentio "github.com/segmentio/kafka-go" - "go.opentelemetry.io/otel/attribute" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" -) - -type otelReaderWrapper struct { - r *otelkafkakonsumer.Reader -} - -func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { - cfg.setDefaults() - - newReader, err := otelkafkakonsumer.NewReader( - reader, - otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), - otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), - otelkafkakonsumer.WithAttributes( - []attribute.KeyValue{ - semconv.MessagingDestinationKindTopic, - semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), - }, - )) - if err != nil { - return nil, err - } - - return &otelReaderWrapper{ - r: newReader, - }, nil -} - -func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { - return o.r.FetchMessage(ctx) -} - -func (o *otelReaderWrapper) Close() error { - return o.r.Close() -} - -func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { - return o.r.CommitMessages(context.Background(), messages...) -} +//import ( +// "context" +// +// "github.com/Trendyol/otel-kafka-konsumer" +// segmentio "github.com/segmentio/kafka-go" +// "go.opentelemetry.io/otel/attribute" +// semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +//) +// +//type otelReaderWrapper struct { +// r *otelkafkakonsumer.Reader +//} +// +//func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { +// cfg.setDefaults() +// +// newReader, err := otelkafkakonsumer.NewReader( +// reader, +// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), +// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), +// otelkafkakonsumer.WithAttributes( +// []attribute.KeyValue{ +// semconv.MessagingDestinationKindTopic, +// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), +// }, +// )) +// if err != nil { +// return nil, err +// } +// +// return &otelReaderWrapper{ +// r: newReader, +// }, nil +//} +// +//func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { +// return o.r.FetchMessage(ctx) +//} +// +//func (o *otelReaderWrapper) Close() error { +// return o.r.Close() +//} +// +//func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { +// return o.r.CommitMessages(context.Background(), messages...) +//} diff --git a/reader_wrapper.go b/reader_wrapper.go index 8311f10..57644f9 100644 --- a/reader_wrapper.go +++ b/reader_wrapper.go @@ -15,9 +15,10 @@ func NewReaderWrapper(reader *segmentio.Reader) Reader { } // ReadMessage returns pointer of kafka message because we will support distributed tracing in the near future -func (s *readerWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { +func (s *readerWrapper) FetchMessage(ctx context.Context, msg *segmentio.Message) error { message, err := s.r.FetchMessage(ctx) - return &message, err + *msg = message + return err } func (s *readerWrapper) Close() error { From 0260eb7a95b44090277da1218739182fa8795fa2 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:38:44 +0300 Subject: [PATCH 21/24] fix ci lint --- consumer_base.go | 3 ++- message.go | 1 + otel_reader_wrapper.go | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/consumer_base.go b/consumer_base.go index e86c73d..fee3007 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -2,10 +2,11 @@ package kafka import ( "context" - otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" "sync" "time" + otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" + "go.opentelemetry.io/otel/propagation" "github.com/prometheus/client_golang/prometheus" diff --git a/message.go b/message.go index 2542497..58e3aca 100644 --- a/message.go +++ b/message.go @@ -58,6 +58,7 @@ func putMessages(messages *[]*Message) { func putKafkaMessage(messages *[]kafka.Message) { for _, message := range *messages { + //nolint:gosec kafkaMessagePool.Put(&message) } } diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 33a929c..ec89036 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,6 +1,6 @@ package kafka -//import ( +// import ( // "context" // // "github.com/Trendyol/otel-kafka-konsumer" @@ -9,7 +9,7 @@ package kafka // semconv "go.opentelemetry.io/otel/semconv/v1.19.0" //) // -//type otelReaderWrapper struct { +// type otelReaderWrapper struct { // r *otelkafkakonsumer.Reader //} // From 09294c0ca8fdfd335604eac782d872ccfd112ef1 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:42:51 +0300 Subject: [PATCH 22/24] fix lint --- otel_reader_wrapper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index ec89036..c775bb7 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -13,7 +13,7 @@ package kafka // r *otelkafkakonsumer.Reader //} // -//func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { +// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { // cfg.setDefaults() // // newReader, err := otelkafkakonsumer.NewReader( @@ -35,14 +35,14 @@ package kafka // }, nil //} // -//func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { +// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { // return o.r.FetchMessage(ctx) //} // -//func (o *otelReaderWrapper) Close() error { +// func (o *otelReaderWrapper) Close() error { // return o.r.Close() //} // -//func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { +// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { // return o.r.CommitMessages(context.Background(), messages...) //} From 823f14c2913bb834198cef0a9a4d0478cd9161d1 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 9 Dec 2023 10:44:57 +0300 Subject: [PATCH 23/24] fix integration --- test/integration/go.sum | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/go.sum b/test/integration/go.sum index 5d4ea20..db8c29b 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -90,7 +90,7 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= From 1333410b55544d571b7918edc69ce8f97bfcc6aa Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sun, 10 Dec 2023 15:15:12 +0300 Subject: [PATCH 24/24] refactor code --- batch_consumer.go | 53 +++++++++-------- batch_consumer_test.go | 29 ++++----- consumer.go | 49 ++++++++------- consumer_base.go | 34 +++++------ consumer_base_test.go | 10 ++-- examples/with-sasl-plaintext/go.mod | 2 +- examples/with-sasl-plaintext/go.sum | 1 + go.mod | 2 +- go.sum | 5 +- otel_reader_wrapper.go | 92 ++++++++++++++--------------- reader_wrapper.go | 2 +- test/integration/go.mod | 2 +- test/integration/go.sum | 4 +- 13 files changed, 150 insertions(+), 135 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index d93459d..528384b 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -50,6 +50,8 @@ func (b *batchConsumer) Consume() { b.wg.Add(1) go b.startConsume() + b.setupConcurrentWorkers() + b.wg.Add(1) go b.startBatch() } @@ -57,22 +59,12 @@ func (b *batchConsumer) Consume() { func (b *batchConsumer) startBatch() { defer b.wg.Done() - for i := 0; i < b.concurrency; i++ { - b.wg.Add(1) - go func() { - defer b.wg.Done() - for messages := range b.batchMessageCommitCh { - b.process(messages) - b.waitMessageProcess <- struct{}{} - } - }() - } - ticker := time.NewTicker(b.messageGroupDuration) defer ticker.Stop() - messages := make([]*Message, 0, b.messageGroupLimit*b.concurrency) - commitMessages := make([]kafka.Message, 0, b.messageGroupLimit*b.concurrency) + maximumMessageLimit := b.messageGroupLimit * b.concurrency + messages := make([]*Message, 0, maximumMessageLimit) + commitMessages := make([]kafka.Message, 0, maximumMessageLimit) for { select { @@ -82,22 +74,33 @@ func (b *batchConsumer) startBatch() { } b.consume(&messages, &commitMessages) - messages = messages[:0] - case msg, ok := <-b.messageCh: + case msg, ok := <-b.incomingMessageStream: if !ok { return } messages = append(messages, msg) - if len(messages) == (b.messageGroupLimit * b.concurrency) { + if len(messages) == maximumMessageLimit { b.consume(&messages, &commitMessages) - messages = messages[:0] } } } } +func (b *batchConsumer) setupConcurrentWorkers() { + for i := 0; i < b.concurrency; i++ { + b.wg.Add(1) + go func() { + defer b.wg.Done() + for messages := range b.batchConsumingStream { + b.process(messages) + b.messageProcessedStream <- struct{}{} + } + }() + } +} + func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message { var chunks [][]*Message @@ -120,22 +123,26 @@ func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message { func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) { chunks := chunkMessages(allMessages, b.messageGroupLimit) + // Send the messages to process for _, chunk := range chunks { - b.batchMessageCommitCh <- chunk + b.batchConsumingStream <- chunk } + // Wait the messages to be processed for i := 0; i < len(chunks); i++ { - <-b.waitMessageProcess + <-b.messageProcessedStream } toKafkaMessages(allMessages, commitMessages) - err := b.r.CommitMessages(*commitMessages) + if err := b.r.CommitMessages(*commitMessages); err != nil { + b.logger.Errorf("Commit Error %s,", err.Error()) + } + + // Clearing resources putKafkaMessage(commitMessages) putMessages(allMessages) *commitMessages = (*commitMessages)[:0] - if err != nil { - b.logger.Errorf("Commit Error %s,", err.Error()) - } + *allMessages = (*allMessages)[:0] } func (b *batchConsumer) process(chunkMessages []*Message) { diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 15b87e0..1bfc670 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -20,15 +20,15 @@ func Test_batchConsumer_startBatch(t *testing.T) { mc := mockReader{} bc := batchConsumer{ base: &base{ - messageCh: make(chan *Message, 1), - batchMessageCommitCh: make(chan []*Message, 1), - singleMessageCommitCh: make(chan *Message, 1), - waitMessageProcess: make(chan struct{}, 1), - metric: &ConsumerMetric{}, - wg: sync.WaitGroup{}, - messageGroupDuration: 500 * time.Millisecond, - r: &mc, - concurrency: 1, + incomingMessageStream: make(chan *Message, 1), + batchConsumingStream: make(chan []*Message, 1), + singleConsumingStream: make(chan *Message, 1), + messageProcessedStream: make(chan struct{}, 1), + metric: &ConsumerMetric{}, + wg: sync.WaitGroup{}, + messageGroupDuration: 500 * time.Millisecond, + r: &mc, + concurrency: 1, }, messageGroupLimit: 3, consumeFn: func(messages []*Message) error { @@ -38,24 +38,25 @@ func Test_batchConsumer_startBatch(t *testing.T) { } go func() { // Simulate messageGroupLimit - bc.base.messageCh <- &Message{} - bc.base.messageCh <- &Message{} - bc.base.messageCh <- &Message{} + bc.base.incomingMessageStream <- &Message{} + bc.base.incomingMessageStream <- &Message{} + bc.base.incomingMessageStream <- &Message{} time.Sleep(1 * time.Second) // Simulate messageGroupDuration - bc.base.messageCh <- &Message{} + bc.base.incomingMessageStream <- &Message{} time.Sleep(1 * time.Second) // Return from startBatch - close(bc.base.messageCh) + close(bc.base.incomingMessageStream) }() bc.base.wg.Add(1) // When + bc.setupConcurrentWorkers() bc.startBatch() // Then diff --git a/consumer.go b/consumer.go index cfd0a5c..08bd9e3 100644 --- a/consumer.go +++ b/consumer.go @@ -43,6 +43,8 @@ func (c *consumer) Consume() { c.wg.Add(1) go c.startConsume() + c.setupConcurrentWorkers() + c.wg.Add(1) go c.startBatch() } @@ -50,17 +52,6 @@ func (c *consumer) Consume() { func (c *consumer) startBatch() { defer c.wg.Done() - for i := 0; i < c.concurrency; i++ { - c.wg.Add(1) - go func() { - defer c.wg.Done() - for message := range c.singleMessageCommitCh { - c.process(message) - c.waitMessageProcess <- struct{}{} - } - }() - } - ticker := time.NewTicker(c.messageGroupDuration) defer ticker.Stop() @@ -75,8 +66,7 @@ func (c *consumer) startBatch() { } c.consume(&messages, &commitMessages) - messages = messages[:0] - case msg, ok := <-c.messageCh: + case msg, ok := <-c.incomingMessageStream: if !ok { return } @@ -85,30 +75,47 @@ func (c *consumer) startBatch() { if len(messages) == c.concurrency { c.consume(&messages, &commitMessages) - messages = messages[:0] } } } } +func (c *consumer) setupConcurrentWorkers() { + for i := 0; i < c.concurrency; i++ { + c.wg.Add(1) + go func() { + defer c.wg.Done() + for message := range c.singleConsumingStream { + c.process(message) + c.messageProcessedStream <- struct{}{} + } + }() + } +} + func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message) { messageList := *messages + + // Send the messages to process for _, message := range messageList { - c.singleMessageCommitCh <- message + c.singleConsumingStream <- message } - for i := 0; i < len(messageList); i++ { - <-c.waitMessageProcess + // Wait the messages to be processed + for range messageList { + <-c.messageProcessedStream } toKafkaMessages(messages, commitMessages) - err := c.r.CommitMessages(*commitMessages) + if err := c.r.CommitMessages(*commitMessages); err != nil { + c.logger.Errorf("Commit Error %s,", err.Error()) + } + + // Clearing resources putKafkaMessage(commitMessages) putMessages(messages) *commitMessages = (*commitMessages)[:0] - if err != nil { - c.logger.Errorf("Commit Error %s,", err.Error()) - } + *messages = (*messages)[:0] } func (c *consumer) process(message *Message) { diff --git a/consumer_base.go b/consumer_base.go index fee3007..74c316d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -43,10 +43,10 @@ type base struct { cancelFn context.CancelFunc metric *ConsumerMetric quit chan struct{} - waitMessageProcess chan struct{} - singleMessageCommitCh chan *Message - messageCh chan *Message - batchMessageCommitCh chan []*Message + messageProcessedStream chan struct{} + incomingMessageStream chan *Message + singleConsumingStream chan *Message + batchConsumingStream chan []*Message retryTopic string subprocesses subprocesses wg sync.WaitGroup @@ -77,7 +77,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { c := base{ metric: &ConsumerMetric{}, - messageCh: make(chan *Message, messageChSize), + incomingMessageStream: make(chan *Message, messageChSize), quit: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, @@ -87,9 +87,9 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { subprocesses: newSubProcesses(), r: reader, messageGroupDuration: cfg.MessageGroupDuration, - waitMessageProcess: make(chan struct{}, cfg.Concurrency), - singleMessageCommitCh: make(chan *Message, cfg.Concurrency), - batchMessageCommitCh: make(chan []*Message, cfg.Concurrency), + messageProcessedStream: make(chan struct{}, cfg.Concurrency), + singleConsumingStream: make(chan *Message, cfg.Concurrency), + batchConsumingStream: make(chan []*Message, cfg.Concurrency), } if cfg.DistributedTracingEnabled { @@ -128,8 +128,8 @@ func (c *base) startConsume() { case <-c.quit: return default: - message := kafkaMessagePool.Get().(*kafka.Message) - err := c.r.FetchMessage(c.context, message) + m := kafkaMessagePool.Get().(*kafka.Message) + err := c.r.FetchMessage(c.context, m) if err != nil { if c.context.Err() != nil { continue @@ -138,12 +138,12 @@ func (c *base) startConsume() { continue } - consumedMessage := fromKafkaMessage(message) + incomingMessage := fromKafkaMessage(m) if c.distributedTracingEnabled { - consumedMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(message)) + incomingMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m)) } - c.messageCh <- consumedMessage + c.incomingMessageStream <- incomingMessage } } } @@ -159,10 +159,10 @@ func (c *base) Stop() error { c.subprocesses.Stop() c.cancelFn() c.quit <- struct{}{} - close(c.messageCh) - close(c.singleMessageCommitCh) - close(c.batchMessageCommitCh) - close(c.waitMessageProcess) + close(c.incomingMessageStream) + close(c.singleConsumingStream) + close(c.batchConsumingStream) + close(c.messageProcessedStream) c.wg.Wait() err = c.r.Close() }) diff --git a/consumer_base_test.go b/consumer_base_test.go index 8d47c5a..0363609 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -15,9 +15,9 @@ func Test_base_startConsume(t *testing.T) { mc := mockReader{wantErr: true} b := base{ wg: sync.WaitGroup{}, r: &mc, - messageCh: make(chan *Message), - quit: make(chan struct{}), - logger: NewZapLogger(LogLevelDebug), + incomingMessageStream: make(chan *Message), + quit: make(chan struct{}), + logger: NewZapLogger(LogLevelDebug), } b.context, b.cancelFn = context.WithCancel(context.Background()) @@ -34,13 +34,13 @@ func Test_base_startConsume(t *testing.T) { t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) { // Given mc := mockReader{} - b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan *Message)} + b := base{wg: sync.WaitGroup{}, r: &mc, incomingMessageStream: make(chan *Message)} b.wg.Add(1) // When go b.startConsume() - actual := <-b.messageCh + actual := <-b.incomingMessageStream // Then //nolint:lll diff --git a/examples/with-sasl-plaintext/go.mod b/examples/with-sasl-plaintext/go.mod index e8b69fe..ade215d 100644 --- a/examples/with-sasl-plaintext/go.mod +++ b/examples/with-sasl-plaintext/go.mod @@ -8,7 +8,7 @@ require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/examples/with-sasl-plaintext/go.sum b/examples/with-sasl-plaintext/go.sum index 1f83a88..755c5fe 100644 --- a/examples/with-sasl-plaintext/go.sum +++ b/examples/with-sasl-plaintext/go.sum @@ -3,6 +3,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNu github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/go.mod b/go.mod index 1d02a49..805da64 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 - github.com/Trendyol/otel-kafka-konsumer v0.0.6 + github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 github.com/prometheus/client_golang v1.16.0 diff --git a/go.sum b/go.sum index 6eb5d5d..d4efbb5 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM= -github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= @@ -95,7 +95,6 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index c775bb7..b92c386 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,48 +1,48 @@ package kafka -// import ( -// "context" -// -// "github.com/Trendyol/otel-kafka-konsumer" -// segmentio "github.com/segmentio/kafka-go" -// "go.opentelemetry.io/otel/attribute" -// semconv "go.opentelemetry.io/otel/semconv/v1.19.0" -//) -// -// type otelReaderWrapper struct { -// r *otelkafkakonsumer.Reader -//} -// -// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { -// cfg.setDefaults() -// -// newReader, err := otelkafkakonsumer.NewReader( -// reader, -// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), -// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), -// otelkafkakonsumer.WithAttributes( -// []attribute.KeyValue{ -// semconv.MessagingDestinationKindTopic, -// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), -// }, -// )) -// if err != nil { -// return nil, err -// } -// -// return &otelReaderWrapper{ -// r: newReader, -// }, nil -//} -// -// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { -// return o.r.FetchMessage(ctx) -//} -// -// func (o *otelReaderWrapper) Close() error { -// return o.r.Close() -//} -// -// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { -// return o.r.CommitMessages(context.Background(), messages...) -//} +import ( + "context" + + "github.com/Trendyol/otel-kafka-konsumer" + segmentio "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +) + +type otelReaderWrapper struct { + r *otelkafkakonsumer.Reader +} + +func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { + cfg.setDefaults() + + newReader, err := otelkafkakonsumer.NewReader( + reader, + otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), + otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), + otelkafkakonsumer.WithAttributes( + []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), + }, + )) + if err != nil { + return nil, err + } + + return &otelReaderWrapper{ + r: newReader, + }, nil +} + +func (o *otelReaderWrapper) FetchMessage(ctx context.Context, msg *segmentio.Message) error { + return o.r.FetchMessage(ctx, msg) +} + +func (o *otelReaderWrapper) Close() error { + return o.r.Close() +} + +func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { + return o.r.CommitMessages(context.Background(), messages...) +} diff --git a/reader_wrapper.go b/reader_wrapper.go index 57644f9..b6f36f2 100644 --- a/reader_wrapper.go +++ b/reader_wrapper.go @@ -14,7 +14,7 @@ func NewReaderWrapper(reader *segmentio.Reader) Reader { return &readerWrapper{r: reader} } -// ReadMessage returns pointer of kafka message because we will support distributed tracing in the near future +// ReadMessage gets pointer of kafka message because we will support distributed tracing in the near future func (s *readerWrapper) FetchMessage(ctx context.Context, msg *segmentio.Message) error { message, err := s.r.FetchMessage(ctx) *msg = message diff --git a/test/integration/go.mod b/test/integration/go.mod index 2150330..89f6a8c 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -11,7 +11,7 @@ require ( require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index db8c29b..7b9c897 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,7 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM= -github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=