Skip to content

Commit

Permalink
Wait to reenqueue job post-interrupt until after callbacks are finished
Browse files Browse the repository at this point in the history
When Sidekiq is being used as the queue adapter, it is possible for job-iteration to
reenqueue the job after an interruption prior to the callbacks for the original job
completing. This can create race conditions if shutdown or after_perform callbacks
touch data that the new job will also touch.

This can be solved by waiting until after all the callbacks on the original job
are complete to reenqueue the iteration job. Rather than reenqueuing the job
from within #iterate_with_enumerator, we can prepend an after_perform callback (which
ensures the callback runs last) when JobIteration::Iteration is included and reenqueue the job there.
  • Loading branch information
adrianna-chang-shopify committed Feb 25, 2021
1 parent b92d30e commit 2fd075e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
8 changes: 7 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module Iteration
define_callbacks :start
define_callbacks :shutdown
define_callbacks :complete

after_perform(prepend: true, if: :reenqueue_iteration_job?) { reenqueue_iteration_job }
end

module ClassMethods
Expand Down Expand Up @@ -127,8 +129,8 @@ def iterate_with_enumerator(enumerator, arguments)
end

next unless job_should_exit?
@reenqueue_iteration_job = true
self.executions -= 1 if executions > 1
reenqueue_iteration_job
return false
end

Expand All @@ -146,6 +148,10 @@ def record_unit_of_work
end
end

def reenqueue_iteration_job?
defined?(@reenqueue_iteration_job) && @reenqueue_iteration_job
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
Expand Down
15 changes: 15 additions & 0 deletions test/integration/sidekiq_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ class SidekiqIntegrationTest < ActiveSupport::TestCase
assert_equal 0, queue_size
end

test "allows callbacks to finish before reenqueuing job after interrupt" do
out, _ = capture_subprocess_io do
CallbacksJob.perform_later
start_sidekiq_and_wait
end

expected_callbacks_order = [["before_enqueue"], ["on_shutdown"], ["before_enqueue"]]
assert_equal expected_callbacks_order, out.scan(/callback: ([^\s]+)/)

TerminateJob.perform_later
start_sidekiq_and_wait

assert_equal 0, queue_size
end

private

def start_sidekiq_and_wait
Expand Down
16 changes: 16 additions & 0 deletions test/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,19 @@ def perform
Process.kill("TERM", Process.pid)
end
end

class CallbacksJob < IterationJob
include JobIteration::Iteration

before_enqueue { puts "callback: before_enqueue" }
on_shutdown { puts "callback: on_shutdown" }

def build_enumerator(cursor:)
enumerator_builder.times(2, cursor: cursor)
end

def each_iteration(element)
Process.kill("TERM", Process.pid) if element == 0
sleep(1)
end
end

0 comments on commit 2fd075e

Please sign in to comment.