diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index 83862da3..2c86e277 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -15,6 +15,7 @@ module Iteration ) define_callbacks :start + define_callbacks :reenqueue define_callbacks :shutdown define_callbacks :complete end @@ -32,6 +33,10 @@ def on_shutdown(*filters, &blk) set_callback(:shutdown, :after, *filters, &blk) end + def on_reenqueue(*filters, &blk) + set_callback(:reenqueue, :before, *filters, &blk) + end + def on_complete(*filters, &blk) set_callback(:complete, :after, *filters, &blk) end @@ -74,6 +79,18 @@ def retry_job(*) @retried = true end + def reenqueue_iteration_job(options = {}) + self.executions -= 1 if executions > 1 + ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags) + logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}") + + adjust_total_time + self.times_interrupted += 1 + + self.already_in_queue = true if respond_to?(:already_in_queue=) + retry_job(options) + end + private def enumerator_builder @@ -123,8 +140,9 @@ def iterate_with_enumerator(enumerator, arguments) end next unless job_should_exit? - self.executions -= 1 if executions > 1 - reenqueue_iteration_job + run_callbacks(:reenqueue) do + reenqueue_iteration_job + end return false end @@ -137,17 +155,6 @@ def record_unit_of_work end end - def reenqueue_iteration_job - ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags) - logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}") - - adjust_total_time - self.times_interrupted += 1 - - self.already_in_queue = true if respond_to?(:already_in_queue=) - retry_job - end - def adjust_total_time self.total_time += (Time.now.utc.to_f - start_time.to_f).round(6) end diff --git a/lib/job-iteration/throttle_enumerator.rb b/lib/job-iteration/throttle_enumerator.rb index 98a5808c..c45a7479 100644 --- a/lib/job-iteration/throttle_enumerator.rb +++ b/lib/job-iteration/throttle_enumerator.rb @@ -30,7 +30,7 @@ def to_enum @enum.each do |*val| if should_throttle? ActiveSupport::Notifications.instrument("throttled.iteration", job_class: @job.class.name) - @job.retry_job(wait: @backoff) + @job.reenqueue_iteration_job(wait: @backoff) throw(:abort, :skip_complete_callbacks) end diff --git a/test/unit/active_job_iteration_test.rb b/test/unit/active_job_iteration_test.rb index 543ee59d..9ae19e8b 100644 --- a/test/unit/active_job_iteration_test.rb +++ b/test/unit/active_job_iteration_test.rb @@ -14,6 +14,8 @@ class SimpleIterationJob < ActiveJob::Base self.on_complete_called = 0 cattr_accessor :on_shutdown_called, instance_accessor: false self.on_shutdown_called = 0 + cattr_accessor :on_reenqueue_called, instance_accessor: false + self.on_reenqueue_called = 0 on_start do self.class.on_start_called += 1 @@ -26,6 +28,10 @@ class SimpleIterationJob < ActiveJob::Base on_shutdown do self.class.on_shutdown_called += 1 end + + on_reenqueue do + self.class.on_reenqueue_called += 1 + end end class MultiArgumentIterationJob < SimpleIterationJob @@ -61,6 +67,12 @@ def each_iteration(record) end end + class ActiveRecordIterationJobHaltReenqueue < ActiveRecordIterationJob + on_reenqueue do |job| + throw(:abort) if job.times_interrupted > 0 + end + end + class BatchActiveRecordIterationJob < SimpleIterationJob def build_enumerator(cursor:) enumerator_builder.active_record_on_batches( @@ -297,6 +309,7 @@ def setup klass.on_start_called = 0 klass.on_complete_called = 0 klass.on_shutdown_called = 0 + klass.on_reenqueue_called = 0 end JobShouldExitJob.records_performed = [] super @@ -329,6 +342,7 @@ def test_works_with_private_methods assert_equal(1, PrivateIterationJob.on_start_called) assert_equal(1, PrivateIterationJob.on_complete_called) assert_equal(1, PrivateIterationJob.on_shutdown_called) + assert_equal(0, PrivateIterationJob.on_reenqueue_called) end def test_failing_job @@ -379,6 +393,7 @@ def test_active_record_job assert_equal(0, ActiveRecordIterationJob.on_complete_called) work_one_job + assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called) assert_equal(2, ActiveRecordIterationJob.records_performed.size) @@ -389,6 +404,7 @@ def test_active_record_job work_one_job assert_equal(4, ActiveRecordIterationJob.records_performed.size) + assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called) job = peek_into_queue assert_equal(2, job.times_interrupted) @@ -401,6 +417,24 @@ def test_active_record_job assert_equal(2, ActiveRecordIterationJob.on_shutdown_called) end + def test_active_record_job_halt_reenqueue + iterate_exact_times(3.times) + + push(ActiveRecordIterationJobHaltReenqueue) + assert_jobs_in_queue(1) + + work_one_job + assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called) + assert_equal(3, ActiveRecordIterationJob.records_performed.size) + assert_jobs_in_queue(1) + + work_one_job + assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called) + assert_equal(6, ActiveRecordIterationJob.records_performed.size) + # By throwing abort on the reenqueue callback we halt the iteration and no jobs are reenqueue + assert_jobs_in_queue(0) + end + def test_activerecord_batches_complete push(BatchActiveRecordIterationJob) processed_records = Product.order(:id).pluck(:id) diff --git a/test/unit/throttle_enumerator_test.rb b/test/unit/throttle_enumerator_test.rb index 76d6ec64..8a957239 100644 --- a/test/unit/throttle_enumerator_test.rb +++ b/test/unit/throttle_enumerator_test.rb @@ -12,6 +12,8 @@ class IterationThrottleJob < ActiveJob::Base cattr_accessor :on_complete_called, instance_accessor: false self.on_complete_called = 0 + cattr_accessor :on_reenqueue_called, instance_accessor: false + self.on_reenqueue_called = 0 cattr_accessor :should_throttle_sequence, instance_accessor: false self.should_throttle_sequence = [] @@ -20,6 +22,10 @@ class IterationThrottleJob < ActiveJob::Base self.class.on_complete_called += 1 end + on_reenqueue do + self.class.on_reenqueue_called += 1 + end + def build_enumerator(_params, cursor:) enumerator_builder.build_throttle_enumerator( enumerator_builder.build_array_enumerator( @@ -36,13 +42,18 @@ def each_iteration(record, _params) end end - setup do - IterationThrottleJob.iterations_performed = [] + class IterationThrottleJobHaltReenqueue < IterationThrottleJob + on_reenqueue do |_job| + throw(:abort) + end end - teardown do - IterationThrottleJob.on_complete_called = 0 - IterationThrottleJob.should_throttle_sequence = [] + setup do + IterationThrottleJob.descendants.each do |klass| + klass.iterations_performed = [] + klass.on_complete_called = 0 + klass.on_reenqueue_called = 0 + end end test "throttle enumerator proxies wrapped enumerator" do @@ -92,6 +103,17 @@ def each_iteration(record, _params) assert_equal [1], IterationThrottleJob.iterations_performed end + test "do not push back to queue if reenqueue callback abort" do + IterationThrottleJobHaltReenqueue.should_throttle_sequence = [false, true, false] + + IterationThrottleJobHaltReenqueue.perform_now({}) + + enqueued = ActiveJob::Base.queue_adapter.enqueued_jobs + assert_equal 0, enqueued.size + + assert_equal [1], IterationThrottleJobHaltReenqueue.iterations_performed + end + test "does not pushed back to queue if not throttle" do assert_predicate ActiveJob::Base.queue_adapter.enqueued_jobs, :empty?