diff --git a/CHANGELOG.md b/CHANGELOG.md index 95952546..0b227266 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ### Main (unreleased) +- [340](https://github.com/Shopify/job-iteration/pull/340) Add `cursor.iteration` instrumentation for the query to fetch the next batch of records for the Active Record cursor. - [513](https://github.com/Shopify/job-iteration/pull/513) Deprecate returning enumerators from `build_enumerator` that are not wrapped with `enumerator_builder.wrap`. The built-in enumerator builders now always wrap. ## v1.7.0 (Oct 11, 2024) diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index 7c721ff5..ff9dc838 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -32,7 +32,7 @@ def records def batches cursor = finder_cursor Enumerator.new(method(:size)) do |yielder| - while (records = cursor.next_batch(@batch_size)) + while (records = instrument_next_batch(cursor)) yielder.yield(records, cursor_value(records.last)) if records.any? end end @@ -44,6 +44,12 @@ def size private + def instrument_next_batch(cursor) + ActiveSupport::Notifications.instrument("active_record_cursor.iteration") do + cursor.next_batch(@batch_size) + end + end + def cursor_value(record) positions = @columns.map do |column| attribute_name = column.to_s.split(".").last diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index 724f8779..db79bc16 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -57,6 +57,38 @@ class ActiveRecordEnumeratorTest < IterationUnitTest assert_equal([shops, shops.last.id], enum.first) end + class StubbedCursor + include ActiveSupport::Testing::TimeHelpers + def initialize(wait_time:) + @wait_time = wait_time + end + + def next_batch(*) + travel(@wait_time) + end + end + + test "enumerator next batch is instrumented with proper duration" do + wait_time = 15.seconds + freeze_time do + stubbed_cursor = StubbedCursor.new(wait_time: wait_time) + + ActiveSupport::Notifications.subscribe("active_record_cursor.iteration") do |*args| + ActiveSupport::Notifications.unsubscribe("active_record_cursor.iteration") + event = ActiveSupport::Notifications::Event.new(*args) + assert_equal(wait_time.in_milliseconds, event.duration) + end + enum = build_enumerator + enum.send(:instrument_next_batch, stubbed_cursor) + end + end + + test "enumerator next batch is instrumented" do + ActiveSupport::Notifications.expects(:instrument).with("active_record_cursor.iteration") + enum = build_enumerator.batches + enum.first + end + test "columns are configurable" do enum = build_enumerator(columns: [:updated_at]).batches shops = Product.order(:updated_at).take(2)