diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index fb08b855..b6959a94 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -45,5 +45,5 @@ services: batchTimeout: 3s exportTimeout: 30s maxExportBatchSize: 5000 - compression: none + compression: zstd keepAlive: true diff --git a/pkg/output/http/compression.go b/pkg/output/http/compression.go index cbc7d770..a70b1d86 100644 --- a/pkg/output/http/compression.go +++ b/pkg/output/http/compression.go @@ -1,8 +1,111 @@ package http +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "net/http" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" +) + type CompressionStrategy string var ( - CompressionStrategyNone CompressionStrategy = "none" - CompressionStrategyGzip CompressionStrategy = "gzip" + CompressionStrategyNone CompressionStrategy = "none" + CompressionStrategyGzip CompressionStrategy = "gzip" + CompressionStrategyZstd CompressionStrategy = "zstd" + CompressionStrategyZlib CompressionStrategy = "zlib" + CompressionStrategySnappy CompressionStrategy = "snappy" ) + +type Compressor struct { + Strategy CompressionStrategy +} + +func (c *Compressor) Compress(in *bytes.Buffer) (*bytes.Buffer, error) { + switch c.Strategy { + case CompressionStrategyGzip: + return c.gzipCompress(in) + case CompressionStrategyZstd: + return c.zstdCompress(in) + case CompressionStrategyZlib: + return c.zlibCompress(in) + case CompressionStrategySnappy: + return c.snappyCompress(in) + default: + return in, nil + } +} + +func (c *Compressor) gzipCompress(in *bytes.Buffer) (*bytes.Buffer, error) { + out := &bytes.Buffer{} + g := gzip.NewWriter(out) + + _, err := g.Write(in.Bytes()) + if err != nil { + return out, err + } + + if err := g.Close(); err != nil { + return out, err + } + + return out, nil +} + +func (c *Compressor) zstdCompress(in *bytes.Buffer) (*bytes.Buffer, error) { + out := &bytes.Buffer{} + + z, err := zstd.NewWriter(out) + if err != nil { + return out, err + } + + _, err = z.Write(in.Bytes()) + if err != nil { + return out, err + } + + if err := z.Close(); err != nil { + return out, err + } + + return out, nil +} + +func (c *Compressor) zlibCompress(in *bytes.Buffer) (*bytes.Buffer, error) { + out := &bytes.Buffer{} + z := zlib.NewWriter(out) + + _, err := z.Write(in.Bytes()) + if err != nil { + return out, err + } + + if err := z.Close(); err != nil { + return out, err + } + + return out, nil +} + +func (c *Compressor) snappyCompress(in *bytes.Buffer) (*bytes.Buffer, error) { + compressed := snappy.Encode(nil, in.Bytes()) + + return bytes.NewBuffer(compressed), nil +} + +func (c *Compressor) AddHeaders(req *http.Request) { + switch c.Strategy { + case CompressionStrategyGzip: + req.Header.Set("Content-Encoding", "gzip") + case CompressionStrategyZstd: + req.Header.Set("Content-Encoding", "zstd") + case CompressionStrategyZlib: + req.Header.Set("Content-Encoding", "deflate") + case CompressionStrategySnappy: + req.Header.Set("Content-Encoding", "snappy") + } +} diff --git a/pkg/output/http/exporter.go b/pkg/output/http/exporter.go index 415ca44a..3f1d11fc 100644 --- a/pkg/output/http/exporter.go +++ b/pkg/output/http/exporter.go @@ -2,7 +2,6 @@ package http import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -18,10 +17,10 @@ import ( ) type ItemExporter struct { - config *Config - log logrus.FieldLogger - - client *http.Client + config *Config + log logrus.FieldLogger + compressor *Compressor + client *http.Client } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { @@ -38,6 +37,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE Transport: t, Timeout: config.ExportTimeout, }, + compressor: &Compressor{Strategy: config.Compression}, }, nil } @@ -82,15 +82,14 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated } buf := bytes.NewBufferString(body) - if e.config.Compression == CompressionStrategyGzip { - compressed, err := e.gzip(buf) - if err != nil { - return err - } - buf = compressed + compressed, err := e.compressor.Compress(buf) + if err != nil { + return err } + buf = compressed + // TODO: check that this also handles processor timeout req, err := http.NewRequestWithContext(ctx, httpMethod, e.config.Address, buf) if err != nil { @@ -103,9 +102,7 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated req.Header.Set("Content-Type", "application/x-ndjson") - if e.config.Compression == CompressionStrategyGzip { - req.Header.Set("Content-Encoding", "gzip") - } + e.compressor.AddHeaders(req) rsp, err = e.client.Do(req) if err != nil { @@ -125,19 +122,3 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated return nil } - -func (e *ItemExporter) gzip(in *bytes.Buffer) (*bytes.Buffer, error) { - out := &bytes.Buffer{} - g := gzip.NewWriter(out) - - _, err := g.Write(in.Bytes()) - if err != nil { - return out, err - } - - if err := g.Close(); err != nil { - return out, err - } - - return out, nil -}