-
Notifications
You must be signed in to change notification settings - Fork 892
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
x/mongo/driver: enable parallel zlib compression and improve zstd dec…
…ompression This commit fixes a bug where zlib compression was serialized across all goroutines. This occurred because only one shared zlib decompresser was instantiated for each compression level which had to be locked when used and thus preventing concurrent compression. The decompression performance of zstd is also improved by using a pool of zstd decoders (instantiating a zstd encoded or decoder is fairly expensive). It also slightly cleans up the logic used to store and acquire zstd encoders. The CompressPayload benchmark is changed to use a stable payload, which is now stored in testdata/compression.go. Previously it used the compression.go file itself, which made benchmarks results confusing and liable to change whenever the compression/decompression logic changed. ``` goos: darwin goarch: arm64 pkg: go.mongodb.org/mongo-driver/x/mongo/driver │ base.20.txt │ new.20.txt │ │ sec/op │ sec/op vs base │ CompressPayload/CompressorZLib-10 5387.4µ ± 0% 651.1µ ± 1% -87.91% (p=0.000 n=20) CompressPayload/CompressorZstd-10 64.56µ ± 1% 64.10µ ± 0% -0.72% (p=0.000 n=20) DecompressPayload/CompressorZLib-10 125.7µ ± 1% 123.7µ ± 0% -1.60% (p=0.000 n=20) DecompressPayload/CompressorZstd-10 70.13µ ± 1% 45.80µ ± 1% -34.70% (p=0.000 n=20) geomean 235.3µ 124.0µ -47.31% │ base.20.txt │ new.20.txt │ │ B/s │ B/s vs base │ CompressPayload/CompressorZLib-10 365.2Mi ± 0% 3021.4Mi ± 1% +727.41% (p=0.000 n=20) CompressPayload/CompressorZstd-10 29.76Gi ± 1% 29.97Gi ± 0% +0.73% (p=0.000 n=20) DecompressPayload/CompressorZLib-10 15.28Gi ± 1% 15.53Gi ± 0% +1.63% (p=0.000 n=20) DecompressPayload/CompressorZstd-10 27.39Gi ± 1% 41.95Gi ± 1% +53.13% (p=0.000 n=20) geomean 8.164Gi 15.49Gi +89.77% │ base.20.txt │ new.20.txt │ │ B/op │ B/op vs base │ CompressPayload/CompressorZLib-10 14.02Ki ± 0% 14.00Ki ± 0% -0.10% (p=0.000 n=20) CompressPayload/CompressorZstd-10 3.398Ki ± 0% 3.398Ki ± 0% ~ (p=1.000 n=20) ¹ DecompressPayload/CompressorZLib-10 2.008Mi ± 0% 2.008Mi ± 0% ~ (p=1.000 n=20) ¹ DecompressPayload/CompressorZstd-10 4.109Mi ± 0% 1.969Mi ± 0% -52.08% (p=0.000 n=20) geomean 142.5Ki 118.5Ki -16.82% ¹ all samples are equal │ base.20.txt │ new.20.txt │ │ allocs/op │ allocs/op vs base │ CompressPayload/CompressorZLib-10 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=20) ¹ CompressPayload/CompressorZstd-10 4.000 ± 0% 4.000 ± 0% ~ (p=1.000 n=20) ¹ DecompressPayload/CompressorZLib-10 26.00 ± 0% 26.00 ± 0% ~ (p=1.000 n=20) ¹ DecompressPayload/CompressorZstd-10 104.000 ± 0% 1.000 ± 0% -99.04% (p=0.000 n=20) geomean 10.20 3.193 -68.69% ¹ all samples are equal ```
- Loading branch information
1 parent
26566ed
commit 9bbdb31
Showing
3 changed files
with
259 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
// Copyright (C) MongoDB, Inc. 2017-present. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. You may obtain | ||
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
package driver | ||
|
||
import ( | ||
"bytes" | ||
"compress/zlib" | ||
"fmt" | ||
"io" | ||
"sync" | ||
|
||
"github.com/golang/snappy" | ||
"github.com/klauspost/compress/zstd" | ||
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage" | ||
) | ||
|
||
// CompressionOpts holds settings for how to compress a payload | ||
type CompressionOpts struct { | ||
Compressor wiremessage.CompressorID | ||
ZlibLevel int | ||
ZstdLevel int | ||
UncompressedSize int32 | ||
} | ||
|
||
var zstdEncoders sync.Map // map[zstd.EncoderLevel]*zstd.Encoder | ||
|
||
func getZstdEncoder(level zstd.EncoderLevel) (*zstd.Encoder, error) { | ||
if v, ok := zstdEncoders.Load(level); ok { | ||
return v.(*zstd.Encoder), nil | ||
} | ||
encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
zstdEncoders.Store(level, encoder) | ||
return encoder, nil | ||
} | ||
|
||
var zlibEncoders sync.Map // map[int /*level*/]*zlibEncoder | ||
|
||
func getZlibEncoder(level int) (*zlibEncoder, error) { | ||
if v, ok := zlibEncoders.Load(level); ok { | ||
return v.(*zlibEncoder), nil | ||
} | ||
writer, err := zlib.NewWriterLevel(nil, level) | ||
if err != nil { | ||
return nil, err | ||
} | ||
encoder := &zlibEncoder{writer: writer, buf: new(bytes.Buffer)} | ||
zlibEncoders.Store(level, encoder) | ||
|
||
return encoder, nil | ||
} | ||
|
||
type zlibEncoder struct { | ||
mu sync.Mutex | ||
writer *zlib.Writer | ||
buf *bytes.Buffer | ||
} | ||
|
||
func (e *zlibEncoder) Encode(dst, src []byte) ([]byte, error) { | ||
e.mu.Lock() | ||
defer e.mu.Unlock() | ||
|
||
e.buf.Reset() | ||
e.writer.Reset(e.buf) | ||
|
||
_, err := e.writer.Write(src) | ||
if err != nil { | ||
return nil, err | ||
} | ||
err = e.writer.Close() | ||
if err != nil { | ||
return nil, err | ||
} | ||
dst = append(dst[:0], e.buf.Bytes()...) | ||
return dst, nil | ||
} | ||
|
||
// CompressPayload takes a byte slice and compresses it according to the options passed | ||
func CompressPayload(in []byte, opts CompressionOpts) ([]byte, error) { | ||
switch opts.Compressor { | ||
case wiremessage.CompressorNoOp: | ||
return in, nil | ||
case wiremessage.CompressorSnappy: | ||
return snappy.Encode(nil, in), nil | ||
case wiremessage.CompressorZLib: | ||
encoder, err := getZlibEncoder(opts.ZlibLevel) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return encoder.Encode(nil, in) | ||
case wiremessage.CompressorZstd: | ||
encoder, err := getZstdEncoder(zstd.EncoderLevelFromZstd(opts.ZstdLevel)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return encoder.EncodeAll(in, nil), nil | ||
default: | ||
return nil, fmt.Errorf("unknown compressor ID %v", opts.Compressor) | ||
} | ||
} | ||
|
||
// DecompressPayload takes a byte slice that has been compressed and undoes it according to the options passed | ||
func DecompressPayload(in []byte, opts CompressionOpts) (uncompressed []byte, err error) { | ||
switch opts.Compressor { | ||
case wiremessage.CompressorNoOp: | ||
return in, nil | ||
case wiremessage.CompressorSnappy: | ||
l, err := snappy.DecodedLen(in) | ||
if err != nil { | ||
return nil, fmt.Errorf("decoding compressed length %w", err) | ||
} else if int32(l) != opts.UncompressedSize { | ||
return nil, fmt.Errorf("unexpected decompression size, expected %v but got %v", opts.UncompressedSize, l) | ||
} | ||
uncompressed = make([]byte, opts.UncompressedSize) | ||
return snappy.Decode(uncompressed, in) | ||
case wiremessage.CompressorZLib: | ||
r, err := zlib.NewReader(bytes.NewReader(in)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer func() { | ||
err = r.Close() | ||
}() | ||
uncompressed = make([]byte, opts.UncompressedSize) | ||
_, err = io.ReadFull(r, uncompressed) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return uncompressed, nil | ||
case wiremessage.CompressorZstd: | ||
r, err := zstd.NewReader(bytes.NewBuffer(in)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer r.Close() | ||
uncompressed = make([]byte, opts.UncompressedSize) | ||
_, err = io.ReadFull(r, uncompressed) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return uncompressed, nil | ||
default: | ||
return nil, fmt.Errorf("unknown compressor ID %v", opts.Compressor) | ||
} | ||
} |