From 39c2243f2e33f685748149b36aa3084246f0c78e Mon Sep 17 00:00:00 2001 From: lhightower Date: Fri, 1 Sep 2023 23:01:56 +0000 Subject: [PATCH] Send exceptions to `process_batch` instrumenter --- lib/racecar/runner.rb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index 0536a5f7..3a4615b5 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -222,21 +222,21 @@ def process_batch(messages) } @instrumenter.instrument("start_process_batch", instrumentation_payload) - @instrumenter.instrument("process_batch", instrumentation_payload) do - with_pause(first.topic, first.partition, first.offset..last.offset) do |pause| - begin + with_pause(first.topic, first.partition, first.offset..last.offset) do |pause| + begin + @instrumenter.instrument("process_batch", instrumentation_payload) do racecar_messages = messages.map do |message| Racecar::Message.new(message, retries_count: pause.pauses_count) end processor.process_batch(racecar_messages) processor.deliver! consumer.store_offset(messages.last) - rescue => e - instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) - instrumentation_payload[:retries_count] = pause.pauses_count - config.error_handler.call(e, instrumentation_payload) - raise e end + rescue => e + instrumentation_payload[:unrecoverable_delivery_error] = reset_producer_on_unrecoverable_delivery_errors(e) + instrumentation_payload[:retries_count] = pause.pauses_count + config.error_handler.call(e, instrumentation_payload) + raise e end end end