Skip to content

Commit

Permalink
Merge pull request #781 from asg0451/fix-lz4-compression-levels
Browse files Browse the repository at this point in the history
fix setting lz4 compression levels
  • Loading branch information
twmb authored Jul 29, 2024
2 parents 187266a + 9347148 commit e16c46c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
11 changes: 4 additions & 7 deletions pkg/kgo/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// for that batch.
type CompressionCodec struct {
codec codecType
level int8
level int
}

// NoCompression is a compression option that avoids compression. This can
Expand All @@ -60,10 +60,7 @@ func ZstdCompression() CompressionCodec { return CompressionCodec{codecZstd, 0}
//
// If the level is invalid, compressors just use a default level.
func (c CompressionCodec) WithLevel(level int) CompressionCodec {
if level > 127 {
level = 127 // lz4 could theoretically be large, I guess
}
c.level = int8(level)
c.level = level
return c
}

Expand Down Expand Up @@ -108,8 +105,8 @@ out:
case codecGzip:
level := gzip.DefaultCompression
if codec.level != 0 {
if _, err := gzip.NewWriterLevel(nil, int(codec.level)); err != nil {
level = int(codec.level)
if _, err := gzip.NewWriterLevel(nil, codec.level); err != nil {
level = codec.level
}
}
c.gzPool = sync.Pool{New: func() any { c, _ := gzip.NewWriterLevel(nil, level); return c }}
Expand Down
14 changes: 14 additions & 0 deletions pkg/kgo/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@ import (
"reflect"
"sync"
"testing"

"github.com/pierrec/lz4/v4"
)

// Regression test for #778.
func TestCompressionCodecLZ4WithSpecifiedLevel(t *testing.T) {
t.Parallel()

codec := Lz4Compression().WithLevel(512)
w := lz4.NewWriter(new(bytes.Buffer))
err := w.Apply(lz4.CompressionLevelOption(lz4.CompressionLevel(codec.level)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}

func TestNewCompressor(t *testing.T) {
t.Parallel()
for i, test := range []struct {
Expand Down

0 comments on commit e16c46c

Please sign in to comment.