Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vanishing thread data between deserialising and performing an ActiveJob instance #36

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/delayed/job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def display_name
end

def perform
ActiveJob::Callbacks.run_callbacks(:execute) do
job.perform_now
end
ActiveJob::Base.execute(job_data)
Copy link
Member

@smudge smudge May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to dig up, in an old archive repo, the original comment thread pertaining to the choice of job.perform_now vs ActiveJob::Base.execute(job_data) in this custom job wrapper class.

Basically, it's because of the delegate_missing_to :job call (above), and how we wanted to support defining things like max_attempts and reschedule_at (and other DJ-specific interfaces) on the ActiveJob class, and have the wrapper be able to call into the ActiveJob class in order to expose those interfaces to the Delayed::Worker.

And the implementation choice was made here to ensure that job is the same object instance as the thing that ultimately gets performed within execute(job_data). Otherwise, you'd have one instance of the ActiveJob (@job, memoized below) responding to the worker, and a different instance deserialized within ActiveJob's .execute call.

So the implementation here was based on the assumption that the internal implementation of execute looks something like this:

def execute(job_data) # :nodoc:
  ActiveJob::Callbacks.run_callbacks(:execute) do
    job = deserialize(job_data)
    job.perform_now
  end
end

And we wanted to avoid calling deserialize(job_data) twice. Not just because it requires doing 2x the work, but because, in rare cases, you may actually want the legacy DelayedJob methods to be able to change their response depending on the result of the perform. I've only ever seen this use case once or twice, though. Usually, they are a pure function or return a static value, like this:

def reschedule_at(current_time, attempts)
  current_time + attempts.hours
end

def max_attempts
  4
end

However, it's possible to make them react to the results of the perform method:

# Totally made-up examples. Don't try this at home. ;-)
# the ivars below would be set in the ActiveJob's `perform` method

def reschedule_at(current_time, attempts)
  @timeout_error ? current_time + 10.minutes : current_time + (attempts ** 2).hours
end

def max_attempts
  @pro_user ? 10 : 5
end

And this breaks if @job in the wrapper (which the worker can see) is a completely different object from the deserialized job inside of .execute.

All of that said, I can see exactly what problem you are encountering. Because, if we deserialize the job outside of the run_callbacks block, callbacks that deal with setting up global state will swap out that global state from underneath the existing job instance. No bueno!

The way I can think to resolve this is to essentially pull everything back into the run_callbacks block, but expose an attr_reader so that the instantiated @job instance can be read back out by the worker.

attr_reader :job

def perform
  ActiveJob::Callbacks.run_callbacks(:execute) do
    @job = ActiveJob::Base.deserialize(job_data)
    @job.perform_now
  end
end

This assumes that the worker doesn't need the job instance before it calls perform. If it does... maybe there's still a double-instantiation that you'd have to do to make it happy (and then throw away the first instance you instantiated), but at the very least we can guarantee that any delegated method calls that occur after the job has completed will be delegated to the same instance of @job that actually just ran its perform method.

Does that make sense to you @caius?

(And, thank you for shining the flashlight on this very specific but very important piece of behavior. I learned a lot going down this rabbit hole, and I'm sorry I didn't get a chance to do it sooner!)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Another choice we could make here would be to not support that very niche "read the ivar from the perform method to respond dynamically to the worker" kind of configuration method. Technically a breaking change, and I'm hesitant to make it if there's still a way to make the job instance internally consistent.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, sorry it's taken a while to get back to you on this.

Thanks for going code-spelunking in the history, it's much clearer to understand why choices were made previously - and makes sense as to why it's written the way it is now given the context. I suspected this might be a case of DelayedJob meets ActiveJob and they have subtly different behaviours that don't interlace nicely.

we wanted to avoid calling deserialize(job_data) twice. Not just because it requires doing 2x the work, but because, in rare cases, you may actually want the legacy DelayedJob methods to be able to change their response depending on the result of the perform. I've only ever seen this use case once or twice, though.

Ah, this is an interesting use case I'd not considered. In my mind callbacks are either pure, or respond based on data held outside the instance as the ruby objects are basically ephemeral within the worker's runloop. In some ways that's quite a neat idea though, and in theory the ivar could be included in job_data to persist through different attempts of the job as well. Hrm. :thinking_face:

I can definitely see benefits to trying to support both use cases here, but I think I also come down on the side of not wanting to make breaking changes if we can't. The ivar-callback pattern seems like a useful option to maintain, even if rarely used. Especially if you don't have (or need) to store that data somewhere outside the instance.

