Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spin-loop/cleanup failure mode within run loop #42

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 24 additions & 26 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,17 @@ def on_exit!
# Exit early if interrupted.
def work_off(num = 100)
success = Concurrent::AtomicFixnum.new(0)
failure = Concurrent::AtomicFixnum.new(0)
total = 0

num.times do
while total < num
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure how likely it is to matter but this change means that we will keep looping until we see num jobs which means that if the queue becomes empty before we hit num we will keep looping, right?

Is that okay or desired?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a break if empty? below that covers that case as well, so we should only ever continue the loop if there are jobs being returned in the query (and that's consistent with the way it worked pre-threading too).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! i missed that. excellent.

jobs = reserve_jobs
break if jobs.empty?

total += jobs.length
pool = Concurrent::FixedThreadPool.new(jobs.length)
jobs.each do |job|
pool.post do
run_thread_callbacks(job) do
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanation might make more sense if I put it here next to the related code.

Previously, if run_thread_callbacks crashed for any reason, neither success nor failure would be incremented. Furthermore, the job cleanup (down at the end of run(job)) would never occur, so attempts would not be incremented and run_at would not be bumped, making the job immediately available for pickup again.

This is what I mean by the "spinloop". A job gets picked up, its thread crashes, no cleanup occurs, it gets immediately picked up again, etc. Pushing the run_thread_callbacks call into run_job allows us to clean up the job if, e.g., it fails to deserialize, or if one of our callbacks fails to connect to a secondary resource (as was the case in #41).

if run_job(job)
success.increment
else
failure.increment
end
end
success.increment if run_job(job)
end
end

Expand All @@ -114,38 +109,41 @@ def work_off(num = 100)
break if stop? # leave if we're exiting
end

[success, failure].map(&:value)
[success.value, total - success.value]
end

def run_thread_callbacks(job, &block)
self.class.lifecycle.run_callbacks(:thread, self, job, &block)
end

def run(job)
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
run_thread_callbacks(job) do
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
end
job.destroy
end
job.destroy
job_say job, format('COMPLETED after %.4f seconds', run_time)
end
job_say job, format('COMPLETED after %.4f seconds', run_time)
true # did work
rescue DeserializationError => e
job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error'

job.error = e
failed(job)
false # work failed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug that improperly reported deserialization errors in the success number, which only impacted the logging output.

rescue Exception => e # rubocop:disable Lint/RescueException
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) }
false # work failed
Expand Down
Loading