Skip to content

Commit

Permalink
Move max_run_time and max_attempts to worker. Refs tobi#26
Browse files Browse the repository at this point in the history
  • Loading branch information
bkeepers committed Dec 19, 2009
1 parent 3366c1e commit d7533d7
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 31 deletions.
9 changes: 2 additions & 7 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ class DeserializationError < StandardError
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ActiveRecord::Base
@@max_attempts = 25
@@max_run_time = 4.hours

cattr_accessor :max_attempts, :max_run_time

set_table_name :delayed_jobs

# By default failed jobs are destroyed after too many attempts.
Expand Down Expand Up @@ -66,7 +61,7 @@ def payload_object=(object)
def reschedule(message, backtrace = [], time = nil)
self.last_error = message + "\n" + backtrace.join("\n")

if (self.attempts += 1) < max_attempts
if (self.attempts += 1) < Worker.max_attempts
time ||= Job.db_time_now + (attempts ** 4) + 5

self.run_at = time
Expand Down Expand Up @@ -121,7 +116,7 @@ def self.enqueue(*args, &block)
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = max_run_time)
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', min_priority]) if min_priority
scope = scope.scoped(:conditions => ['priority <= ?', max_priority]) if max_priority
Expand Down
14 changes: 6 additions & 8 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ module Delayed
class Worker
@@sleep_delay = 5

cattr_accessor :sleep_delay

cattr_accessor :logger
@@max_attempts = 25
@@max_run_time = 4.hours

cattr_accessor :max_attempts, :max_run_time, :sleep_delay, :logger

self.logger = if defined?(Merb::Logger)
Merb.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
Expand All @@ -14,10 +16,6 @@ class Worker
# name_prefix is ignored if name is set directly
attr_accessor :name_prefix

def job_max_run_time
Delayed::Job.max_run_time
end

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
Expand Down Expand Up @@ -77,7 +75,7 @@ def say(text, level = Logger::INFO)

# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job(max_run_time = job_max_run_time)
def reserve_and_run_one_job(max_run_time = self.class.max_run_time)

# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
Expand Down
2 changes: 1 addition & 1 deletion spec/delayed_method_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def read(story)

Delayed::Job.count.should == 1

job.run_with_lock(Delayed::Job.max_run_time, 'worker')
job.run_with_lock(Delayed::Worker.max_run_time, 'worker')

Delayed::Job.count.should == 0

Expand Down
28 changes: 14 additions & 14 deletions spec/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
SimpleJob.runs.should == 0

job = Delayed::Job.enqueue SimpleJob.new
job.run_with_lock(Delayed::Job.max_run_time, 'worker')
job.run_with_lock(Delayed::Worker.max_run_time, 'worker')

SimpleJob.runs.should == 1
end
Expand All @@ -62,7 +62,7 @@
JOB
end

job.run_with_lock(Delayed::Job.max_run_time, 'worker')
job.run_with_lock(Delayed::Worker.max_run_time, 'worker')

$eval_job_ran.should == true
end
Expand All @@ -71,14 +71,14 @@
M::ModuleJob.runs.should == 0

job = Delayed::Job.enqueue M::ModuleJob.new
job.run_with_lock(Delayed::Job.max_run_time, 'worker')
job.run_with_lock(Delayed::Worker.max_run_time, 'worker')

M::ModuleJob.runs.should == 1
end

it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
job = Delayed::Job.enqueue ErrorJob.new
job.run_with_lock(Delayed::Job.max_run_time, 'worker')
job.run_with_lock(Delayed::Worker.max_run_time, 'worker')

job = Delayed::Job.find(:first)

Expand All @@ -92,7 +92,7 @@

it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Delayed::Job.destroy_failed_jobs = false
Delayed::Job::max_attempts = 1
Delayed::Worker.max_attempts = 1
job = Delayed::Job.enqueue ErrorJob.new
job.run(1)
job.reload
Expand Down Expand Up @@ -154,14 +154,14 @@
Delayed::Job.destroy_failed_jobs = true
end

it "should be destroyed if it failed more than Job::max_attempts times" do
it "should be destroyed if it failed more than Worker.max_attempts times" do
@job.should_receive(:destroy)
Delayed::Job::max_attempts.times { @job.reschedule 'FAIL' }
Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
end

it "should not be destroyed if failed fewer than Job::max_attempts times" do
it "should not be destroyed if failed fewer than Worker.max_attempts times" do
@job.should_not_receive(:destroy)
(Delayed::Job::max_attempts - 1).times { @job.reschedule 'FAIL' }
(Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
end
end

Expand All @@ -170,14 +170,14 @@
Delayed::Job.destroy_failed_jobs = false
end

it "should be failed if it failed more than Job::max_attempts times" do
it "should be failed if it failed more than Worker.max_attempts times" do
@job.reload.failed_at.should == nil
Delayed::Job::max_attempts.times { @job.reschedule 'FAIL' }
Delayed::Worker.max_attempts.times { @job.reschedule 'FAIL' }
@job.reload.failed_at.should_not == nil
end

it "should not be failed if it failed fewer than Job::max_attempts times" do
(Delayed::Job::max_attempts - 1).times { @job.reschedule 'FAIL' }
it "should not be failed if it failed fewer than Worker.max_attempts times" do
(Delayed::Worker.max_attempts - 1).times { @job.reschedule 'FAIL' }
@job.reload.failed_at.should == nil
end

Expand Down Expand Up @@ -306,7 +306,7 @@
it "should leave the queue in a consistent state and not run the job if locking fails" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
@job.run_with_lock(Delayed::Job.max_run_time, 'worker')
@job.run_with_lock(Delayed::Worker.max_run_time, 'worker')
SimpleJob.runs.should == 0
end

Expand Down
2 changes: 1 addition & 1 deletion spec/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def job_create(opts = {})
context "while running with locked and expired jobs, it" do
before(:each) do
@worker.name = 'worker1'
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::max_run_time)
exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
job_create(:locked_by => 'worker1', :locked_at => exp_time)
job_create(:locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
job_create
Expand Down

0 comments on commit d7533d7

Please sign in to comment.