-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix spin-loop/cleanup failure mode within run loop #42
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,7 +216,7 @@ DEPENDENCIES | |
pg | ||
rake | ||
rspec | ||
sqlite3 | ||
sqlite3 (~> 1.7.3) | ||
timecop | ||
zeitwerk | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,22 +89,17 @@ def on_exit! | |
# Exit early if interrupted. | ||
def work_off(num = 100) | ||
success = Concurrent::AtomicFixnum.new(0) | ||
failure = Concurrent::AtomicFixnum.new(0) | ||
total = 0 | ||
|
||
num.times do | ||
while total < num | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I'm not sure how likely it is to matter but this change means that we will keep looping until we see num jobs which means that if the queue becomes empty before we hit num we will keep looping, right? Is that okay or desired? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah! i missed that. excellent. |
||
jobs = reserve_jobs | ||
break if jobs.empty? | ||
|
||
total += jobs.length | ||
pool = Concurrent::FixedThreadPool.new(jobs.length) | ||
jobs.each do |job| | ||
pool.post do | ||
run_thread_callbacks(job) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explanation might make more sense if I put it here next to the related code. Previously, if This is what I mean by the "spinloop". A job gets picked up, its thread crashes, no cleanup occurs, it gets immediately picked up again, etc. Pushing the |
||
if run_job(job) | ||
success.increment | ||
else | ||
failure.increment | ||
end | ||
end | ||
success.increment if run_job(job) | ||
end | ||
end | ||
|
||
|
@@ -114,38 +109,41 @@ def work_off(num = 100) | |
break if stop? # leave if we're exiting | ||
end | ||
|
||
[success, failure].map(&:value) | ||
[success.value, total - success.value] | ||
end | ||
|
||
def run_thread_callbacks(job, &block) | ||
self.class.lifecycle.run_callbacks(:thread, self, job, &block) | ||
end | ||
|
||
def run(job) | ||
metadata = { | ||
status: 'RUNNING', | ||
name: job.name, | ||
run_at: job.run_at, | ||
created_at: job.created_at, | ||
priority: job.priority, | ||
queue: job.queue, | ||
attempts: job.attempts, | ||
enqueued_for: (Time.current - job.created_at).round, | ||
} | ||
job_say job, metadata.to_json | ||
run_time = Benchmark.realtime do | ||
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do | ||
job.invoke_job | ||
run_thread_callbacks(job) do | ||
metadata = { | ||
status: 'RUNNING', | ||
name: job.name, | ||
run_at: job.run_at, | ||
created_at: job.created_at, | ||
priority: job.priority, | ||
queue: job.queue, | ||
attempts: job.attempts, | ||
enqueued_for: (Time.current - job.created_at).round, | ||
} | ||
job_say job, metadata.to_json | ||
run_time = Benchmark.realtime do | ||
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do | ||
job.invoke_job | ||
end | ||
job.destroy | ||
end | ||
job.destroy | ||
job_say job, format('COMPLETED after %.4f seconds', run_time) | ||
end | ||
job_say job, format('COMPLETED after %.4f seconds', run_time) | ||
true # did work | ||
rescue DeserializationError => e | ||
job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error' | ||
|
||
job.error = e | ||
failed(job) | ||
false # work failed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a bug that improperly reported deserialization errors in the |
||
rescue Exception => e # rubocop:disable Lint/RescueException | ||
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) } | ||
false # work failed | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can make this more formal, but I wanted to unblock this build for now, without the linter churn that comes from actually changing the min supported Ruby.