Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix vanishing thread data between deserialising and performing an ActiveJob instance #36

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

caius
Copy link
Contributor

@caius caius commented Jan 15, 2024

Change Delayed::JobWrapper from directly invoking :execute callbacks and job#perform_now, to instead calling ActiveJob::Base.execute and letting ActiveJob handle the callbacks, job deserialisation and performing internally.

To explain why, we've observed a change in behaviour between delayed_job's queue adapter and delayed's queue adapter with thread variables being reset between job#deserialize and job#perform being called from the same worker thread/process.

Some background context around the Rails executor and ActiveSupport::CurrentAttributes is useful. Feel free to skip down a couple of paragraphs if you already understand them!

The ActiveJob Railtie has an initialiser that wraps all job executions with the Rails executor (it calls app.reloader in the code, but that's effectively the executor still.) This ensures code is reloaded between jobs running in development mode, but also makes sure any executor callbacks are invoked when the reloader block is entered at runtime in any environment. (If you've ever wondered how things in rails get reset between web requests, this mechanism is the answer.)

The ActiveSupport Railtie has an initialiser that resets ActiveSupport::CurrentAttributes instances when entering an executor block. This means before every job is run, all subclasses/instances of ActiveSupport::CurrentAttributes have their values cleared which stops things leaking between requests/jobs/blocks of work. This effectively lets you have thread-global variables that are cleaned up once the current unit of work has finished, without you needing to remember.

In our app we're using the Rails default Current model which is a subclass of ActiveSupport::CurrentAttributes to store some request-global information about an external service to use. (This allows us to override that on a per-request basis in non-production environments for testing purposes.) We pass that override down into the jobs at enqueue time by overriding ApplicationJob#serialize to save the value of Current.service_url into the job data hash.

class ApplicationJob < ActiveJob::Base
  def serialize
    super.merge("service_url" => Current.service_url)
  end

  def deserialize(job_data)
    super
    Current.service_url = job_data.delete("service_url")
  end
end

Our code in SomeJob#perform and elsewhere in the app invoked from the job can then call Current.service_url to find the overridden value. Doing it this way means the behaviour is shared between all jobs that need that overridden url without having to remember to override serialize/deserialize in each job when required.

Switching from delayed_job to delayed broke this mechanism silently 😬. No errors were raised but our override wasn't being observed in the code. Debugging it we observed within the ApplicationJob#deserialize method Current.service_url was being set correctly, but by the time the job#perform method was called Current.service_url was nil again. The case of the vanishing variable?! 🙀

At this point we deduced it was likely an ordering issue around ActiveSupport::CurrentAttributes being reset and investigated further in that direction. After littering puts statements through delayed and ActiveJob I think we've figured out why this is occurring and how it differs to delayed_job's behaviour.

We see JobWrapper#job being invoked early in the worker method in Delayed::Worker#run, via Delayed::Worker#max_run_time seeing if the job instance responds to #max_run_time. DJ Hooks will also use the job instance (payload_object). JobWrapper forwards all missing methods to the job method, which deserialises the job object from the data.

The JobWrapper#perform method then re-uses the @job memoized instance in when invoking execute callbacks (which reset all the ActiveSupport::CurrentAttributes values) then calls job.perform_now without invoking #deserialize again. This means we've deserialised the job, then invoked the Rails executor entry callbacks, then called job#perform. Subtle, but unexpected (to us at least) behaviour change.

This PR changes JobWrapper#perform to call ActiveJob::Base.execute(job_data) to have ActiveJob invoke callbacks, deserialize the job and perform it in the correct order. In delayed's case it means job#deserialize will be called twice, due to the worker interrogating the job instance before getting to calling JobWrapper#perform. The crucial change is that deserialisation happens after the rails executor callbacks, so any override to job#deserialize expecting to be run in the same "unit of work" as the job#perform method will work.

For reference, the delayed_job adapter doesn't deserialize the job_data before calling ActiveJob::Base.execute so doesn't have this ordering effect in it. The tradeoff is delayed_job's adapter also doesn't allow overriding max_run_time, etc in the job class which is useful.