The way I can think to resolve this is to essentially pull everything back into the run_callbacks block, but expose an attr_reader so that the instantiated @job instance can be read back out by the worker.

This assumes that the worker doesn't need the job instance before it calls perform. If it does... maybe there's still a double-instantiation that you'd have to do to make it happy (and then throw away the first instance you instantiated), but at the very least we can guarantee that any delegated method calls that occur after the job has completed will be delegated to the same instance of @jobthat actually just ran itsperform` method.

I started out agreeing, then thought about this some more and realised if that would work then it should work now as JobWrapper#job is already lazily loading the job data after . But then we wouldn't have seen the behaviour we did, so there must be something accessing JobWrapper#job before perform is invoked. If no messages are delegated to missing through the JobWrapper it wouldn't be loaded (ie, ActiveRecord::Base#deserialize) before entering the callback block in JobWrapper#perform.

Looking at the worker class, we go through Delayed::Runnable#start, Delayed::Worker#run!, Delayed::Worker#work_off then descend into iterating the reserved jobs where job is an Delayed::Job instance, and job#payload_object will return the Delayed::JobWrapper instance - which at this point hasn't yet loaded @job.

Presuming no Delayed callbacks defined in this example, we call through Worker#run_thread_callbacks into Worker#run_job, passing the Delayed::Job instance with #payload_object still not having loaded @job yet. #run_job then invokes any Delayed perform callbacks and calls into Delayed::Worker#run.

The first thing we do in Worker#run is generate a metadata hash by calling some methods on the Delayed::Job instance. Most of these are database columns, so won't trigger the JobWrapper loading the @job ivar. We do however call job.name which is Delayed::Backend::Base#name, and checks if payload_object responds_to? a couple of things. Turns out delegate_missing_to in turn delegates respond_to? (via respond_to_missing?), which causes the @job to be loaded to handle the respond_to?. This means the job is deserialized before we get to job.invoke_job a couple of lines later in Worker#run.

I've not used delegate_missing_to before, so wrote a little in-memory reproduction to prove to myself that was the case:

require "active_support/core_ext/module/delegation"

class JobRecord
  attr_reader :payload_object

  def initialize(payload_object)
    @payload_object = payload_object
  end

  # Copied verbatim from Delayed::Backend::Base#name, without error handling
  def name
    @name ||= payload_object.job_data['job_class'] if payload_object.respond_to?(:job_data)
    @name ||= payload_object.display_name if payload_object.respond_to?(:display_name)
    @name ||= payload_object.class.name
  end

  def invoke_job
    payload_object.perform
  end
end

class JobWrapper
  delegate_missing_to :job

  def job
    @job ||= SimpleJob.new
  end
end

class SimpleJob
  def perform
    "SimpleJob#perform called"
  end
end

j = JobRecord.new(JobWrapper.new)
# => #<JobRecord:0x00000001050ef2a8
#     @payload_object=#<JobWrapper:0x00000001050ef320>>
j.name
# => "JobWrapper"

j
# => #<JobRecord:0x00000001050ef2a8
#     @name="JobWrapper",
#     @payload_object=
#      #<JobWrapper:0x00000001050ef320 @job=#<SimpleJob:0x00000001050ebc98>>>

j.invoke_job
# => "SimpleJob#perform called"

j
# => #<JobRecord:0x00000001050ef2a8
#     @name="JobWrapper",
#     @payload_object=
#      #<JobWrapper:0x00000001050ef320 @job=#<SimpleJob:0x00000001050ebc98>>>

As you can see after calling j.name the JobWrapper instance gains the @job instance, because delegate_missing_to is calling job.respond_to? which in turn then deserializes our application job instance.

Then went and proved it was due to the respond_to_missing? falling through:

require "active_support/core_ext/module/delegation"

class Wrapper
  delegate_missing_to :item

  attr_reader :item

  def initialize(item)
    @item = item
  end

  def foo
  end

  def respond_to?(*a, **k)
    puts "#{self.class}##{__method__}(#{a.inspect}, #{k.inspect})"
    super
  end
end

class Item
  def respond_to?(*a, **k)
    puts "#{self.class}##{__method__}(#{a.inspect}, #{k.inspect})"
    super
  end
end

w = Wrapper.new(Item.new)
# => #<Wrapper:0x0000000100ec0260 @item=#<Item:0x0000000100ec02d8>>

w.respond_to?(:foo)
# => true
# >> Wrapper#respond_to?([:foo], {})

w.respond_to?(:hello)
# => false
# >> Wrapper#respond_to?([:hello], {})
# >> Item#respond_to?([:hello], {})

