Skip to content

Commit

Permalink
feat(output-http): Add more compression options (#341)
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 27, 2024
1 parent 017327a commit f107997
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 33 deletions.
2 changes: 1 addition & 1 deletion deploy/local/docker-compose/xatu-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ services:
batchTimeout: 3s
exportTimeout: 30s
maxExportBatchSize: 5000
compression: none
compression: zstd
keepAlive: true
107 changes: 105 additions & 2 deletions pkg/output/http/compression.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,111 @@
package http

import (
"bytes"
"compress/gzip"
"compress/zlib"
"net/http"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

type CompressionStrategy string

var (
CompressionStrategyNone CompressionStrategy = "none"
CompressionStrategyGzip CompressionStrategy = "gzip"
CompressionStrategyNone CompressionStrategy = "none"
CompressionStrategyGzip CompressionStrategy = "gzip"
CompressionStrategyZstd CompressionStrategy = "zstd"
CompressionStrategyZlib CompressionStrategy = "zlib"
CompressionStrategySnappy CompressionStrategy = "snappy"
)

type Compressor struct {
Strategy CompressionStrategy
}

func (c *Compressor) Compress(in *bytes.Buffer) (*bytes.Buffer, error) {
switch c.Strategy {
case CompressionStrategyGzip:
return c.gzipCompress(in)
case CompressionStrategyZstd:
return c.zstdCompress(in)
case CompressionStrategyZlib:
return c.zlibCompress(in)
case CompressionStrategySnappy:
return c.snappyCompress(in)
default:
return in, nil
}
}

func (c *Compressor) gzipCompress(in *bytes.Buffer) (*bytes.Buffer, error) {
out := &bytes.Buffer{}
g := gzip.NewWriter(out)

_, err := g.Write(in.Bytes())
if err != nil {
return out, err
}

if err := g.Close(); err != nil {
return out, err
}

return out, nil
}

func (c *Compressor) zstdCompress(in *bytes.Buffer) (*bytes.Buffer, error) {
out := &bytes.Buffer{}

z, err := zstd.NewWriter(out)
if err != nil {
return out, err
}

_, err = z.Write(in.Bytes())
if err != nil {
return out, err
}

if err := z.Close(); err != nil {
return out, err
}

return out, nil
}

func (c *Compressor) zlibCompress(in *bytes.Buffer) (*bytes.Buffer, error) {
out := &bytes.Buffer{}
z := zlib.NewWriter(out)

_, err := z.Write(in.Bytes())
if err != nil {
return out, err
}

if err := z.Close(); err != nil {
return out, err
}

return out, nil
}

func (c *Compressor) snappyCompress(in *bytes.Buffer) (*bytes.Buffer, error) {
compressed := snappy.Encode(nil, in.Bytes())

return bytes.NewBuffer(compressed), nil
}

func (c *Compressor) AddHeaders(req *http.Request) {
switch c.Strategy {
case CompressionStrategyGzip:
req.Header.Set("Content-Encoding", "gzip")
case CompressionStrategyZstd:
req.Header.Set("Content-Encoding", "zstd")
case CompressionStrategyZlib:
req.Header.Set("Content-Encoding", "deflate")
case CompressionStrategySnappy:
req.Header.Set("Content-Encoding", "snappy")
}
}
41 changes: 11 additions & 30 deletions pkg/output/http/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package http

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -18,10 +17,10 @@ import (
)

type ItemExporter struct {
config *Config
log logrus.FieldLogger

client *http.Client
config *Config
log logrus.FieldLogger
compressor *Compressor
client *http.Client
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
Expand All @@ -38,6 +37,7 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
Transport: t,
Timeout: config.ExportTimeout,
},
compressor: &Compressor{Strategy: config.Compression},
}, nil
}

Expand Down Expand Up @@ -82,15 +82,14 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated
}

buf := bytes.NewBufferString(body)
if e.config.Compression == CompressionStrategyGzip {
compressed, err := e.gzip(buf)
if err != nil {
return err
}

buf = compressed
compressed, err := e.compressor.Compress(buf)
if err != nil {
return err
}

buf = compressed

// TODO: check that this also handles processor timeout
req, err := http.NewRequestWithContext(ctx, httpMethod, e.config.Address, buf)
if err != nil {
Expand All @@ -103,9 +102,7 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated

req.Header.Set("Content-Type", "application/x-ndjson")

if e.config.Compression == CompressionStrategyGzip {
req.Header.Set("Content-Encoding", "gzip")
}
e.compressor.AddHeaders(req)

rsp, err = e.client.Do(req)
if err != nil {
Expand All @@ -125,19 +122,3 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*xatu.Decorated

return nil
}

func (e *ItemExporter) gzip(in *bytes.Buffer) (*bytes.Buffer, error) {
out := &bytes.Buffer{}
g := gzip.NewWriter(out)

_, err := g.Write(in.Bytes())
if err != nil {
return out, err
}

if err := g.Close(); err != nil {
return out, err
}

return out, nil
}

0 comments on commit f107997

Please sign in to comment.