Skip to content

Commit

Permalink
feat: support kafka writer batch conf (#3495)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Jan 9, 2025
1 parent 2742a02 commit afdba0d
Showing 1 changed file with 29 additions and 9 deletions.
38 changes: 29 additions & 9 deletions extensions/sinks/kafka/ext/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

kafkago "github.com/segmentio/kafka-go"

Expand Down Expand Up @@ -46,10 +47,17 @@ type sinkConf struct {
}

type kafkaConf struct {
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
WriterConf kafkaWriterConf `json:"writerConf"`
}

type kafkaWriterConf struct {
BatchSize int `json:"batchSize"`
BatchTimeout time.Duration `json:"batchTimeout"`
BatchBytes int64 `json:"batchBytes"`
}

func (m *kafkaSink) Ping(_ string, props map[string]interface{}) error {
Expand Down Expand Up @@ -92,10 +100,7 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
return err
}
m.tlsConfig = tlsConfig
kc := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
kc := getDefaultKafkaConf()
if err := cast.MapToStruct(props, kc); err != nil {
return err
}
Expand All @@ -122,7 +127,9 @@ func (m *kafkaSink) buildKafkaWriter() error {
AllowAutoTopicCreation: true,
MaxAttempts: m.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs),
BatchSize: 1,
BatchSize: m.kc.WriterConf.BatchSize,
BatchBytes: m.kc.WriterConf.BatchBytes,
BatchTimeout: m.kc.WriterConf.BatchTimeout,
Transport: &kafkago.Transport{
SASL: mechanism,
TLS: m.tlsConfig,
Expand Down Expand Up @@ -310,3 +317,16 @@ func (m *kafkaSink) ping(address string) error {
defer c.Close()
return nil
}

func getDefaultKafkaConf() *kafkaConf {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
WriterConf: kafkaWriterConf{
BatchSize: 100,
BatchTimeout: 5 * time.Millisecond,
BatchBytes: 1048576,
},
}
return c
}

0 comments on commit afdba0d

Please sign in to comment.