Skip to content

Commit

Permalink
feat: Add compress_events option to support gzipping event payload (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 committed Jul 25, 2024
1 parent fbb430b commit 3180161
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 20 deletions.
1 change: 1 addition & 0 deletions contract-tests/client_entity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def initialize(log, config)
opts[:private_attributes] = events[:globalPrivateAttributes]
opts[:flush_interval] = (events[:flushIntervalMs] / 1_000) unless events[:flushIntervalMs].nil?
opts[:omit_anonymous_contexts] = !!events[:omitAnonymousContexts]
opts[:compress_events] = !!events[:enableGzip]
else
opts[:send_events] = false
end
Expand Down
1 change: 1 addition & 0 deletions contract-tests/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'tags',
'migrations',
'event-gzip',
'optional-event-gzip',
'event-sampling',
'context-comparison',
'polling-gzip',
Expand Down
20 changes: 20 additions & 0 deletions lib/ldclient-rb/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def initialize(opts = {})
@all_attributes_private = opts[:all_attributes_private] || false
@private_attributes = opts[:private_attributes] || []
@send_events = opts.has_key?(:send_events) ? opts[:send_events] : Config.default_send_events
@compress_events = opts.has_key?(:compress_events) ? opts[:compress_events] : Config.default_compress_events
@context_keys_capacity = opts[:context_keys_capacity] || Config.default_context_keys_capacity
@context_keys_flush_interval = opts[:context_keys_flush_interval] || Config.default_context_keys_flush_interval
@data_source = opts[:data_source]
Expand Down Expand Up @@ -254,6 +255,17 @@ def offline?
#
attr_reader :send_events

#
# Should the event payload sent to LaunchDarkly use gzip compression. By default this is false to prevent backward
# breaking compatibility issues with older versions of the relay proxy.
#
# Customers not using the relay proxy are strongly encouraged to enable this feature to reduce egress bandwidth
# cost.
#
# @return [Boolean]
#
attr_reader :compress_events

#
# The number of context keys that the event processor can remember at any one time. This reduces the
# amount of duplicate context details sent in analytics events.
Expand Down Expand Up @@ -539,6 +551,14 @@ def self.default_send_events
true
end

#
# The default value for {#compress_events}.
# @return [Boolean] false
#
def self.default_compress_events
false
end

#
# The default value for {#context_keys_capacity}.
# @return [Integer] 1000
Expand Down
15 changes: 11 additions & 4 deletions lib/ldclient-rb/impl/event_sender.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,24 @@ def send_event_data(event_data, description, is_diagnostic)
@logger.debug { "[LDClient] sending #{description}: #{event_data}" }
headers = {}
headers["content-type"] = "application/json"
headers["content-encoding"] = "gzip"
headers["content-encoding"] = "gzip" if @config.compress_events
Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| headers[k] = v }
unless is_diagnostic
headers["X-LaunchDarkly-Event-Schema"] = CURRENT_SCHEMA_VERSION.to_s
headers["X-LaunchDarkly-Payload-ID"] = payload_id
end
gzip = Zlib::GzipWriter.new(StringIO.new)
gzip << event_data

body = event_data
if @config.compress_events
gzip = Zlib::GzipWriter.new(StringIO.new)
gzip << event_data

body = gzip.close.string
end

response = http_client.request("POST", uri, {
headers: headers,
body: gzip.close.string,
body: body,
})
rescue StandardError => exn
@logger.warn { "[LDClient] Error sending events: #{exn.inspect}." }
Expand Down
12 changes: 8 additions & 4 deletions spec/http_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class StubHTTPServer

@@next_port = 50000

