diff --git a/README.md b/README.md index bfb3fa6..6db9eec 100644 --- a/README.md +++ b/README.md @@ -100,9 +100,9 @@ After running `docker-compose up` command, you can run any application you want. WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, BatchConfiguration: kafka.BatchConfiguration{ MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, BatchConsumeFn: batchConsumeFn, }, } @@ -140,9 +140,9 @@ After running `docker-compose up` command, you can run any application you want. WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, BatchConfiguration: kafka.BatchConfiguration{ MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, BatchConsumeFn: batchConsumeFn, }, } @@ -198,6 +198,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | | `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | | `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | | +| `messageGroupDuration` | Maximum time to wait for a batch | | | `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout | | `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled | | `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s | @@ -219,7 +220,6 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | | `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | | `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | -| `batchConfiguration.messageGroupDuration` | Maximum time to wait for a batch | | | `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | | `sasl.authType` | `SCRAM` or `PLAIN` | | diff --git a/batch_consumer.go b/batch_consumer.go index 5ea7255..528384b 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "github.com/segmentio/kafka-go" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) @@ -11,21 +13,19 @@ type batchConsumer struct { consumeFn func([]*Message) error - messageGroupLimit int - messageGroupDuration time.Duration + messageGroupLimit int } func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { - consumerBase, err := newBase(cfg) + consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency) if err != nil { return nil, err } c := batchConsumer{ - base: consumerBase, - consumeFn: cfg.BatchConfiguration.BatchConsumeFn, - messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit, - messageGroupDuration: cfg.BatchConfiguration.MessageGroupDuration, + base: consumerBase, + consumeFn: cfg.BatchConfiguration.BatchConsumeFn, + messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit, } if cfg.RetryEnabled { @@ -50,10 +50,10 @@ func (b *batchConsumer) Consume() { b.wg.Add(1) go b.startConsume() - for i := 0; i < b.concurrency; i++ { - b.wg.Add(1) - go b.startBatch() - } + b.setupConcurrentWorkers() + + b.wg.Add(1) + go b.startBatch() } func (b *batchConsumer) startBatch() { @@ -62,7 +62,9 @@ func (b *batchConsumer) startBatch() { ticker := time.NewTicker(b.messageGroupDuration) defer ticker.Stop() - messages := make([]*Message, 0, b.messageGroupLimit) + maximumMessageLimit := b.messageGroupLimit * b.concurrency + messages := make([]*Message, 0, maximumMessageLimit) + commitMessages := make([]kafka.Message, 0, maximumMessageLimit) for { select { @@ -71,44 +73,99 @@ func (b *batchConsumer) startBatch() { continue } - b.process(messages) - messages = messages[:0] - case msg, ok := <-b.messageCh: + b.consume(&messages, &commitMessages) + case msg, ok := <-b.incomingMessageStream: if !ok { return } messages = append(messages, msg) - if len(messages) == b.messageGroupLimit { + if len(messages) == maximumMessageLimit { + b.consume(&messages, &commitMessages) + } + } + } +} + +func (b *batchConsumer) setupConcurrentWorkers() { + for i := 0; i < b.concurrency; i++ { + b.wg.Add(1) + go func() { + defer b.wg.Done() + for messages := range b.batchConsumingStream { b.process(messages) - messages = messages[:0] + b.messageProcessedStream <- struct{}{} } + }() + } +} + +func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message { + var chunks [][]*Message + + allMessageList := *allMessages + for i := 0; i < len(allMessageList); i += chunkSize { + end := i + chunkSize + + // necessary check to avoid slicing beyond + // slice capacity + if end > len(allMessageList) { + end = len(allMessageList) } + + chunks = append(chunks, allMessageList[i:end]) + } + + return chunks +} + +func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) { + chunks := chunkMessages(allMessages, b.messageGroupLimit) + + // Send the messages to process + for _, chunk := range chunks { + b.batchConsumingStream <- chunk + } + + // Wait the messages to be processed + for i := 0; i < len(chunks); i++ { + <-b.messageProcessedStream } + + toKafkaMessages(allMessages, commitMessages) + if err := b.r.CommitMessages(*commitMessages); err != nil { + b.logger.Errorf("Commit Error %s,", err.Error()) + } + + // Clearing resources + putKafkaMessage(commitMessages) + putMessages(allMessages) + *commitMessages = (*commitMessages)[:0] + *allMessages = (*allMessages)[:0] } -func (b *batchConsumer) process(messages []*Message) { - consumeErr := b.consumeFn(messages) +func (b *batchConsumer) process(chunkMessages []*Message) { + consumeErr := b.consumeFn(chunkMessages) if consumeErr != nil { b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) // Try to process same messages again for resolving transient network errors etc. - if consumeErr = b.consumeFn(messages); consumeErr != nil { + if consumeErr = b.consumeFn(chunkMessages); consumeErr != nil { b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) - b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) + b.metric.TotalUnprocessedMessagesCounter += int64(len(chunkMessages)) } if consumeErr != nil && b.retryEnabled { - cronsumerMessages := make([]kcronsumer.Message, 0, len(messages)) + cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages)) if b.transactionalRetry { - for i := range messages { - cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + for i := range chunkMessages { + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) } } else { - for i := range messages { - if messages[i].IsFailed { - cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + for i := range chunkMessages { + if chunkMessages[i].IsFailed { + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) } } } @@ -120,6 +177,6 @@ func (b *batchConsumer) process(messages []*Message) { } if consumeErr == nil { - b.metric.TotalProcessedMessagesCounter += int64(len(messages)) + b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages)) } } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 555f750..1bfc670 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -2,6 +2,8 @@ package kafka import ( "errors" + "reflect" + "strconv" "sync" "testing" "time" @@ -15,14 +17,20 @@ func Test_batchConsumer_startBatch(t *testing.T) { // Given var numberOfBatch int + mc := mockReader{} bc := batchConsumer{ base: &base{ - messageCh: make(chan *Message), - metric: &ConsumerMetric{}, - wg: sync.WaitGroup{}, + incomingMessageStream: make(chan *Message, 1), + batchConsumingStream: make(chan []*Message, 1), + singleConsumingStream: make(chan *Message, 1), + messageProcessedStream: make(chan struct{}, 1), + metric: &ConsumerMetric{}, + wg: sync.WaitGroup{}, + messageGroupDuration: 500 * time.Millisecond, + r: &mc, + concurrency: 1, }, - messageGroupLimit: 3, - messageGroupDuration: 500 * time.Millisecond, + messageGroupLimit: 3, consumeFn: func(messages []*Message) error { numberOfBatch++ return nil @@ -30,24 +38,25 @@ func Test_batchConsumer_startBatch(t *testing.T) { } go func() { // Simulate messageGroupLimit - bc.base.messageCh <- &Message{} - bc.base.messageCh <- &Message{} - bc.base.messageCh <- &Message{} + bc.base.incomingMessageStream <- &Message{} + bc.base.incomingMessageStream <- &Message{} + bc.base.incomingMessageStream <- &Message{} time.Sleep(1 * time.Second) // Simulate messageGroupDuration - bc.base.messageCh <- &Message{} + bc.base.incomingMessageStream <- &Message{} time.Sleep(1 * time.Second) // Return from startBatch - close(bc.base.messageCh) + close(bc.base.incomingMessageStream) }() bc.base.wg.Add(1) // When + bc.setupConcurrentWorkers() bc.startBatch() // Then @@ -205,6 +214,72 @@ func Test_batchConsumer_process(t *testing.T) { }) } +func Test_batchConsumer_chunk(t *testing.T) { + tests := []struct { + allMessages []*Message + expected [][]*Message + chunkSize int + }{ + { + allMessages: createMessages(0, 9), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + createMessages(3, 6), + createMessages(6, 9), + }, + }, + { + allMessages: []*Message{}, + chunkSize: 3, + expected: [][]*Message{}, + }, + { + allMessages: createMessages(0, 1), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 1), + }, + }, + { + allMessages: createMessages(0, 8), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + createMessages(3, 6), + createMessages(6, 8), + }, + }, + { + allMessages: createMessages(0, 3), + chunkSize: 3, + expected: [][]*Message{ + createMessages(0, 3), + }, + }, + } + + for i, tc := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + chunkedMessages := chunkMessages(&tc.allMessages, tc.chunkSize) + + if !reflect.DeepEqual(chunkedMessages, tc.expected) && !(len(chunkedMessages) == 0 && len(tc.expected) == 0) { + t.Errorf("For chunkSize %d, expected %v, but got %v", tc.chunkSize, tc.expected, chunkedMessages) + } + }) + } +} + +func createMessages(partitionStart int, partitionEnd int) []*Message { + messages := make([]*Message, 0) + for i := partitionStart; i < partitionEnd; i++ { + messages = append(messages, &Message{ + Partition: i, + }) + } + return messages +} + type mockCronsumer struct { wantErr bool } @@ -221,11 +296,11 @@ func (m *mockCronsumer) Stop() { panic("implement me") } -func (m *mockCronsumer) WithLogger(logger lcronsumer.Interface) { +func (m *mockCronsumer) WithLogger(_ lcronsumer.Interface) { panic("implement me") } -func (m *mockCronsumer) Produce(message kcronsumer.Message) error { +func (m *mockCronsumer) Produce(_ kcronsumer.Message) error { if m.wantErr { return errors.New("error") } diff --git a/collector.go b/collector.go index 50f1d0a..3f702d2 100644 --- a/collector.go +++ b/collector.go @@ -19,19 +19,21 @@ func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(s, ch) } +var emptyStringList []string + func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, float64(s.consumerMetric.TotalProcessedMessagesCounter), - []string{}..., + emptyStringList..., ) ch <- prometheus.MustNewConstMetric( s.totalUnprocessedMessagesCounter, prometheus.CounterValue, float64(s.consumerMetric.TotalUnprocessedMessagesCounter), - []string{}..., + emptyStringList..., ) } @@ -42,13 +44,13 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { totalProcessedMessagesCounter: prometheus.NewDesc( prometheus.BuildFQName(Name, "processed_messages_total", "current"), "Total number of processed messages.", - []string{}, + emptyStringList, nil, ), totalUnprocessedMessagesCounter: prometheus.NewDesc( prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), "Total number of unprocessed messages.", - []string{}, + emptyStringList, nil, ), } diff --git a/consumer.go b/consumer.go index 6c5c6b8..08bd9e3 100644 --- a/consumer.go +++ b/consumer.go @@ -1,6 +1,10 @@ package kafka import ( + "time" + + "github.com/segmentio/kafka-go" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" ) @@ -11,7 +15,7 @@ type consumer struct { } func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { - consumerBase, err := newBase(cfg) + consumerBase, err := newBase(cfg, cfg.Concurrency) if err != nil { return nil, err } @@ -39,18 +43,81 @@ func (c *consumer) Consume() { c.wg.Add(1) go c.startConsume() + c.setupConcurrentWorkers() + + c.wg.Add(1) + go c.startBatch() +} + +func (c *consumer) startBatch() { + defer c.wg.Done() + + ticker := time.NewTicker(c.messageGroupDuration) + defer ticker.Stop() + + messages := make([]*Message, 0, c.concurrency) + commitMessages := make([]kafka.Message, 0, c.concurrency) + + for { + select { + case <-ticker.C: + if len(messages) == 0 { + continue + } + + c.consume(&messages, &commitMessages) + case msg, ok := <-c.incomingMessageStream: + if !ok { + return + } + + messages = append(messages, msg) + + if len(messages) == c.concurrency { + c.consume(&messages, &commitMessages) + } + } + } +} + +func (c *consumer) setupConcurrentWorkers() { for i := 0; i < c.concurrency; i++ { c.wg.Add(1) go func() { defer c.wg.Done() - - for message := range c.messageCh { + for message := range c.singleConsumingStream { c.process(message) + c.messageProcessedStream <- struct{}{} } }() } } +func (c *consumer) consume(messages *[]*Message, commitMessages *[]kafka.Message) { + messageList := *messages + + // Send the messages to process + for _, message := range messageList { + c.singleConsumingStream <- message + } + + // Wait the messages to be processed + for range messageList { + <-c.messageProcessedStream + } + + toKafkaMessages(messages, commitMessages) + if err := c.r.CommitMessages(*commitMessages); err != nil { + c.logger.Errorf("Commit Error %s,", err.Error()) + } + + // Clearing resources + putKafkaMessage(commitMessages) + putMessages(messages) + *commitMessages = (*commitMessages)[:0] + *messages = (*messages)[:0] +} + func (c *consumer) process(message *Message) { consumeErr := c.consumeFn(message) diff --git a/consumer_base.go b/consumer_base.go index 51b6a8c..74c316d 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -3,8 +3,10 @@ package kafka import ( "context" "sync" + "time" + + otelkafkakonsumer "github.com/Trendyol/otel-kafka-konsumer" - "github.com/Trendyol/otel-kafka-konsumer" "go.opentelemetry.io/otel/propagation" "github.com/prometheus/client_golang/prometheus" @@ -26,29 +28,34 @@ type Consumer interface { } type Reader interface { - ReadMessage(ctx context.Context) (*kafka.Message, error) + FetchMessage(ctx context.Context, msg *kafka.Message) error Close() error + CommitMessages(messages []kafka.Message) error } type base struct { cronsumer kcronsumer.Cronsumer api API logger LoggerInterface - metric *ConsumerMetric + propagator propagation.TextMapPropagator context context.Context - messageCh chan *Message - quit chan struct{} - cancelFn context.CancelFunc r Reader + cancelFn context.CancelFunc + metric *ConsumerMetric + quit chan struct{} + messageProcessedStream chan struct{} + incomingMessageStream chan *Message + singleConsumingStream chan *Message + batchConsumingStream chan []*Message retryTopic string subprocesses subprocesses wg sync.WaitGroup concurrency int + messageGroupDuration time.Duration once sync.Once retryEnabled bool transactionalRetry bool distributedTracingEnabled bool - propagator propagation.TextMapPropagator } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -59,7 +66,7 @@ func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { return newSingleConsumer(cfg) } -func newBase(cfg *ConsumerConfig) (*base, error) { +func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { log := NewZapLogger(cfg.LogLevel) reader, err := cfg.newKafkaReader() @@ -70,7 +77,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) { c := base{ metric: &ConsumerMetric{}, - messageCh: make(chan *Message, cfg.Concurrency), + incomingMessageStream: make(chan *Message, messageChSize), quit: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, @@ -79,6 +86,10 @@ func newBase(cfg *ConsumerConfig) (*base, error) { logger: log, subprocesses: newSubProcesses(), r: reader, + messageGroupDuration: cfg.MessageGroupDuration, + messageProcessedStream: make(chan struct{}, cfg.Concurrency), + singleConsumingStream: make(chan *Message, cfg.Concurrency), + batchConsumingStream: make(chan []*Message, cfg.Concurrency), } if cfg.DistributedTracingEnabled { @@ -117,7 +128,8 @@ func (c *base) startConsume() { case <-c.quit: return default: - message, err := c.r.ReadMessage(c.context) + m := kafkaMessagePool.Get().(*kafka.Message) + err := c.r.FetchMessage(c.context, m) if err != nil { if c.context.Err() != nil { continue @@ -126,12 +138,12 @@ func (c *base) startConsume() { continue } - consumedMessage := fromKafkaMessage(message) + incomingMessage := fromKafkaMessage(m) if c.distributedTracingEnabled { - consumedMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(message)) + incomingMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m)) } - c.messageCh <- consumedMessage + c.incomingMessageStream <- incomingMessage } } } @@ -147,7 +159,10 @@ func (c *base) Stop() error { c.subprocesses.Stop() c.cancelFn() c.quit <- struct{}{} - close(c.messageCh) + close(c.incomingMessageStream) + close(c.singleConsumingStream) + close(c.batchConsumingStream) + close(c.messageProcessedStream) c.wg.Wait() err = c.r.Close() }) diff --git a/consumer_base_test.go b/consumer_base_test.go index c63ae2a..0363609 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -15,9 +15,9 @@ func Test_base_startConsume(t *testing.T) { mc := mockReader{wantErr: true} b := base{ wg: sync.WaitGroup{}, r: &mc, - messageCh: make(chan *Message), - quit: make(chan struct{}), - logger: NewZapLogger(LogLevelDebug), + incomingMessageStream: make(chan *Message), + quit: make(chan struct{}), + logger: NewZapLogger(LogLevelDebug), } b.context, b.cancelFn = context.WithCancel(context.Background()) @@ -34,13 +34,13 @@ func Test_base_startConsume(t *testing.T) { t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) { // Given mc := mockReader{} - b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan *Message)} + b := base{wg: sync.WaitGroup{}, r: &mc, incomingMessageStream: make(chan *Message)} b.wg.Add(1) // When go b.startConsume() - actual := <-b.messageCh + actual := <-b.incomingMessageStream // Then //nolint:lll @@ -83,14 +83,25 @@ type mockReader struct { wantErr bool } -func (m *mockReader) ReadMessage(ctx context.Context) (*kafka.Message, error) { +func (m *mockReader) FetchMessage(_ context.Context, msg *kafka.Message) error { if m.wantErr { - return nil, errors.New("err") + return errors.New("err") } //nolint:lll - return &kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}}, nil + *msg = kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}} + return nil } func (m *mockReader) Close() error { - panic("implement me") + if m.wantErr { + return errors.New("err") + } + return nil +} + +func (m *mockReader) CommitMessages(_ []kafka.Message) error { + if m.wantErr { + return errors.New("err") + } + return nil } diff --git a/consumer_config.go b/consumer_config.go index d73976c..ac0eec2 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -25,26 +25,27 @@ type DialConfig struct { } type ConsumerConfig struct { - APIConfiguration APIConfiguration + DistributedTracingConfiguration DistributedTracingConfiguration Logger LoggerInterface + APIConfiguration APIConfiguration MetricConfiguration MetricConfiguration SASL *SASLConfig TLS *TLSConfig Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn - ClientID string - Rack string + TransactionalRetry *bool + RetryConfiguration RetryConfiguration LogLevel LogLevel + Rack string + ClientID string Reader ReaderConfig - RetryConfiguration RetryConfiguration CommitInterval time.Duration - DistributedTracingEnabled bool - DistributedTracingConfiguration DistributedTracingConfiguration + MessageGroupDuration time.Duration Concurrency int + DistributedTracingEnabled bool RetryEnabled bool APIEnabled bool - TransactionalRetry *bool } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { @@ -115,16 +116,15 @@ type RetryConfiguration struct { Topic string DeadLetterTopic string Rack string + LogLevel LogLevel Brokers []string MaxRetry int WorkDuration time.Duration - LogLevel LogLevel } type BatchConfiguration struct { - BatchConsumeFn BatchConsumeFn - MessageGroupLimit int - MessageGroupDuration time.Duration + BatchConsumeFn BatchConsumeFn + MessageGroupLimit int } func (cfg *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) { @@ -166,9 +166,9 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { reader := kafka.NewReader(readerCfg) - if cfg.DistributedTracingEnabled { - return NewOtelReaderWrapper(cfg, reader) - } + // if cfg.DistributedTracingEnabled { + // return NewOtelReaderWrapper(cfg, reader) + //} return NewReaderWrapper(reader), nil } @@ -186,6 +186,10 @@ func (cfg *ConsumerConfig) setDefaults() { cfg.Reader.CommitInterval = cfg.CommitInterval } + if cfg.MessageGroupDuration == 0 { + cfg.MessageGroupDuration = time.Second + } + if cfg.DistributedTracingEnabled { if cfg.DistributedTracingConfiguration.Propagator == nil { cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator() diff --git a/examples/with-kafka-batch-consumer/main.go b/examples/with-kafka-batch-consumer/main.go index ed24f17..f52d8a9 100644 --- a/examples/with-kafka-batch-consumer/main.go +++ b/examples/with-kafka-batch-consumer/main.go @@ -16,9 +16,8 @@ func main() { GroupID: "standart-cg", }, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, - BatchConsumeFn: batchConsumeFn, + MessageGroupLimit: 1000, + BatchConsumeFn: batchConsumeFn, }, RetryEnabled: true, RetryConfiguration: kafka.RetryConfiguration{ @@ -28,6 +27,7 @@ func main() { WorkDuration: 50 * time.Second, MaxRetry: 3, }, + MessageGroupDuration: time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index f1783c7..60464c2 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -17,9 +17,8 @@ func main() { GroupID: "standart-cg", }, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 1000, - MessageGroupDuration: time.Second, - BatchConsumeFn: batchConsumeFn, + MessageGroupLimit: 1000, + BatchConsumeFn: batchConsumeFn, }, RetryEnabled: true, TransactionalRetry: kafka.NewBoolPtr(false), @@ -30,6 +29,7 @@ func main() { WorkDuration: 4 * time.Minute, MaxRetry: 3, }, + MessageGroupDuration: time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) diff --git a/examples/with-sasl-plaintext/go.mod b/examples/with-sasl-plaintext/go.mod index e419066..ade215d 100644 --- a/examples/with-sasl-plaintext/go.mod +++ b/examples/with-sasl-plaintext/go.mod @@ -8,7 +8,7 @@ require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/examples/with-sasl-plaintext/go.sum b/examples/with-sasl-plaintext/go.sum index 58296f6..755c5fe 100644 --- a/examples/with-sasl-plaintext/go.sum +++ b/examples/with-sasl-plaintext/go.sum @@ -2,6 +2,8 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6 github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/go.mod b/go.mod index 9e5fc93..805da64 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 - github.com/Trendyol/otel-kafka-konsumer v0.0.5 + github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 github.com/prometheus/client_golang v1.16.0 @@ -44,6 +44,7 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.uber.org/atomic v1.11.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.9.0 // indirect diff --git a/go.sum b/go.sum index 2c16b5e..d4efbb5 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ -github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE= -github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= -github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= @@ -97,7 +95,8 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/message.go b/message.go index 363e536..58e3aca 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "sync" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -12,22 +13,21 @@ import ( type Header = protocol.Header type Message struct { + Time time.Time + WriterData interface{} + + // Context To enable distributed tracing support + Context context.Context Topic string + Key []byte + Value []byte + Headers []Header Partition int Offset int64 HighWaterMark int64 // IsFailed Is only used on transactional retry disabled IsFailed bool - - Key []byte - Value []byte - Headers []Header - WriterData interface{} - Time time.Time - - // Context To enable distributed tracing support - Context context.Context } func (m *Message) toKafkaMessage() kafka.Message { @@ -44,21 +44,54 @@ func (m *Message) toKafkaMessage() kafka.Message { } } -func fromKafkaMessage(message *kafka.Message) *Message { - return &Message{ - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - HighWaterMark: message.HighWaterMark, - Key: message.Key, - Value: message.Value, - Headers: message.Headers, - WriterData: message.WriterData, - Time: message.Time, - Context: context.TODO(), +func toKafkaMessages(messages *[]*Message, commitMessages *[]kafka.Message) { + for _, message := range *messages { + *commitMessages = append(*commitMessages, message.toKafkaMessage()) + } +} + +func putMessages(messages *[]*Message) { + for _, message := range *messages { + messagePool.Put(message) + } +} + +func putKafkaMessage(messages *[]kafka.Message) { + for _, message := range *messages { + //nolint:gosec + kafkaMessagePool.Put(&message) } } +var messagePool = sync.Pool{ + New: func() any { + return &Message{} + }, +} + +var kafkaMessagePool = sync.Pool{ + New: func() any { + return &kafka.Message{} + }, +} + +func fromKafkaMessage(kafkaMessage *kafka.Message) *Message { + message := messagePool.Get().(*Message) + + message.Topic = kafkaMessage.Topic + message.Partition = kafkaMessage.Partition + message.Offset = kafkaMessage.Offset + message.HighWaterMark = kafkaMessage.HighWaterMark + message.Key = kafkaMessage.Key + message.Value = kafkaMessage.Value + message.Headers = kafkaMessage.Headers + message.WriterData = kafkaMessage.WriterData + message.Time = kafkaMessage.Time + message.Context = context.TODO() + + return message +} + func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { headers := make([]kcronsumer.Header, 0, len(m.Headers)) for i := range m.Headers { @@ -87,16 +120,17 @@ func toMessage(message kcronsumer.Message) *Message { }) } - return &Message{ - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - HighWaterMark: message.HighWaterMark, - Key: message.Key, - Value: message.Value, - Headers: headers, - Time: message.Time, - } + msg := messagePool.Get().(*Message) + msg.Topic = message.Topic + msg.Partition = message.Partition + msg.Offset = message.Offset + msg.HighWaterMark = message.HighWaterMark + msg.Key = message.Key + msg.Value = message.Value + msg.Headers = headers + msg.Time = message.Time + + return msg } func (m *Message) Header(key string) *kafka.Header { diff --git a/otel_producer_test.go b/otel_producer_test.go index 65d23e3..ff4a0d3 100644 --- a/otel_producer_test.go +++ b/otel_producer_test.go @@ -63,14 +63,14 @@ type mockOtelKafkaKonsumerWriter struct { var _ OtelKafkaKonsumerWriter = (*mockOtelKafkaKonsumerWriter)(nil) -func (m mockOtelKafkaKonsumerWriter) WriteMessage(ctx context.Context, msg kafka.Message) error { +func (m mockOtelKafkaKonsumerWriter) WriteMessage(_ context.Context, _ kafka.Message) error { if m.wantErr { return errors.New("err occurred") } return nil } -func (m mockOtelKafkaKonsumerWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { +func (m mockOtelKafkaKonsumerWriter) WriteMessages(_ context.Context, _ []kafka.Message) error { if m.wantErr { return errors.New("err occurred") } diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 0fa07a5..b92c386 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -35,10 +35,14 @@ func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader }, nil } -func (o *otelReaderWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) { - return o.r.ReadMessage(ctx) +func (o *otelReaderWrapper) FetchMessage(ctx context.Context, msg *segmentio.Message) error { + return o.r.FetchMessage(ctx, msg) } func (o *otelReaderWrapper) Close() error { return o.r.Close() } + +func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { + return o.r.CommitMessages(context.Background(), messages...) +} diff --git a/producer_config.go b/producer_config.go index fde6746..1feeecd 100644 --- a/producer_config.go +++ b/producer_config.go @@ -30,20 +30,20 @@ type WriterConfig struct { } type TransportConfig struct { + MetadataTopics []string DialTimeout time.Duration IdleTimeout time.Duration MetadataTTL time.Duration - MetadataTopics []string } type ProducerConfig struct { + DistributedTracingConfiguration DistributedTracingConfiguration Transport *TransportConfig SASL *SASLConfig TLS *TLSConfig ClientID string Writer WriterConfig DistributedTracingEnabled bool - DistributedTracingConfiguration DistributedTracingConfiguration } func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { diff --git a/producer_test.go b/producer_test.go index 26c1eaa..d4fd8ac 100644 --- a/producer_test.go +++ b/producer_test.go @@ -48,7 +48,7 @@ func Test_producer_Close_Successfully(t *testing.T) { type mockWriter struct{} -func (m *mockWriter) WriteMessages(ctx context.Context, message ...kafka.Message) error { +func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error { return nil } diff --git a/reader_wrapper.go b/reader_wrapper.go index a283538..b6f36f2 100644 --- a/reader_wrapper.go +++ b/reader_wrapper.go @@ -14,12 +14,17 @@ func NewReaderWrapper(reader *segmentio.Reader) Reader { return &readerWrapper{r: reader} } -// ReadMessage returns pointer of kafka message because we will support distributed tracing in the near future -func (s *readerWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) { - message, err := s.r.ReadMessage(ctx) - return &message, err +// ReadMessage gets pointer of kafka message because we will support distributed tracing in the near future +func (s *readerWrapper) FetchMessage(ctx context.Context, msg *segmentio.Message) error { + message, err := s.r.FetchMessage(ctx) + *msg = message + return err } func (s *readerWrapper) Close() error { return s.r.Close() } + +func (s *readerWrapper) CommitMessages(messages []segmentio.Message) error { + return s.r.CommitMessages(context.Background(), messages...) +} diff --git a/test/integration/go.mod b/test/integration/go.mod index aec012d..89f6a8c 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -5,13 +5,13 @@ go 1.19 replace github.com/Trendyol/kafka-konsumer => ../.. require ( - github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 + github.com/Trendyol/kafka-konsumer v1.8.8 github.com/segmentio/kafka-go v0.4.43 ) require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 58296f6..7b9c897 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,7 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= -github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= +github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= @@ -90,7 +90,7 @@ go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1 go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index b674867..131ec01 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -125,10 +125,10 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { messagesLen := make(chan int) consumerCfg := &kafka.ConsumerConfig{ - Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + MessageGroupDuration: time.Second, + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, - MessageGroupDuration: time.Second, + MessageGroupLimit: 100, BatchConsumeFn: func(messages []*kafka.Message) error { messagesLen <- len(messages) return nil @@ -186,9 +186,9 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di MaxRetry: 3, LogLevel: "error", }, + MessageGroupDuration: time.Second, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, - MessageGroupDuration: time.Second, + MessageGroupLimit: 100, BatchConsumeFn: func(messages []*kafka.Message) error { messages[1].IsFailed = true return errors.New("err")