Skip to content

Commit

Permalink
Merge pull request #520 from Shopify/eo/build_enum_metric
Browse files Browse the repository at this point in the history
Add active record cursor next iteration metric
  • Loading branch information
etiennebarrie authored Dec 2, 2024
2 parents 51bc16e + dbb2f82 commit 22b4ea2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### Main (unreleased)

- [340](https://github.com/Shopify/job-iteration/pull/340) Add `cursor.iteration` instrumentation for the query to fetch the next batch of records for the Active Record cursor.
- [513](https://github.com/Shopify/job-iteration/pull/513) Deprecate returning enumerators from `build_enumerator` that are not wrapped with `enumerator_builder.wrap`. The built-in enumerator builders now always wrap.

## v1.7.0 (Oct 11, 2024)
Expand Down
8 changes: 7 additions & 1 deletion lib/job-iteration/active_record_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def records
def batches
cursor = finder_cursor
Enumerator.new(method(:size)) do |yielder|
while (records = cursor.next_batch(@batch_size))
while (records = instrument_next_batch(cursor))
yielder.yield(records, cursor_value(records.last)) if records.any?
end
end
Expand All @@ -44,6 +44,12 @@ def size

private

def instrument_next_batch(cursor)
ActiveSupport::Notifications.instrument("active_record_cursor.iteration") do
cursor.next_batch(@batch_size)
end
end

def cursor_value(record)
positions = @columns.map do |column|
attribute_name = column.to_s.split(".").last
Expand Down
32 changes: 32 additions & 0 deletions test/unit/active_record_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,38 @@ class ActiveRecordEnumeratorTest < IterationUnitTest
assert_equal([shops, shops.last.id], enum.first)
end

class StubbedCursor
include ActiveSupport::Testing::TimeHelpers
def initialize(wait_time:)
@wait_time = wait_time
end

def next_batch(*)
travel(@wait_time)
end
end

test "enumerator next batch is instrumented with proper duration" do
wait_time = 15.seconds
freeze_time do
stubbed_cursor = StubbedCursor.new(wait_time: wait_time)

ActiveSupport::Notifications.subscribe("active_record_cursor.iteration") do |*args|
ActiveSupport::Notifications.unsubscribe("active_record_cursor.iteration")
event = ActiveSupport::Notifications::Event.new(*args)
assert_equal(wait_time.in_milliseconds, event.duration)
end
enum = build_enumerator
enum.send(:instrument_next_batch, stubbed_cursor)
end
end

test "enumerator next batch is instrumented" do
ActiveSupport::Notifications.expects(:instrument).with("active_record_cursor.iteration")
enum = build_enumerator.batches
enum.first
end

test "columns are configurable" do
enum = build_enumerator(columns: [:updated_at]).batches
shops = Product.order(:updated_at).take(2)
Expand Down

0 comments on commit 22b4ea2

Please sign in to comment.