diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index ce2d075fd..67ff91b3d 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -1,4 +1,19 @@ namespace :jobs do + def fork_delayed(worker_index) + worker_name = "delayed_job.#{worker_index}" + + worker_pid = fork do + Delayed::Worker.after_fork + worker = Delayed::Worker.new(@worker_options) + worker.name_prefix = worker_name + worker.start + end + + puts "started #{worker_name} with pid=#{worker_pid}" + + return worker_name, worker_pid + end + desc "Clear the delayed_job queue." task :clear => :environment do Delayed::Job.delete_all @@ -6,7 +21,36 @@ desc "Start a delayed_job worker." task :work => :environment_options do - Delayed::Worker.new(@worker_options).start + if @worker_options[:num_processes] == 1 + Delayed::Worker.new(@worker_options).start + else + stop = false + + Signal.trap 'TERM' do + stop = true + end + + Delayed::Worker.before_fork + + workers = {} + @worker_options[:num_processes].times do |worker_index| + worker_name, worker_pid = fork_delayed(worker_index) + workers[worker_pid] = worker_name + end + + worker_index = @worker_options[:num_processes] + + while true + worker_pid = Process.wait() + puts "worker #{workers[worker_pid]} exited - #{$?.to_s }" + break if stop + + worker_name, worker_pid = fork_delayed(worker_index) + workers[worker_pid] = worker_name + + worker_index = worker_index + 1 + end + end end desc "Start a delayed_job worker and exit when all available jobs are complete."