diff --git a/mutate/compress.go b/mutate/compress.go index 740968c17..d8a2427ec 100644 --- a/mutate/compress.go +++ b/mutate/compress.go @@ -22,8 +22,15 @@ type Compressor interface { // indicate what compression type is used, e.g. "gzip", or "" for no // compression. MediaTypeSuffix() string + + // WithOpt applies an option and can be chained. + WithOpt(CompressorOpt) Compressor } +// CompressorOpt is a compressor option which can be used to configure a +// compressor. +type CompressorOpt interface{} + type noopCompressor struct{} func (nc noopCompressor) Compress(r io.Reader) (io.ReadCloser, error) { @@ -37,16 +44,24 @@ func (nc noopCompressor) MediaTypeSuffix() string { // NoopCompressor provides no compression. var NoopCompressor Compressor = noopCompressor{} +func (nc noopCompressor) WithOpt(CompressorOpt) Compressor { + return nc +} + // GzipCompressor provides gzip compression. -var GzipCompressor Compressor = gzipCompressor{} +var GzipCompressor Compressor = gzipCompressor{blockSize: 256 << 10} + +type GzipBlockSize int -type gzipCompressor struct{} +type gzipCompressor struct { + blockSize int +} func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) { pipeReader, pipeWriter := io.Pipe() gzw := gzip.NewWriter(pipeWriter) - if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil { + if err := gzw.SetConcurrency(gz.blockSize, 2*runtime.NumCPU()); err != nil { return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU()) } go func() { @@ -76,6 +91,15 @@ func (gz gzipCompressor) MediaTypeSuffix() string { return "gzip" } +func (gz gzipCompressor) WithOpt(opt CompressorOpt) Compressor { + switch val := opt.(type) { + case GzipBlockSize: + gz.blockSize = int(val) + } + + return gz +} + // ZstdCompressor provides zstd compression. var ZstdCompressor Compressor = zstdCompressor{} @@ -114,3 +138,7 @@ func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) { func (zs zstdCompressor) MediaTypeSuffix() string { return "zstd" } + +func (zs zstdCompressor) WithOpt(CompressorOpt) Compressor { + return zs +} diff --git a/mutate/compress_test.go b/mutate/compress_test.go index 6cfd47ffe..1797a828f 100644 --- a/mutate/compress_test.go +++ b/mutate/compress_test.go @@ -46,6 +46,21 @@ func TestGzipCompressor(t *testing.T) { assert.NoError(err) assert.Equal(string(content), fact) + + // with options + c = c.WithOpt(GzipBlockSize(256 << 12)) + + r, err = c.Compress(buf) + assert.NoError(err) + assert.Equal(c.MediaTypeSuffix(), "gzip") + + r, err = gzip.NewReader(r) + assert.NoError(err) + + content, err = ioutil.ReadAll(r) + assert.NoError(err) + + assert.Equal(string(content), fact) } func TestZstdCompressor(t *testing.T) {