Skip to content

Commit

Permalink
bulker: workaround for too long error header in some messages
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jul 9, 2024
1 parent 10bef6a commit e018fd5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
3 changes: 2 additions & 1 deletion bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (bc *AbstractBatchConsumer) initTransactionalProducer() (*kafka.Producer, e
for k, v := range errors {
bc.Errorf("%s COUNT: %d", k, v)
}
clear(errors)
}
clear(errors)
case e := <-producer.Events():
switch ev := e.(type) {
case *kafka.Message:
Expand All @@ -163,6 +163,7 @@ func (bc *AbstractBatchConsumer) initTransactionalProducer() (*kafka.Producer, e
zero := 0
cnt := utils.MapGetOrCreate(errors, errtext, &zero)
*cnt = *cnt + 1
bc.Errorf("%s %d", errtext, len(errors))
} else {
kafkabase.ProducerMessages(ProducerMessageLabels(*ev.TopicPartition.Topic, "delivered", "")).Inc()
//bc.Debugf("Message ID: %s delivered to topic %s [%d] at offset %v", messageId, *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
Expand Down
3 changes: 3 additions & 0 deletions bulkerapp/app/retry_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
bulker "github.com/jitsucom/bulker/bulkerlib"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/kafkabase"
"strconv"
"time"
Expand Down Expand Up @@ -167,6 +168,8 @@ func (rc *RetryConsumer) processBatchImpl(_ *Destination, _, _, retryBatchSize i
retries++
singleCount.retryScheduled++
}
originalError := kafkabase.GetKafkaHeader(message, errorHeader)
kafkabase.PutKafkaHeader(&headers, errorHeader, utils.ShortenString(originalError, 1024))
kafkabase.PutKafkaHeader(&headers, retriesCountHeader, strconv.Itoa(retries))
err = producer.Produce(&kafka.Message{
Key: message.Key,
Expand Down

0 comments on commit e018fd5

Please sign in to comment.