I can't think of a way to solve this without either breaking the API allowing max_run_time methods on job classes (aside from perhaps making them class methods?) or having the side-effect of deserializing the job before the Rails executor entry which can lead to seemingly weird behaviour when using standard Rails executor aware objects in this way.

(For ease of linking to the source code, the above description links to Rails 7.1.2 source code, but holds true back to Rails 5.0 for ActiveJob and Rails 5.2 for ActiveSupport::CurrentAttributes. Aka the first version each feature appeared respectively.)

@caius caius force-pushed the cd/activejob_base_execute branch from fd403d4 to e292ae8 Compare January 15, 2024 20:03
@caius
Copy link
Contributor Author

caius commented Jan 15, 2024

The order we observe on 0.5.2 from lots of puts debugging is:

  • Delayed::Worker#run invoked with job data

  • Delayed::Worker#max_run_time calls job.max_run_time

  • Delayed::Backend::Base#max_run_time calls payload_object.respond_to?(:max_run_time) where payload_object is the Delayed::JobWrapper instance. This in turn forwards the message to #job which instantiates and memoizes the ActiveJob instance in @job ivar.

    ⚠️ The job has been deserialised at this point, invoking ApplicationJob#deserialize and setting Current attributes

  • Delayed::Backend::Base#invoke_job then called which calls payload_object.perform, which is Delayed::JobWrapper#perform

  • ActiveJob execute callbacks are fired, the first of which is the Rails executor.

    ⚠️ ActiveSupport::CurrentAttributes are reset at this point. Anything else cleared through Rails Executors callbacks is also triggered here.

  • job.perform_now is called. As @job already contains the job instance no further deserialisation takes place here.

We now arrive at OurJob#perform with Current having only nil values even though we set them in ApplicationJob#deserialize.

@smudge smudge self-requested a review January 31, 2024 03:44
@smudge
Copy link
Member

smudge commented Feb 2, 2024

@caius it's been a busy couple of weeks for me, but I haven't forgotten this PR! I intend to do some testing of this branch, but your reasoning is sound, and I suspect that because much of our DJ infrastructure predates ActiveSupport::CurrentAttributes (instead we have other ways of sharing & clearing job-thread-global state), we hadn't encountered this inconsistency yet.

@caius
Copy link
Contributor Author

caius commented Feb 2, 2024

@smudge no worries, it's not exactly a simple thing to think through and understand so wasn't expecting it to be a quick thing. Let me know if you need anything more explaining to help 👍🏻

@bmt-github-policybot
Copy link

This pull request has been automatically closed because it has not been updated in the last month. 😪

If you still need this change, you can reopen it.

Thanks for helping keep our house in order!

@caius
Copy link
Contributor Author

caius commented Apr 16, 2024

If you still need this change, you can reopen it.

Don't appear to be able to do that, bit of a hostile experience as an outside contributor if I'm honest.

@smudge smudge reopened this Apr 16, 2024
@jmileham
Copy link
Member

Hi Caius, I'm the CTO at Betterment and I wanted to say sorry for the unpleasant experience. This was a result of a change we made for our internal software dev workflows, and we haven't been thoughtful about the impact on our OSS repos. We'll take that away and see what to do. Genuinely appreciate the contribution.

@caius
Copy link
Contributor Author

caius commented Apr 16, 2024

@jmileham appreciate the note, I didn't mean my original message to come across as curt as I suspect it might have done re-reading it. Thanks :)

@jmileham
Copy link
Member

Not at all, just wanted to make sure you know we see the unpleasant experience and appreciate the effort you've put in. Sorry we haven't been able to engage as quickly as we'd like with this PR.

@caius caius force-pushed the cd/activejob_base_execute branch from e292ae8 to bb54ca6 Compare April 16, 2024 18:23
@caius
Copy link
Contributor Author

caius commented Apr 16, 2024

Noticed CI had a linting error for parallel assignment, fixed that up whilst here 🙃

caius added 2 commits April 30, 2024 14:59
Specifically they are wrapped in a Rails.application.executor block that will clear all thread-unsafe/request persisted values in memory, like ActiveSupport::CurrentAttributes.
This follows the other queue adapters in ActiveJob upstream, and Delayed::Job's queue adapter as well. Ensures the job deserialize is called within the executor block/execute callbacks along with perform. If the deserialize method is causing side effects like using ActiveSupport::CurrentAttributes they won't get cleared between deserialize and perform being invoked.
@caius caius force-pushed the cd/activejob_base_execute branch from bb54ca6 to 4e58db7 Compare April 30, 2024 14:01
ActiveJob::Callbacks.run_callbacks(:execute) do
job.perform_now
end
ActiveJob::Base.execute(job_data)
Copy link
Member

