From 54eba86ad1c6ce6d3469c9a3f0625d81e34041f9 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Sun, 4 Apr 2021 19:29:34 +0900 Subject: [PATCH 01/20] Adds support for running as a forking process in the foreground Reason: The `script/delayed_job` process (described above) uses the [Daemons](https://github.com/thuehlinger/daemons) gem which has several undesirable behaviors when running inside a container (Docker, etc.): - The parent process spawns a background child process then exits immediately, which causes the container to shutdown. - The worker processes are detached from the parent, which prevents logging to `STDOUT`. The `--fork` option solves this by running workers in a foreground process tree. When using `--fork` the `daemons` gem is not required. The command script requires changing the following line to enable this: # always daemonizes, never forks Delayed::Command.new(ARGV).daemonize must now be: # daemonize by default unless --fork specified Delayed::Command.new(ARGV).launch In addition, there are several quality of life improvements added to the Command script: - Command: Add --pools arg as an alias for --pool, and allow pipe-delimited pool definitions - Command: Add --num-worker arg as an alias for -n / --number-of-workers (and deprecate --number-of-workers) - Command: Pool parsing code has been extracted to PoolParser class - Command: Add -v / --verbose switch which sets quiet=false on workers - Command: Add -x switch as alias for --exit-on-complete - Command: Add STDERR warning num-workers less than 1 - Command: Add STDERR warning if num-workers and pools both specified (pools overrides num-workers as per existing behavior) - Command: Add STDERR warning if queues and pools both specified (pools overrides num-workers as per existing behavior) The Rake script has also been enhanced: - Rake: Uses Forking launcher under the hood - Rake: Add support for NUM_WORKERS and POOLS args --- .github/workflows/ci.yml | 61 ++- .rubocop.yml | 3 + README.md | 323 +++++++------ lib/delayed/command.rb | 188 ++++---- lib/delayed/launcher/base.rb | 101 ++++ lib/delayed/launcher/daemonized.rb | 50 ++ lib/delayed/launcher/forking.rb | 74 +++ lib/delayed/pool_parser.rb | 16 + lib/delayed/tasks.rb | 29 +- lib/delayed/util.rb | 11 + lib/delayed_job.rb | 1 + lib/generators/delayed_job/templates/script | 2 +- spec/daemons.rb | 5 + spec/delayed/command_spec.rb | 488 +++++++++++++++----- spec/delayed/launcher/daemonized_spec.rb | 14 + spec/delayed/launcher/forking_spec.rb | 22 + spec/delayed/launcher/shared_examples.rb | 189 ++++++++ spec/delayed/pool_parser_spec.rb | 49 ++ 18 files changed, 1243 insertions(+), 383 deletions(-) create mode 100644 lib/delayed/launcher/base.rb create mode 100644 lib/delayed/launcher/daemonized.rb create mode 100644 lib/delayed/launcher/forking.rb create mode 100644 lib/delayed/pool_parser.rb create mode 100644 lib/delayed/util.rb create mode 100644 spec/delayed/launcher/daemonized_spec.rb create mode 100644 spec/delayed/launcher/forking_spec.rb create mode 100644 spec/delayed/launcher/shared_examples.rb create mode 100644 spec/delayed/pool_parser_spec.rb diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f78e0b26..c57788de8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,32 +9,51 @@ on: jobs: test: runs-on: ubuntu-latest + continue-on-error: ${{ matrix.experimental }} strategy: fail-fast: false matrix: - ruby: [2.5, 2.6, 2.7, jruby, jruby-head, ruby-head] + ruby: + - '2.5' + - '2.6' + - '2.7' + - '3.0' + - 'jruby' rails_version: - - '5.2.0' - - '6.0.0' - - '6.1.0.rc2' - - 'edge' + - '6.0' + - '6.1' include: - # - # The past - # - # EOL Active Record - - ruby: 2.2 - rails_version: '3.2.0' - - ruby: 2.1 - rails_version: '4.1.0' - - ruby: 2.4 - rails_version: '4.2.0' - - ruby: 2.4 - rails_version: '5.0.0' - - ruby: 2.5 - rails_version: '5.1.0' - - continue-on-error: ${{ matrix.rails_version == 'edge' || endsWith(matrix.ruby, 'head') }} + # Older Rails + - ruby: '2.2' + rails_version: '3.2' + - ruby: '2.1' + rails_version: '4.1' + - ruby: '2.4' + rails_version: '4.2' + - ruby: '2.4' + rails_version: '5.0' + - ruby: '2.5' + rails_version: '5.1' + - ruby: '2.5' + rails_version: '5.2' + - ruby: 'jruby' + rails_version: '5.2' + # Experimental + - ruby: 'ruby-head' + rails_version: '6.1' + experimental: true + - ruby: 'jruby-head' + rails_version: '6.1' + experimental: true + - ruby: '2.7' + rails_version: 'edge' + experimental: true + - ruby: '3.0' + rails_version: 'edge' + experimental: true + - ruby: 'jruby' + rails_version: 'edge' + experimental: true steps: - uses: actions/checkout@v2 diff --git a/.rubocop.yml b/.rubocop.yml index 3b0bd2701..ee021c566 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -127,6 +127,9 @@ SignalException: SpaceInsideHashLiteralBraces: EnforcedStyle: no_space +Style/SpecialGlobalVars: + Enabled: false + Style/SymbolArray: Enabled: false diff --git a/README.md b/README.md index 29ad10782..b9bb8389d 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ you're reading the documentation for the master branch. [View documentation for the latest release (4.1.9).](https://github.com/collectiveidea/delayed_job/tree/v4.1.9)** -Delayed::Job -============ +# Delayed Job + [![Gem Version](https://badge.fury.io/rb/delayed_job.svg)][gem] ![CI](https://github.com/collectiveidea/delayed_job/workflows/CI/badge.svg) [![Code Climate](https://codeclimate.com/github/collectiveidea/delayed_job.svg)][codeclimate] @@ -14,87 +14,74 @@ Delayed::Job [codeclimate]: https://codeclimate.com/github/collectiveidea/delayed_job [coveralls]: https://coveralls.io/r/collectiveidea/delayed_job -Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing -longer tasks in the background. +Delayed Job encapsulates the common pattern of asynchronously executing +longer tasks in the background. Examples of such tasks include: + +* Sending emails +* Image resizing +* HTTP downloads +* Updating smart collections +* Updating Solr, our search server, after product changes +* Batch imports +* Spam checks -It is a direct extraction from Shopify where the job table is responsible for a -multitude of core tasks. Amongst those tasks are: +Delayed Job was extracted from Shopify. -* sending massive newsletters -* image resizing -* http downloads -* updating smart collections -* updating solr, our search server, after product changes -* batch imports -* spam checks +## Installation -[Follow us on Twitter][twitter] to get updates and notices about new releases. +### Version Support -[twitter]: https://twitter.com/delayedjob +Delayed Job 4.x only supports Rails 3.0+. -Installation -============ -delayed_job 3.0.0 only supports Rails 3.0+. +### Configuring your Database -delayed_job supports multiple backends for storing the job queue. [See the wiki +Delayed Job supports multiple backends for storing the job queue. [See the wiki for other backends](https://github.com/collectiveidea/delayed_job/wiki/Backends). -If you plan to use delayed_job with Active Record, add `delayed_job_active_record` to your `Gemfile`. +To use with Active Record, add `delayed_job_active_record` to your `Gemfile`. ```ruby gem 'delayed_job_active_record' ``` -If you plan to use delayed_job with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. +To use with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. ```ruby gem 'delayed_job_mongoid' ``` -Run `bundle install` to install the backend and delayed_job gems. +Run `bundle install` to install the backend and `delayed_job` gems. The Active Record backend requires a jobs table. You can create that table by running the following command: - rails generate delayed_job:active_record - rake db:migrate - -For Rails 4.2+, see [below](#active-job) +``` +rails generate delayed_job:active_record +rake db:migrate +``` -Development -=========== -In development mode, if you are using Rails 3.1+, your application code will automatically reload every 100 jobs or when the queue finishes. -You no longer need to restart Delayed Job every time you update your code in development. +### Active Job -Active Job -========== -In Rails 4.2+, set the queue_adapter in config/application.rb +To use Delayed Job with Active Job (Rails 4.2+), set the `queue_adapter` in `config/application.rb`: ```ruby config.active_job.queue_adapter = :delayed_job ``` -See the [rails guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. - -Rails 4.x -========= -If you are using the protected_attributes gem, it must appear before delayed_job in your gemfile. If your jobs are failing with: +See the [Rails Guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. - ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +### Protected Attributes -then this is the fix you're looking for. +When using the `protected_attributes` gem, it must appear before `delayed_job` in your `Gemfile`. Otherwise you will see this error: -Upgrading from 2.x to 3.0.0 on Active Record -============================================ -Delayed Job 3.0.0 introduces a new column to the delayed_jobs table. +``` +ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +``` -If you're upgrading from Delayed Job 2.x, run the upgrade generator to create a migration to add the column. +## Using Delayed Job in your Application - rails generate delayed_job:upgrade - rake db:migrate +### Queuing Jobs -Queuing Jobs -============ Call `.delay.method(params)` on any object and it will be processed in the background. ```ruby @@ -120,7 +107,7 @@ device = Device.new device.deliver ``` -## Parameters +### Parameters `#handle_asynchronously` and `#delay` take these parameters: @@ -165,10 +152,11 @@ class LongTasks end ``` -If you ever want to call a `handle_asynchronously`'d method without Delayed Job, for instance while debugging something at the console, just add `_without_delay` to the method name. For instance, if your original method was `foo`, then call `foo_without_delay`. +To call a `handle_asynchronously`'d method without Delayed Job, add `_without_delay` to the method name. +For instance, if your original method was `foo`, then call `foo_without_delay`. + +### Rails Mailers -Rails Mailers -============= Delayed Job uses special syntax for Rails Mailers. Do not call the `.deliver` method when using `.delay`. @@ -190,108 +178,167 @@ You may also wish to consider using [Active Job with Action Mailer](https://edgeguides.rubyonrails.org/active_job_basics.html#action-mailer) which provides convenient `.deliver_later` syntax that forwards to Delayed Job under-the-hood. -Named Queues -============ -DJ 3 introduces Resque-style named queues while still retaining DJ-style -priority. The goal is to provide a system for grouping tasks to be worked by -separate pools of workers, which may be scaled and controlled individually. +### Queues -Jobs can be assigned to a queue by setting the `queue` option: +Delayed Job supports assigning jobs to named queues. Each queue may be worked by a +separate pool of workers, which may then be scaled and controlled individually. + +Jobs can be assigned to a queue by setting the `:queue` option: ```ruby -object.delay(:queue => 'tracking').method +object.delay(queue: 'tracking').method -Delayed::Job.enqueue job, :queue => 'tracking' +Delayed::Job.enqueue job, queue: 'tracking' -handle_asynchronously :tweet_later, :queue => 'tweets' +handle_asynchronously :tweet_later, queue: 'tweets' ``` -You can configure default priorities for named queues: +You may configure a default priority for each queue (lower number = higher priority): ```ruby Delayed::Worker.queue_attributes = { - high_priority: { priority: -10 }, - low_priority: { priority: 10 } + high_priority_queue: { priority: -10 }, + low_priority_queue: { priority: 10 } } ``` -Configured queue priorities can be overriden by passing priority to the delay method +To override the queue's default priority, pass `:priority` to the delay method: ```ruby -object.delay(:queue => 'high_priority', priority: 0).method +object.delay(queue: 'high_priority_queue', priority: 0).method ``` You can start processes to only work certain queues with the `queue` and `queues` -options defined below. Processes started without specifying a queue will run jobs -from **any** queue. To effectively have a process that runs jobs where a queue is not -specified, set a default queue name with `Delayed::Worker.default_queue_name` and -have the processes run that queue. +options (refer to "Running Jobs" section below.) Processes started without specifying +a queue will run jobs from **any** queue. To effectively have a process that runs +jobs where a queue is not specified, set a default queue name with +`Delayed::Worker.default_queue_name` and have the processes run that queue. -Running Jobs -============ -`script/delayed_job` can be used to manage a background process which will -start working off jobs. +## Running Jobs -To do so, add `gem "daemons"` to your `Gemfile` and make sure you've run `rails -generate delayed_job`. +### Running as a Daemon Process -You can then do the following: +`script/delayed_job` starts a background daemon process which will continually work jobs. - RAILS_ENV=production script/delayed_job start - RAILS_ENV=production script/delayed_job stop +To install this script, add `gem "daemons"` to your `Gemfile` then run `rails generate delayed_job`. - # Runs two workers in separate processes. - RAILS_ENV=production script/delayed_job -n 2 start - RAILS_ENV=production script/delayed_job stop +Then run the `start` command: - # Set the --queue or --queues option to work from a particular queue. - RAILS_ENV=production script/delayed_job --queue=tracking start - RAILS_ENV=production script/delayed_job --queues=mailers,tasks start +``` +# Run a single worker as a background process +RAILS_ENV=production script/delayed_job start - # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. - # The following command will start 1 worker for the tracking queue, - # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: - RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +# Run 4 workers in separate background child processes +RAILS_ENV=production script/delayed_job -n4 start +``` - # Runs all available jobs and then exits - RAILS_ENV=production script/delayed_job start --exit-on-complete - # or to run in the foreground - RAILS_ENV=production script/delayed_job run --exit-on-complete +Each worker will check the database at least every 5 seconds. -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +### Stopping and Restarting -Workers can be running on any computer, as long as they have access to the -database and their clock is in sync. Keep in mind that each worker will check -the database at least every 5 seconds. +You may use `stop` and `restart` commands. These commands wait for each worker +to finish its current job before proceeding. -You can also invoke `rake jobs:work` which will start working off jobs. You can -cancel the rake task with `CTRL-C`. +``` +# Shutdown all workers and exit +RAILS_ENV=production script/delayed_job stop -If you want to just run all available jobs and exit you can use `rake jobs:workoff` +# Shutdown all workers and start a single new worker process +RAILS_ENV=production script/delayed_job restart -Work off queues by setting the `QUEUE` or `QUEUES` environment variable. +# Shutdown all workers and start 4 new worker processes +RAILS_ENV=production script/delayed_job -n4 restart +``` - QUEUE=tracking rake jobs:work - QUEUES=mailers,tasks rake jobs:work +You must pass the same arguments to `restart` that you used when calling `start`. -Restarting delayed_job -====================== +You may also send `SIGTERM` to stop Delayed Job. -The following syntax will restart delayed jobs: +### Worker Queues and Pools - RAILS_ENV=production script/delayed_job restart +``` +# Set the --queues option to work from a particular queue +RAILS_ENV=production script/delayed_job --queues=mailers,tasks start + +# Use the --pool option to specify a worker pool. You can use this option multiple +# times to start different numbers of workers for different queues. +# The following command will start 1 worker for the tracking queue, +# 2 workers for the mailers and tasks queues, and 2 workers for any jobs: +RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +``` -To restart multiple delayed_job workers: +### Exit On Complete Mode - RAILS_ENV=production script/delayed_job -n2 restart +``` +# Run as a daemon and exit after working all available jobs +RAILS_ENV=production script/delayed_job start --exit-on-complete -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +# or to run in the foreground +RAILS_ENV=production script/delayed_job run --exit-on-complete +``` +### Experimental: Forking Mode +The `script/delayed_job` process (described above) uses the +[Daemons](https://github.com/thuehlinger/daemons) gem which has several +undesirable behaviors when running inside a container (Docker, etc.): -Custom Jobs -=========== -Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner. +* The parent process spawns a background child process then exits immediately, +which causes the container to shutdown. +* The worker processes are detached from the parent, which prevents logging to `STDOUT`. + +The `--fork` option solves this by running workers in a foreground process tree. +When using `--fork` the `daemons` gem is not required. + +``` +# Run as a foreground process with 4 worker child processes. +RAILS_ENV=production script/delayed_job --fork -n4 start + +# Forking mode supports queues and pools. +RAILS_ENV=production script/delayed_job --fork --pool=mailers,tasks:2 --pool=*:2 start +``` + +Forking mode does not yet support the `restart`, `stop`, etc. script commands. +Use `SIGTERM` or `SIGINT` to allow workers to finish their current job then gracefully shutdown. + +### Running via Rake + +You may start a worker process using `rake jobs:work`. +You can exit the rake task with `CTRL-C`. + +To run all available jobs and exit, use `rake jobs:workoff`. + +Set the `QUEUES` or `POOLS` environment variable to work specific queues. + +``` +# Start a worker listening to all jobs +rake jobs:work + +# Run 4 worker processes and exit when all jobs are finished +NUM_WORKERS=4 rake jobs:workoff + +# Work all jobs from the "mailers" and "tasks" queues, then exit +NUM_WORKERS=4 QUEUES=mailers,tasks rake jobs:workoff + +# You may also specify POOLS pipe-delimited +POOLS=mailers,tasks:2|tweets:1|*:2 rake jobs:workoff +``` + +Rake uses Forking Mode (see above) under-the-hood. + +### Development + +In `development` environment, Delayed Job will automatically reload +your application code after each 100 jobs or when the queue finishes. +You should not need to restart Delayed Job each time you update your code. + +## Advanced Topics + +### Custom Jobs + +Jobs are simple ruby objects with a method called `perform`. +Any object which responds to `perform` can be enqueued into the jobs table. +Job objects are serialized to YAML so that they can later be marshalled by the job runner. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -303,7 +350,7 @@ end Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email)) ``` -To set a per-job max attempts that overrides the Delayed::Worker.max_attempts you can define a max_attempts method on the job +To override `Delayed::Worker.max_attempts` per-job, you can define a `max_attempts` instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -317,9 +364,11 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job max run time that overrides the Delayed::Worker.max_run_time you can define a max_run_time method on the job +To override `Delayed::Worker.max_run_time` per-job, you may define a `max_run_time` +instance method in the job class. -NOTE: this can ONLY be used to set a max_run_time that is lower than Delayed::Worker.max_run_time. Otherwise the lock on the job would expire and another worker would start the working on the in progress job. +**NOTE:** You may only set a `max_run_time` that is lower than `Delayed::Worker.max_run_time`. +Otherwise the lock on the job would expire and a second worker would start working the same in-progress job. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -333,7 +382,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job default for destroying failed jobs that overrides the Delayed::Worker.destroy_failed_jobs you can define a destroy_failed_jobs? method on the job +To override `Delayed::Worker.destroy_failed_jobs` per-job, you may define a `destroy_failed_jobs?` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -347,7 +397,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a default queue name for a custom job that overrides Delayed::Worker.default_queue_name, you can define a queue_name method on the job +To override `Delayed::Worker.default_queue_name` per-job, you may define a `queue_name` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -361,7 +412,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. You can define your own `reschedule_at` method to override this default behavior. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. +You may define a `reschedule_at` instance method to override this default behavior. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -370,17 +422,18 @@ NewsletterJob = Struct.new(:text, :emails) do end def reschedule_at(current_time, attempts) - current_time + 5.seconds + current_time + (attempts * 60).seconds end end ``` -Hooks -===== -You can define hooks on your job that will be called at different stages in the process: +### Hooks +You can define hooks on your job that will be called at different stages in the process: -**NOTE:** If you are using ActiveJob these hooks are **not** available to your jobs. You will need to use ActiveJob's callbacks. You can find details here https://guides.rubyonrails.org/active_job_basics.html#callbacks +**NOTE:** If you are using Active Job these hooks are **not** available to your jobs. +You will need to use Active Job's callbacks. +See the [Rails Guides](https://guides.rubyonrails.org/active_job_basics.html#callbacks) for details. ```ruby class ParanoidNewsletterJob < NewsletterJob @@ -414,9 +467,9 @@ class ParanoidNewsletterJob < NewsletterJob end ``` -Gory Details -============ -The library revolves around a delayed_jobs table which looks as follows: +### Gory Details + +The library revolves around a `delayed_jobs` table which looks as follows: ```ruby create_table :delayed_jobs, :force => true do |table| @@ -433,12 +486,12 @@ create_table :delayed_jobs, :force => true do |table| end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` method. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` instance method. -The default `Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. +The default `Delayed::Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. With the default of 25 attempts, the last retry will be 20 days later, with the last interval being almost 100 hours. -The default `Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to +The default `Delayed::Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take. By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set @@ -454,7 +507,7 @@ If no jobs are found, the worker sleeps for the amount of time specified by the It is possible to disable delayed jobs for testing purposes. Set `Delayed::Worker.delay_jobs = false` to execute all jobs realtime. -Or `Delayed::Worker.delay_jobs` can be a Proc that decides whether to execute jobs inline on a per-job basis: +`Delayed::Worker.delay_jobs` may also be a `Proc` that decides whether to execute jobs inline on a per-job basis: ```ruby Delayed::Worker.delay_jobs = ->(job) { @@ -479,12 +532,12 @@ Delayed::Worker.raise_signal_exceptions = :term Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log')) ``` -Cleaning up -=========== +### Cleaning up + You can invoke `rake jobs:clear` to delete all jobs in the queue. -Having problems? -================ +### Having problems? + Good places to get help are: * [Google Groups](http://groups.google.com/group/delayed_job) where you can join our mailing list. * [StackOverflow](http://stackoverflow.com/questions/tagged/delayed-job) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..8cec823e3 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -1,33 +1,54 @@ -unless ENV['RAILS_ENV'] == 'test' - begin - require 'daemons' - rescue LoadError - raise "You need to add gem 'daemons' to your Gemfile if you wish to use it." - end -end require 'fileutils' require 'optparse' require 'pathname' +require 'delayed/pool_parser' +require 'delayed/launcher/daemonized' +require 'delayed/launcher/forking' module Delayed class Command # rubocop:disable ClassLength - attr_accessor :worker_count, :worker_pools - - DIR_PWD = Pathname.new Dir.pwd - - def initialize(args) # rubocop:disable MethodLength + def initialize(args) @options = { :quiet => true, - :pid_dir => "#{root}/tmp/pids", - :log_dir => "#{root}/log" + :worker_count => 1, + :monitor => false } - @worker_count = 1 - @monitor = false + @options[:args] = option_parser.parse!(args) + (@daemon_options || []) + + pools = pool_parser.pools + @options[:pools] = pools unless pools.empty? - opts = OptionParser.new do |opt| - opt.banner = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run" + validate_options! + end + def launch + launcher.launch + end + + def daemonize + @launch_strategy = :daemon + launch + end + + private + + def launcher + @launcher ||= launcher_class.new(@options) + end + + def launcher_class + case @launch_strategy + when :fork + Delayed::Launcher::Forking + else + Delayed::Launcher::Daemonized + end + end + + def option_parser # rubocop:disable MethodLength, CyclomaticComplexity + @option_parser ||= OptionParser.new do |opt| + opt.banner = "Usage: #{Delayed.program_name} [options] start|stop|restart|run" opt.on('-h', '--help', 'Show this message') do puts opt exit 1 @@ -35,14 +56,24 @@ def initialize(args) # rubocop:disable MethodLength opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7' end + opt.on('-d', '--daemon', 'Launch in daemon mode') do |_fork| + @launch_strategy ||= :daemon + end + opt.on('--fork', 'Launch in forking mode') do |_fork| + @launch_strategy ||= :fork + end opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n| - @options[:min_priority] = n + @options[:min_priority] = Integer(n) rescue nil end opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n| - @options[:max_priority] = n + @options[:max_priority] = Integer(n) rescue nil + end + opt.on('-n', '--num-workers=workers', 'Number of child workers to spawn') do |n| + @options[:worker_count] = Integer(n) rescue 1 end - opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count| - @worker_count = worker_count.to_i rescue 1 + opt.on('--number-of-workers=workers', 'Number of child workers to spawn') do |n| + STDERR.puts 'DEPRECATED: Use -n or --num-workers instead of --number-of-workers. This will be removed in the next major version.' + @options[:worker_count] = Integer(n) rescue 1 end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| @options[:pid_dir] = dir @@ -50,123 +81,74 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--log-dir=DIR', 'Specifies an alternate directory in which to store the delayed_job log.') do |dir| @options[:log_dir] = dir end + opt.on('-v', '--verbose', 'Output additional logging') do + @options[:quiet] = false + end opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n| @options[:identifier] = n end opt.on('-m', '--monitor', 'Start monitor process.') do - @monitor = true + @options[:monitor] = true end opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n| - @options[:sleep_delay] = n.to_i + @options[:sleep_delay] = Integer(n) rescue nil end opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n| - @options[:read_ahead] = n + @options[:read_ahead] = Integer(n) rescue nil end opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix| @options[:prefix] = prefix end - opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues| + opt.on('--queues=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queues| @options[:queues] = queues.split(',') end - opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| + opt.on('--queue=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queue| @options[:queues] = queue.split(',') end + opt.on('--pools=queue1[,queue2][:worker_count][|...]', 'Specify queues and number of workers for a worker pool. Use pipe to delimit multiple pools.') do |pools| + pool_parser.add(pools) + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| - parse_worker_pool(pool) + pool_parser.add(pool) end - opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do + opt.on('-x', '--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| @daemon_options = daemon_options end end - @args = opts.parse!(args) + (@daemon_options || []) end - def daemonize # rubocop:disable PerceivedComplexity - dir = @options[:pid_dir] - FileUtils.mkdir_p(dir) unless File.exist?(dir) - - if worker_pools - setup_pools - elsif @options[:identifier] - # rubocop:disable GuardClause - if worker_count > 1 - raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier' - else - run_process("delayed_job.#{@options[:identifier]}", @options) - end - # rubocop:enable GuardClause - else - worker_count.times do |worker_index| - process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}" - run_process(process_name, @options) - end - end - end - - def setup_pools - worker_index = 0 - @worker_pools.each do |queues, worker_count| - options = @options.merge(:queues => queues) - worker_count.times do - process_name = "delayed_job.#{worker_index}" - run_process(process_name, options) - worker_index += 1 - end - end + def pool_parser + @pool_parser ||= PoolParser.new end - def run_process(process_name, options = {}) - Delayed::Worker.before_fork - Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| - $0 = File.join(options[:prefix], process_name) if @options[:prefix] - run process_name, options - end - end - - def run(worker_name = nil, options = {}) - Dir.chdir(root) - - Delayed::Worker.after_fork - Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) - - worker = Delayed::Worker.new(options) - worker.name_prefix = "#{worker_name} " - worker.start - rescue => e - STDERR.puts e.message - STDERR.puts e.backtrace - ::Rails.logger.fatal(e) if rails_logger_defined? - exit_with_error_status - end - - private - - def parse_worker_pool(pool) - @worker_pools ||= [] - - queues, worker_count = pool.split(':') - queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') - worker_count = (worker_count || 1).to_i rescue 1 - @worker_pools << [queues, worker_count] + def validate_options! + validate_worker_count! + validate_identifier! + validate_workers_and_pools! + validate_queues_and_pools! end - def root - @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD + def validate_worker_count! + return unless @options[:worker_count] < 1 + STDERR.puts 'WARNING: --num-workers must be 1 or greater. This will raise an ArgumentError in the next major version.' end - def rails_root_defined? - defined?(::Rails.root) + def validate_identifier! + return unless @options[:identifier] && @options[:worker_count] > 1 + raise ArgumentError, 'Cannot specify both --num-workers and --identifier' end - def rails_logger_defined? - defined?(::Rails.logger) + def validate_workers_and_pools! + return unless @options[:worker_count] > 1 && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --num-workers and --pool. This will raise an ArgumentError in the next major version.' end - def exit_with_error_status - exit 1 + def validate_queues_and_pools! + return unless @options[:queues] && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --queues and --pool. This will raise an ArgumentError in the next major version.' end end end diff --git a/lib/delayed/launcher/base.rb b/lib/delayed/launcher/base.rb new file mode 100644 index 000000000..ae9bbfd12 --- /dev/null +++ b/lib/delayed/launcher/base.rb @@ -0,0 +1,101 @@ +module Delayed + module Launcher + class Base + attr_accessor :worker_count, + :pools, + :process_prefix, + :process_identifier + + def initialize(options) + @worker_index = 0 + @worker_count = options.delete(:worker_count) || 1 + @pools = options.delete(:pools) + @pools = nil if @pools == [] + @monitor = options.delete(:monitor) + @process_prefix = options.delete(:prefix) + @process_identifier = options.delete(:identifier) + @args = options.delete(:args) + + @options = options + @options[:pid_dir] ||= "#{Delayed.root}/tmp/pids" + @options[:log_dir] ||= "#{Delayed.root}/log" + end + + def launch + raise NotImplementedError, '#launch must be implemented in subclass' + end + + protected + + def setup_logger + Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) + end + + def setup_workers + if pools + setup_pooled_workers + elsif process_identifier + setup_identified_worker + elsif worker_count > 1 + setup_multiple_workers + else + setup_single_worker + end + end + + def setup_pooled_workers + pools.each do |queues, pool_worker_count| + options = @options.merge(:queues => queues) + pool_worker_count.times { add_worker(options) } + end + end + + def setup_multiple_workers + worker_count.times { add_worker(@options) } + end + + def setup_single_worker + raise NotImplementedError, '#setup_single_worker must be implemented in subclass' + end + + def setup_identified_worker + setup_single_worker + end + + def add_worker(_options) + raise NotImplementedError, '#add_worker must be implemented in subclass' + end + + def run_worker(worker_name, options) + Dir.chdir(Delayed.root) + set_process_name(worker_name) + Delayed::Worker.after_fork + setup_logger + worker = Delayed::Worker.new(options) + worker.name_prefix = "#{worker_name} " + worker.start + rescue => e + STDERR.puts e.message + STDERR.puts e.backtrace + logger.fatal(e) + exit_with_error_status + end + + def set_process_name(name) # rubocop:disable AccessorMethodName + $0 = process_prefix ? File.join(process_prefix, name) : name + end + + def get_name(label) + "delayed_job#{".#{label}" if label}" + end + + def exit_with_error_status + exit(1) + end + + def logger + @logger ||= Delayed::Worker.logger || (::Rails.logger if defined?(::Rails.logger)) || Logger.new(STDOUT) + end + end + end +end diff --git a/lib/delayed/launcher/daemonized.rb b/lib/delayed/launcher/daemonized.rb new file mode 100644 index 000000000..240c0a764 --- /dev/null +++ b/lib/delayed/launcher/daemonized.rb @@ -0,0 +1,50 @@ +require 'delayed/launcher/base' + +module Delayed + module Launcher + class Daemonized < Base + def initialize(options) + super + end + + def launch + require_daemons! + create_pid_dir + setup_workers + end + + private + + def require_daemons! + return if ENV['RAILS_ENV'] == 'test' + begin + require 'daemons' + rescue LoadError + raise "Add gem 'daemons' to your Gemfile or use --fork option." + end + end + + def create_pid_dir + dir = @options[:pid_dir] + FileUtils.mkdir_p(dir) unless File.exist?(dir) + end + + def setup_single_worker + run_process(get_name(process_identifier), @options) + end + + def add_worker(options) + process_name = get_name(@worker_index) + run_process(process_name, options) + @worker_index += 1 + end + + def run_process(process_name, options = {}) + Delayed::Worker.before_fork + Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| + run_worker(process_name, options) + end + end + end + end +end diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb new file mode 100644 index 000000000..cf0779b8b --- /dev/null +++ b/lib/delayed/launcher/forking.rb @@ -0,0 +1,74 @@ +require 'delayed/launcher/base' + +module Delayed + module Launcher + class Forking < Base + def launch + @stop = !!@options[:exit_on_complete] + setup_logger + setup_signals + Delayed::Worker.before_fork if worker_count > 1 + setup_workers + run_loop if worker_count > 1 + on_exit + end + + private + + def setup_signals + Signal.trap('INT') do + Thread.new { logger.info('Received SIGINT. Waiting for workers to finish current job...') } + @stop = true + end + + Signal.trap('TERM') do + Thread.new { logger.info('Received SIGTERM. Waiting for workers to finish current job...') } + @stop = true + end + end + + def workers + @workers ||= {} + end + + def setup_single_worker + set_process_name(get_name(process_identifier)) + Delayed::Worker.new(@options).start + end + + def add_worker(options) + worker_name = get_name(@worker_index) + worker_pid = fork_worker(worker_name, options) + + queues = options[:queues] + queue_msg = " queues=#{queues.join(',')}" unless queues.nil? || queues.empty? + logger.info "Worker #{worker_name} started - pid #{worker_pid}#{queue_msg}" + + workers[worker_pid] = [worker_name, queues] + @worker_index += 1 + end + + def fork_worker(worker_name, options) + fork { run_worker(worker_name, options) } + end + + def run_loop + loop do + worker_pid = Process.wait + worker_name, queues = workers.delete(worker_pid) + logger.info "Worker #{worker_name} exited - #{$?}" + break if @stop && workers.empty? + next if @stop + options = @options.merge(:queues => queues) + add_worker(options) + end + rescue Errno::ECHILD + logger.warn 'No worker processes found' + end + + def on_exit + logger.info "#{get_name(process_identifier)}#{' (parent)' if worker_count > 1} exited gracefully - pid #{$$}" + end + end + end +end diff --git a/lib/delayed/pool_parser.rb b/lib/delayed/pool_parser.rb new file mode 100644 index 000000000..a33e937c3 --- /dev/null +++ b/lib/delayed/pool_parser.rb @@ -0,0 +1,16 @@ +module Delayed + class PoolParser + def add(string) + string.split('|').each do |segment| + queues, worker_count = segment.split(':') + queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') + worker_count = (worker_count || 1).to_i rescue 1 + pools << [queues, worker_count] + end + end + + def pools + @pools ||= [] + end + end +end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 409ba48f8..8b2f673ef 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -6,24 +6,41 @@ desc 'Start a delayed_job worker.' task :work => :environment_options do - Delayed::Worker.new(@worker_options).start + Delayed::Launcher::Forking.new(@options).launch end desc 'Start a delayed_job worker and exit when all available jobs are complete.' task :workoff => :environment_options do - Delayed::Worker.new(@worker_options.merge(:exit_on_complete => true)).start + Delayed::Launcher::Forking.new(@options.merge(:exit_on_complete => true)).launch end task :environment_options => :environment do - @worker_options = { + require 'delayed/launcher/forking' + require 'delayed/pool_parser' + + @options = { + :worker_count => ENV['NUM_WORKERS'] || 1, :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], - :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), :quiet => ENV['QUIET'] } - @worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY'] - @worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD'] + queues = (ENV['QUEUES'] || ENV['QUEUE'] || '').split(',') + @options[:queues] = queues unless queues.empty? + + pools = PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools + @options[:pools] = pools unless pools.empty? + + if ENV['NUM_WORKERS'] && pools + raise ArgumentError, 'Cannot specify both NUM_WORKERS and POOLS' + end + + if queues && pools + raise ArgumentError, 'Cannot specify both QUEUES and POOLS' + end + + @options[:sleep_delay] = Integer(ENV['SLEEP_DELAY']) if ENV['SLEEP_DELAY'] + @options[:read_ahead] = Integer(ENV['READ_AHEAD']) if ENV['READ_AHEAD'] end desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet." diff --git a/lib/delayed/util.rb b/lib/delayed/util.rb new file mode 100644 index 000000000..8ca6a9814 --- /dev/null +++ b/lib/delayed/util.rb @@ -0,0 +1,11 @@ +require 'fileutils' + +module Delayed + def self.program_name + File.basename($PROGRAM_NAME) + end + + def self.root + defined?(::Rails.root) ? ::Rails.root : Pathname.new(Dir.pwd) + end +end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index d38f2edbc..5b6da02e7 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -1,4 +1,5 @@ require 'active_support' +require 'delayed/util' require 'delayed/compatibility' require 'delayed/exceptions' require 'delayed/message_sending' diff --git a/lib/generators/delayed_job/templates/script b/lib/generators/delayed_job/templates/script index edf195985..4a762ea49 100644 --- a/lib/generators/delayed_job/templates/script +++ b/lib/generators/delayed_job/templates/script @@ -2,4 +2,4 @@ require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) require 'delayed/command' -Delayed::Command.new(ARGV).daemonize +Delayed::Command.new(ARGV).launch diff --git a/spec/daemons.rb b/spec/daemons.rb index 750b130c0..4665afbd0 100644 --- a/spec/daemons.rb +++ b/spec/daemons.rb @@ -1,2 +1,7 @@ # Fake "daemons" file on the spec load path to allow spec/delayed/command_spec.rb # to test the Delayed::Command class without actually adding daemons as a dependency. +module Daemons + def self.run_proc(*_args) + yield if block_given? + end +end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b57cd6efa..092789fea 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -4,176 +4,430 @@ describe Delayed::Command do let(:options) { [] } let(:logger) { double('Logger') } + let(:output_options) { subject.instance_variable_get(:'@options') } + subject { Delayed::Command.new(options) } + + def verify_worker_processes + command = Delayed::Command.new(%w[-d] + options) + allow(FileUtils).to receive(:mkdir_p) + exp.each do |args| + expect(command.send(:launcher)).to receive(:run_process).with(*args).once + end + command.launch + end + + describe 'launch strategy' do + it 'should use daemon mode by default' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new([]).launch + end - subject { Delayed::Command.new options } + it 'should allow forking mode' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) + Delayed::Command.new(%w[--fork]).launch + end + + it 'using multiple switches should use first one' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[-d --fork]).launch + end - before do - allow(Delayed::Worker).to receive(:after_fork) - allow(Dir).to receive(:chdir) - allow(Logger).to receive(:new).and_return(logger) - allow_any_instance_of(Delayed::Worker).to receive(:start) - allow(Delayed::Worker).to receive(:logger=) - allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + it '#daemonize method should always launch in daemon mode' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[--fork]).daemonize + end end - shared_examples_for 'uses --log-dir option' do - context 'when --log-dir is specified' do - let(:options) { ['--log-dir=/custom/log/dir'] } + describe '--min-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:min_priority]).to eq nil } + end - it 'creates the delayed_job.log in the specified directory' do - expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') - subject.run - end + context 'set' do + let(:options) { %w[--min-priority 2] } + it { expect(output_options[:min_priority]).to eq 2 } + end + + context 'not a number' do + let(:options) { %w[--min-priority sponge] } + it { expect(output_options[:min_priority]).to eq nil } end end - describe 'run' do - it 'sets the Delayed::Worker logger' do - expect(Delayed::Worker).to receive(:logger=).with(logger) - subject.run + describe '--max-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:max_priority]).to eq nil } end - context 'when Rails root is defined' do - let(:rails_root) { Pathname.new '/rails/root' } - let(:rails) { double('Rails', :root => rails_root) } + context 'set' do + let(:options) { %w[--max-priority -5] } + it { expect(output_options[:max_priority]).to eq(-5) } + end - before do - stub_const('Rails', rails) - end + context 'not a number' do + let(:options) { %w[--max-priority giraffe] } + it { expect(output_options[:max_priority]).to eq nil } + end + end + + describe '--num-workers arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '-n set' do + let(:options) { %w[-n 2] } + it { expect(output_options[:worker_count]).to eq 2 } + end + + context '-n not a number' do + let(:options) { %w[-n elephant] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '--num-workers set' do + let(:options) { %w[--num-workers 4] } + it { expect(output_options[:worker_count]).to eq 4 } + end - it 'runs the Delayed::Worker process in Rails.root' do - expect(Dir).to receive(:chdir).with(rails_root) - subject.run + context '--num-workers not a number' do + let(:options) { %w[--num_workers hippo] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '--number_of_workers set' do + let(:options) { %w[--number_of_workers 5] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 5 end + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in Rails.root/log' do - expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') - subject.run - end + context '--number-of-workers not a number' do + let(:options) { %w[--number_of_workers rhino] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 1 end + end + end - include_examples 'uses --log-dir option' + describe '--pid-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:pid_dir]).to eq nil } end - context 'when Rails root is not defined' do - let(:rails_without_root) { double('Rails') } + context 'set' do + let(:options) { %w[--pid-dir ./foo/bar] } + it { expect(output_options[:pid_dir]).to eq './foo/bar' } + end - before do - stub_const('Rails', rails_without_root) + context 'worker processes' do + let(:options) { %w[--pid-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './foo/bar', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - it 'runs the Delayed::Worker process in $PWD' do - expect(Dir).to receive(:chdir).with(Delayed::Command::DIR_PWD) - subject.run - end + describe '--log-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:log_dir]).to eq nil } + end + + context 'set' do + let(:options) { %w[--log-dir ./foo/bar] } + it { expect(output_options[:log_dir]).to eq './foo/bar' } + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in $PWD/log' do - expect(Logger).to receive(:new).with("#{Delayed::Command::DIR_PWD}/log/delayed_job.log") - subject.run - end + context 'worker processes' do + let(:options) { %w[--log-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './foo/bar'}]] end + it { verify_worker_processes } + end + end + + describe '--monitor arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:monitor]).to eq false } + end + + context 'set' do + let(:options) { %w[--monitor] } + it { expect(output_options[:monitor]).to eq true } + end + end + + describe '--sleep-delay arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + + context 'set' do + let(:options) { %w[--sleep-delay 5] } + it { expect(output_options[:sleep_delay]).to eq(5) } + end + + context 'not a number' do + let(:options) { %w[--sleep-delay giraffe] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + end + + describe '--read-ahead arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:read_ahead]).to eq nil } + end - include_examples 'uses --log-dir option' + context 'set' do + let(:options) { %w[--read-ahead 5] } + it { expect(output_options[:read_ahead]).to eq(5) } end - context 'when an error is raised' do - let(:test_error) { Class.new(StandardError) } + context 'not a number' do + let(:options) { %w[--read-ahead giraffe] } + it { expect(output_options[:read_ahead]).to eq nil } + end + end + + describe '--identifier arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:identifier]).to eq nil } + end - before do - allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) - allow(subject).to receive(:exit_with_error_status) - allow(STDERR).to receive(:puts) + context '-i set' do + let(:options) { %w[-i bond] } + it { expect(output_options[:identifier]).to eq 'bond' } + end + + context '--identifier set' do + let(:options) { %w[--identifier goldfinger] } + it { expect(output_options[:identifier]).to eq 'goldfinger' } + end + + context 'worker processes' do + let(:options) { %w[--identifier spectre] } + let(:exp) do + [['delayed_job.spectre', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end + + describe '--prefix arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:prefix]).to eq nil } + end + + context '-p set' do + let(:options) { %w[-p oddjob] } + it { expect(output_options[:prefix]).to eq 'oddjob' } + end + + context '--prefix set' do + let(:options) { %w[--prefix jaws] } + it { expect(output_options[:prefix]).to eq 'jaws' } + end + end + + describe '--exit-on-complete arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } + end + + context '-x set' do + let(:options) { %w[-x] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + + context '--exit-on-complete set' do + let(:options) { %w[--exit-on-complete] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + end + + describe '--verbose arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:quiet]).to eq true } + end + + context '-v set' do + let(:options) { %w[-v] } + it { expect(output_options[:quiet]).to eq false } + end - it 'prints the error message to STDERR' do - expect(STDERR).to receive(:puts).with('An error') - subject.run + context '--verbose set' do + let(:options) { %w[--verbose] } + it { expect(output_options[:quiet]).to eq false } + end + + context 'worker processes' do + let(:options) { %w[-v] } + let(:exp) do + [['delayed_job', {:quiet => false, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - it 'exits with an error status' do - expect(subject).to receive(:exit_with_error_status) - subject.run + describe '--queues arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:queues]).to eq nil } + end + + context '--queue set' do + let(:options) { %w[--queue mailers] } + it { expect(output_options[:queues]).to eq %w[mailers] } + end + + context '--queues set' do + let(:options) { %w[--queues mailers,tweets] } + it { expect(output_options[:queues]).to eq %w[mailers tweets] } + end + + context 'worker processes' do + let(:options) { %w[--queues mailers,tweets] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers tweets]}]] end + it { verify_worker_processes } + end + end - context 'when Rails logger is not defined' do - let(:rails) { double('Rails') } + describe '--pool arg' do + context 'multiple --pool args set' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + end - before do - stub_const('Rails', rails) - end + context 'pipe-delimited' do + let(:options) { %w[--pools=*:1|test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + end - it 'does not attempt to use the Rails logger' do - subject.run - end + context 'queues specified as *' do + let(:options) { ['--pool=*:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] end + end - context 'when Rails logger is defined' do - let(:rails_logger) { double('Rails logger') } - let(:rails) { double('Rails', :logger => rails_logger) } + context 'queues not specified' do + let(:options) { ['--pools=:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] + end + end - before do - stub_const('Rails', rails) - end + context 'worker count not specified' do + let(:options) { ['--pool=mailers'] } + it 'should default to one worker' do + expect(output_options[:pools]).to eq [[['mailers'], 1]] + end + end - it 'logs the error to the Rails logger' do - expect(rails_logger).to receive(:fatal).with(test_error) - subject.run - end + context 'worker processes' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + let(:exp) do + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] end + it { verify_worker_processes } end end - describe 'parsing --pool argument' do - it 'should parse --pool correctly' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) - - expect(command.worker_pools).to eq [ - [[], 1], - [['test_queue'], 4], - [%w[mailers misc], 2] - ] + describe '--daemon-options arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } end - it 'should allow * or blank to specify any pools' do - command = Delayed::Command.new(['--pool=*:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + context 'set' do + let(:options) { %w[--daemon-options a,b,c] } + it { expect(subject.instance_variable_get(:'@daemon_options')).to eq %w[a b c] } + end + end - command = Delayed::Command.new(['--pool=:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + describe 'extra args' do + context '--daemon-options not set' do + let(:options) { %w[foo bar baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz] } end - it 'should default to one worker if not specified' do - command = Delayed::Command.new(['--pool=mailers']) - expect(command.worker_pools).to eq [ - [['mailers'], 1], - ] + context '--daemon-options set' do + let(:options) { %w[foo bar --daemon-options a,b,c baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz a b c] } end end - describe 'running worker pools defined by multiple --pool arguments' do - it 'should run the correct worker processes' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) - expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + describe 'validations' do + it 'should launch normally without validations' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d]).launch + end - [ - ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], - ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], - ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] - ].each do |args| - expect(command).to receive(:run_process).with(*args).once - end + it 'raise error num-workers and identifier are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect(STDERR).to_not receive(:puts) + expect { Delayed::Command.new(%w[-d --num-workers=2 --identifier=foobar]).launch }.to raise_error(ArgumentError) + end + + it 'warn if num-workers is 0' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=0]).launch + end + + it 'warn if both queues and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --queues=mailers --pool=mailers:2]).launch + end - command.daemonize + it 'warn if both num-workers and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=2 --pool=mailers:2]).launch end end end diff --git a/spec/delayed/launcher/daemonized_spec.rb b/spec/delayed/launcher/daemonized_spec.rb new file mode 100644 index 000000000..9f9cd5bf6 --- /dev/null +++ b/spec/delayed/launcher/daemonized_spec.rb @@ -0,0 +1,14 @@ +require_relative '../../helper' +require_relative 'shared_examples' +require 'delayed/launcher/daemonized' + +describe Delayed::Launcher::Daemonized do + def verify_worker_processes + exp.each do |args| + expect(subject).to receive(:run_process).with(*args).once + end + subject.launch + end + + it_behaves_like 'launcher shared examples' +end diff --git a/spec/delayed/launcher/forking_spec.rb b/spec/delayed/launcher/forking_spec.rb new file mode 100644 index 000000000..ef1c57993 --- /dev/null +++ b/spec/delayed/launcher/forking_spec.rb @@ -0,0 +1,22 @@ +require_relative '../../helper' +require_relative 'shared_examples' +require 'delayed/launcher/forking' + +describe Delayed::Launcher::Forking do + before do + @workers = [] + allow(Delayed::Worker).to receive(:logger).and_return(Logger.new(nil)) + allow_any_instance_of(described_class).to receive(:run_loop).and_return(nil) + allow_any_instance_of(described_class).to receive(:fork_worker) { |_, *args| @workers << args } + allow_any_instance_of(described_class).to receive(:setup_single_worker) do + @workers << [subject.send(:get_name, subject.send(:process_identifier)), subject.instance_variable_get(:'@options')] + end + end + + def verify_worker_processes + subject.launch + expect(@workers).to eq(exp) + end + + it_behaves_like 'launcher shared examples' +end diff --git a/spec/delayed/launcher/shared_examples.rb b/spec/delayed/launcher/shared_examples.rb new file mode 100644 index 000000000..ab2db7c20 --- /dev/null +++ b/spec/delayed/launcher/shared_examples.rb @@ -0,0 +1,189 @@ +shared_examples_for 'launcher shared examples' do + let(:options) { {} } + subject { described_class.new(options) } + + describe 'run_worker' do + let(:logger) { double('Logger') } + + before do + allow(Delayed::Worker).to receive(:after_fork) + allow(Dir).to receive(:chdir) + allow(Logger).to receive(:new).and_return(logger) + allow_any_instance_of(Delayed::Worker).to receive(:start) + allow(Delayed::Worker).to receive(:logger=) + allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + end + + shared_examples_for 'uses log_dir option' do + context 'when log_dir is specified' do + let(:options) { {:log_dir => '/custom/log/dir'} } + + it 'creates the delayed_job.log in the specified directory' do + expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + it 'sets the Delayed::Worker logger' do + expect(Delayed::Worker).to receive(:logger=).with(logger) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails root is defined' do + let(:rails_root) { Pathname.new '/rails/root' } + let(:rails) { double('Rails', :root => rails_root) } + + before do + stub_const('Rails', rails) + end + + it 'runs the Delayed::Worker process in Rails.root' do + expect(Dir).to receive(:chdir).with(rails_root) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in Rails.root/log' do + expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when Rails root is not defined' do + let(:rails_without_root) { double('Rails') } + + before do + stub_const('Rails', rails_without_root) + end + + it 'runs the Delayed::Worker process in $PWD' do + expect(Dir).to receive(:chdir).with(Pathname.new(Dir.pwd)) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in $PWD/log' do + expect(Logger).to receive(:new).with("#{Pathname.new(Dir.pwd)}/log/delayed_job.log") + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when an error is raised' do + let(:test_error) { Class.new(StandardError) } + + before do + allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) + allow(subject).to receive(:exit_with_error_status) + allow(STDERR).to receive(:puts) + end + + context 'using Delayed::Worker logger' do + before do + expect(logger).to receive(:fatal).with(test_error) + end + + it 'prints the error message to STDERR' do + expect(STDERR).to receive(:puts).with('An error') + subject.send(:run_worker, 'delayed_job.0', {}) + end + + it 'exits with an error status' do + expect(subject).to receive(:exit_with_error_status) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails logger is not defined' do + let(:rails) { double('Rails') } + + before do + stub_const('Rails', rails) + end + + it 'does not attempt to use the Rails logger' do + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + context 'when Rails logger is defined' do + let(:rails_logger) { double('Rails logger') } + let(:rails) { double('Rails', :logger => rails_logger) } + + before do + stub_const('Rails', rails) + allow(Delayed::Worker).to receive(:logger).and_return(nil) + end + + it 'logs the error to the Rails logger' do + expect(rails_logger).to receive(:fatal).with(test_error) + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + end + + describe 'spawning workers' do + context 'no args' do + let(:options) { {} } + let(:exp) do + [['delayed_job', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + + context ':pools arg' do + let(:options) { {:pools => [[[], 1], [['test_queue'], 4], [%w[mailers misc], 2]]} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':queues and :worker_count args' do + let(:options) { {:queues => %w[mailers misc], :worker_count => 4} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':pid_dir and :log_dir args' do + let(:options) { {:pid_dir => './foo/bar', :log_dir => './baz/qux', :worker_count => 2} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ['delayed_job.1', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ] + end + it { verify_worker_processes } + end + + context ':identifier and other args' do + let(:options) { {:monitor => true, :prefix => 'my_prefix', :identifier => 'my_identifier', :worker_count => 2, :args => {:foo => 'bar', :baz => 'qux'}} } + let(:exp) do + [['delayed_job.my_identifier', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + end +end diff --git a/spec/delayed/pool_parser_spec.rb b/spec/delayed/pool_parser_spec.rb new file mode 100644 index 000000000..f677682b1 --- /dev/null +++ b/spec/delayed/pool_parser_spec.rb @@ -0,0 +1,49 @@ +require 'helper' +require 'delayed/pool_parser' + +describe Delayed::PoolParser do + subject { described_class.new } + + describe '#add and #pools' do + it 'parsing pools' do + %w[*:1 test_queue:4 mailers,misc:2].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + + it 'should allow pipe delimiter' do + %w[*:1|test_queue:4 mailers,misc:2|foo,bar:3|baz:4].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2], + [%w[foo bar], 3], + [['baz'], 4], + ] + end + + it 'should allow * to specify any pools' do + subject.add('*:4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should allow blank to specify any pools' do + subject.add(':4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should default to one worker if not specified' do + subject.add('mailers') + expect(subject.pools).to eq [[['mailers'], 1]] + end + end +end From 3401c24191024e55d7f8999dea030dace6ef750e Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Sun, 4 Apr 2021 19:57:29 +0900 Subject: [PATCH 02/20] Cleanup behavior for Command#launch and #daemonize --- lib/delayed/command.rb | 17 ++++--- lib/generators/delayed_job/templates/script | 2 +- spec/delayed/command_spec.rb | 56 ++++++++++++++++++--- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 8cec823e3..9c3aabfa5 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -27,7 +27,7 @@ def launch end def daemonize - @launch_strategy = :daemon + @launch_strategy ||= :daemon launch end @@ -39,10 +39,10 @@ def launcher def launcher_class case @launch_strategy - when :fork - Delayed::Launcher::Forking - else + when :daemon Delayed::Launcher::Daemonized + else + Delayed::Launcher::Forking end end @@ -56,9 +56,13 @@ def option_parser # rubocop:disable MethodLength, CyclomaticComplexity opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7' end - opt.on('-d', '--daemon', 'Launch in daemon mode') do |_fork| + opt.on('-d', '--daemonize', 'Launch in daemon mode') do |_fork| @launch_strategy ||= :daemon end + opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| + @launch_strategy ||= :daemon + @daemon_options = daemon_options + end opt.on('--fork', 'Launch in forking mode') do |_fork| @launch_strategy ||= :fork end @@ -114,9 +118,6 @@ def option_parser # rubocop:disable MethodLength, CyclomaticComplexity opt.on('-x', '--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end - opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| - @daemon_options = daemon_options - end end end diff --git a/lib/generators/delayed_job/templates/script b/lib/generators/delayed_job/templates/script index 4a762ea49..edf195985 100644 --- a/lib/generators/delayed_job/templates/script +++ b/lib/generators/delayed_job/templates/script @@ -2,4 +2,4 @@ require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) require 'delayed/command' -Delayed::Command.new(ARGV).launch +Delayed::Command.new(ARGV).daemonize diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index 092789fea..15d2f9160 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -16,30 +16,68 @@ def verify_worker_processes command.launch end - describe 'launch strategy' do - it 'should use daemon mode by default' do - expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) - expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + describe '#launch' do + it 'should use fork mode by default' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) Delayed::Command.new([]).launch end - it 'should allow forking mode' do + it 'should use fork mode if --fork set' do expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) Delayed::Command.new(%w[--fork]).launch end + it 'should use daemon mode if -d set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[-d]).launch + end + + it 'should use daemon mode if --daemonize set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[--daemonize]).launch + end + it 'using multiple switches should use first one' do expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) Delayed::Command.new(%w[-d --fork]).launch end + end - it '#daemonize method should always launch in daemon mode' do + describe '#daemonize' do + it 'should use daemon mode by default' do expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new([]).daemonize + end + + it 'should use fork mode if --fork set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) Delayed::Command.new(%w[--fork]).daemonize end + + it 'should use daemon mode if -d set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[-d]).daemonize + end + + it 'should use daemon mode if --daemonize set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:launch) + Delayed::Command.new(%w[--daemonize]).daemonize + end + + it 'using multiple switches should use first one' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:launch) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) + Delayed::Command.new(%w[--fork -d]).daemonize + end end describe '--min-priority arg' do @@ -379,11 +417,17 @@ def verify_worker_processes context 'not set' do let(:options) { [] } it { expect(output_options[:exit_on_complete]).to eq nil } + it 'does not affect launch_strategy' do + expect(subject.instance_variable_get(:'@launch_strategy')).to eq nil + end end context 'set' do let(:options) { %w[--daemon-options a,b,c] } it { expect(subject.instance_variable_get(:'@daemon_options')).to eq %w[a b c] } + it 'coerces launch_strategy to :daemon' do + expect(subject.instance_variable_get(:'@launch_strategy')).to eq :daemon + end end end From d328cb85e2b9d3a126885295e252558ff393d03d Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Mon, 5 Apr 2021 10:40:46 +0900 Subject: [PATCH 03/20] Prevent specs from creating stray directories --- spec/delayed/launcher/shared_examples.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/delayed/launcher/shared_examples.rb b/spec/delayed/launcher/shared_examples.rb index ab2db7c20..3307dc38b 100644 --- a/spec/delayed/launcher/shared_examples.rb +++ b/spec/delayed/launcher/shared_examples.rb @@ -1,6 +1,7 @@ shared_examples_for 'launcher shared examples' do let(:options) { {} } subject { described_class.new(options) } + before { allow(FileUtils).to receive(:mkdir_p) } describe 'run_worker' do let(:logger) { double('Logger') } From dd0db79953d00a77f800a110528d1715badcac25 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 00:28:27 +0900 Subject: [PATCH 04/20] Fix rake tasks --- lib/delayed/pool_parser.rb | 1 + lib/delayed/tasks.rb | 6 +++--- spec/delayed/pool_parser_spec.rb | 4 ++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/delayed/pool_parser.rb b/lib/delayed/pool_parser.rb index a33e937c3..38ba7443e 100644 --- a/lib/delayed/pool_parser.rb +++ b/lib/delayed/pool_parser.rb @@ -7,6 +7,7 @@ def add(string) worker_count = (worker_count || 1).to_i rescue 1 pools << [queues, worker_count] end + self end def pools diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 8b2f673ef..f6f9883db 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -28,14 +28,14 @@ queues = (ENV['QUEUES'] || ENV['QUEUE'] || '').split(',') @options[:queues] = queues unless queues.empty? - pools = PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools + pools = Delayed::PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools @options[:pools] = pools unless pools.empty? - if ENV['NUM_WORKERS'] && pools + if ENV['NUM_WORKERS'] && @options[:pools] raise ArgumentError, 'Cannot specify both NUM_WORKERS and POOLS' end - if queues && pools + if @options[:queues] && @options[:pools] raise ArgumentError, 'Cannot specify both QUEUES and POOLS' end diff --git a/spec/delayed/pool_parser_spec.rb b/spec/delayed/pool_parser_spec.rb index f677682b1..43435e90d 100644 --- a/spec/delayed/pool_parser_spec.rb +++ b/spec/delayed/pool_parser_spec.rb @@ -45,5 +45,9 @@ subject.add('mailers') expect(subject.pools).to eq [[['mailers'], 1]] end + + it '#add should return self' do + expect(subject.add('mailers:2|*:4')).to eq subject + end end end From 40d9f0429fc91ca9f57eed8d0bed711380aadbd8 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 01:19:54 +0900 Subject: [PATCH 05/20] More fixes to rake command parsing --- lib/delayed/tasks.rb | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index f6f9883db..de07e8504 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -19,11 +19,13 @@ require 'delayed/pool_parser' @options = { - :worker_count => ENV['NUM_WORKERS'] || 1, - :min_priority => ENV['MIN_PRIORITY'], - :max_priority => ENV['MAX_PRIORITY'], - :quiet => ENV['QUIET'] + :worker_count => ENV['NUM_WORKERS'] ? Integer(ENV['NUM_WORKERS']) : 1, + :quiet => ENV['QUIET'] ? ENV['QUIET'] !~ /\A(?:0|f|false)\z/i : true } + @options[:min_priority] = Integer(ENV['MIN_PRIORITY']) if ENV['MIN_PRIORITY'] + @options[:max_priority] = Integer(ENV['MAX_PRIORITY']) if ENV['MAX_PRIORITY'] + @options[:sleep_delay] = Integer(ENV['SLEEP_DELAY']) if ENV['SLEEP_DELAY'] + @options[:read_ahead] = Integer(ENV['READ_AHEAD']) if ENV['READ_AHEAD'] queues = (ENV['QUEUES'] || ENV['QUEUE'] || '').split(',') @options[:queues] = queues unless queues.empty? @@ -38,9 +40,6 @@ if @options[:queues] && @options[:pools] raise ArgumentError, 'Cannot specify both QUEUES and POOLS' end - - @options[:sleep_delay] = Integer(ENV['SLEEP_DELAY']) if ENV['SLEEP_DELAY'] - @options[:read_ahead] = Integer(ENV['READ_AHEAD']) if ENV['READ_AHEAD'] end desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet." From 0a7145fe0d75fbcb5cf64a6d4c77fb7c73d9ca16 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 02:57:52 +0900 Subject: [PATCH 06/20] - Add tests for Rake tasks - Add validation of min vs. max priority --- lib/delayed/command.rb | 6 + lib/delayed/railtie.rb | 2 +- lib/delayed/{tasks.rb => tasks.rake} | 10 +- spec/delayed/command_spec.rb | 18 ++ spec/delayed/tasks_spec.rb | 266 +++++++++++++++++++++++++++ 5 files changed, 300 insertions(+), 2 deletions(-) rename lib/delayed/{tasks.rb => tasks.rake} (83%) create mode 100644 spec/delayed/tasks_spec.rb diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 9c3aabfa5..144405205 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -127,6 +127,7 @@ def pool_parser def validate_options! validate_worker_count! + validate_priority! validate_identifier! validate_workers_and_pools! validate_queues_and_pools! @@ -137,6 +138,11 @@ def validate_worker_count! STDERR.puts 'WARNING: --num-workers must be 1 or greater. This will raise an ArgumentError in the next major version.' end + def validate_priority! + return unless @options[:min_priority] && @options[:max_priority] && @options[:min_priority] > @options[:max_priority] + STDERR.puts 'WARNING: --min-priority must be less than or equal to --max-priority. This will raise an ArgumentError in the next major version.' + end + def validate_identifier! return unless @options[:identifier] && @options[:worker_count] > 1 raise ArgumentError, 'Cannot specify both --num-workers and --identifier' diff --git a/lib/delayed/railtie.rb b/lib/delayed/railtie.rb index a50ca1b4b..f92948d54 100644 --- a/lib/delayed/railtie.rb +++ b/lib/delayed/railtie.rb @@ -12,7 +12,7 @@ class Railtie < Rails::Railtie end rake_tasks do - load 'delayed/tasks.rb' + load 'delayed/tasks.rake' end end end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rake similarity index 83% rename from lib/delayed/tasks.rb rename to lib/delayed/tasks.rake index de07e8504..ff2e3d1c4 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rake @@ -20,7 +20,7 @@ @options = { :worker_count => ENV['NUM_WORKERS'] ? Integer(ENV['NUM_WORKERS']) : 1, - :quiet => ENV['QUIET'] ? ENV['QUIET'] !~ /\A(?:0|f|false)\z/i : true + :quiet => !!ENV['QUIET'] && ENV['QUIET'] !~ /\A(?:0|f|false)\z/i } @options[:min_priority] = Integer(ENV['MIN_PRIORITY']) if ENV['MIN_PRIORITY'] @options[:max_priority] = Integer(ENV['MAX_PRIORITY']) if ENV['MAX_PRIORITY'] @@ -33,6 +33,14 @@ pools = Delayed::PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools @options[:pools] = pools unless pools.empty? + if @options[:worker_count] < 1 + raise ArgumentError, 'NUM_WORKERS must be 1 or greater' + end + + if @options[:min_priority] && @options[:max_priority] && @options[:min_priority] > @options[:max_priority] + raise ArgumentError, 'MIN_PRIORITY must be less than or equal to MAX_PRIORITY' + end + if ENV['NUM_WORKERS'] && @options[:pools] raise ArgumentError, 'Cannot specify both NUM_WORKERS and POOLS' end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index 15d2f9160..b4a8039b5 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -462,6 +462,24 @@ def verify_worker_processes Delayed::Command.new(%w[-d --num-workers=0]).launch end + it 'not warn if min-priority is less than max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-5 --max-priority=0]).launch + end + + it 'not warn if min-priority equals max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-5 --max-priority=-5]).launch + end + + it 'warn if min-priority is greater than max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-4 --max-priority=-5]).launch + end + it 'warn if both queues and pools are present' do expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:launch) expect(STDERR).to receive(:puts) diff --git a/spec/delayed/tasks_spec.rb b/spec/delayed/tasks_spec.rb new file mode 100644 index 000000000..653b9edf2 --- /dev/null +++ b/spec/delayed/tasks_spec.rb @@ -0,0 +1,266 @@ +require 'helper' +require 'rake' +require 'delayed/launcher/forking' + +describe 'Rake tasks' do + let(:env) { {} } + + before do + stub_const('ENV', env) + Rake.application = Rake::Application.new + Rake.application.rake_require('delayed/tasks', $LOAD_PATH, []) + Rake::Task.define_task(:environment) + end + + describe 'jobs:clear' do + it do + expect(Delayed::Job).to receive(:delete_all) + Rake.application.invoke_task 'jobs:clear' + end + end + + shared_examples_for 'work task' do + let(:expect_success) do + expect(Delayed::Launcher::Forking).to receive(:new).with(default_args.merge(exp_args)).and_call_original + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:launch) + end + + context 'default case' do + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY=-2' do + let(:env) { {'MIN_PRIORITY' => '-2'} } + let(:exp_args) { {:min_priority => -2, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY not a number' do + let(:env) { {'MIN_PRIORITY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'MAX_PRIORITY=-5' do + let(:env) { {'MAX_PRIORITY' => '-5'} } + let(:exp_args) { {:max_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MAX_PRIORITY not a number' do + let(:env) { {'MAX_PRIORITY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'NUM_WORKERS=5' do + let(:env) { {'NUM_WORKERS' => '5'} } + let(:exp_args) { {:quiet => false, :worker_count => 5} } + it do + expect_success + run_task + end + end + + context 'NUM_WORKERS not a number' do + let(:env) { {'NUM_WORKERS' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'SLEEP_DELAY=5' do + let(:env) { {'SLEEP_DELAY' => '5'} } + let(:exp_args) { {:quiet => false, :sleep_delay => 5, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'SLEEP_DELAY not a number' do + let(:env) { {'SLEEP_DELAY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'READ_AHEAD=5' do + let(:env) { {'READ_AHEAD' => '5'} } + let(:exp_args) { {:quiet => false, :read_ahead => 5, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'READ_AHEAD not a number' do + let(:env) { {'READ_AHEAD' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'QUIET=foo' do + let(:env) { {'QUIET' => 'foo'} } + let(:exp_args) { {:quiet => true, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=0' do + let(:env) { {'QUIET' => '0'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=f' do + let(:env) { {'QUIET' => 'f'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=FaLsE' do + let(:env) { {'QUIET' => 'FaLsE'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUE=mailers' do + let(:env) { {'QUEUE' => 'mailers'} } + let(:exp_args) { {:queues => %w[mailers], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUE=mailers,tweets,payments' do + let(:env) { {'QUEUE' => 'mailers,tweets,payments'} } + let(:exp_args) { {:queues => %w[mailers tweets payments], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUES=mailers' do + let(:env) { {'QUEUES' => 'mailers'} } + let(:exp_args) { {:queues => %w[mailers], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUES=mailers,tweets,payments' do + let(:env) { {'QUEUES' => 'mailers,tweets,payments'} } + let(:exp_args) { {:queues => %w[mailers tweets payments], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOL=*:1' do + let(:env) { {'POOL' => '*:1'} } + let(:exp_args) { {:pools => [[[], 1]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOL=*:1|test_queue:4|mailers,misc:2' do + let(:env) { {'POOL' => '*:1|test_queue:4|mailers,misc:2'} } + let(:exp_args) { {:pools => [[[], 1], [%w[test_queue], 4], [%w[mailers misc], 2]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOLS=*:1' do + let(:env) { {'POOL' => '*:1'} } + let(:exp_args) { {:pools => [[[], 1]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOLS=*:1|test_queue:4|mailers,misc:2' do + let(:env) { {'POOL' => '*:1|test_queue:4|mailers,misc:2'} } + let(:exp_args) { {:pools => [[[], 1], [%w[test_queue], 4], [%w[mailers misc], 2]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'NUM_WORKERS=0' do + let(:env) { {'NUM_WORKERS' => '0'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'MIN_PRIORITY less than MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '-5', 'MAX_PRIORITY' => '0'} } + let(:exp_args) { {:max_priority => 0, :min_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY equal to MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '-5', 'MAX_PRIORITY' => '-5'} } + let(:exp_args) { {:max_priority => -5, :min_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY greater than MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '5', 'MAX_PRIORITY' => '0'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'NUM_WORKERS and POOLS' do + let(:env) { {'NUM_WORKERS' => '5', 'POOLS' => 'mailers:2'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'QUEUES and POOLS' do + let(:env) { {'QUEUES' => 'mailers', 'POOLS' => 'mailers:2'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + end + + describe 'jobs:work task' do + let(:run_task) { Rake.application.invoke_task 'jobs:work' } + let(:default_args) { {} } + + it_behaves_like 'work task' + end + + describe 'jobs:workoff' do + let(:run_task) { Rake.application.invoke_task 'jobs:workoff' } + let(:default_args) { {:exit_on_complete => true} } + + it_behaves_like 'work task' + end +end From 9d0d3fecb2ea072ae68d8bfacf3f81d27176a713 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 03:05:16 +0900 Subject: [PATCH 07/20] Make life easy and require all files at boot time --- lib/delayed/command.rb | 3 --- lib/delayed/launcher/daemonized.rb | 2 -- lib/delayed/launcher/forking.rb | 2 -- lib/delayed/tasks.rake | 3 --- lib/delayed_job.rb | 5 +++++ lib/generators/delayed_job/templates/script | 1 - spec/delayed/command_spec.rb | 1 - spec/delayed/launcher/daemonized_spec.rb | 1 - spec/delayed/launcher/forking_spec.rb | 1 - spec/delayed/pool_parser_spec.rb | 1 - spec/delayed/tasks_spec.rb | 1 - 11 files changed, 5 insertions(+), 16 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 144405205..1ea662ba6 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -1,9 +1,6 @@ require 'fileutils' require 'optparse' require 'pathname' -require 'delayed/pool_parser' -require 'delayed/launcher/daemonized' -require 'delayed/launcher/forking' module Delayed class Command # rubocop:disable ClassLength diff --git a/lib/delayed/launcher/daemonized.rb b/lib/delayed/launcher/daemonized.rb index 240c0a764..935963320 100644 --- a/lib/delayed/launcher/daemonized.rb +++ b/lib/delayed/launcher/daemonized.rb @@ -1,5 +1,3 @@ -require 'delayed/launcher/base' - module Delayed module Launcher class Daemonized < Base diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index cf0779b8b..71a6e93ca 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -1,5 +1,3 @@ -require 'delayed/launcher/base' - module Delayed module Launcher class Forking < Base diff --git a/lib/delayed/tasks.rake b/lib/delayed/tasks.rake index ff2e3d1c4..930627106 100644 --- a/lib/delayed/tasks.rake +++ b/lib/delayed/tasks.rake @@ -15,9 +15,6 @@ namespace :jobs do end task :environment_options => :environment do - require 'delayed/launcher/forking' - require 'delayed/pool_parser' - @options = { :worker_count => ENV['NUM_WORKERS'] ? Integer(ENV['NUM_WORKERS']) : 1, :quiet => !!ENV['QUIET'] && ENV['QUIET'] !~ /\A(?:0|f|false)\z/i diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index 5b6da02e7..0fa3b182c 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -11,6 +11,11 @@ require 'delayed/backend/base' require 'delayed/backend/job_preparer' require 'delayed/worker' +require 'delayed/launcher/base' +require 'delayed/launcher/daemonized' +require 'delayed/launcher/forking' +require 'delayed/pool_parser' +require 'delayed/command' require 'delayed/deserialization_error' require 'delayed/railtie' if defined?(Rails::Railtie) diff --git a/lib/generators/delayed_job/templates/script b/lib/generators/delayed_job/templates/script index edf195985..8e189bfb2 100644 --- a/lib/generators/delayed_job/templates/script +++ b/lib/generators/delayed_job/templates/script @@ -1,5 +1,4 @@ #!/usr/bin/env ruby require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) -require 'delayed/command' Delayed::Command.new(ARGV).daemonize diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b4a8039b5..ab9e3e8d6 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -1,5 +1,4 @@ require 'helper' -require 'delayed/command' describe Delayed::Command do let(:options) { [] } diff --git a/spec/delayed/launcher/daemonized_spec.rb b/spec/delayed/launcher/daemonized_spec.rb index 9f9cd5bf6..a12d225b6 100644 --- a/spec/delayed/launcher/daemonized_spec.rb +++ b/spec/delayed/launcher/daemonized_spec.rb @@ -1,6 +1,5 @@ require_relative '../../helper' require_relative 'shared_examples' -require 'delayed/launcher/daemonized' describe Delayed::Launcher::Daemonized do def verify_worker_processes diff --git a/spec/delayed/launcher/forking_spec.rb b/spec/delayed/launcher/forking_spec.rb index ef1c57993..b480c35c2 100644 --- a/spec/delayed/launcher/forking_spec.rb +++ b/spec/delayed/launcher/forking_spec.rb @@ -1,6 +1,5 @@ require_relative '../../helper' require_relative 'shared_examples' -require 'delayed/launcher/forking' describe Delayed::Launcher::Forking do before do diff --git a/spec/delayed/pool_parser_spec.rb b/spec/delayed/pool_parser_spec.rb index 43435e90d..8b20e5a3b 100644 --- a/spec/delayed/pool_parser_spec.rb +++ b/spec/delayed/pool_parser_spec.rb @@ -1,5 +1,4 @@ require 'helper' -require 'delayed/pool_parser' describe Delayed::PoolParser do subject { described_class.new } diff --git a/spec/delayed/tasks_spec.rb b/spec/delayed/tasks_spec.rb index 653b9edf2..b9b8f2ad6 100644 --- a/spec/delayed/tasks_spec.rb +++ b/spec/delayed/tasks_spec.rb @@ -1,6 +1,5 @@ require 'helper' require 'rake' -require 'delayed/launcher/forking' describe 'Rake tasks' do let(:env) { {} } From 246936cd2faf4c9930abc9caf13317d2f2eb0536 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 03:16:01 +0900 Subject: [PATCH 08/20] Revert require of Command --- lib/delayed/command.rb | 2 -- lib/delayed/util.rb | 2 -- lib/delayed_job.rb | 2 ++ lib/generators/delayed_job/templates/script | 1 + spec/delayed/command_spec.rb | 1 + 5 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 1ea662ba6..776948df5 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -1,6 +1,4 @@ -require 'fileutils' require 'optparse' -require 'pathname' module Delayed class Command # rubocop:disable ClassLength diff --git a/lib/delayed/util.rb b/lib/delayed/util.rb index 8ca6a9814..a513bdeac 100644 --- a/lib/delayed/util.rb +++ b/lib/delayed/util.rb @@ -1,5 +1,3 @@ -require 'fileutils' - module Delayed def self.program_name File.basename($PROGRAM_NAME) diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index 0fa3b182c..51874c8bc 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -1,4 +1,6 @@ require 'active_support' +require 'fileutils' +require 'pathname' require 'delayed/util' require 'delayed/compatibility' require 'delayed/exceptions' diff --git a/lib/generators/delayed_job/templates/script b/lib/generators/delayed_job/templates/script index 8e189bfb2..edf195985 100644 --- a/lib/generators/delayed_job/templates/script +++ b/lib/generators/delayed_job/templates/script @@ -1,4 +1,5 @@ #!/usr/bin/env ruby require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment')) +require 'delayed/command' Delayed::Command.new(ARGV).daemonize diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index ab9e3e8d6..b4a8039b5 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -1,4 +1,5 @@ require 'helper' +require 'delayed/command' describe Delayed::Command do let(:options) { [] } From ea83992afa748ccf9634daefbcbd78aba0a77121 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 17:29:21 +0900 Subject: [PATCH 09/20] Fix edge case where queues are nil --- lib/delayed/launcher/forking.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index 71a6e93ca..9702bb4ca 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -58,6 +58,7 @@ def run_loop break if @stop && workers.empty? next if @stop options = @options.merge(:queues => queues) + options.delete(:queues) unless queues add_worker(options) end rescue Errno::ECHILD From 2c87350192694477f4def79a86c28aa9acafac25 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 17:39:22 +0900 Subject: [PATCH 10/20] Improve log message --- lib/delayed/launcher/forking.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index 9702bb4ca..b5d8456e6 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -39,7 +39,7 @@ def add_worker(options) worker_pid = fork_worker(worker_name, options) queues = options[:queues] - queue_msg = " queues=#{queues.join(',')}" unless queues.nil? || queues.empty? + queue_msg = " queues=#{queues.empty? ? '*' : queues.join(',')}" if queues logger.info "Worker #{worker_name} started - pid #{worker_pid}#{queue_msg}" workers[worker_pid] = [worker_name, queues] From 853efa9c143efa1a7bf7697bbedaccf512b69125 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 22:52:19 +0900 Subject: [PATCH 11/20] Fix CI --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c57788de8..19a340b48 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,6 +22,7 @@ jobs: rails_version: - '6.0' - '6.1' + experimental: [false] include: # Older Rails - ruby: '2.2' From ed855a4ac1664cff678742034ec8fe86996c657a Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 22:56:02 +0900 Subject: [PATCH 12/20] Try again --- .github/workflows/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 19a340b48..8c54f4a6c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: jobs: test: runs-on: ubuntu-latest - continue-on-error: ${{ matrix.experimental }} + continue-on-error: ${{ matrix.experimental || false }} strategy: fail-fast: false matrix: @@ -22,7 +22,6 @@ jobs: rails_version: - '6.0' - '6.1' - experimental: [false] include: # Older Rails - ruby: '2.2' From 226d6d8ffc76fd6472a1af8cdf435ebfa712148c Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 23:00:05 +0900 Subject: [PATCH 13/20] Fix spec --- lib/delayed/command.rb | 4 ++-- spec/delayed/command_spec.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 776948df5..6d2a8a157 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -70,8 +70,8 @@ def option_parser # rubocop:disable MethodLength, CyclomaticComplexity opt.on('-n', '--num-workers=workers', 'Number of child workers to spawn') do |n| @options[:worker_count] = Integer(n) rescue 1 end - opt.on('--number-of-workers=workers', 'Number of child workers to spawn') do |n| - STDERR.puts 'DEPRECATED: Use -n or --num-workers instead of --number-of-workers. This will be removed in the next major version.' + opt.on('--number_of_workers=workers', 'Number of child workers to spawn') do |n| + STDERR.puts 'DEPRECATED: Use -n or --num-workers instead of --number_of_workers. This will be removed in the next major version.' @options[:worker_count] = Integer(n) rescue 1 end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b4a8039b5..bc16679a2 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -136,7 +136,7 @@ def verify_worker_processes end context '--num-workers not a number' do - let(:options) { %w[--num_workers hippo] } + let(:options) { %w[--num-workers hippo] } it { expect(output_options[:worker_count]).to eq 1 } end @@ -148,7 +148,7 @@ def verify_worker_processes end end - context '--number-of-workers not a number' do + context '--number_of_workers not a number' do let(:options) { %w[--number_of_workers rhino] } it do expect(STDERR).to receive(:puts) From 7f284daf773f0ea5e4adb7a9f584b870b7a58b56 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Tue, 6 Apr 2021 23:12:41 +0900 Subject: [PATCH 14/20] Add continue-on-error to steps --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c54f4a6c..07def82b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,16 +58,19 @@ jobs: steps: - uses: actions/checkout@v2 - uses: ruby/setup-ruby@v1 + continue-on-error: ${{ matrix.experimental || false }} env: RAILS_VERSION: ${{ matrix.rails_version }} with: ruby-version: ${{ matrix.ruby }} bundler-cache: true # runs 'bundle install' and caches installed gems automatically - name: Run tests + continue-on-error: ${{ matrix.experimental || false }} env: RAILS_VERSION: ${{ matrix.rails_version }} run: bundle exec rspec - name: Coveralls Parallel + continue-on-error: ${{ matrix.experimental || false }} uses: coverallsapp/github-action@master with: github-token: ${{ secrets.github_token }} From 6f225dae2ffe94edb7582fa48334ca26f5242367 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 00:13:28 +0900 Subject: [PATCH 15/20] Fix spec --- spec/delayed/launcher/shared_examples.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/spec/delayed/launcher/shared_examples.rb b/spec/delayed/launcher/shared_examples.rb index 3307dc38b..5fbcdd641 100644 --- a/spec/delayed/launcher/shared_examples.rb +++ b/spec/delayed/launcher/shared_examples.rb @@ -1,16 +1,21 @@ shared_examples_for 'launcher shared examples' do let(:options) { {} } subject { described_class.new(options) } - before { allow(FileUtils).to receive(:mkdir_p) } + + before do + # stub I/O methods + allow(ObjectSpace).to receive(:each_object) + allow(FileUtils).to receive(:mkdir_p) + allow(Dir).to receive(:chdir) + end describe 'run_worker' do let(:logger) { double('Logger') } before do allow(Delayed::Worker).to receive(:after_fork) - allow(Dir).to receive(:chdir) - allow(Logger).to receive(:new).and_return(logger) allow_any_instance_of(Delayed::Worker).to receive(:start) + allow(Logger).to receive(:new).and_return(logger) allow(Delayed::Worker).to receive(:logger=) allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) end From a17c4882f8bbae0968822969cd2107d21f399cf9 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 02:33:39 +0900 Subject: [PATCH 16/20] Ignore process messages from non-workers --- lib/delayed/launcher/forking.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index b5d8456e6..5dae74c93 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -53,6 +53,7 @@ def fork_worker(worker_name, options) def run_loop loop do worker_pid = Process.wait + next unless workers.key?(worker_pid) worker_name, queues = workers.delete(worker_pid) logger.info "Worker #{worker_name} exited - #{$?}" break if @stop && workers.empty? From 0d06a33e6fe11ce71e6d82b91581476a9b8456ed Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 02:43:46 +0900 Subject: [PATCH 17/20] Fix rubocop --- lib/delayed/launcher/forking.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index 5dae74c93..ee702552a 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -50,7 +50,7 @@ def fork_worker(worker_name, options) fork { run_worker(worker_name, options) } end - def run_loop + def run_loop # rubocop:disable CyclomaticComplexity loop do worker_pid = Process.wait next unless workers.key?(worker_pid) @@ -58,8 +58,8 @@ def run_loop logger.info "Worker #{worker_name} exited - #{$?}" break if @stop && workers.empty? next if @stop - options = @options.merge(:queues => queues) - options.delete(:queues) unless queues + options = @options + options = options.merge(:queues => queues) if queues add_worker(options) end rescue Errno::ECHILD From 293522bc3b5891c983ddfcb0b8f61ac7a99e6747 Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 15:14:05 +0900 Subject: [PATCH 18/20] Shutdown after child killed with 30 sec grace period. --- lib/delayed/launcher/forking.rb | 88 ++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index ee702552a..349c4812c 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -1,27 +1,51 @@ module Delayed module Launcher class Forking < Base + KILL_TIMEOUT = 30 + def launch - @stop = !!@options[:exit_on_complete] + @stopped = !!@options[:exit_on_complete] + @killed = false setup_logger - setup_signals + trap_signals Delayed::Worker.before_fork if worker_count > 1 setup_workers run_loop if worker_count > 1 - on_exit + before_graceful_exit + end + + def shutdown(timeout = nil) + @stopped = true + message = " with #{timeout} second grace period" if timeout + logger.info "Shutdown invoked#{message}..." + signal_workers('TERM') + schedule_kill(timeout) if timeout + end + + def kill(exit_status = 0, message = nil) + @stopped = true + @killed = true + message = " #{message}" if message + logger.warn "Kill invoked#{message}..." + signal_workers('KILL') + logger.warn "#{parent_name} exited forcefully#{message} - pid #{$$}" + exit(exit_status) end private - def setup_signals - Signal.trap('INT') do - Thread.new { logger.info('Received SIGINT. Waiting for workers to finish current job...') } - @stop = true - end + def trap_signals + trap_shutdown_signal('INT') + trap_shutdown_signal('TERM') + end - Signal.trap('TERM') do - Thread.new { logger.info('Received SIGTERM. Waiting for workers to finish current job...') } - @stop = true + # Trapped signals are forwarded worker processes. + # Hence it is not necessary to explicitly shutdown workers; + # we only need to stop the run loop. + def trap_shutdown_signal(signal) + Signal.trap(signal) do + Thread.new { logger.info("Received SIG#{signal}. Waiting for workers to finish current job...") } + @stopped = true end end @@ -50,14 +74,26 @@ def fork_worker(worker_name, options) fork { run_worker(worker_name, options) } end - def run_loop # rubocop:disable CyclomaticComplexity + def run_loop # rubocop:disable CyclomaticComplexity, PerceivedComplexity loop do worker_pid = Process.wait next unless workers.key?(worker_pid) worker_name, queues = workers.delete(worker_pid) - logger.info "Worker #{worker_name} exited - #{$?}" - break if @stop && workers.empty? - next if @stop + child_status = $? + logger.info "Worker #{worker_name} exited - #{child_status}" + + # If any child was SIGKILL'ed, we must shutdown all children. + # This first will attempt a graceful SIGTERM of the children, + # followed by a SIGKILL after a timeout period. + if child_status.include?('SIGKILL') && !@killed + @killed = true + logger.warn "Worker #{worker_name} SIGKILL detected. #{parent_name} shutting down..." + shutdown(KILL_TIMEOUT) + next + end + + break if @stopped && workers.empty? + next if @stopped options = @options options = options.merge(:queues => queues) if queues add_worker(options) @@ -66,8 +102,26 @@ def run_loop # rubocop:disable CyclomaticComplexity logger.warn 'No worker processes found' end - def on_exit - logger.info "#{get_name(process_identifier)}#{' (parent)' if worker_count > 1} exited gracefully - pid #{$$}" + def schedule_kill(timeout) + Thread.new do + sleep(timeout) + kill(1, "after #{timeout} second timeout") + end + end + + def signal_workers(signal) + workers.each do |pid, (worker_name, _)| + logger.warn "Sending SIG#{signal} to worker #{worker_name}..." + Process.kill(signal, pid) + end + end + + def before_graceful_exit + logger.info "#{parent_name} exited gracefully - pid #{$$}" + end + + def parent_name + "#{get_name(process_identifier)}#{' (parent)' if worker_count > 1}" end end end From 60ccd9aa88e2be64aa26d198402ed250120946ec Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 16:11:15 +0900 Subject: [PATCH 19/20] Fix bug --- lib/delayed/launcher/forking.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index 349c4812c..955ba5666 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -85,7 +85,7 @@ def run_loop # rubocop:disable CyclomaticComplexity, PerceivedComplexity # If any child was SIGKILL'ed, we must shutdown all children. # This first will attempt a graceful SIGTERM of the children, # followed by a SIGKILL after a timeout period. - if child_status.include?('SIGKILL') && !@killed + if child_status.termsig == 9 && !@killed @killed = true logger.warn "Worker #{worker_name} SIGKILL detected. #{parent_name} shutting down..." shutdown(KILL_TIMEOUT) From 6ab0d172e73b27a6d7c95ea9768566dba1efdb3b Mon Sep 17 00:00:00 2001 From: Johnny Shields Date: Wed, 7 Apr 2021 16:32:44 +0900 Subject: [PATCH 20/20] Cleanup logging --- lib/delayed/launcher/forking.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb index 955ba5666..ebdac1e4b 100644 --- a/lib/delayed/launcher/forking.rb +++ b/lib/delayed/launcher/forking.rb @@ -17,7 +17,7 @@ def launch def shutdown(timeout = nil) @stopped = true message = " with #{timeout} second grace period" if timeout - logger.info "Shutdown invoked#{message}..." + logger.info "Shutdown invoked#{message}" signal_workers('TERM') schedule_kill(timeout) if timeout end @@ -26,7 +26,7 @@ def kill(exit_status = 0, message = nil) @stopped = true @killed = true message = " #{message}" if message - logger.warn "Kill invoked#{message}..." + logger.warn "Kill invoked#{message}" signal_workers('KILL') logger.warn "#{parent_name} exited forcefully#{message} - pid #{$$}" exit(exit_status) @@ -111,7 +111,7 @@ def schedule_kill(timeout) def signal_workers(signal) workers.each do |pid, (worker_name, _)| - logger.warn "Sending SIG#{signal} to worker #{worker_name}..." + logger.info "Sent SIG#{signal} to worker #{worker_name}" Process.kill(signal, pid) end end