Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for worker pools that exclude specific queues [Green specs] #1198

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ You can then do the following:
RAILS_ENV=production script/delayed_job --queue=tracking start
RAILS_ENV=production script/delayed_job --queues=mailers,tasks start

# Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues.
# Option --exclude-specified-queues will do inverse of queues processing by skipping ones from --queue, --queues.
# If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*".
# A worker pool's queue list can be prefixed with a ! which has the same effect as setting
# --exclude-specified-queues but only applies it to that specific worker pool.

# Use the --pool option to specify a worker pool.
# You can use this option multiple times to start different numbers of workers for different queues.
# The following command will start 1 worker for the tracking queue,
# 2 workers for the mailers and tasks queues, and 2 workers for any jobs:
RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start
Expand All @@ -274,6 +280,9 @@ Work off queues by setting the `QUEUE` or `QUEUES` environment variable.
QUEUE=tracking rake jobs:work
QUEUES=mailers,tasks rake jobs:work

If EXCLUDE_SPECIFIED_QUEUES set to YES, then queues defined by QUEUE, QUEUES will be skipped instead.
See option --exclude-specified-queues description for special case of queue "*"

Restarting delayed_job
======================

Expand Down
59 changes: 59 additions & 0 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,65 @@ def create_job(opts = {})
expect(SimpleJob.runs).to eq(3)
end
end

context 'when asked to exclude specified queues' do
context 'and worker does not have queue set' do
before(:each) do
worker.queues = []
worker.exclude_specified_queues = true
end

it 'works off all jobs' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'one')
create_job(:queue => 'two')
create_job
worker.work_off

expect(SimpleJob.runs).to eq(3)
end
end

context 'and worker has one queue set' do
before(:each) do
worker.queues = ['large']
worker.exclude_specified_queues = true
end

it 'only works off jobs which are not from selected queues' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'large')
create_job(:queue => 'small')
create_job(:queue => 'small 2')
worker.work_off

expect(SimpleJob.runs).to eq(2)
end
end

context 'and worker has two queue set' do
before(:each) do
worker.queues = %w[large small]
worker.exclude_specified_queues = true
end

it 'only works off jobs which are not from selected queues' do
expect(SimpleJob.runs).to eq(0)

create_job(:queue => 'large')
create_job(:queue => 'small')
create_job(:queue => 'medium')
create_job(:queue => 'medium 2')
create_job

worker.work_off

expect(SimpleJob.runs).to eq(3)
end
end
end
end

context 'max_attempts' do
Expand Down
21 changes: 18 additions & 3 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def initialize(args) # rubocop:disable MethodLength
opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue|
@options[:queues] = queue.split(',')
end
opt.on('--exclude-specified-queues', 'Exclude looking up of queues specified by --queue[s]=') do
@options[:exclude_specified_queues] = true
end
opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool|
parse_worker_pool(pool)
end
Expand All @@ -95,13 +98,13 @@ def daemonize # rubocop:disable PerceivedComplexity
if worker_count > 1
raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier'
else
run_process("delayed_job.#{@options[:identifier]}", @options)
run_process("delayed_job.#{@options[:identifier]}", normalize_worker_options(@options))
end
# rubocop:enable GuardClause
else
worker_count.times do |worker_index|
process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}"
run_process(process_name, @options)
run_process(process_name, normalize_worker_options(@options))
end
end
end
Expand All @@ -112,13 +115,14 @@ def setup_pools
options = @options.merge(:queues => queues)
worker_count.times do
process_name = "delayed_job.#{worker_index}"
run_process(process_name, options)
run_process(process_name, normalize_worker_options(options))
worker_index += 1
end
end
end

def run_process(process_name, options = {})
options = normalize_worker_options(options)
Delayed::Worker.before_fork
Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args|
$0 = File.join(options[:prefix], process_name) if @options[:prefix]
Expand Down Expand Up @@ -153,6 +157,17 @@ def parse_worker_pool(pool)
@worker_pools << [queues, worker_count]
end

# If we haven't explictly said that we do or don't want to exclude specified queues, treat a leading '!' as a negation indicator for that list of queues
# Otherwise, the ! is treated as part of the queue name itself
def normalize_worker_options(options)
return options unless options[:exclude_specified_queues].nil? && options[:queues].present?

