Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gzip compression as an option #77

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func main() {
sp.RequireStorage(*storagememory.Init()),
sp.OptionRequestType("POST"),
sp.OptionProtocol("http"),
sp.OptionSendLimit(4),
sp.OptionSendLimit(1000),
sp.OptionEnableRequestPostGzip(true),
)

subject := sp.InitSubject()
Expand All @@ -40,20 +41,19 @@ func main() {
tracker := sp.InitTracker(
sp.RequireEmitter(emitter),
sp.OptionSubject(subject),
sp.OptionBase64Encode(true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just for setting up the experiment, wouldn't the gzip compression be more efficient in case we didn't encode to base 64? I think we lose some potential for compression by using base64 before gzip.

)

fmt.Println("Sending events to " + emitter.GetCollectorUrl())

pageView := sp.PageViewEvent{
PageUrl: sphelp.NewString("acme.com"),
}
tracker.TrackPageView(pageView)

screenView := sp.ScreenViewEvent{
Name: sphelp.NewString("name"),
Id: sphelp.NewString("Screen ID"),
}
tracker.TrackScreenView(screenView)

structEvent := sp.StructuredEvent{
Category: sphelp.NewString("shop"),
Expand All @@ -62,19 +62,19 @@ func main() {
Value: sphelp.NewFloat64(2),
}

tracker.TrackStructEvent(structEvent)

data := map[string]interface{}{
"targetUrl": "https://www.snowplow.io",
}

sdj := sp.InitSelfDescribingJson("iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-1", data)

sde := sp.SelfDescribingEvent{Event: sdj}

tracker.TrackSelfDescribingEvent(sde)
for i := 0; i < 1000; i++ {
tracker.TrackPageView(pageView)
tracker.TrackScreenView(screenView)
tracker.TrackStructEvent(structEvent)
tracker.TrackSelfDescribingEvent(sde)
}

tracker.Emitter.Stop()
tracker.BlockingFlush(5, 10)

tracker.BlockingFlush(5, 1000)
}
61 changes: 41 additions & 20 deletions tracker/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tracker

import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
Expand All @@ -30,14 +31,15 @@ import (
)

const (
DEFAULT_REQ_TYPE = "POST"
DEFAULT_PROTOCOL = "http"
DEFAULT_SEND_LIMIT = 500
DEFAULT_BYTE_LIMIT_GET = 40000
DEFAULT_BYTE_LIMIT_POST = 40000
DEFAULT_DB_NAME = "events.db"
POST_WRAPPER_BYTES = 88 // "schema":"iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-3","data":[]
POST_STM_BYTES = 22 // "stm":"1443452851000"
DEFAULT_REQ_TYPE = "POST"
DEFAULT_PROTOCOL = "http"
DEFAULT_SEND_LIMIT = 500
DEFAULT_BYTE_LIMIT_GET = 40000
DEFAULT_BYTE_LIMIT_POST = 40000
DEFAULT_DB_NAME = "events.db"
DEFAULT_ENABLE_REQUEST_POST_GZIP = false
POST_WRAPPER_BYTES = 88 // "schema":"iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-3","data":[]
POST_STM_BYTES = 22 // "stm":"1443452851000"
)

type SendResult struct {
Expand All @@ -51,17 +53,18 @@ type CallbackResult struct {
}

type Emitter struct {
CollectorUri string
CollectorUrl url.URL
RequestType string
Protocol string
SendLimit int
ByteLimitGet int
ByteLimitPost int
Storage storageiface.Storage
SendChannel chan bool
Callback func(successCount []CallbackResult, failureCount []CallbackResult)
HttpClient *http.Client
CollectorUri string
CollectorUrl url.URL
RequestType string
Protocol string
SendLimit int
ByteLimitGet int
ByteLimitPost int
Storage storageiface.Storage
SendChannel chan bool
Callback func(successCount []CallbackResult, failureCount []CallbackResult)
HttpClient *http.Client
EnableRequestPostGzip bool
}

// InitEmitter creates a new Emitter object which handles
Expand All @@ -75,6 +78,7 @@ func InitEmitter(options ...func(*Emitter)) *Emitter {
e.SendLimit = DEFAULT_SEND_LIMIT
e.ByteLimitGet = DEFAULT_BYTE_LIMIT_GET
e.ByteLimitPost = DEFAULT_BYTE_LIMIT_POST
e.EnableRequestPostGzip = DEFAULT_ENABLE_REQUEST_POST_GZIP

// Option parameters
for _, op := range options {
Expand Down Expand Up @@ -168,6 +172,11 @@ func OptionHttpClient(client *http.Client) func(e *Emitter) {
return func(e *Emitter) { e.HttpClient = client }
}

// OptionEnableRequestPostGzip
func OptionEnableRequestPostGzip(enableRequestPostGzip bool) func(e *Emitter) {
return func(e *Emitter) { e.EnableRequestPostGzip = enableRequestPostGzip }
}

// --- Event Handlers

// Add will push an event to the database and will then initiate a sending loop.
Expand Down Expand Up @@ -342,8 +351,20 @@ func (e *Emitter) sendPostRequest(url string, ids []int, body []payload.Payload,
SCHEMA: SCHEMA_PAYLOAD_DATA,
DATA: addSentTimeToEvents(body),
}
postBuffer := common.MapToJson(postEnvelope)

var req *http.Request
if e.EnableRequestPostGzip {
var postBufferGzip bytes.Buffer
gz := gzip.NewWriter(&postBufferGzip)
gz.Write([]byte(postBuffer))
gz.Close()

req, _ := http.NewRequest("POST", url, bytes.NewBufferString(common.MapToJson(postEnvelope)))
req, _ = http.NewRequest("POST", url, &postBufferGzip)
req.Header.Set("Content-Encoding", "gzip")
} else {
req, _ = http.NewRequest("POST", url, bytes.NewBufferString(postBuffer))
}
req.Header.Set("Content-Type", POST_CONTENT_TYPE)

resp, err := e.HttpClient.Do(req)
Expand Down
2 changes: 2 additions & 0 deletions tracker/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestEmitterInit(t *testing.T) {
assert.NotNil(emitter.HttpClient)
assert.NotNil(emitter.Storage)
assert.Equal("memory.StorageMemory", reflect.TypeOf(emitter.Storage).String())
assert.False(emitter.EnableRequestPostGzip)

// Assert defaults
emitter = InitEmitter(RequireCollectorUri("com.acme"), RequireStorage(*sqlite3.Init("test.db")))
Expand All @@ -75,6 +76,7 @@ func TestEmitterInit(t *testing.T) {
assert.NotNil(emitter.HttpClient)
assert.NotNil(emitter.Storage)
assert.Equal("sqlite3.StorageSQLite3", reflect.TypeOf(emitter.Storage).String())
assert.False(emitter.EnableRequestPostGzip)

// Assert the set functions
emitter.SetCollectorUri("com.snplow")
Expand Down
Loading