@smudge smudge May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to dig up, in an old archive repo, the original comment thread pertaining to the choice of job.perform_now vs ActiveJob::Base.execute(job_data) in this custom job wrapper class.

Basically, it's because of the delegate_missing_to :job call (above), and how we wanted to support defining things like max_attempts and reschedule_at (and other DJ-specific interfaces) on the ActiveJob class, and have the wrapper be able to call into the ActiveJob class in order to expose those interfaces to the Delayed::Worker.

And the implementation choice was made here to ensure that job is the same object instance as the thing that ultimately gets performed within execute(job_data). Otherwise, you'd have one instance of the ActiveJob (@job, memoized below) responding to the worker, and a different instance deserialized within ActiveJob's .execute call.

So the implementation here was based on the assumption that the internal implementation of execute looks something like this:

def execute(job_data) # :nodoc:
  ActiveJob::Callbacks.run_callbacks(:execute) do
    job = deserialize(job_data)
    job.perform_now
  end
end

And we wanted to avoid calling deserialize(job_data) twice. Not just because it requires doing 2x the work, but because, in rare cases, you may actually want the legacy DelayedJob methods to be able to change their response depending on the result of the perform. I've only ever seen this use case once or twice, though. Usually, they are a pure function or return a static value, like this:

def reschedule_at(current_time, attempts)
  current_time + attempts.hours
end

def max_attempts
  4
end

However, it's possible to make them react to the results of the perform method:

# Totally made-up examples. Don't try this at home. ;-)
# the ivars below would be set in the ActiveJob's `perform` method

def reschedule_at(current_time, attempts)
  @timeout_error ? current_time + 10.minutes : current_time + (attempts ** 2).hours
end

def max_attempts
  @pro_user ? 10 : 5
end

And this breaks if @job in the wrapper (which the worker can see) is a completely different object from the deserialized job inside of .execute.

All of that said, I can see exactly what problem you are encountering. Because, if we deserialize the job outside of the run_callbacks block, callbacks that deal with setting up global state will swap out that global state from underneath the existing job instance. No bueno!

The way I can think to resolve this is to essentially pull everything back into the run_callbacks block, but expose an attr_reader so that the instantiated @job instance can be read back out by the worker.

attr_reader :job

def perform
  ActiveJob::Callbacks.run_callbacks(:execute) do
    @job = ActiveJob::Base.deserialize(job_data)
    @job.perform_now
  end
end

This assumes that the worker doesn't need the job instance before it calls perform. If it does... maybe there's still a double-instantiation that you'd have to do to make it happy (and then throw away the first instance you instantiated), but at the very least we can guarantee that any delegated method calls that occur after the job has completed will be delegated to the same instance of @job that actually just ran its perform method.

Does that make sense to you @caius?

(And, thank you for shining the flashlight on this very specific but very important piece of behavior. I learned a lot going down this rabbit hole, and I'm sorry I didn't get a chance to do it sooner!)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Another choice we could make here would be to not support that very niche "read the ivar from the perform method to respond dynamically to the worker" kind of configuration method. Technically a breaking change, and I'm hesitant to make it if there's still a way to make the job instance internally consistent.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, sorry it's taken a while to get back to you on this.

Thanks for going code-spelunking in the history, it's much clearer to understand why choices were made previously - and makes sense as to why it's written the way it is now given the context. I suspected this might be a case of DelayedJob meets ActiveJob and they have subtly different behaviours that don't interlace nicely.

we wanted to avoid calling deserialize(job_data) twice. Not just because it requires doing 2x the work, but because, in rare cases, you may actually want the legacy DelayedJob methods to be able to change their response depending on the result of the perform. I've only ever seen this use case once or twice, though.