# remove leading ! from all queues even though we only expect the first to have one, this makes it easier to look for changes after
queues = options[:queues].map { |queue| queue.sub(/^!/, '') }

options.merge(queues: queues, exclude_specified_queues: queues != options[:queues])
end

def root
@root ||= rails_root_defined? ? ::Rails.root : DIR_PWD
end
Expand Down
1 change: 1 addition & 0 deletions lib/delayed/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
:min_priority => ENV['MIN_PRIORITY'],
:max_priority => ENV['MAX_PRIORITY'],
:queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
:exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.casecmp('YES').zero?,
:quiet => ENV['QUIET']
}

Expand Down
47 changes: 25 additions & 22 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@

module Delayed
class Worker # rubocop:disable ClassLength
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_EXCLUDE_SPECIFIED_QUEUES = false
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5

cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
:read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete,
:default_log_level
:exclude_specified_queues, :read_ahead, :plugins, :destroy_failed_jobs,
:exit_on_complete, :default_log_level

# Named queue into which jobs are enqueued by default
cattr_accessor :default_queue_name
Expand All @@ -34,16 +35,17 @@ class Worker # rubocop:disable ClassLength
attr_accessor :name_prefix

def self.reset
self.default_log_level = DEFAULT_LOG_LEVEL
self.sleep_delay = DEFAULT_SLEEP_DELAY
self.max_attempts = DEFAULT_MAX_ATTEMPTS
self.max_run_time = DEFAULT_MAX_RUN_TIME
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
self.default_log_level = DEFAULT_LOG_LEVEL
self.sleep_delay = DEFAULT_SLEEP_DELAY
self.max_attempts = DEFAULT_MAX_ATTEMPTS
self.max_run_time = DEFAULT_MAX_RUN_TIME
self.default_priority = DEFAULT_DEFAULT_PRIORITY
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.exclude_specified_queues = DEFAULT_EXCLUDE_SPECIFIED_QUEUES
self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES
self.read_ahead = DEFAULT_READ_AHEAD
@lifecycle = nil
end

# Add or remove plugins in this list before the worker is instantiated
Expand Down Expand Up @@ -132,7 +134,8 @@ def initialize(options = {})
@quiet = options.key?(:quiet) ? options[:quiet] : true
@failed_reserve_count = 0

[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option|
[:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues,
:exclude_specified_queues, :exit_on_complete].each do |option|
self.class.send("#{option}=", options[option]) if options.key?(option)
end

Expand Down
7 changes: 6 additions & 1 deletion spec/delayed/backend/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti
end
jobs.select! { |j| j.priority <= Worker.max_priority } if Worker.max_priority
jobs.select! { |j| j.priority >= Worker.min_priority } if Worker.min_priority
jobs.select! { |j| Worker.queues.include?(j.queue) } if Worker.queues.any?
if Worker.queues.any?
jobs.select! do |j|
includes = Worker.queues.include?(j.queue)
Worker.exclude_specified_queues ? !includes : includes
end
end
jobs.sort_by! { |j| [j.priority, j.run_at] }[0..limit - 1]
end

Expand Down
42 changes: 36 additions & 6 deletions spec/delayed/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,42 @@

[
['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}],
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}],
['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}],
['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}],
['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}],
['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}]
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue'], :exclude_specified_queues => false}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue'], :exclude_specified_queues => false}],
['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue'], :exclude_specified_queues => false}],
['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue'], :exclude_specified_queues => false}],
['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc], :exclude_specified_queues => false}],
['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc], :exclude_specified_queues => false}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end

command.daemonize
end

it 'should run with respect of exclude queues' do
command = Delayed::Command.new(['--pool=*:1', '--pool=lage,slow,buggy:2', '--exclude-specified-queues'])
expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once

[
['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => [], :exclude_specified_queues => true}],
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end

command.daemonize
end

it 'should set queue exclusion to true if a queue starts with a ! and --exclude_specified_queues has not been specified' do
command = Delayed::Command.new(['--pool=fast:1', '--pool=!lage,slow,buggy:2'])
expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once

[
['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[fast], :exclude_specified_queues => false}],
['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}],
['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end
Expand Down