diff --git a/extensions/sinks/kafka/ext/kafka.go b/extensions/sinks/kafka/ext/kafka.go index 655b97a8e4..033e5c11d9 100644 --- a/extensions/sinks/kafka/ext/kafka.go +++ b/extensions/sinks/kafka/ext/kafka.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "strings" + "time" kafkago "github.com/segmentio/kafka-go" @@ -46,10 +47,17 @@ type sinkConf struct { } type kafkaConf struct { - MaxAttempts int `json:"maxAttempts"` - RequiredACKs int `json:"requiredACKs"` - Key string `json:"key"` - Headers interface{} `json:"headers"` + MaxAttempts int `json:"maxAttempts"` + RequiredACKs int `json:"requiredACKs"` + Key string `json:"key"` + Headers interface{} `json:"headers"` + WriterConf kafkaWriterConf `json:"writerConf"` +} + +type kafkaWriterConf struct { + BatchSize int `json:"batchSize"` + BatchTimeout time.Duration `json:"batchTimeout"` + BatchBytes int64 `json:"batchBytes"` } func (m *kafkaSink) Ping(_ string, props map[string]interface{}) error { @@ -92,10 +100,7 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error { return err } m.tlsConfig = tlsConfig - kc := &kafkaConf{ - RequiredACKs: -1, - MaxAttempts: 1, - } + kc := getDefaultKafkaConf() if err := cast.MapToStruct(props, kc); err != nil { return err } @@ -122,7 +127,9 @@ func (m *kafkaSink) buildKafkaWriter() error { AllowAutoTopicCreation: true, MaxAttempts: m.kc.MaxAttempts, RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs), - BatchSize: 1, + BatchSize: m.kc.WriterConf.BatchSize, + BatchBytes: m.kc.WriterConf.BatchBytes, + BatchTimeout: m.kc.WriterConf.BatchTimeout, Transport: &kafkago.Transport{ SASL: mechanism, TLS: m.tlsConfig, @@ -310,3 +317,16 @@ func (m *kafkaSink) ping(address string) error { defer c.Close() return nil } + +func getDefaultKafkaConf() *kafkaConf { + c := &kafkaConf{ + RequiredACKs: -1, + MaxAttempts: 1, + WriterConf: kafkaWriterConf{ + BatchSize: 100, + BatchTimeout: 5 * time.Millisecond, + BatchBytes: 1048576, + }, + } + return c +}