diff --git a/transport/kafka/producer.go b/transport/kafka/producer.go index 182ce3d..e463c66 100644 --- a/transport/kafka/producer.go +++ b/transport/kafka/producer.go @@ -50,6 +50,12 @@ func WithMaxAttemptsProducerOption(attempts int) ProducerOption { return func(p *Producer) { p.config.MaxAttempts = attempts } } +// WithMaxBatchBytesOption sets the maximum bytes of record size +// kafka producer will try to produce +func WithMaxBatchBytesOption(batchBytes int64) ProducerOption { + return func(p *Producer) { p.config.BatchBytes = int(batchBytes) } +} + // WithQueueCapacityProducerOption sets the internal buffer capacity // used to cache incoming messages before publishing on kafka func WithQueueCapacityProducerOption(qc int) ProducerOption {