Skip to content

Commit

Permalink
add runner and datadog specs for process_message and process_batch ex…
Browse files Browse the repository at this point in the history
…ception instrumentation
  • Loading branch information
lairen committed Oct 4, 2023
1 parent 39c2243 commit b392a6c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
18 changes: 18 additions & 0 deletions spec/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ def create_event(name, payload = {})

subscriber.process_message(event)
end

it 'publishes errors' do
event.payload[:exception] = StandardError.new("surprise")
expect(statsd).
to receive(:increment).
with("consumer.process_message.errors", tags: metric_tags)

subscriber.process_message(event)
end
end

describe '#process_batch' do
Expand Down Expand Up @@ -137,6 +146,15 @@ def create_event(name, payload = {})

subscriber.process_batch(event)
end

it 'publishes errors' do
event.payload[:exception] = StandardError.new("surprise")
expect(statsd).
to receive(:increment).
with("consumer.process_batch.errors", tags: metric_tags)

subscriber.process_batch(event)
end
end

describe '#join_group' do
Expand Down
22 changes: 22 additions & 0 deletions spec/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ def consumer_rebalance_listener=(_listenr)
class FakeInstrumenter < Racecar::Instrumenter
def initialize(*)
super(backend: Racecar::NullInstrumenter)
@errored_events = []
end

def instrument(event_name, payload = {}, &block)
super
rescue => e
@errored_events << event_name unless @errored_events.include?(event_name)
raise
end

def event_raised_errors?(event_name)
@errored_events.include?(event_name)
end
end

Expand Down Expand Up @@ -497,6 +509,11 @@ def initialize(*)

runner.run
end

specify 'message processing errors are propagated to the instrumenter' do
kafka.deliver_message(StandardError.new("surprise"), topic: "greetings")
expect { runner.run }.to change { instrumenter.event_raised_errors?("process_message") }.to(true)
end
end
end

Expand Down Expand Up @@ -624,6 +641,11 @@ def initialize(*)

runner.run
end

specify 'batch processing errors are propagated to the instrumenter' do
kafka.deliver_message(StandardError.new("surprise"), topic: "greetings")
expect { runner.run }.to change { instrumenter.event_raised_errors?("process_batch") }.to(true)
end
end
end

Expand Down

0 comments on commit b392a6c

Please sign in to comment.