I think we're at a point where the two use cases just don't mesh in a way we can make happen, lazily deserializing the job instance outside of the callbacks means anything run during deserialize that affects things outside the job instance aren't guaranteed to persist through the ActiveJob callbacks being invoked. And if we deserialize the job instance again, we break any ivar-trickery that works with DelayedJob natively.

I'm half-tempted to suggest we shouldn't be doing anything that operates outside the job instance in #deserialize in our application code - currently we've worked around this issue by moving the loading logic for Current.services = into a before_perform callback, which is working fine. The job gets deserialized before the ActiveJob callbacks are invoked, then the before_perform callback sets up the environment for us just before the #perform method is invoked.

I wonder if potentially we could move the ActiveJob callbacks up a level in Delayed, so job deserialization happens inside the callbacks even when Delayed callbacks are accessing the job instance. :thinking_face:

We'd have to keep supporting non-ActiveJob jobs being scheduled, so it would require branching in Delayed::Job#run_thread_callbacks as the earliest point non-Delayed code could trigger job deserialization. And then the JobWrapper can just perform the job, knowing that the worker has already invoked the callbacks.

module Delayed
  class Worker
    def run_thread_callbacks(job, &block)
      if job.payload_object.is_a?(Delayed::JobWrapper)
        ActiveJob::Callbacks.run_callbacks(:execute) do
          self.class.lifecycle.run_callbacks(:thread, self, job, &block)
        end
      else
        self.class.lifecycle.run_callbacks(:thread, self, job, &block)
      end
    end
  end

  class JobWrapper
    def perform
      # ActiveJob callbacks already triggered from Delayed::Worker#run_thread_callbacks
      job.perform_now
    end
  end
end

Not sure what the knock-on effect of an ActiveJob callback erroring at that point would be vs the current behaviour though. I think currently that's handled by Delayed::Job#run having rescue Exception which would be bypassed by moving it earlier. Potentially would have to catch errors in run_thread_callbacks and invoke the error callbacks to maintain error handling parity.

I wonder if an easier option might be to try and stop the #run metadata hash loading the job instance and document that any of the Delayed callbacks accessing the job can lead to ActiveJob's #deserialize being called before the ActiveJob callbacks being executed, which can cause anything using ActiveSupport::CurrentAttributes to be wiped between #deserialize and #perform. I suspect the correct answer for our use case is to use a before_perform callback as we are doing now, and treat #deserialize as something that operates on the job instance and nothing else.

What do you think about interleaving the ActiveJob::Callbacks earlier in the Worker, and having ActiveJob logic outside of JobWrapper @smudge?

end

def encode_with(coder)
Expand Down
32 changes: 32 additions & 0 deletions spec/delayed/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,38 @@ def create_job(opts = {})
expect(job).to be_failed
end
end

describe 'handing off to ActiveJob' do
around do |example|
old_adapter = ActiveJob::Base.queue_adapter
ActiveJob::Base.queue_adapter = :delayed

example.run
ensure
ActiveJob::Base.queue_adapter = old_adapter
end

it 'runs everything under the same executor block' do
# Ensure the current attributes are persisted in the job, but then reset in this process
ActiveJobAttributesJob.results.clear
ActiveJobAttributesJob::Current.user_id = 0xC0FFEE
ActiveJobAttributesJob.perform_later
ActiveJobAttributesJob::Current.clear_all

# In a rails app this is in ActiveJob's railtie, wrapping the execute in an around callback that
# leans on the Rails executor to reset things in jobs. Fake it here with execute around callback
# setup the same, and only clear CurrentAttributes directly.
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, prepend: true) do |_, inner|
ActiveSupport::CurrentAttributes.clear_all
inner.call
end

worker.work_off

expect(ActiveJobAttributesJob::Current.user_id).to be_nil
expect(ActiveJobAttributesJob.results).to eq([0xC0FFEE])
end
end
end

describe 'failed jobs' do
Expand Down
23 changes: 23 additions & 0 deletions spec/sample_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,26 @@ def enqueue(job)
class ActiveJobJob < ActiveJob::Base # rubocop:disable Rails/ApplicationJob
def perform; end
end

class ActiveJobAttributesJob < ActiveJobJob
class Current < ActiveSupport::CurrentAttributes
attribute :user_id
end

def self.results
@results ||= []
end

def serialize
super.merge("user_id" => Current.user_id)
end

def deserialize(job_data)
super
Current.user_id = job_data["user_id"]
end

def perform
self.class.results << Current.user_id
end
end