def initialize
def initialize(enable_compression: false)
@port = StubHTTPServer.next_port
@enable_compression = enable_compression
begin
base_opts = {
BindAddress: '127.0.0.1',
Expand Down Expand Up @@ -77,11 +78,14 @@ def record_request(req, res)

def await_request_with_body
r = @requests_queue.pop
request = r[0]
body = r[1]

return [request, body.to_s] unless @enable_compression

gz = Zlib::GzipReader.new(StringIO.new(body.to_s))

[r[0], gz.read]
[request, gz.read]
end
end

Expand All @@ -91,8 +95,8 @@ def method_missing(*)
end
end

def with_server(server = nil)
server = StubHTTPServer.new if server.nil?
def with_server(enable_compression: false)
server = StubHTTPServer.new(enable_compression: enable_compression)
begin
server.start
yield server
Expand Down
49 changes: 37 additions & 12 deletions spec/impl/event_sender_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,48 @@ module Impl
subject { EventSender }

let(:sdk_key) { "sdk_key" }
let(:fake_data) { '{"things":[]}' }
let(:fake_data) { '{"things":[],"stuff":false,"other examples":["you", "me", "us", "we"]}' }

def make_sender(server)
make_sender_with_events_uri(server.base_uri.to_s)
def make_sender(config_options = {})
config_options = {logger: $null_log}.merge(config_options)
subject.new(sdk_key, Config.new(config_options), nil, 0.1)
end

def make_sender_with_events_uri(events_uri)
subject.new(sdk_key, Config.new(events_uri: events_uri, logger: $null_log, application: {id: "id", version: "version"}), nil, 0.1)
def with_sender_and_server(config_options = {})
enable_compression = config_options[:compress_events] || false
with_server(enable_compression: enable_compression) do |server|
config_options[:events_uri] = server.base_uri.to_s
yield make_sender(config_options), server
end
end

def with_sender_and_server
with_server do |server|
yield make_sender(server), server
it "sends analytics event data without compression enabled" do
with_sender_and_server(compress_events: false) do |es, server|
server.setup_ok_response("/bulk", "")

result = es.send_event_data(fake_data, "", false)

expect(result.success).to be true
expect(result.must_shutdown).to be false
expect(result.time_from_server).not_to be_nil

req, body = server.await_request_with_body
expect(body).to eq fake_data
expect(req.header).to include({
"authorization" => [ sdk_key ],
"content-type" => [ "application/json" ],
"user-agent" => [ "RubyClient/" + LaunchDarkly::VERSION ],
"x-launchdarkly-event-schema" => [ "4" ],
"connection" => [ "Keep-Alive" ],
})
expect(req.header['x-launchdarkly-payload-id']).not_to eq []
expect(req.header['content-encoding']).to eq []
expect(req.header['content-length'][0].to_i).to eq fake_data.length
end
end

it "sends analytics event data" do
with_sender_and_server do |es, server|
it "sends analytics event data with compression enabled" do
with_sender_and_server(compress_events: true) do |es, server|
server.setup_ok_response("/bulk", "")

result = es.send_event_data(fake_data, "", false)
Expand All @@ -41,13 +65,14 @@ def with_sender_and_server
expect(body).to eq fake_data
expect(req.header).to include({
"authorization" => [ sdk_key ],
"content-encoding" => [ "gzip" ],
"content-type" => [ "application/json" ],
"user-agent" => [ "RubyClient/" + LaunchDarkly::VERSION ],
"x-launchdarkly-event-schema" => [ "4" ],
"x-launchdarkly-tags" => [ "application-id/id application-version/version" ],
"connection" => [ "Keep-Alive" ],
})
expect(req.header['x-launchdarkly-payload-id']).not_to eq []
expect(req.header['content-length'][0].to_i).to be > fake_data.length
end
end

Expand Down Expand Up @@ -123,7 +148,7 @@ def with_sender_and_server
begin
ENV["http_proxy"] = proxy.base_uri.to_s

es = make_sender_with_events_uri(fake_target_uri)
es = make_sender(events_uri: fake_target_uri)

result = es.send_event_data(fake_data, "", false)

Expand Down

0 comments on commit 3180161

Please sign in to comment.