From 54eba86ad1c6ce6d3469c9a3f0625d81e34041f9 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Sun, 4 Apr 2021 19:29:34 +0900 Subject: [PATCH] Adds support for running as a forking process in the foreground Reason: The `script/delayed_job` process (described above) uses the [Daemons](https://github.com/thuehlinger/daemons) gem which has several undesirable behaviors when running inside a container (Docker, etc.): - The parent process spawns a background child process then exits immediately, which causes the container to shutdown. - The worker processes are detached from the parent, which prevents logging to `STDOUT`. The `--fork` option solves this by running workers in a foreground process tree. When using `--fork` the `daemons` gem is not required. The command script requires changing the following line to enable this: # always daemonizes, never forks Delayed::Command.new(ARGV).daemonize must now be: # daemonize by default unless --fork specified Delayed::Command.new(ARGV).launch In addition, there are several quality of life improvements added to the Command script: - Command: Add --pools arg as an alias for --pool, and allow pipe-delimited pool definitions - Command: Add --num-worker arg as an alias for -n / --number-of-workers (and deprecate --number-of-workers) - Command: Pool parsing code has been extracted to PoolParser class - Command: Add -v / --verbose switch which sets quiet=false on workers - Command: Add -x switch as alias for --exit-on-complete - Command: Add STDERR warning num-workers less than 1 - Command: Add STDERR warning if num-workers and pools both specified (pools overrides num-workers as per existing behavior) - Command: Add STDERR warning if queues and pools both specified (pools overrides num-workers as per existing behavior) The Rake script has also been enhanced: - Rake: Uses Forking launcher under the hood - Rake: Add support for NUM_WORKERS and POOLS args --- .github/workflows/ci.yml | 61 ++- .rubocop.yml | 3 + README.md | 323 +++++++------ lib/delayed/command.rb | 188 ++++---- lib/delayed/launcher/base.rb | 101 ++++ lib/delayed/launcher/daemonized.rb | 50 ++ lib/delayed/launcher/forking.rb | 74 +++ lib/delayed/pool_parser.rb | 16 + lib/delayed/tasks.rb | 29 +- lib/delayed/util.rb | 11 + lib/delayed_job.rb | 1 + lib/generators/delayed_job/templates/script | 2 +- spec/daemons.rb | 5 + spec/delayed/command_spec.rb | 488 +++++++++++++++----- spec/delayed/launcher/daemonized_spec.rb | 14 + spec/delayed/launcher/forking_spec.rb | 22 + spec/delayed/launcher/shared_examples.rb | 189 ++++++++ spec/delayed/pool_parser_spec.rb | 49 ++ 18 files changed, 1243 insertions(+), 383 deletions(-) create mode 100644 lib/delayed/launcher/base.rb create mode 100644 lib/delayed/launcher/daemonized.rb create mode 100644 lib/delayed/launcher/forking.rb create mode 100644 lib/delayed/pool_parser.rb create mode 100644 lib/delayed/util.rb create mode 100644 spec/delayed/launcher/daemonized_spec.rb create mode 100644 spec/delayed/launcher/forking_spec.rb create mode 100644 spec/delayed/launcher/shared_examples.rb create mode 100644 spec/delayed/pool_parser_spec.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f78e0b26..c57788de8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,32 +9,51 @@ on: jobs: test: runs-on: ubuntu-latest + continue-on-error: ${{ matrix.experimental }} strategy: fail-fast: false matrix: - ruby: [2.5, 2.6, 2.7, jruby, jruby-head, ruby-head] + ruby: + - '2.5' + - '2.6' + - '2.7' + - '3.0' + - 'jruby' rails_version: - - '5.2.0' - - '6.0.0' - - '6.1.0.rc2' - - 'edge' + - '6.0' + - '6.1' include: - # - # The past - # - # EOL Active Record - - ruby: 2.2 - rails_version: '3.2.0' - - ruby: 2.1 - rails_version: '4.1.0' - - ruby: 2.4 - rails_version: '4.2.0' - - ruby: 2.4 - rails_version: '5.0.0' - - ruby: 2.5 - rails_version: '5.1.0' - - continue-on-error: ${{ matrix.rails_version == 'edge' || endsWith(matrix.ruby, 'head') }} + # Older Rails + - ruby: '2.2' + rails_version: '3.2' + - ruby: '2.1' + rails_version: '4.1' + - ruby: '2.4' + rails_version: '4.2' + - ruby: '2.4' + rails_version: '5.0' + - ruby: '2.5' + rails_version: '5.1' + - ruby: '2.5' + rails_version: '5.2' + - ruby: 'jruby' + rails_version: '5.2' + # Experimental + - ruby: 'ruby-head' + rails_version: '6.1' + experimental: true + - ruby: 'jruby-head' + rails_version: '6.1' + experimental: true + - ruby: '2.7' + rails_version: 'edge' + experimental: true + - ruby: '3.0' + rails_version: 'edge' + experimental: true + - ruby: 'jruby' + rails_version: 'edge' + experimental: true steps: - uses: actions/checkout@v2 diff --git a/.rubocop.yml b/.rubocop.yml index 3b0bd2701..ee021c566 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -127,6 +127,9 @@ SignalException: SpaceInsideHashLiteralBraces: EnforcedStyle: no_space +Style/SpecialGlobalVars: + Enabled: false + Style/SymbolArray: Enabled: false diff --git a/README.md b/README.md index 29ad10782..b9bb8389d 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ you're reading the documentation for the master branch. [View documentation for the latest release (4.1.9).](https://github.com/collectiveidea/delayed_job/tree/v4.1.9)** -Delayed::Job -============ +# Delayed Job + [![Gem Version](https://badge.fury.io/rb/delayed_job.svg)][gem] ![CI](https://github.com/collectiveidea/delayed_job/workflows/CI/badge.svg) [![Code Climate](https://codeclimate.com/github/collectiveidea/delayed_job.svg)][codeclimate] @@ -14,87 +14,74 @@ Delayed::Job [codeclimate]: https://codeclimate.com/github/collectiveidea/delayed_job [coveralls]: https://coveralls.io/r/collectiveidea/delayed_job -Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing -longer tasks in the background. +Delayed Job encapsulates the common pattern of asynchronously executing +longer tasks in the background. Examples of such tasks include: + +* Sending emails +* Image resizing +* HTTP downloads +* Updating smart collections +* Updating Solr, our search server, after product changes +* Batch imports +* Spam checks -It is a direct extraction from Shopify where the job table is responsible for a -multitude of core tasks. Amongst those tasks are: +Delayed Job was extracted from Shopify. -* sending massive newsletters -* image resizing -* http downloads -* updating smart collections -* updating solr, our search server, after product changes -* batch imports -* spam checks +## Installation -[Follow us on Twitter][twitter] to get updates and notices about new releases. +### Version Support -[twitter]: https://twitter.com/delayedjob +Delayed Job 4.x only supports Rails 3.0+. -Installation -============ -delayed_job 3.0.0 only supports Rails 3.0+. +### Configuring your Database -delayed_job supports multiple backends for storing the job queue. [See the wiki +Delayed Job supports multiple backends for storing the job queue. [See the wiki for other backends](https://github.com/collectiveidea/delayed_job/wiki/Backends). -If you plan to use delayed_job with Active Record, add `delayed_job_active_record` to your `Gemfile`. +To use with Active Record, add `delayed_job_active_record` to your `Gemfile`. ```ruby gem 'delayed_job_active_record' ``` -If you plan to use delayed_job with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. +To use with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. ```ruby gem 'delayed_job_mongoid' ``` -Run `bundle install` to install the backend and delayed_job gems. +Run `bundle install` to install the backend and `delayed_job` gems. The Active Record backend requires a jobs table. You can create that table by running the following command: - rails generate delayed_job:active_record - rake db:migrate - -For Rails 4.2+, see [below](#active-job) +``` +rails generate delayed_job:active_record +rake db:migrate +``` -Development -=========== -In development mode, if you are using Rails 3.1+, your application code will automatically reload every 100 jobs or when the queue finishes. -You no longer need to restart Delayed Job every time you update your code in development. +### Active Job -Active Job -========== -In Rails 4.2+, set the queue_adapter in config/application.rb +To use Delayed Job with Active Job (Rails 4.2+), set the `queue_adapter` in `config/application.rb`: ```ruby config.active_job.queue_adapter = :delayed_job ``` -See the [rails guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. - -Rails 4.x -========= -If you are using the protected_attributes gem, it must appear before delayed_job in your gemfile. If your jobs are failing with: +See the [Rails Guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. - ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +### Protected Attributes -then this is the fix you're looking for. +When using the `protected_attributes` gem, it must appear before `delayed_job` in your `Gemfile`. Otherwise you will see this error: -Upgrading from 2.x to 3.0.0 on Active Record -============================================ -Delayed Job 3.0.0 introduces a new column to the delayed_jobs table. +``` +ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +``` -If you're upgrading from Delayed Job 2.x, run the upgrade generator to create a migration to add the column. +## Using Delayed Job in your Application - rails generate delayed_job:upgrade - rake db:migrate +### Queuing Jobs -Queuing Jobs -============ Call `.delay.method(params)` on any object and it will be processed in the background. ```ruby @@ -120,7 +107,7 @@ device = Device.new device.deliver ``` -## Parameters +### Parameters `#handle_asynchronously` and `#delay` take these parameters: @@ -165,10 +152,11 @@ class LongTasks end ``` -If you ever want to call a `handle_asynchronously`'d method without Delayed Job, for instance while debugging something at the console, just add `_without_delay` to the method name. For instance, if your original method was `foo`, then call `foo_without_delay`. +To call a `handle_asynchronously`'d method without Delayed Job, add `_without_delay` to the method name. +For instance, if your original method was `foo`, then call `foo_without_delay`. + +### Rails Mailers -Rails Mailers -============= Delayed Job uses special syntax for Rails Mailers. Do not call the `.deliver` method when using `.delay`. @@ -190,108 +178,167 @@ You may also wish to consider using [Active Job with Action Mailer](https://edgeguides.rubyonrails.org/active_job_basics.html#action-mailer) which provides convenient `.deliver_later` syntax that forwards to Delayed Job under-the-hood. -Named Queues -============ -DJ 3 introduces Resque-style named queues while still retaining DJ-style -priority. The goal is to provide a system for grouping tasks to be worked by -separate pools of workers, which may be scaled and controlled individually. +### Queues -Jobs can be assigned to a queue by setting the `queue` option: +Delayed Job supports assigning jobs to named queues. Each queue may be worked by a +separate pool of workers, which may then be scaled and controlled individually. + +Jobs can be assigned to a queue by setting the `:queue` option: ```ruby -object.delay(:queue => 'tracking').method +object.delay(queue: 'tracking').method -Delayed::Job.enqueue job, :queue => 'tracking' +Delayed::Job.enqueue job, queue: 'tracking' -handle_asynchronously :tweet_later, :queue => 'tweets' +handle_asynchronously :tweet_later, queue: 'tweets' ``` -You can configure default priorities for named queues: +You may configure a default priority for each queue (lower number = higher priority): ```ruby Delayed::Worker.queue_attributes = { - high_priority: { priority: -10 }, - low_priority: { priority: 10 } + high_priority_queue: { priority: -10 }, + low_priority_queue: { priority: 10 } } ``` -Configured queue priorities can be overriden by passing priority to the delay method +To override the queue's default priority, pass `:priority` to the delay method: ```ruby -object.delay(:queue => 'high_priority', priority: 0).method +object.delay(queue: 'high_priority_queue', priority: 0).method ``` You can start processes to only work certain queues with the `queue` and `queues` -options defined below. Processes started without specifying a queue will run jobs -from **any** queue. To effectively have a process that runs jobs where a queue is not -specified, set a default queue name with `Delayed::Worker.default_queue_name` and -have the processes run that queue. +options (refer to "Running Jobs" section below.) Processes started without specifying +a queue will run jobs from **any** queue. To effectively have a process that runs +jobs where a queue is not specified, set a default queue name with +`Delayed::Worker.default_queue_name` and have the processes run that queue. -Running Jobs -============ -`script/delayed_job` can be used to manage a background process which will -start working off jobs. +## Running Jobs -To do so, add `gem "daemons"` to your `Gemfile` and make sure you've run `rails -generate delayed_job`. +### Running as a Daemon Process -You can then do the following: +`script/delayed_job` starts a background daemon process which will continually work jobs. - RAILS_ENV=production script/delayed_job start - RAILS_ENV=production script/delayed_job stop +To install this script, add `gem "daemons"` to your `Gemfile` then run `rails generate delayed_job`. - # Runs two workers in separate processes. - RAILS_ENV=production script/delayed_job -n 2 start - RAILS_ENV=production script/delayed_job stop +Then run the `start` command: - # Set the --queue or --queues option to work from a particular queue. - RAILS_ENV=production script/delayed_job --queue=tracking start - RAILS_ENV=production script/delayed_job --queues=mailers,tasks start +``` +# Run a single worker as a background process +RAILS_ENV=production script/delayed_job start - # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. - # The following command will start 1 worker for the tracking queue, - # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: - RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +# Run 4 workers in separate background child processes +RAILS_ENV=production script/delayed_job -n4 start +``` - # Runs all available jobs and then exits - RAILS_ENV=production script/delayed_job start --exit-on-complete - # or to run in the foreground - RAILS_ENV=production script/delayed_job run --exit-on-complete +Each worker will check the database at least every 5 seconds. -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +### Stopping and Restarting -Workers can be running on any computer, as long as they have access to the -database and their clock is in sync. Keep in mind that each worker will check -the database at least every 5 seconds. +You may use `stop` and `restart` commands. These commands wait for each worker +to finish its current job before proceeding. -You can also invoke `rake jobs:work` which will start working off jobs. You can -cancel the rake task with `CTRL-C`. +``` +# Shutdown all workers and exit +RAILS_ENV=production script/delayed_job stop -If you want to just run all available jobs and exit you can use `rake jobs:workoff` +# Shutdown all workers and start a single new worker process +RAILS_ENV=production script/delayed_job restart -Work off queues by setting the `QUEUE` or `QUEUES` environment variable. +# Shutdown all workers and start 4 new worker processes +RAILS_ENV=production script/delayed_job -n4 restart +``` - QUEUE=tracking rake jobs:work - QUEUES=mailers,tasks rake jobs:work +You must pass the same arguments to `restart` that you used when calling `start`. -Restarting delayed_job -====================== +You may also send `SIGTERM` to stop Delayed Job. -The following syntax will restart delayed jobs: +### Worker Queues and Pools - RAILS_ENV=production script/delayed_job restart +``` +# Set the --queues option to work from a particular queue +RAILS_ENV=production script/delayed_job --queues=mailers,tasks start + +# Use the --pool option to specify a worker pool. You can use this option multiple +# times to start different numbers of workers for different queues. +# The following command will start 1 worker for the tracking queue, +# 2 workers for the mailers and tasks queues, and 2 workers for any jobs: +RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +``` -To restart multiple delayed_job workers: +### Exit On Complete Mode - RAILS_ENV=production script/delayed_job -n2 restart +``` +# Run as a daemon and exit after working all available jobs +RAILS_ENV=production script/delayed_job start --exit-on-complete -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +# or to run in the foreground +RAILS_ENV=production script/delayed_job run --exit-on-complete +``` +### Experimental: Forking Mode +The `script/delayed_job` process (described above) uses the +[Daemons](https://github.com/thuehlinger/daemons) gem which has several +undesirable behaviors when running inside a container (Docker, etc.): -Custom Jobs -=========== -Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner. +* The parent process spawns a background child process then exits immediately, +which causes the container to shutdown. +* The worker processes are detached from the parent, which prevents logging to `STDOUT`. + +The `--fork` option solves this by running workers in a foreground process tree. +When using `--fork` the `daemons` gem is not required. + +``` +# Run as a foreground process with 4 worker child processes. +RAILS_ENV=production script/delayed_job --fork -n4 start + +# Forking mode supports queues and pools. +RAILS_ENV=production script/delayed_job --fork --pool=mailers,tasks:2 --pool=*:2 start +``` + +Forking mode does not yet support the `restart`, `stop`, etc. script commands. +Use `SIGTERM` or `SIGINT` to allow workers to finish their current job then gracefully shutdown. + +### Running via Rake + +You may start a worker process using `rake jobs:work`. +You can exit the rake task with `CTRL-C`. + +To run all available jobs and exit, use `rake jobs:workoff`. + +Set the `QUEUES` or `POOLS` environment variable to work specific queues. + +``` +# Start a worker listening to all jobs +rake jobs:work + +# Run 4 worker processes and exit when all jobs are finished +NUM_WORKERS=4 rake jobs:workoff + +# Work all jobs from the "mailers" and "tasks" queues, then exit +NUM_WORKERS=4 QUEUES=mailers,tasks rake jobs:workoff + +# You may also specify POOLS pipe-delimited +POOLS=mailers,tasks:2|tweets:1|*:2 rake jobs:workoff +``` + +Rake uses Forking Mode (see above) under-the-hood. + +### Development + +In `development` environment, Delayed Job will automatically reload +your application code after each 100 jobs or when the queue finishes. +You should not need to restart Delayed Job each time you update your code. + +## Advanced Topics + +### Custom Jobs + +Jobs are simple ruby objects with a method called `perform`. +Any object which responds to `perform` can be enqueued into the jobs table. +Job objects are serialized to YAML so that they can later be marshalled by the job runner. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -303,7 +350,7 @@ end Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email)) ``` -To set a per-job max attempts that overrides the Delayed::Worker.max_attempts you can define a max_attempts method on the job +To override `Delayed::Worker.max_attempts` per-job, you can define a `max_attempts` instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -317,9 +364,11 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job max run time that overrides the Delayed::Worker.max_run_time you can define a max_run_time method on the job +To override `Delayed::Worker.max_run_time` per-job, you may define a `max_run_time` +instance method in the job class. -NOTE: this can ONLY be used to set a max_run_time that is lower than Delayed::Worker.max_run_time. Otherwise the lock on the job would expire and another worker would start the working on the in progress job. +**NOTE:** You may only set a `max_run_time` that is lower than `Delayed::Worker.max_run_time`. +Otherwise the lock on the job would expire and a second worker would start working the same in-progress job. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -333,7 +382,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job default for destroying failed jobs that overrides the Delayed::Worker.destroy_failed_jobs you can define a destroy_failed_jobs? method on the job +To override `Delayed::Worker.destroy_failed_jobs` per-job, you may define a `destroy_failed_jobs?` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -347,7 +397,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a default queue name for a custom job that overrides Delayed::Worker.default_queue_name, you can define a queue_name method on the job +To override `Delayed::Worker.default_queue_name` per-job, you may define a `queue_name` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -361,7 +412,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. You can define your own `reschedule_at` method to override this default behavior. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. +You may define a `reschedule_at` instance method to override this default behavior. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -370,17 +422,18 @@ NewsletterJob = Struct.new(:text, :emails) do end def reschedule_at(current_time, attempts) - current_time + 5.seconds + current_time + (attempts * 60).seconds end end ``` -Hooks -===== -You can define hooks on your job that will be called at different stages in the process: +### Hooks +You can define hooks on your job that will be called at different stages in the process: -**NOTE:** If you are using ActiveJob these hooks are **not** available to your jobs. You will need to use ActiveJob's callbacks. You can find details here https://guides.rubyonrails.org/active_job_basics.html#callbacks +**NOTE:** If you are using Active Job these hooks are **not** available to your jobs. +You will need to use Active Job's callbacks. +See the [Rails Guides](https://guides.rubyonrails.org/active_job_basics.html#callbacks) for details. ```ruby class ParanoidNewsletterJob < NewsletterJob @@ -414,9 +467,9 @@ class ParanoidNewsletterJob < NewsletterJob end ``` -Gory Details -============ -The library revolves around a delayed_jobs table which looks as follows: +### Gory Details + +The library revolves around a `delayed_jobs` table which looks as follows: ```ruby create_table :delayed_jobs, :force => true do |table| @@ -433,12 +486,12 @@ create_table :delayed_jobs, :force => true do |table| end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` method. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` instance method. -The default `Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. +The default `Delayed::Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. With the default of 25 attempts, the last retry will be 20 days later, with the last interval being almost 100 hours. -The default `Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to +The default `Delayed::Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take. By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set @@ -454,7 +507,7 @@ If no jobs are found, the worker sleeps for the amount of time specified by the It is possible to disable delayed jobs for testing purposes. Set `Delayed::Worker.delay_jobs = false` to execute all jobs realtime. -Or `Delayed::Worker.delay_jobs` can be a Proc that decides whether to execute jobs inline on a per-job basis: +`Delayed::Worker.delay_jobs` may also be a `Proc` that decides whether to execute jobs inline on a per-job basis: ```ruby Delayed::Worker.delay_jobs = ->(job) { @@ -479,12 +532,12 @@ Delayed::Worker.raise_signal_exceptions = :term Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log')) ``` -Cleaning up -=========== +### Cleaning up + You can invoke `rake jobs:clear` to delete all jobs in the queue. -Having problems? -================ +### Having problems? + Good places to get help are: * [Google Groups](http://groups.google.com/group/delayed_job) where you can join our mailing list. * [StackOverflow](http://stackoverflow.com/questions/tagged/delayed-job) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..8cec823e3 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -1,33 +1,54 @@ -unless ENV['RAILS_ENV'] == 'test' - begin - require 'daemons' - rescue LoadError - raise "You need to add gem 'daemons' to your Gemfile if you wish to use it." - end -end require 'fileutils' require 'optparse' require 'pathname' +require 'delayed/pool_parser' +require 'delayed/launcher/daemonized' +require 'delayed/launcher/forking' module Delayed class Command # rubocop:disable ClassLength - attr_accessor :worker_count, :worker_pools - - DIR_PWD = Pathname.new Dir.pwd - - def initialize(args) # rubocop:disable MethodLength + def initialize(args) @options = { :quiet => true, - :pid_dir => "#{root}/tmp/pids", - :log_dir => "#{root}/log" + :worker_count => 1, + :monitor => false } - @worker_count = 1 - @monitor = false + @options[:args] = option_parser.parse!(args) + (@daemon_options || []) + + pools = pool_parser.pools + @options[:pools] = pools unless pools.empty? - opts = OptionParser.new do |opt| - opt.banner = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run" + validate_options! + end + def launch + launcher.launch + end + + def daemonize + @launch_strategy = :daemon + launch + end + + private + + def launcher + @launcher ||= launcher_class.new(@options) + end + + def launcher_class + case @launch_strategy + when :fork + Delayed::Launcher::Forking + else + Delayed::Launcher::Daemonized + end + end + + def option_parser # rubocop:disable MethodLength, CyclomaticComplexity + @option_parser ||= OptionParser.new do |opt| + opt.banner = "Usage: #{Delayed.program_name} [options] start|stop|restart|run" opt.on('-h', '--help', 'Show this message') do puts opt exit 1 @@ -35,14 +56,24 @@ def initialize(args) # rubocop:disable MethodLength opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7' end + opt.on('-d', '--daemon', 'Launch in daemon mode') do |_fork| + @launch_strategy ||= :daemon + end + opt.on('--fork', 'Launch in forking mode') do |_fork| + @launch_strategy ||= :fork + end opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n| - @options[:min_priority] = n + @options[:min_priority] = Integer(n) rescue nil end opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n| - @options[:max_priority] = n + @options[:max_priority] = Integer(n) rescue nil + end + opt.on('-n', '--num-workers=workers', 'Number of child workers to spawn') do |n| + @options[:worker_count] = Integer(n) rescue 1 end - opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count| - @worker_count = worker_count.to_i rescue 1 + opt.on('--number-of-workers=workers', 'Number of child workers to spawn') do |n| + STDERR.puts 'DEPRECATED: Use -n or --num-workers instead of --number-of-workers. This will be removed in the next major version.' + @options[:worker_count] = Integer(n) rescue 1 end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| @options[:pid_dir] = dir @@ -50,123 +81,74 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--log-dir=DIR', 'Specifies an alternate directory in which to store the delayed_job log.') do |dir| @options[:log_dir] = dir end + opt.on('-v', '--verbose', 'Output additional logging') do + @options[:quiet] = false + end opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n| @options[:identifier] = n end opt.on('-m', '--monitor', 'Start monitor process.') do - @monitor = true + @options[:monitor] = true end opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n| - @options[:sleep_delay] = n.to_i + @options[:sleep_delay] = Integer(n) rescue nil end opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n| - @options[:read_ahead] = n + @options[:read_ahead] = Integer(n) rescue nil end opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix| @options[:prefix] = prefix end - opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues| + opt.on('--queues=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queues| @options[:queues] = queues.split(',') end - opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| + opt.on('--queue=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queue| @options[:queues] = queue.split(',') end + opt.on('--pools=queue1[,queue2][:worker_count][|...]', 'Specify queues and number of workers for a worker pool. Use pipe to delimit multiple pools.') do |pools| + pool_parser.add(pools) + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| - parse_worker_pool(pool) + pool_parser.add(pool) end - opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do + opt.on('-x', '--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| @daemon_options = daemon_options end end - @args = opts.parse!(args) + (@daemon_options || []) end - def daemonize # rubocop:disable PerceivedComplexity - dir = @options[:pid_dir] - FileUtils.mkdir_p(dir) unless File.exist?(dir) - - if worker_pools - setup_pools - elsif @options[:identifier] - # rubocop:disable GuardClause - if worker_count > 1 - raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier' - else - run_process("delayed_job.#{@options[:identifier]}", @options) - end - # rubocop:enable GuardClause - else - worker_count.times do |worker_index| - process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}" - run_process(process_name, @options) - end - end - end - - def setup_pools - worker_index = 0 - @worker_pools.each do |queues, worker_count| - options = @options.merge(:queues => queues) - worker_count.times do - process_name = "delayed_job.#{worker_index}" - run_process(process_name, options) - worker_index += 1 - end - end + def pool_parser + @pool_parser ||= PoolParser.new end - def run_process(process_name, options = {}) - Delayed::Worker.before_fork - Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| - $0 = File.join(options[:prefix], process_name) if @options[:prefix] - run process_name, options - end - end - - def run(worker_name = nil, options = {}) - Dir.chdir(root) - - Delayed::Worker.after_fork - Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) - - worker = Delayed::Worker.new(options) - worker.name_prefix = "#{worker_name} " - worker.start - rescue => e - STDERR.puts e.message - STDERR.puts e.backtrace - ::Rails.logger.fatal(e) if rails_logger_defined? - exit_with_error_status - end - - private - - def parse_worker_pool(pool) - @worker_pools ||= [] - - queues, worker_count = pool.split(':') - queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') - worker_count = (worker_count || 1).to_i rescue 1 - @worker_pools << [queues, worker_count] + def validate_options! + validate_worker_count! + validate_identifier! + validate_workers_and_pools! + validate_queues_and_pools! end - def root - @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD + def validate_worker_count! + return unless @options[:worker_count] < 1 + STDERR.puts 'WARNING: --num-workers must be 1 or greater. This will raise an ArgumentError in the next major version.' end - def rails_root_defined? - defined?(::Rails.root) + def validate_identifier! + return unless @options[:identifier] && @options[:worker_count] > 1 + raise ArgumentError, 'Cannot specify both --num-workers and --identifier' end - def rails_logger_defined? - defined?(::Rails.logger) + def validate_workers_and_pools! + return unless @options[:worker_count] > 1 && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --num-workers and --pool. This will raise an ArgumentError in the next major version.' end - def exit_with_error_status - exit 1 + def validate_queues_and_pools! + return unless @options[:queues] && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --queues and --pool. This will raise an ArgumentError in the next major version.' end end end diff --git a/lib/delayed/launcher/base.rb b/lib/delayed/launcher/base.rb new file mode 100644 index 000000000..ae9bbfd12 --- /dev/null +++ b/lib/delayed/launcher/base.rb @@ -0,0 +1,101 @@ +module Delayed + module Launcher + class Base + attr_accessor :worker_count, + :pools, + :process_prefix, + :process_identifier + + def initialize(options) + @worker_index = 0 + @worker_count = options.delete(:worker_count) || 1 + @pools = options.delete(:pools) + @pools = nil if @pools == [] + @monitor = options.delete(:monitor) + @process_prefix = options.delete(:prefix) + @process_identifier = options.delete(:identifier) + @args = options.delete(:args) + + @options = options + @options[:pid_dir] ||= "#{Delayed.root}/tmp/pids" + @options[:log_dir] ||= "#{Delayed.root}/log" + end + + def launch + raise NotImplementedError, '#launch must be implemented in subclass' + end + + protected + + def setup_logger + Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) + end + + def setup_workers + if pools + setup_pooled_workers + elsif process_identifier + setup_identified_worker + elsif worker_count > 1 + setup_multiple_workers + else + setup_single_worker + end + end + + def setup_pooled_workers + pools.each do |queues, pool_worker_count| + options = @options.merge(:queues => queues) + pool_worker_count.times { add_worker(options) } + end + end + + def setup_multiple_workers + worker_count.times { add_worker(@options) } + end + + def setup_single_worker + raise NotImplementedError, '#setup_single_worker must be implemented in subclass' + end + + def setup_identified_worker + setup_single_worker + end + + def add_worker(_options) + raise NotImplementedError, '#add_worker must be implemented in subclass' + end + + def run_worker(worker_name, options) + Dir.chdir(Delayed.root) + set_process_name(worker_name) + Delayed::Worker.after_fork + setup_logger + worker = Delayed::Worker.new(options) + worker.name_prefix = "#{worker_name} " + worker.start + rescue => e + STDERR.puts e.message + STDERR.puts e.backtrace + logger.fatal(e) + exit_with_error_status + end + + def set_process_name(name) # rubocop:disable AccessorMethodName + $0 = process_prefix ? File.join(process_prefix, name) : name + end + + def get_name(label) + "delayed_job#{".#{label}" if label}" + end + + def exit_with_error_status + exit(1) + end + + def logger + @logger ||= Delayed::Worker.logger || (::Rails.logger if defined?(::Rails.logger)) || Logger.new(STDOUT) + end + end + end +end diff --git a/lib/delayed/launcher/daemonized.rb b/lib/delayed/launcher/daemonized.rb new file mode 100644 index 000000000..240c0a764 --- /dev/null +++ b/lib/delayed/launcher/daemonized.rb @@ -0,0 +1,50 @@ +require 'delayed/launcher/base' + +module Delayed + module Launcher + class Daemonized < Base + def initialize(options) + super + end + + def launch + require_daemons! + create_pid_dir + setup_workers + end + + private + + def require_daemons! + return if ENV['RAILS_ENV'] == 'test' + begin + require 'daemons' + rescue LoadError + raise "Add gem 'daemons' to your Gemfile or use --fork option." + end + end + + def create_pid_dir + dir = @options[:pid_dir] + FileUtils.mkdir_p(dir) unless File.exist?(dir) + end + + def setup_single_worker + run_process(get_name(process_identifier), @options) + end + + def add_worker(options) + process_name = get_name(@worker_index) + run_process(process_name, options) + @worker_index += 1 + end + + def run_process(process_name, options = {}) + Delayed::Worker.before_fork + Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| + run_worker(process_name, options) + end + end + end + end +end diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb new file mode 100644 index 000000000..cf0779b8b --- /dev/null +++ b/lib/delayed/launcher/forking.rb @@ -0,0 +1,74 @@ +require 'delayed/launcher/base' + +module Delayed + module Launcher + class Forking < Base + def launch + @stop = !!@options[:exit_on_complete] + setup_logger + setup_signals + Delayed::Worker.before_fork if worker_count > 1 + setup_workers + run_loop if worker_count > 1 + on_exit + end + + private + + def setup_signals + Signal.trap('INT') do + Thread.new { logger.info('Received SIGINT. Waiting for workers to finish current job...') } + @stop = true + end + + Signal.trap('TERM') do + Thread.new { logger.info('Received SIGTERM. Waiting for workers to finish current job...') } + @stop = true + end + end + + def workers + @workers ||= {} + end + + def setup_single_worker + set_process_name(get_name(process_identifier)) + Delayed::Worker.new(@options).start + end + + def add_worker(options) + worker_name = get_name(@worker_index) + worker_pid = fork_worker(worker_name, options) + + queues = options[:queues] + queue_msg = " queues=#{queues.join(',')}" unless queues.nil? || queues.empty? + logger.info "Worker #{worker_name} started - pid #{worker_pid}#{queue_msg}" + + workers[worker_pid] = [worker_name, queues] + @worker_index += 1 + end + + def fork_worker(worker_name, options) + fork { run_worker(worker_name, options) } + end + + def run_loop + loop do + worker_pid = Process.wait + worker_name, queues = workers.delete(worker_pid) + logger.info "Worker #{worker_name} exited - #{$?}" + break if @stop && workers.empty? + next if @stop + options = @options.merge(:queues => queues) + add_worker(options) + end + rescue Errno::ECHILD + logger.warn 'No worker processes found' + end + + def on_exit + logger.info "#{get_name(process_identifier)}#{' (parent)' if worker_count > 1} exited gracefully - pid #{$$}" + end + end + end +end diff --git a/lib/delayed/pool_parser.rb b/lib/delayed/pool_parser.rb new file mode 100644 index 000000000..a33e937c3 --- /dev/null +++ b/lib/delayed/pool_parser.rb @@ -0,0 +1,16 @@ +module Delayed + class PoolParser + def add(string) + string.split('|').each do |segment| + queues, worker_count = segment.split(':') + queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') + worker_count = (worker_count || 1).to_i rescue 1 + pools << [queues, worker_count] + end + end + + def pools + @pools ||= [] + end + end +end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 409ba48f8..8b2f673ef 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -6,24 +6,41 @@ desc 'Start a delayed_job worker.' task :work => :environment_options do - Delayed::Worker.new(@worker_options).start + Delayed::Launcher::Forking.new(@options).launch end desc 'Start a delayed_job worker and exit when all available jobs are complete.' task :workoff => :environment_options do - Delayed::Worker.new(@worker_options.merge(:exit_on_complete => true)).start + Delayed::Launcher::Forking.new(@options.merge(:exit_on_complete => true)).launch end task :environment_options => :environment do - @worker_options = { + require 'delayed/launcher/forking' + require 'delayed/pool_parser' + + @options = { + :worker_count => ENV['NUM_WORKERS'] || 1, :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], - :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), :quiet => ENV['QUIET'] } - @worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY'] - @worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD'] + queues = (ENV['QUEUES'] || ENV['QUEUE'] || '').split(',') + @options[:queues] = queues unless queues.empty? + + pools = PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools + @options[:pools] = pools unless pools.empty? + + if ENV['NUM_WORKERS'] && pools + raise ArgumentError, 'Cannot specify both NUM_WORKERS and POOLS' + end + + if queues && pools + raise ArgumentError, 'Cannot specify both QUEUES and POOLS' + end + + @options[:sleep_delay] = Integer(ENV['SLEEP_DELAY']) if ENV['SLEEP_DELAY'] + @options[:read_ahead] = Integer(ENV['READ_AHEAD']) if ENV['READ_AHEAD'] end desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet." diff --git a/lib/delayed/util.rb b/lib/delayed/util.rb new file mode 100644 index 000000000..8ca6a9814 --- /dev/null +++ b/lib/delayed/util.rb @@ -0,0 +1,11 @@ +require 'fileutils' + +module Delayed + def self.program_name + File.basename($PROGRAM_NAME) + end + + def self.root + defined?(::Rails.root) ? ::Rails.root : Pathname.new(Dir.pwd) + end +end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index d38f2edbc..5b6da02e7 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -1,4 +1,5 @@ require 'active_support' +require 'delayed/util' require 'delayed/compatibility' require 'delayed/exceptions' require 'delayed/message_sending' diff --git a/lib/generators/delayed_job/templates/script b/lib/generators/delayed_job/templates/script index edf195985..4a762ea49 100644 --- a/lib/generators/delayed_job/templates/script +++ b/lib/generators/delayed_job/templates/script @@ -2,4 +2,4 @@ require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) require 'delayed/command' -Delayed::Command.new(ARGV).daemonize +Delayed::Command.new(ARGV).launch diff --git a/spec/daemons.rb b/spec/daemons.rb index 750b130c0..4665afbd0 100644 --- a/spec/daemons.rb +++ b/spec/daemons.rb @@ -1,2 +1,7 @@ # Fake "daemons" file on the spec load path to allow spec/delayed/command_spec.rb # to test the Delayed::Command class without actually adding daemons as a dependency. +module Daemons + def self.run_proc(*_args) + yield if block_given? + end +end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b57cd6efa..092789fea 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -4,176 +4,430 @@ describe Delayed::Command do let(:options) { [] } let(:logger) { double('Logger') } + let(:output_options) { subject.instance_variable_get(:'@options') } + subject { Delayed::Command.new(options) } + + def verify_worker_processes + command = Delayed::Command.new(%w[-d] + options) + allow(FileUtils).to receive(:mkdir_p) + exp.each do |args| + expect(command.send(:launcher)).to receive(:run_process).with(*args).once + end + command.launch + end + + describe 'launch strategy' do + it 'should use daemon mode by default' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new([]).launch + end - subject { Delayed::Command.new options } + it 'should allow forking mode' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) + Delayed::Command.new(%w[--fork]).launch + end + + it 'using multiple switches should use first one' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[-d --fork]).launch + end - before do - allow(Delayed::Worker).to receive(:after_fork) - allow(Dir).to receive(:chdir) - allow(Logger).to receive(:new).and_return(logger) - allow_any_instance_of(Delayed::Worker).to receive(:start) - allow(Delayed::Worker).to receive(:logger=) - allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + it '#daemonize method should always launch in daemon mode' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[--fork]).daemonize + end end - shared_examples_for 'uses --log-dir option' do - context 'when --log-dir is specified' do - let(:options) { ['--log-dir=/custom/log/dir'] } + describe '--min-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:min_priority]).to eq nil } + end - it 'creates the delayed_job.log in the specified directory' do - expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') - subject.run - end + context 'set' do + let(:options) { %w[--min-priority 2] } + it { expect(output_options[:min_priority]).to eq 2 } + end + + context 'not a number' do + let(:options) { %w[--min-priority sponge] } + it { expect(output_options[:min_priority]).to eq nil } end end - describe 'run' do - it 'sets the Delayed::Worker logger' do - expect(Delayed::Worker).to receive(:logger=).with(logger) - subject.run + describe '--max-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:max_priority]).to eq nil } end - context 'when Rails root is defined' do - let(:rails_root) { Pathname.new '/rails/root' } - let(:rails) { double('Rails', :root => rails_root) } + context 'set' do + let(:options) { %w[--max-priority -5] } + it { expect(output_options[:max_priority]).to eq(-5) } + end - before do - stub_const('Rails', rails) - end + context 'not a number' do + let(:options) { %w[--max-priority giraffe] } + it { expect(output_options[:max_priority]).to eq nil } + end + end + + describe '--num-workers arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '-n set' do + let(:options) { %w[-n 2] } + it { expect(output_options[:worker_count]).to eq 2 } + end + + context '-n not a number' do + let(:options) { %w[-n elephant] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '--num-workers set' do + let(:options) { %w[--num-workers 4] } + it { expect(output_options[:worker_count]).to eq 4 } + end - it 'runs the Delayed::Worker process in Rails.root' do - expect(Dir).to receive(:chdir).with(rails_root) - subject.run + context '--num-workers not a number' do + let(:options) { %w[--num_workers hippo] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '--number_of_workers set' do + let(:options) { %w[--number_of_workers 5] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 5 end + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in Rails.root/log' do - expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') - subject.run - end + context '--number-of-workers not a number' do + let(:options) { %w[--number_of_workers rhino] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 1 end + end + end - include_examples 'uses --log-dir option' + describe '--pid-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:pid_dir]).to eq nil } end - context 'when Rails root is not defined' do - let(:rails_without_root) { double('Rails') } + context 'set' do + let(:options) { %w[--pid-dir ./foo/bar] } + it { expect(output_options[:pid_dir]).to eq './foo/bar' } + end - before do - stub_const('Rails', rails_without_root) + context 'worker processes' do + let(:options) { %w[--pid-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './foo/bar', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - it 'runs the Delayed::Worker process in $PWD' do - expect(Dir).to receive(:chdir).with(Delayed::Command::DIR_PWD) - subject.run - end + describe '--log-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:log_dir]).to eq nil } + end + + context 'set' do + let(:options) { %w[--log-dir ./foo/bar] } + it { expect(output_options[:log_dir]).to eq './foo/bar' } + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in $PWD/log' do - expect(Logger).to receive(:new).with("#{Delayed::Command::DIR_PWD}/log/delayed_job.log") - subject.run - end + context 'worker processes' do + let(:options) { %w[--log-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './foo/bar'}]] end + it { verify_worker_processes } + end + end + + describe '--monitor arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:monitor]).to eq false } + end + + context 'set' do + let(:options) { %w[--monitor] } + it { expect(output_options[:monitor]).to eq true } + end + end + + describe '--sleep-delay arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + + context 'set' do + let(:options) { %w[--sleep-delay 5] } + it { expect(output_options[:sleep_delay]).to eq(5) } + end + + context 'not a number' do + let(:options) { %w[--sleep-delay giraffe] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + end + + describe '--read-ahead arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:read_ahead]).to eq nil } + end - include_examples 'uses --log-dir option' + context 'set' do + let(:options) { %w[--read-ahead 5] } + it { expect(output_options[:read_ahead]).to eq(5) } end - context 'when an error is raised' do - let(:test_error) { Class.new(StandardError) } + context 'not a number' do + let(:options) { %w[--read-ahead giraffe] } + it { expect(output_options[:read_ahead]).to eq nil } + end + end + + describe '--identifier arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:identifier]).to eq nil } + end - before do - allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) - allow(subject).to receive(:exit_with_error_status) - allow(STDERR).to receive(:puts) + context '-i set' do + let(:options) { %w[-i bond] } + it { expect(output_options[:identifier]).to eq 'bond' } + end + + context '--identifier set' do + let(:options) { %w[--identifier goldfinger] } + it { expect(output_options[:identifier]).to eq 'goldfinger' } + end + + context 'worker processes' do + let(:options) { %w[--identifier spectre] } + let(:exp) do + [['delayed_job.spectre', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end + + describe '--prefix arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:prefix]).to eq nil } + end + + context '-p set' do + let(:options) { %w[-p oddjob] } + it { expect(output_options[:prefix]).to eq 'oddjob' } + end + + context '--prefix set' do + let(:options) { %w[--prefix jaws] } + it { expect(output_options[:prefix]).to eq 'jaws' } + end + end + + describe '--exit-on-complete arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } + end + + context '-x set' do + let(:options) { %w[-x] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + + context '--exit-on-complete set' do + let(:options) { %w[--exit-on-complete] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + end + + describe '--verbose arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:quiet]).to eq true } + end + + context '-v set' do + let(:options) { %w[-v] } + it { expect(output_options[:quiet]).to eq false } + end - it 'prints the error message to STDERR' do - expect(STDERR).to receive(:puts).with('An error') - subject.run + context '--verbose set' do + let(:options) { %w[--verbose] } + it { expect(output_options[:quiet]).to eq false } + end + + context 'worker processes' do + let(:options) { %w[-v] } + let(:exp) do + [['delayed_job', {:quiet => false, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - it 'exits with an error status' do - expect(subject).to receive(:exit_with_error_status) - subject.run + describe '--queues arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:queues]).to eq nil } + end + + context '--queue set' do + let(:options) { %w[--queue mailers] } + it { expect(output_options[:queues]).to eq %w[mailers] } + end + + context '--queues set' do + let(:options) { %w[--queues mailers,tweets] } + it { expect(output_options[:queues]).to eq %w[mailers tweets] } + end + + context 'worker processes' do + let(:options) { %w[--queues mailers,tweets] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers tweets]}]] end + it { verify_worker_processes } + end + end - context 'when Rails logger is not defined' do - let(:rails) { double('Rails') } + describe '--pool arg' do + context 'multiple --pool args set' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + end - before do - stub_const('Rails', rails) - end + context 'pipe-delimited' do + let(:options) { %w[--pools=*:1|test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + end - it 'does not attempt to use the Rails logger' do - subject.run - end + context 'queues specified as *' do + let(:options) { ['--pool=*:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] end + end - context 'when Rails logger is defined' do - let(:rails_logger) { double('Rails logger') } - let(:rails) { double('Rails', :logger => rails_logger) } + context 'queues not specified' do + let(:options) { ['--pools=:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] + end + end - before do - stub_const('Rails', rails) - end + context 'worker count not specified' do + let(:options) { ['--pool=mailers'] } + it 'should default to one worker' do + expect(output_options[:pools]).to eq [[['mailers'], 1]] + end + end - it 'logs the error to the Rails logger' do - expect(rails_logger).to receive(:fatal).with(test_error) - subject.run - end + context 'worker processes' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + let(:exp) do + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] end + it { verify_worker_processes } end end - describe 'parsing --pool argument' do - it 'should parse --pool correctly' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) - - expect(command.worker_pools).to eq [ - [[], 1], - [['test_queue'], 4], - [%w[mailers misc], 2] - ] + describe '--daemon-options arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } end - it 'should allow * or blank to specify any pools' do - command = Delayed::Command.new(['--pool=*:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + context 'set' do + let(:options) { %w[--daemon-options a,b,c] } + it { expect(subject.instance_variable_get(:'@daemon_options')).to eq %w[a b c] } + end + end - command = Delayed::Command.new(['--pool=:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + describe 'extra args' do + context '--daemon-options not set' do + let(:options) { %w[foo bar baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz] } end - it 'should default to one worker if not specified' do - command = Delayed::Command.new(['--pool=mailers']) - expect(command.worker_pools).to eq [ - [['mailers'], 1], - ] + context '--daemon-options set' do + let(:options) { %w[foo bar --daemon-options a,b,c baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz a b c] } end end - describe 'running worker pools defined by multiple --pool arguments' do - it 'should run the correct worker processes' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) - expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + describe 'validations' do + it 'should launch normally without validations' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d]).launch + end - [ - ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], - ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], - ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] - ].each do |args| - expect(command).to receive(:run_process).with(*args).once - end + it 'raise error num-workers and identifier are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect(STDERR).to_not receive(:puts) + expect { Delayed::Command.new(%w[-d --num-workers=2 --identifier=foobar]).launch }.to raise_error(ArgumentError) + end + + it 'warn if num-workers is 0' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=0]).launch + end + + it 'warn if both queues and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --queues=mailers --pool=mailers:2]).launch + end - command.daemonize + it 'warn if both num-workers and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=2 --pool=mailers:2]).launch end end end diff --git a/spec/delayed/launcher/daemonized_spec.rb b/spec/delayed/launcher/daemonized_spec.rb new file mode 100644 index 000000000..9f9cd5bf6 --- /dev/null +++ b/spec/delayed/launcher/daemonized_spec.rb @@ -0,0 +1,14 @@ +require_relative '../../helper' +require_relative 'shared_examples' +require 'delayed/launcher/daemonized' + +describe Delayed::Launcher::Daemonized do + def verify_worker_processes + exp.each do |args| + expect(subject).to receive(:run_process).with(*args).once + end + subject.launch + end + + it_behaves_like 'launcher shared examples' +end diff --git a/spec/delayed/launcher/forking_spec.rb b/spec/delayed/launcher/forking_spec.rb new file mode 100644 index 000000000..ef1c57993 --- /dev/null +++ b/spec/delayed/launcher/forking_spec.rb @@ -0,0 +1,22 @@ +require_relative '../../helper' +require_relative 'shared_examples' +require 'delayed/launcher/forking' + +describe Delayed::Launcher::Forking do + before do + @workers = [] + allow(Delayed::Worker).to receive(:logger).and_return(Logger.new(nil)) + allow_any_instance_of(described_class).to receive(:run_loop).and_return(nil) + allow_any_instance_of(described_class).to receive(:fork_worker) { |_, *args| @workers << args } + allow_any_instance_of(described_class).to receive(:setup_single_worker) do + @workers << [subject.send(:get_name, subject.send(:process_identifier)), subject.instance_variable_get(:'@options')] + end + end + + def verify_worker_processes + subject.launch + expect(@workers).to eq(exp) + end + + it_behaves_like 'launcher shared examples' +end diff --git a/spec/delayed/launcher/shared_examples.rb b/spec/delayed/launcher/shared_examples.rb new file mode 100644 index 000000000..ab2db7c20 --- /dev/null +++ b/spec/delayed/launcher/shared_examples.rb @@ -0,0 +1,189 @@ +shared_examples_for 'launcher shared examples' do + let(:options) { {} } + subject { described_class.new(options) } + + describe 'run_worker' do + let(:logger) { double('Logger') } + + before do + allow(Delayed::Worker).to receive(:after_fork) + allow(Dir).to receive(:chdir) + allow(Logger).to receive(:new).and_return(logger) + allow_any_instance_of(Delayed::Worker).to receive(:start) + allow(Delayed::Worker).to receive(:logger=) + allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + end + + shared_examples_for 'uses log_dir option' do + context 'when log_dir is specified' do + let(:options) { {:log_dir => '/custom/log/dir'} } + + it 'creates the delayed_job.log in the specified directory' do + expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + it 'sets the Delayed::Worker logger' do + expect(Delayed::Worker).to receive(:logger=).with(logger) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails root is defined' do + let(:rails_root) { Pathname.new '/rails/root' } + let(:rails) { double('Rails', :root => rails_root) } + + before do + stub_const('Rails', rails) + end + + it 'runs the Delayed::Worker process in Rails.root' do + expect(Dir).to receive(:chdir).with(rails_root) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in Rails.root/log' do + expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when Rails root is not defined' do + let(:rails_without_root) { double('Rails') } + + before do + stub_const('Rails', rails_without_root) + end + + it 'runs the Delayed::Worker process in $PWD' do + expect(Dir).to receive(:chdir).with(Pathname.new(Dir.pwd)) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in $PWD/log' do + expect(Logger).to receive(:new).with("#{Pathname.new(Dir.pwd)}/log/delayed_job.log") + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when an error is raised' do + let(:test_error) { Class.new(StandardError) } + + before do + allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) + allow(subject).to receive(:exit_with_error_status) + allow(STDERR).to receive(:puts) + end + + context 'using Delayed::Worker logger' do + before do + expect(logger).to receive(:fatal).with(test_error) + end + + it 'prints the error message to STDERR' do + expect(STDERR).to receive(:puts).with('An error') + subject.send(:run_worker, 'delayed_job.0', {}) + end + + it 'exits with an error status' do + expect(subject).to receive(:exit_with_error_status) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails logger is not defined' do + let(:rails) { double('Rails') } + + before do + stub_const('Rails', rails) + end + + it 'does not attempt to use the Rails logger' do + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + context 'when Rails logger is defined' do + let(:rails_logger) { double('Rails logger') } + let(:rails) { double('Rails', :logger => rails_logger) } + + before do + stub_const('Rails', rails) + allow(Delayed::Worker).to receive(:logger).and_return(nil) + end + + it 'logs the error to the Rails logger' do + expect(rails_logger).to receive(:fatal).with(test_error) + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + end + + describe 'spawning workers' do + context 'no args' do + let(:options) { {} } + let(:exp) do + [['delayed_job', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + + context ':pools arg' do + let(:options) { {:pools => [[[], 1], [['test_queue'], 4], [%w[mailers misc], 2]]} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':queues and :worker_count args' do + let(:options) { {:queues => %w[mailers misc], :worker_count => 4} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':pid_dir and :log_dir args' do + let(:options) { {:pid_dir => './foo/bar', :log_dir => './baz/qux', :worker_count => 2} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ['delayed_job.1', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ] + end + it { verify_worker_processes } + end + + context ':identifier and other args' do + let(:options) { {:monitor => true, :prefix => 'my_prefix', :identifier => 'my_identifier', :worker_count => 2, :args => {:foo => 'bar', :baz => 'qux'}} } + let(:exp) do + [['delayed_job.my_identifier', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + end +end diff --git a/spec/delayed/pool_parser_spec.rb b/spec/delayed/pool_parser_spec.rb new file mode 100644 index 000000000..f677682b1 --- /dev/null +++ b/spec/delayed/pool_parser_spec.rb @@ -0,0 +1,49 @@ +require 'helper' +require 'delayed/pool_parser' + +describe Delayed::PoolParser do + subject { described_class.new } + + describe '#add and #pools' do + it 'parsing pools' do + %w[*:1 test_queue:4 mailers,misc:2].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + + it 'should allow pipe delimiter' do + %w[*:1|test_queue:4 mailers,misc:2|foo,bar:3|baz:4].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2], + [%w[foo bar], 3], + [['baz'], 4], + ] + end + + it 'should allow * to specify any pools' do + subject.add('*:4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should allow blank to specify any pools' do + subject.add(':4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should default to one worker if not specified' do + subject.add('mailers') + expect(subject.pools).to eq [[['mailers'], 1]] + end + end +end