Ah, this is an interesting use case I'd not considered. In my mind callbacks are either pure, or respond based on data held outside the instance as the ruby objects are basically ephemeral within the worker's runloop. In some ways that's quite a neat idea though, and in theory the ivar could be included in job_data to persist through different attempts of the job as well. Hrm. :thinking_face:

I can definitely see benefits to trying to support both use cases here, but I think I also come down on the side of not wanting to make breaking changes if we can't. The ivar-callback pattern seems like a useful option to maintain, even if rarely used. Especially if you don't have (or need) to store that data somewhere outside the instance.

The way I can think to resolve this is to essentially pull everything back into the run_callbacks block, but expose an attr_reader so that the instantiated @job instance can be read back out by the worker.

This assumes that the worker doesn't need the job instance before it calls perform. If it does... maybe there's still a double-instantiation that you'd have to do to make it happy (and then throw away the first instance you instantiated), but at the very least we can guarantee that any delegated method calls that occur after the job has completed will be delegated to the same instance of @jobthat actually just ran itsperform` method.

I started out agreeing, then thought about this some more and realised if that would work then it should work now as JobWrapper#job is already lazily loading the job data after . But then we wouldn't have seen the behaviour we did, so there must be something accessing JobWrapper#job before perform is invoked. If no messages are delegated to missing through the JobWrapper it wouldn't be loaded (ie, ActiveRecord::Base#deserialize) before entering the callback block in JobWrapper#perform.

Looking at the worker class, we go through Delayed::Runnable#start, Delayed::Worker#run!, Delayed::Worker#work_off then descend into iterating the reserved jobs where job is an Delayed::Job instance, and job#payload_object will return the Delayed::JobWrapper instance - which at this point hasn't yet loaded @job.

Presuming no Delayed callbacks defined in this example, we call through Worker#run_thread_callbacks into Worker#run_job, passing the Delayed::Job instance with #payload_object still not having loaded @job yet. #run_job then invokes any Delayed perform callbacks and calls into Delayed::Worker#run.

The first thing we do in Worker#run is generate a metadata hash by calling some methods on the Delayed::Job instance. Most of these are database columns, so won't trigger the JobWrapper loading the @job ivar. We do however call job.name which is Delayed::Backend::Base#name, and checks if payload_object responds_to? a couple of things. Turns out delegate_missing_to in turn delegates respond_to? (via respond_to_missing?), which causes the @job to be loaded to handle the respond_to?. This means the job is deserialized before we get to job.invoke_job a couple of lines later in Worker#run.

I've not used delegate_missing_to before, so wrote a little in-memory reproduction to prove to myself that was the case:

require "active_support/core_ext/module/delegation"

class JobRecord
  attr_reader :payload_object

  def initialize(payload_object)
    @payload_object = payload_object
  end

  # Copied verbatim from Delayed::Backend::Base#name, without error handling
  def name
    @name ||= payload_object.job_data['job_class'] if payload_object.respond_to?(:job_data)
    @name ||= payload_object.display_name if payload_object.respond_to?(:display_name)
    @name ||= payload_object.class.name
  end

  def invoke_job
    payload_object.perform
  end
end

class JobWrapper
  delegate_missing_to :job

  def job
    @job ||= SimpleJob.new
  end
end

class SimpleJob
  def perform
    "SimpleJob#perform called"
  end
end

j = JobRecord.new(JobWrapper.new)
# => #<JobRecord:0x00000001050ef2a8
#     @payload_object=#<JobWrapper:0x00000001050ef320>>
j.name
# => "JobWrapper"

j
# => #<JobRecord:0x00000001050ef2a8
#     @name="JobWrapper",
#     @payload_object=
#      #<JobWrapper:0x00000001050ef320 @job=#<SimpleJob:0x00000001050ebc98>>>

j.invoke_job
# => "SimpleJob#perform called"

j
# => #<JobRecord:0x00000001050ef2a8
#     @name="JobWrapper",
#     @payload_object=
#      #<JobWrapper:0x00000001050ef320 @job=#<SimpleJob:0x00000001050ebc98>>>

As you can see after calling j.name the JobWrapper instance gains the @job instance, because delegate_missing_to is calling job.respond_to? which in turn then deserializes our application job instance.

Then went and proved it was due to the respond_to_missing? falling through:

require "active_support/core_ext/module/delegation"

