From 352bf5961741eb010a434e7367db5c60d739aa17 Mon Sep 17 00:00:00 2001 From: Elson Oliveira Date: Tue, 21 Feb 2023 10:16:20 -0700 Subject: [PATCH] Add cursor next iteration metric --- lib/job-iteration/active_record_enumerator.rb | 11 +++++++++-- lib/job-iteration/enumerator_builder.rb | 6 +++++- test/unit/active_record_enumerator_test.rb | 10 +++++++++- test/unit/enumerator_builder_test.rb | 15 +++++++++++++++ 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/lib/job-iteration/active_record_enumerator.rb b/lib/job-iteration/active_record_enumerator.rb index 5e7ab832..db408249 100644 --- a/lib/job-iteration/active_record_enumerator.rb +++ b/lib/job-iteration/active_record_enumerator.rb @@ -7,11 +7,12 @@ module JobIteration class ActiveRecordEnumerator SQL_DATETIME_WITH_NSEC = "%Y-%m-%d %H:%M:%S.%N" - def initialize(relation, columns: nil, batch_size: 100, cursor: nil) + def initialize(relation, columns: nil, batch_size: 100, cursor: nil, instrument_tags: nil) @relation = relation @batch_size = batch_size @columns = Array(columns || "#{relation.table_name}.#{relation.primary_key}") @cursor = cursor + @instrument_tags = instrument_tags end def records @@ -27,7 +28,7 @@ def records def batches cursor = finder_cursor Enumerator.new(method(:size)) do |yielder| - while (records = cursor.next_batch(@batch_size)) + while (records = instrument_next_batch(cursor)) yielder.yield(records, cursor_value(records.last)) if records.any? end end @@ -39,6 +40,12 @@ def size private + def instrument_next_batch(cursor) + ActiveSupport::Notifications.instrument("cursor.iteration", @instrument_tags) do + cursor.next_batch(@batch_size) + end + end + def cursor_value(record) positions = @columns.map do |column| attribute_name = column.to_s.split(".").last diff --git a/lib/job-iteration/enumerator_builder.rb b/lib/job-iteration/enumerator_builder.rb index 0f52d1a3..b3cca00d 100644 --- a/lib/job-iteration/enumerator_builder.rb +++ b/lib/job-iteration/enumerator_builder.rb @@ -160,10 +160,14 @@ def build_active_record_enumerator(scope, cursor:, **args) raise ArgumentError, "scope must be an ActiveRecord::Relation" end + args_tags = args.delete(:instrument_tags) + instrument_tags = { job_class: @job.class.name }.merge(args_tags || {}) + JobIteration::ActiveRecordEnumerator.new( scope, cursor: cursor, - **args + instrument_tags: instrument_tags, + **args, ) end end diff --git a/test/unit/active_record_enumerator_test.rb b/test/unit/active_record_enumerator_test.rb index 26fc886d..f4c3a3b3 100644 --- a/test/unit/active_record_enumerator_test.rb +++ b/test/unit/active_record_enumerator_test.rb @@ -57,6 +57,13 @@ class ActiveRecordEnumeratorTest < IterationUnitTest assert_equal([shops, shops.last.id], enum.first) end + test "enumerator next batch is instrumented" do + instrument_tags = { job_class: "JobClass", custom_tag: "CustomTag" } + ActiveSupport::Notifications.expects(:instrument).with("cursor.iteration", instrument_tags) + enum = build_enumerator(instrument_tags: instrument_tags).batches + enum.first + end + test "columns are configurable" do enum = build_enumerator(columns: [:updated_at]).batches shops = Product.order(:updated_at).take(2) @@ -107,12 +114,13 @@ class ActiveRecordEnumeratorTest < IterationUnitTest private - def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil) + def build_enumerator(relation: Product.all, batch_size: 2, columns: nil, cursor: nil, instrument_tags: nil) JobIteration::ActiveRecordEnumerator.new( relation, batch_size: batch_size, columns: columns, cursor: cursor, + instrument_tags: instrument_tags, ) end end diff --git a/test/unit/enumerator_builder_test.rb b/test/unit/enumerator_builder_test.rb index e00919ba..3c86388a 100644 --- a/test/unit/enumerator_builder_test.rb +++ b/test/unit/enumerator_builder_test.rb @@ -39,6 +39,21 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase enumerator_builder.build_active_record_enumerator_on_records(Product.all, cursor: nil) end + test "enumerator is created with instrument tags" do + instrument_tags = { custom_tag: "CustomTag" } + expected_tags = instrument_tags.merge(job_class: "Mocha::Mock") + + JobIteration::ActiveRecordEnumerator.expects(:new) + .with(anything, cursor: nil, instrument_tags: expected_tags) + .returns(mock({ records: [] })) + + enumerator_builder.build_active_record_enumerator_on_records( + Product.all, + cursor: nil, + instrument_tags: instrument_tags, + ) + end + test_builder_method(:build_active_record_enumerator_on_batches) do enumerator_builder.build_active_record_enumerator_on_batches(Product.all, cursor: nil) end