Skip to content

Commit

Permalink
Send exceptions to process_batch instrumenter
Browse files Browse the repository at this point in the history
  • Loading branch information
lairen committed Oct 4, 2023
1 parent 04c76cf commit 39c2243
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,21 @@ def process_batch(messages)
}

@instrumenter.instrument("start_process_batch", instrumentation_payload)
@instrumenter.instrument("process_batch", instrumentation_payload) do
with_pause(first.topic, first.partition, first.offset..last.offset) do |pause|
begin
with_pause(first.topic, first.partition, first.offset..last.offset) do |pause|
begin
@instrumenter.instrument("process_batch", instrumentation_payload) do
racecar_messages = messages.map do |message|
Racecar::Message.new(message, retries_count: pause.pauses_count)
end
processor.process_batch(racecar_messages)
processor.deliver!
consumer.store_offset(messages.last)
rescue => e
instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e)
instrumentation_payload[:retries_count] = pause.pauses_count
config.error_handler.call(e, instrumentation_payload)
raise e
end
rescue => e
instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e)
instrumentation_payload[:retries_count] = pause.pauses_count
config.error_handler.call(e, instrumentation_payload)
raise e
end
end
end
Expand Down

0 comments on commit 39c2243

Please sign in to comment.