class Wrapper
  delegate_missing_to :item

  attr_reader :item

  def initialize(item)
    @item = item
  end

  def foo
  end

  def respond_to?(*a, **k)
    puts "#{self.class}##{__method__}(#{a.inspect}, #{k.inspect})"
    super
  end
end

class Item
  def respond_to?(*a, **k)
    puts "#{self.class}##{__method__}(#{a.inspect}, #{k.inspect})"
    super
  end
end

w = Wrapper.new(Item.new)
# => #<Wrapper:0x0000000100ec0260 @item=#<Item:0x0000000100ec02d8>>

w.respond_to?(:foo)
# => true
# >> Wrapper#respond_to?([:foo], {})

w.respond_to?(:hello)
# => false
# >> Wrapper#respond_to?([:hello], {})
# >> Item#respond_to?([:hello], {})

I think we're at a point where the two use cases just don't mesh in a way we can make happen, lazily deserializing the job instance outside of the callbacks means anything run during deserialize that affects things outside the job instance aren't guaranteed to persist through the ActiveJob callbacks being invoked. And if we deserialize the job instance again, we break any ivar-trickery that works with DelayedJob natively.

I'm half-tempted to suggest we shouldn't be doing anything that operates outside the job instance in #deserialize in our application code - currently we've worked around this issue by moving the loading logic for Current.services = into a before_perform callback, which is working fine. The job gets deserialized before the ActiveJob callbacks are invoked, then the before_perform callback sets up the environment for us just before the #perform method is invoked.

I wonder if potentially we could move the ActiveJob callbacks up a level in Delayed, so job deserialization happens inside the callbacks even when Delayed callbacks are accessing the job instance. :thinking_face:

We'd have to keep supporting non-ActiveJob jobs being scheduled, so it would require branching in Delayed::Job#run_thread_callbacks as the earliest point non-Delayed code could trigger job deserialization. And then the JobWrapper can just perform the job, knowing that the worker has already invoked the callbacks.

module Delayed
  class Worker
    def run_thread_callbacks(job, &block)
      if job.payload_object.is_a?(Delayed::JobWrapper)
        ActiveJob::Callbacks.run_callbacks(:execute) do
          self.class.lifecycle.run_callbacks(:thread, self, job, &block)
        end
      else
        self.class.lifecycle.run_callbacks(:thread, self, job, &block)
      end
    end
  end

  class JobWrapper
    def perform
      # ActiveJob callbacks already triggered from Delayed::Worker#run_thread_callbacks
      job.perform_now
    end
  end
end

Not sure what the knock-on effect of an ActiveJob callback erroring at that point would be vs the current behaviour though. I think currently that's handled by Delayed::Job#run having rescue Exception which would be bypassed by moving it earlier. Potentially would have to catch errors in run_thread_callbacks and invoke the error callbacks to maintain error handling parity.

I wonder if an easier option might be to try and stop the #run metadata hash loading the job instance and document that any of the Delayed callbacks accessing the job can lead to ActiveJob's #deserialize being called before the ActiveJob callbacks being executed, which can cause anything using ActiveSupport::CurrentAttributes to be wiped between #deserialize and #perform. I suspect the correct answer for our use case is to use a before_perform callback as we are doing now, and treat #deserialize as something that operates on the job instance and nothing else.

What do you think about interleaving the ActiveJob::Callbacks earlier in the Worker, and having ActiveJob logic outside of JobWrapper @smudge?

@smudge smudge self-requested a review June 12, 2024 17:53
smudge added a commit that referenced this pull request Jun 27, 2024
This ensures that exceptions raised in thread callback hooks are rescued
and properly mark jobs as failed.

This is also a good opportunity to change the `num` argument (of
`work_off(num)`) to mean number of jobs (give or take a few due to
`max_claims`), not number of iterations. Previously (before threading
was introduced) I think it meant number of jobs (though jobs and
iterations were 1:1). I would not have done this before the refactor,
because there was no guarantee that one of `success` or `failure` would
be incremented (the thread might crash for many reasons). Now, we only
increment `success` and treat `total - success` as the "failure" number
when we return from the method.

Fixes #23 and #41

This is also a prereq for a resolution I'm cooking up for #36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants