diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f78e0b26..07def82b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,46 +9,68 @@ on: jobs: test: runs-on: ubuntu-latest + continue-on-error: ${{ matrix.experimental || false }} strategy: fail-fast: false matrix: - ruby: [2.5, 2.6, 2.7, jruby, jruby-head, ruby-head] + ruby: + - '2.5' + - '2.6' + - '2.7' + - '3.0' + - 'jruby' rails_version: - - '5.2.0' - - '6.0.0' - - '6.1.0.rc2' - - 'edge' + - '6.0' + - '6.1' include: - # - # The past - # - # EOL Active Record - - ruby: 2.2 - rails_version: '3.2.0' - - ruby: 2.1 - rails_version: '4.1.0' - - ruby: 2.4 - rails_version: '4.2.0' - - ruby: 2.4 - rails_version: '5.0.0' - - ruby: 2.5 - rails_version: '5.1.0' - - continue-on-error: ${{ matrix.rails_version == 'edge' || endsWith(matrix.ruby, 'head') }} + # Older Rails + - ruby: '2.2' + rails_version: '3.2' + - ruby: '2.1' + rails_version: '4.1' + - ruby: '2.4' + rails_version: '4.2' + - ruby: '2.4' + rails_version: '5.0' + - ruby: '2.5' + rails_version: '5.1' + - ruby: '2.5' + rails_version: '5.2' + - ruby: 'jruby' + rails_version: '5.2' + # Experimental + - ruby: 'ruby-head' + rails_version: '6.1' + experimental: true + - ruby: 'jruby-head' + rails_version: '6.1' + experimental: true + - ruby: '2.7' + rails_version: 'edge' + experimental: true + - ruby: '3.0' + rails_version: 'edge' + experimental: true + - ruby: 'jruby' + rails_version: 'edge' + experimental: true steps: - uses: actions/checkout@v2 - uses: ruby/setup-ruby@v1 + continue-on-error: ${{ matrix.experimental || false }} env: RAILS_VERSION: ${{ matrix.rails_version }} with: ruby-version: ${{ matrix.ruby }} bundler-cache: true # runs 'bundle install' and caches installed gems automatically - name: Run tests + continue-on-error: ${{ matrix.experimental || false }} env: RAILS_VERSION: ${{ matrix.rails_version }} run: bundle exec rspec - name: Coveralls Parallel + continue-on-error: ${{ matrix.experimental || false }} uses: coverallsapp/github-action@master with: github-token: ${{ secrets.github_token }} diff --git a/.rubocop.yml b/.rubocop.yml index 3b0bd2701..ee021c566 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -127,6 +127,9 @@ SignalException: SpaceInsideHashLiteralBraces: EnforcedStyle: no_space +Style/SpecialGlobalVars: + Enabled: false + Style/SymbolArray: Enabled: false diff --git a/Gemfile b/Gemfile index b83acb5bc..c6a871458 100644 --- a/Gemfile +++ b/Gemfile @@ -2,15 +2,17 @@ source 'https://rubygems.org' gem 'rake' -platforms :ruby do - # Rails 5.1 is the first to work with sqlite 1.4 - # Rails 6 now requires sqlite 1.4 - if ENV['RAILS_VERSION'] && ENV['RAILS_VERSION'] < '5.1' - gem 'sqlite3', '< 1.4' - else - gem 'sqlite3' - end -end +gem 'sqlite3' + +# platforms :ruby do +# # Rails 5.1 is the first to work with sqlite 1.4 +# # Rails 6 now requires sqlite 1.4 +# if ENV['RAILS_VERSION'] && ENV['RAILS_VERSION'] < '5.1' +# gem 'sqlite3', '< 1.4' +# else +# gem 'sqlite3' +# end +# end platforms :jruby do if ENV['RAILS_VERSION'] == '4.2.0' diff --git a/LICENSE.md b/LICENSE.txt similarity index 100% rename from LICENSE.md rename to LICENSE.txt diff --git a/README.md b/README.md index 29ad10782..d3e4a2053 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ you're reading the documentation for the master branch. [View documentation for the latest release (4.1.9).](https://github.com/collectiveidea/delayed_job/tree/v4.1.9)** -Delayed::Job -============ +# Delayed Job + [![Gem Version](https://badge.fury.io/rb/delayed_job.svg)][gem] ![CI](https://github.com/collectiveidea/delayed_job/workflows/CI/badge.svg) [![Code Climate](https://codeclimate.com/github/collectiveidea/delayed_job.svg)][codeclimate] @@ -14,87 +14,72 @@ Delayed::Job [codeclimate]: https://codeclimate.com/github/collectiveidea/delayed_job [coveralls]: https://coveralls.io/r/collectiveidea/delayed_job -Delayed::Job (or DJ) encapsulates the common pattern of asynchronously executing -longer tasks in the background. +Delayed Job encapsulates the common pattern of asynchronously executing +longer tasks in the background. Examples of such tasks include: -It is a direct extraction from Shopify where the job table is responsible for a -multitude of core tasks. Amongst those tasks are: +* Sending emails +* Image resizing +* HTTP downloads +* Updating smart collections +* Updating Solr, our search server, after product changes +* Batch imports +* Spam checks -* sending massive newsletters -* image resizing -* http downloads -* updating smart collections -* updating solr, our search server, after product changes -* batch imports -* spam checks +## Installation -[Follow us on Twitter][twitter] to get updates and notices about new releases. +### Version Support -[twitter]: https://twitter.com/delayedjob +Delayed Job 4.x only supports Rails 3.0+. -Installation -============ -delayed_job 3.0.0 only supports Rails 3.0+. +### Configuring your Database -delayed_job supports multiple backends for storing the job queue. [See the wiki +Delayed Job supports multiple backends for storing the job queue. [See the wiki for other backends](https://github.com/collectiveidea/delayed_job/wiki/Backends). -If you plan to use delayed_job with Active Record, add `delayed_job_active_record` to your `Gemfile`. +To use with Active Record, add `delayed_job_active_record` to your `Gemfile`. ```ruby gem 'delayed_job_active_record' ``` -If you plan to use delayed_job with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. +To use with Mongoid, add `delayed_job_mongoid` to your `Gemfile`. ```ruby gem 'delayed_job_mongoid' ``` -Run `bundle install` to install the backend and delayed_job gems. +Run `bundle install` to install the backend and `delayed_job` gems. The Active Record backend requires a jobs table. You can create that table by running the following command: - rails generate delayed_job:active_record - rake db:migrate - -For Rails 4.2+, see [below](#active-job) +``` +rails generate delayed_job:active_record +rake db:migrate +``` -Development -=========== -In development mode, if you are using Rails 3.1+, your application code will automatically reload every 100 jobs or when the queue finishes. -You no longer need to restart Delayed Job every time you update your code in development. +### Active Job -Active Job -========== -In Rails 4.2+, set the queue_adapter in config/application.rb +To use Delayed Job with Active Job (Rails 4.2+), set the `queue_adapter` in `config/application.rb`: ```ruby config.active_job.queue_adapter = :delayed_job ``` -See the [rails guide](http://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. +See the [Rails Guide](https://guides.rubyonrails.org/active_job_basics.html#setting-the-backend) for more details. -Rails 4.x -========= -If you are using the protected_attributes gem, it must appear before delayed_job in your gemfile. If your jobs are failing with: +### Protected Attributes - ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +When using the `protected_attributes` gem, it must appear before `delayed_job` in your `Gemfile`. Otherwise you will see this error: -then this is the fix you're looking for. - -Upgrading from 2.x to 3.0.0 on Active Record -============================================ -Delayed Job 3.0.0 introduces a new column to the delayed_jobs table. +``` +ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint +``` -If you're upgrading from Delayed Job 2.x, run the upgrade generator to create a migration to add the column. +## Using Delayed Job in your Application - rails generate delayed_job:upgrade - rake db:migrate +### Queuing Jobs -Queuing Jobs -============ Call `.delay.method(params)` on any object and it will be processed in the background. ```ruby @@ -120,7 +105,7 @@ device = Device.new device.deliver ``` -## Parameters +### Parameters `#handle_asynchronously` and `#delay` take these parameters: @@ -165,10 +150,11 @@ class LongTasks end ``` -If you ever want to call a `handle_asynchronously`'d method without Delayed Job, for instance while debugging something at the console, just add `_without_delay` to the method name. For instance, if your original method was `foo`, then call `foo_without_delay`. +To call a `handle_asynchronously`'d method without Delayed Job, add `_without_delay` to the method name. +For instance, if your original method was `foo`, then call `foo_without_delay`. + +### Rails Mailers -Rails Mailers -============= Delayed Job uses special syntax for Rails Mailers. Do not call the `.deliver` method when using `.delay`. @@ -190,108 +176,178 @@ You may also wish to consider using [Active Job with Action Mailer](https://edgeguides.rubyonrails.org/active_job_basics.html#action-mailer) which provides convenient `.deliver_later` syntax that forwards to Delayed Job under-the-hood. -Named Queues -============ -DJ 3 introduces Resque-style named queues while still retaining DJ-style -priority. The goal is to provide a system for grouping tasks to be worked by -separate pools of workers, which may be scaled and controlled individually. +### Queues + +Delayed Job supports assigning jobs to named queues. Each queue may be worked by a +separate pool of workers, which may then be scaled and controlled individually. -Jobs can be assigned to a queue by setting the `queue` option: +Jobs can be assigned to a queue by setting the `:queue` option: ```ruby -object.delay(:queue => 'tracking').method +object.delay(queue: 'tracking').method -Delayed::Job.enqueue job, :queue => 'tracking' +Delayed::Job.enqueue job, queue: 'tracking' -handle_asynchronously :tweet_later, :queue => 'tweets' +handle_asynchronously :tweet_later, queue: 'tweets' ``` -You can configure default priorities for named queues: +You may configure a default priority for each queue (lower number = higher priority): ```ruby Delayed::Worker.queue_attributes = { - high_priority: { priority: -10 }, - low_priority: { priority: 10 } + high_priority_queue: { priority: -10 }, + low_priority_queue: { priority: 10 } } ``` -Configured queue priorities can be overriden by passing priority to the delay method +To override the queue's default priority, pass `:priority` to the delay method: ```ruby -object.delay(:queue => 'high_priority', priority: 0).method +object.delay(queue: 'high_priority_queue', priority: 0).method ``` You can start processes to only work certain queues with the `queue` and `queues` -options defined below. Processes started without specifying a queue will run jobs -from **any** queue. To effectively have a process that runs jobs where a queue is not -specified, set a default queue name with `Delayed::Worker.default_queue_name` and -have the processes run that queue. +options (refer to "Running Jobs" section below.) Processes started without specifying +a queue will run jobs from **any** queue. To effectively have a process that runs +jobs where a queue is not specified, set a default queue name with +`Delayed::Worker.default_queue_name` and have the processes run that queue. + +## Running Jobs + +### Running as a Daemon Process + +`script/delayed_job` starts a background daemon process which will continually work jobs. + +To install this script, add `gem "daemons"` to your `Gemfile` then run `rails generate delayed_job`. + +Then run the `start` command: + +``` +# Run a single worker as a background process +RAILS_ENV=production script/delayed_job start + +# Run 4 workers in separate background child processes +RAILS_ENV=production script/delayed_job -n4 start +``` + +Each worker will check the database at least every 5 seconds. + +### Stopping and Restarting + +You may use `stop` and `restart` commands. These commands wait for each worker +to finish its current job before proceeding. + +``` +# Shutdown all workers and exit +RAILS_ENV=production script/delayed_job stop + +# Shutdown all workers and start a single new worker process +RAILS_ENV=production script/delayed_job restart + +# Shutdown all workers and start 4 new worker processes +RAILS_ENV=production script/delayed_job -n4 restart +``` -Running Jobs -============ -`script/delayed_job` can be used to manage a background process which will -start working off jobs. +You must pass the same arguments to `restart` that you used when calling `start`. -To do so, add `gem "daemons"` to your `Gemfile` and make sure you've run `rails -generate delayed_job`. +You may also send `SIGTERM` to stop Delayed Job. -You can then do the following: +### Worker Queues and Pools - RAILS_ENV=production script/delayed_job start - RAILS_ENV=production script/delayed_job stop +``` +# Set the --queues option to work from a particular queue +RAILS_ENV=production script/delayed_job --queues=mailers,tasks start + +# Use the --pool option to specify a worker pool. You can use this option multiple +# times to start different numbers of workers for different queues. +# The following command will start 1 worker for the tracking queue, +# 2 workers for the mailers and tasks queues, and 2 workers for any jobs: +RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +``` + +### Exit On Complete Mode + +``` +# Run as a daemon and exit after working all available jobs +RAILS_ENV=production script/delayed_job start --exit-on-complete + +# or to run in the foreground +RAILS_ENV=production script/delayed_job run --exit-on-complete +``` - # Runs two workers in separate processes. - RAILS_ENV=production script/delayed_job -n 2 start - RAILS_ENV=production script/delayed_job stop +### Experimental: Forking Mode - # Set the --queue or --queues option to work from a particular queue. - RAILS_ENV=production script/delayed_job --queue=tracking start - RAILS_ENV=production script/delayed_job --queues=mailers,tasks start +The `script/delayed_job` process (described above) uses the +[Daemons](https://github.com/thuehlinger/daemons) gem which has several +undesirable behaviors when running inside a container (Docker, etc.): + +* The parent process spawns a background child process then exits immediately, +which causes the container to shutdown. +* The worker processes are detached from the parent, which prevents logging to `STDOUT`. + +The `--fork` option solves this by running workers in a foreground process tree. +When using `--fork` the `daemons` gem is not required. + +``` +# Run as a foreground process with 4 worker child processes. +RAILS_ENV=production script/delayed_job --fork -n4 start + +# Forking mode supports queues and pools. +RAILS_ENV=production script/delayed_job --fork --pool=mailers,tasks:2 --pool=*:2 start +``` - # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. - # The following command will start 1 worker for the tracking queue, - # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: - RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start +When spawning two or more workers (`-n2` or more), Delayed Job runs in "clustered" mode, +which borrows its code and logic from the [Puma webserver version 5+](https://puma.io/) ❤️. +In summary: - # Runs all available jobs and then exits - RAILS_ENV=production script/delayed_job start --exit-on-complete - # or to run in the foreground - RAILS_ENV=production script/delayed_job run --exit-on-complete +* The parent process spawns a foreground child worker process ("worker 0"). +* Subsequent workers are forked from worker 0, using +[copy-on-write](https://en.wikipedia.org/wiki/Copy-on-write) forking. +* After a configurable timeout (default 1 hour), workers other than worker 0 are +terminated and re-forked from worker 0, one-by-one. This "compacts" -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +Forking mode does not yet support the `restart`, `stop`, etc. script commands. +This is planned to be introduced in the future. +Use `SIGTERM` or `SIGINT` to allow workers to finish their current job then gracefully shutdown. -Workers can be running on any computer, as long as they have access to the -database and their clock is in sync. Keep in mind that each worker will check -the database at least every 5 seconds. +### Running via Rake -You can also invoke `rake jobs:work` which will start working off jobs. You can -cancel the rake task with `CTRL-C`. +You may start a worker process using `rake jobs:work`. +You can exit the rake task with `CTRL-C`. -If you want to just run all available jobs and exit you can use `rake jobs:workoff` +To run all available jobs and exit, use `rake jobs:workoff`. -Work off queues by setting the `QUEUE` or `QUEUES` environment variable. +Set the `QUEUES` or `POOLS` environment variable to work specific queues. - QUEUE=tracking rake jobs:work - QUEUES=mailers,tasks rake jobs:work +``` +# Start a worker listening to all jobs +rake jobs:work -Restarting delayed_job -====================== +# Run 4 worker processes and exit when all jobs are finished +NUM_WORKERS=4 rake jobs:workoff -The following syntax will restart delayed jobs: +# Work all jobs from the "mailers" and "tasks" queues, then exit +NUM_WORKERS=4 QUEUES=mailers,tasks rake jobs:workoff - RAILS_ENV=production script/delayed_job restart +# You may also specify POOLS pipe-delimited +POOLS=mailers,tasks:2|tweets:1|*:2 rake jobs:workoff +``` -To restart multiple delayed_job workers: +Rake uses Forking Mode (see above). - RAILS_ENV=production script/delayed_job -n2 restart +### Development -**Rails 4:** *replace script/delayed_job with bin/delayed_job* +In `development` environment, Delayed Job will automatically reload +your application code after each 100 jobs or when the queue finishes. +You should not need to restart Delayed Job each time you update your code. +## Advanced Topics +### Custom Jobs -Custom Jobs -=========== -Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner. +Jobs are simple ruby objects with a method called `perform`. +Any object which responds to `perform` can be enqueued into the jobs table. +Job objects are serialized to YAML so that they can later be marshalled by the job runner. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -303,7 +359,7 @@ end Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email)) ``` -To set a per-job max attempts that overrides the Delayed::Worker.max_attempts you can define a max_attempts method on the job +To override `Delayed::Worker.max_attempts` per-job, you can define a `max_attempts` instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -317,9 +373,11 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job max run time that overrides the Delayed::Worker.max_run_time you can define a max_run_time method on the job +To override `Delayed::Worker.max_run_time` per-job, you may define a `max_run_time` +instance method in the job class. -NOTE: this can ONLY be used to set a max_run_time that is lower than Delayed::Worker.max_run_time. Otherwise the lock on the job would expire and another worker would start the working on the in progress job. +**NOTE:** You may only set a `max_run_time` that is lower than `Delayed::Worker.max_run_time`. +Otherwise the lock on the job would expire and a second worker would start working the same in-progress job. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -333,7 +391,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a per-job default for destroying failed jobs that overrides the Delayed::Worker.destroy_failed_jobs you can define a destroy_failed_jobs? method on the job +To override `Delayed::Worker.destroy_failed_jobs` per-job, you may define a `destroy_failed_jobs?` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -347,7 +406,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -To set a default queue name for a custom job that overrides Delayed::Worker.default_queue_name, you can define a queue_name method on the job +To override `Delayed::Worker.default_queue_name` per-job, you may define a `queue_name` +instance method in the job class. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -361,7 +421,8 @@ NewsletterJob = Struct.new(:text, :emails) do end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. You can define your own `reschedule_at` method to override this default behavior. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts. +You may define a `reschedule_at` instance method to override this default behavior. ```ruby NewsletterJob = Struct.new(:text, :emails) do @@ -370,17 +431,18 @@ NewsletterJob = Struct.new(:text, :emails) do end def reschedule_at(current_time, attempts) - current_time + 5.seconds + current_time + (attempts * 60).seconds end end ``` -Hooks -===== -You can define hooks on your job that will be called at different stages in the process: +### Hooks +You can define hooks on your job that will be called at different stages in the process: -**NOTE:** If you are using ActiveJob these hooks are **not** available to your jobs. You will need to use ActiveJob's callbacks. You can find details here https://guides.rubyonrails.org/active_job_basics.html#callbacks +**NOTE:** If you are using Active Job these hooks are **not** available to your jobs. +You will need to use Active Job's callbacks. +See the [Rails Guides](https://guides.rubyonrails.org/active_job_basics.html#callbacks) for details. ```ruby class ParanoidNewsletterJob < NewsletterJob @@ -414,9 +476,9 @@ class ParanoidNewsletterJob < NewsletterJob end ``` -Gory Details -============ -The library revolves around a delayed_jobs table which looks as follows: +### Gory Details + +The library revolves around a `delayed_jobs` table which looks as follows: ```ruby create_table :delayed_jobs, :force => true do |table| @@ -433,12 +495,12 @@ create_table :delayed_jobs, :force => true do |table| end ``` -On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` method. +On error, the job is scheduled again in 5 seconds + N ** 4, where N is the number of attempts or using the job's defined `reschedule_at` instance method. -The default `Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. +The default `Delayed::Worker.max_attempts` is 25. After this, the job is either deleted (default), or left in the database with "failed_at" set. With the default of 25 attempts, the last retry will be 20 days later, with the last interval being almost 100 hours. -The default `Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to +The default `Delayed::Worker.max_run_time` is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take. By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set @@ -454,7 +516,7 @@ If no jobs are found, the worker sleeps for the amount of time specified by the It is possible to disable delayed jobs for testing purposes. Set `Delayed::Worker.delay_jobs = false` to execute all jobs realtime. -Or `Delayed::Worker.delay_jobs` can be a Proc that decides whether to execute jobs inline on a per-job basis: +`Delayed::Worker.delay_jobs` may also be a `Proc` that decides whether to execute jobs inline on a per-job basis: ```ruby Delayed::Worker.delay_jobs = ->(job) { @@ -462,7 +524,13 @@ Delayed::Worker.delay_jobs = ->(job) { } ``` -You may need to raise exceptions on SIGTERM signals, `Delayed::Worker.raise_signal_exceptions = :term` will cause the worker to raise a `SignalException` causing the running job to abort and be unlocked, which makes the job available to other workers. The default for this option is false. +In Daemonized mode, `Delayed::Worker.raise_signal_exceptions = :term` +will cause the worker to raise a `SignalException` causing the running job to abort and be unlocked, +which makes the job available to other workers. +The default for this option is false. + +In Forking mode, `Delayed::Worker.raise_signal_exceptions` set to any truthy +value will raise an exception only on `SIGTERM`. Exceptions are not raised on `SIGINT`. Here is an example of changing job parameters in Rails: @@ -479,12 +547,21 @@ Delayed::Worker.raise_signal_exceptions = :term Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log')) ``` -Cleaning up -=========== +### Cleaning up + You can invoke `rake jobs:clear` to delete all jobs in the queue. -Having problems? -================ +### Having problems? + Good places to get help are: -* [Google Groups](http://groups.google.com/group/delayed_job) where you can join our mailing list. -* [StackOverflow](http://stackoverflow.com/questions/tagged/delayed-job) + +* [Google Groups](https://groups.google.com/group/delayed_job) where you can join our mailing list. +* [StackOverflow](https://stackoverflow.com/questions/tagged/delayed-job) + +### License & Attribution + +Delayed Job is licensed under the [MIT License](LICENSE.txt). + +Delayed Job was originally extracted from [Shopify](https://www.shopify.com/). + +Code related to worker clustering is lovingly ❤️ borrowed from the [Puma webserver](https://puma.io/) diff --git a/bin/delayed_job b/bin/delayed_job new file mode 100644 index 000000000..81367bdd2 --- /dev/null +++ b/bin/delayed_job @@ -0,0 +1,4 @@ +#!/usr/bin/env ruby + +require 'delayed/command' +Delayed::Command.new(ARGV).run diff --git a/delayed_job.gemspec b/delayed_job.gemspec index c9b60a781..08c55a2cc 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -1,19 +1,20 @@ -# -*- encoding: utf-8 -*- +require_relative 'lib/delayed/version' Gem::Specification.new do |spec| spec.add_dependency 'activesupport', ['>= 3.0', '< 6.2'] spec.authors = ['Brandon Keepers', 'Brian Ryckbost', 'Chris Gaffney', 'David Genord II', 'Erik Michaels-Ober', 'Matt Griffin', 'Steve Richert', 'Tobias Lütke'] - spec.description = 'Delayed_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.' + spec.description = 'Delayed Job encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.' spec.email = ['brian@collectiveidea.com'] - spec.files = %w[CHANGELOG.md CONTRIBUTING.md LICENSE.md README.md Rakefile delayed_job.gemspec] - spec.files += Dir.glob('{contrib,lib,recipes,spec}/**/*') # rubocop:disable SpaceAroundOperators + spec.files = %w[CHANGELOG.md CONTRIBUTING.md LICENSE.txt README.md Rakefile delayed_job.gemspec] + spec.files += Dir.glob('{contrib,bin,lib,recipes,spec}/**/*') # rubocop:disable SpaceAroundOperators + spec.executables = ['delayed_job'] spec.homepage = 'http://github.com/collectiveidea/delayed_job' spec.licenses = ['MIT'] spec.name = 'delayed_job' spec.require_paths = ['lib'] spec.summary = 'Database-backed asynchronous priority queue system -- Extracted from Shopify' spec.test_files = Dir.glob('spec/**/*') - spec.version = '4.1.9' + spec.version = Delayed::VERSION spec.metadata = { 'changelog_uri' => 'https://github.com/collectiveidea/delayed_job/blob/master/CHANGELOG.md', 'bug_tracker_uri' => 'https://github.com/collectiveidea/delayed_job/issues', diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..233c72538 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -1,33 +1,50 @@ -unless ENV['RAILS_ENV'] == 'test' - begin - require 'daemons' - rescue LoadError - raise "You need to add gem 'daemons' to your Gemfile if you wish to use it." - end -end -require 'fileutils' require 'optparse' -require 'pathname' module Delayed class Command # rubocop:disable ClassLength - attr_accessor :worker_count, :worker_pools - - DIR_PWD = Pathname.new Dir.pwd - - def initialize(args) # rubocop:disable MethodLength + def initialize(args) @options = { :quiet => true, - :pid_dir => "#{root}/tmp/pids", - :log_dir => "#{root}/log" + :worker_count => 1, + :monitor => false } - @worker_count = 1 - @monitor = false + @options[:args] = option_parser.parse!(args) + (@daemon_options || []) + + pools = pool_parser.pools + @options[:pools] = pools unless pools.empty? + + validate_options! + end + + def run + launcher.run + end + alias_method :launch, :run - opts = OptionParser.new do |opt| - opt.banner = "Usage: #{File.basename($PROGRAM_NAME)} [options] start|stop|restart|run" + def daemonize + @launch_strategy ||= :daemon + run + end + + private + def launcher + @launcher ||= launcher_class.new(@options) + end + + def launcher_class + case @launch_strategy + when :daemon + Delayed::Launcher::Daemonized + else + Delayed::Launcher::Forking + end + end + + def option_parser # rubocop:disable MethodLength, CyclomaticComplexity + @option_parser ||= OptionParser.new do |opt| + opt.banner = "Usage: #{Delayed.program_name} [options] start|stop|restart|run" opt.on('-h', '--help', 'Show this message') do puts opt exit 1 @@ -35,14 +52,28 @@ def initialize(args) # rubocop:disable MethodLength opt.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |_e| STDERR.puts 'The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/7' end + opt.on('-d', '--daemonize', 'Launch in daemon mode') do |_fork| + @launch_strategy ||= :daemon + end + opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| + @launch_strategy ||= :daemon + @daemon_options = daemon_options + end + opt.on('--fork', 'Launch in forking mode') do |_fork| + @launch_strategy ||= :fork + end opt.on('--min-priority N', 'Minimum priority of jobs to run.') do |n| - @options[:min_priority] = n + @options[:min_priority] = Integer(n) rescue nil end opt.on('--max-priority N', 'Maximum priority of jobs to run.') do |n| - @options[:max_priority] = n + @options[:max_priority] = Integer(n) rescue nil end - opt.on('-n', '--number_of_workers=workers', 'Number of unique workers to spawn') do |worker_count| - @worker_count = worker_count.to_i rescue 1 + opt.on('-n', '--num-workers=workers', 'Number of child workers to spawn') do |n| + @options[:worker_count] = Integer(n) rescue 1 + end + opt.on('--number_of_workers=workers', 'Number of child workers to spawn') do |n| + STDERR.puts 'DEPRECATED: Use -n or --num-workers instead of --number_of_workers. This will be removed in the next major version.' + @options[:worker_count] = Integer(n) rescue 1 end opt.on('--pid-dir=DIR', 'Specifies an alternate directory in which to store the process ids.') do |dir| @options[:pid_dir] = dir @@ -50,123 +81,77 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--log-dir=DIR', 'Specifies an alternate directory in which to store the delayed_job log.') do |dir| @options[:log_dir] = dir end + opt.on('-v', '--verbose', 'Output additional logging') do + @options[:quiet] = false + end opt.on('-i', '--identifier=n', 'A numeric identifier for the worker.') do |n| @options[:identifier] = n end opt.on('-m', '--monitor', 'Start monitor process.') do - @monitor = true + @options[:monitor] = true end opt.on('--sleep-delay N', 'Amount of time to sleep when no jobs are found') do |n| - @options[:sleep_delay] = n.to_i + @options[:sleep_delay] = Integer(n) rescue nil end opt.on('--read-ahead N', 'Number of jobs from the queue to consider') do |n| - @options[:read_ahead] = n + @options[:read_ahead] = Integer(n) rescue nil end opt.on('-p', '--prefix NAME', 'String to be prefixed to worker process names') do |prefix| @options[:prefix] = prefix end - opt.on('--queues=queues', 'Specify which queue DJ must look up for jobs') do |queues| + opt.on('--queues=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queues| @options[:queues] = queues.split(',') end - opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| + opt.on('--queue=queue1[,queue2]', 'Specify the job queues to work. Comma separated.') do |queue| @options[:queues] = queue.split(',') end + opt.on('--pools=queue1[,queue2][:worker_count][|...]', 'Specify queues and number of workers for a worker pool. Use pipe to delimit multiple pools.') do |pools| + pool_parser.add(pools) + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| - parse_worker_pool(pool) + pool_parser.add(pool) end - opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do + opt.on('-x', '--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do @options[:exit_on_complete] = true end - opt.on('--daemon-options a, b, c', Array, 'options to be passed through to daemons gem') do |daemon_options| - @daemon_options = daemon_options - end end - @args = opts.parse!(args) + (@daemon_options || []) end - def daemonize # rubocop:disable PerceivedComplexity - dir = @options[:pid_dir] - FileUtils.mkdir_p(dir) unless File.exist?(dir) - - if worker_pools - setup_pools - elsif @options[:identifier] - # rubocop:disable GuardClause - if worker_count > 1 - raise ArgumentError, 'Cannot specify both --number-of-workers and --identifier' - else - run_process("delayed_job.#{@options[:identifier]}", @options) - end - # rubocop:enable GuardClause - else - worker_count.times do |worker_index| - process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}" - run_process(process_name, @options) - end - end + def pool_parser + @pool_parser ||= PoolParser.new end - def setup_pools - worker_index = 0 - @worker_pools.each do |queues, worker_count| - options = @options.merge(:queues => queues) - worker_count.times do - process_name = "delayed_job.#{worker_index}" - run_process(process_name, options) - worker_index += 1 - end - end - end - - def run_process(process_name, options = {}) - Delayed::Worker.before_fork - Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| - $0 = File.join(options[:prefix], process_name) if @options[:prefix] - run process_name, options - end + def validate_options! + validate_worker_count! + validate_priority! + validate_identifier! + validate_workers_and_pools! + validate_queues_and_pools! end - def run(worker_name = nil, options = {}) - Dir.chdir(root) - - Delayed::Worker.after_fork - Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) - - worker = Delayed::Worker.new(options) - worker.name_prefix = "#{worker_name} " - worker.start - rescue => e - STDERR.puts e.message - STDERR.puts e.backtrace - ::Rails.logger.fatal(e) if rails_logger_defined? - exit_with_error_status - end - - private - - def parse_worker_pool(pool) - @worker_pools ||= [] - - queues, worker_count = pool.split(':') - queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') - worker_count = (worker_count || 1).to_i rescue 1 - @worker_pools << [queues, worker_count] + def validate_worker_count! + return unless @options[:worker_count] < 1 + STDERR.puts 'WARNING: --num-workers must be 1 or greater. This will raise an ArgumentError in the next major version.' end - def root - @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD + def validate_priority! + return unless @options[:min_priority] && @options[:max_priority] && @options[:min_priority] > @options[:max_priority] + STDERR.puts 'WARNING: --min-priority must be less than or equal to --max-priority. This will raise an ArgumentError in the next major version.' end - def rails_root_defined? - defined?(::Rails.root) + def validate_identifier! + return unless @options[:identifier] && @options[:worker_count] > 1 + raise ArgumentError, 'Cannot specify both --num-workers and --identifier' end - def rails_logger_defined? - defined?(::Rails.logger) + def validate_workers_and_pools! + return unless @options[:worker_count] > 1 && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --num-workers and --pool. This will raise an ArgumentError in the next major version.' end - def exit_with_error_status - exit 1 + def validate_queues_and_pools! + return unless @options[:queues] && @options[:pools] + STDERR.puts 'WARNING: Cannot specify both --queues and --pool. This will raise an ArgumentError in the next major version.' end end end diff --git a/lib/delayed/launcher/child.rb b/lib/delayed/launcher/child.rb new file mode 100644 index 000000000..ef68e1f9f --- /dev/null +++ b/lib/delayed/launcher/child.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +module Delayed + module Launcher + + # This class is instantiated by the `Delayed::Launcher::Cluster` and + # represents a single child worker process. + # + # At the core of this class is running an instance of `Delayed::Worker` + # which gets created via the `start_worker` method. + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Cluster::Worker` + class Child + attr_reader :index, + :parent, + :logger + + def initialize(index, parent, logger, options, pipes, worker = nil) + @index = index + @parent = parent + @logger = logger + @options = options + @check_pipe = pipes[:check_pipe] + @child_write = pipes[:child_write] + @fork_pipe = pipes[:fork_pipe] + @wakeup = pipes[:wakeup] + @worker = worker + end + + def run + title = "delayed: cluster child #{index}: #{parent}" + title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty? + $0 = title + + Signal.trap 'SIGINT', 'IGNORE' + Signal.trap 'SIGCHLD', 'DEFAULT' + + Thread.new do + Delayed.set_thread_name 'wrkr check' + @check_pipe.wait_readable + logger.warn '! Detected parent died, dying' + exit! 1 + end + + report_bundler_info + + # Invoke any child boot hooks so they can get + # things in shape before booting the app. + # @launcher.config.run_hooks :before_child_boot, index, @launcher.events + + begin + worker = @worker ||= create_worker + rescue Exception => e + logger.warn '! Unable to start worker' + logger.warn e.backtrace[0] + exit 1 + end + + restart_worker = Queue.new << true << false + + setup_child_zero(worker, restart_worker) if index == 0 + + Signal.trap 'SIGTERM' do + @child_write << "e#{Process.pid}\n" rescue nil + restart_worker.clear + worker.stop + restart_worker << false + end + + begin + @child_write << "b#{Process.pid}:#{index}\n" + rescue SystemCallError, IOError + Delayed.purge_interrupt_queue + STDERR.puts 'Master seems to have exited, exiting.' + return + end + + while restart_worker.pop + worker_thread = worker.start(true) + stat_thread ||= Thread.new(@child_write) do |io| + Delayed.set_thread_name 'stat pld' + while true + begin + io << "p#{Process.pid}\n" + rescue IOError + Delayed.purge_interrupt_queue + break + end + sleep @options[:worker_check_interval] + end + end + worker_thread.join + end + + # Invoke any child shutdown hooks so they can prevent the child + # exiting until any background operations are completed + # @launcher.config.run_hooks :before_child_shutdown, index, @launcher.events + ensure + @child_write << "t#{Process.pid}\n" rescue nil + @child_write.close + end + + private + + def create_worker + Delayed::Worker.new(@options) + end + + def setup_child_zero(worker, restart_worker) + restart_worker.clear + child_pids = [] + Signal.trap 'SIGCHLD' do + wakeup! if child_pids.reject! do |p| + Process.wait(p, Process::WNOHANG) rescue true + end + end + + Thread.new do + Delayed.set_thread_name 'wrkr fork' + while (idx = @fork_pipe.gets) + idx = idx.to_i + if idx == -1 # stop worker + if restart_worker.length > 0 + restart_worker.clear + worker.begin_restart(true) + # @launcher.config.run_hooks :before_refork, nil, @launcher.events + Delayed.nakayoshi_gc(logger) + end + elsif idx == 0 # restart worker + restart_worker << true << false + else # fork child + child_pids << pid = spawn_child(idx) + @child_write << "f#{pid}:#{idx}\n" rescue nil + end + end + end + end + + def spawn_child(idx) + Delayed::Worker.before_fork + + pid = fork do + new_child = Child.new(idx, + parent, + logger, + @options, + { check_pipe: @check_pipe, + child_write: @child_write }, + @worker) + new_child.run + end + + unless pid + logger.warn '! Complete inability to spawn new child processes detected' + logger.warn '! Seppuku is the only choice.' + exit! 1 + end + + Delayed::Worker.after_fork + pid + end + + # If we're not running under a Bundler context, then + # report the info about the context we will be using + def report_bundler_info + return if ENV['BUNDLE_GEMFILE'] + if File.exist?('Gemfile') + logger.info "+ Gemfile in context: #{File.expand_path('Gemfile')}" + elsif File.exist?('gems.rb') + logger.info "+ Gemfile in context: #{File.expand_path('gems.rb')}" + end + end + + def wakeup! + return if !@wakeup || @wakeup.closed? + @wakeup.write('!') + rescue SystemCallError, IOError + Delayed.purge_interrupt_queue + end + end + end +end diff --git a/lib/delayed/launcher/child_handle.rb b/lib/delayed/launcher/child_handle.rb new file mode 100644 index 000000000..7d747170e --- /dev/null +++ b/lib/delayed/launcher/child_handle.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +module Delayed + module Launcher + + # This class represents a child worker process from the perspective of the + # Delayed Job parent process. It contains information about the process and + # its health, and it exposes methods to control the process via IPC. It does not + # include the actual logic executed by the child worker process itself. + # For that, see `Delayed::Launcher::Child`. + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Cluster::WorkerHandle` + class ChildHandle + def initialize(idx, pid, phase, options) + @index = idx + @pid = pid + @phase = phase + @stage = :started + @signal = 'TERM' + @options = options + @first_term_sent = nil + @started_at = Time.now + @last_checkin = Time.now + @last_status = nil + @term = false + end + + attr_accessor :pid, + :phase + + attr_reader :index, + :signal, + :last_checkin, + :last_status, + :started_at + + def booted? + @stage == :booted + end + + def uptime + Time.now - started_at + end + + def boot! + @last_checkin = Time.now + @stage = :booted + end + + def term! + @term = true + end + + def term? + @term + end + + def ping! + @last_checkin = Time.now + @last_status = :ok + end + + # @see Cluster#check_workers + def ping_timeout + @last_checkin + (booted? ? @options[:worker_timeout] : @options[:worker_boot_timeout]) + end + + def term + begin + if @first_term_sent && (Time.now - @first_term_sent) > @options[:worker_shutdown_timeout] + @signal = 'KILL' + else + @term ||= true + @first_term_sent ||= Time.now + end + Process.kill @signal, @pid if @pid + rescue Errno::ESRCH + end + end + + def kill + @signal = 'KILL' + term + end + + def hup + Process.kill 'HUP', @pid + rescue Errno::ESRCH + end + end + end +end diff --git a/lib/delayed/launcher/cluster.rb b/lib/delayed/launcher/cluster.rb new file mode 100644 index 000000000..031de86f9 --- /dev/null +++ b/lib/delayed/launcher/cluster.rb @@ -0,0 +1,589 @@ +# frozen_string_literal: true + +require 'delayed/launcher/runner' +require 'delayed/launcher/child_handle' +require 'delayed/launcher/child' + +module Delayed + module Launcher + + # This class is instantiated by `Delayed::Launcher` and used + # to boot and run a Delayed Job application using multiple child + # worker processes. For example: `$ delayed_job -n5` + # + # An instance of this class will spawn the number of processes passed in + # via the `spawn_childs` method call. Each child will have it's own + # instance of a `Delayed::Worker`. + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Cluster` + class Cluster < Runner + attr_accessor :child_count + + # DEFAULT_FORK_WORKER_JOBS = 1000 + + def initialize(launcher, options) + check_fork_supported! + + @started_at = Time.now + @child_handles = [] + @child_index = 0 + @child_count = options.delete(:worker_count) || raise(':worker_count required') + @next_check = Time.now + @phase = 0 + @phased_restart = false + @last_phased_restart = @started_at + + super + end + + # TODO: NOT NEEDED + # def restart + # @restart = true + # stop + # # TODO: parent needs to restart + # end + + def phased_restart + @phased_restart = true + wakeup! + end + + def stop + @status = :stop + wakeup! + end + + def stop_blocked + @status = :stop if @status == :run + wakeup! + Process.waitall + end + + def halt + @status = :halt + wakeup! + end + + def run + @status = :run + + output_header(mode) + + # This is aligned with Runner#output_header + logger.info "* Workers: #{@child_count}" + # logger.info "* Restarts: (\u2714) hot (\u2714) phased" + + setup_pipes + setup_signals + setup_auto_fork_once + spawn_childs + run_loop + end + + private + + def mode + 'cluster' + end + + def run_loop + booted = false + in_phased_restart = false + childs_not_booted = @child_count + + while @status == :run + begin + if @phased_restart + start_phased_restart + @phased_restart = false + in_phased_restart = true + childs_not_booted = @child_count + end + + check_childs + + if @parent_read.wait_readable([0, @next_check - Time.now].max) + req = @parent_read.read_nonblock(1) + + @next_check = Time.now if req == '!' + next if !req || req == '!' + + result = @parent_read.gets + pid = result.to_i + + if req == 'b' || req == 'f' + pid, idx = result.split(':').map(&:to_i) + handle = @child_handles.find { |h| h.index == idx } + handle.pid = pid if handle.pid.nil? + end + + if handle = @child_handles.find { |h| h.pid == pid } + case req + when 'b' + handle.boot! + logger.info "- Worker #{handle.index} (PID: #{pid}) booted in #{handle.uptime.round(2)}s, phase: #{handle.phase}" + @next_check = Time.now + childs_not_booted -= 1 + when 'e' + # external term, see child method, Signal.trap "SIGTERM" + handle.term! + when 't' + handle.term unless handle.term? + when 'p' + handle.ping! + events.fire(:ping, handle) + if !booted && @child_handles.all? { |h| h.last_status == :ok } + events.fire(:on_booted) + booted = true + end + end + else + logger.info "! Out-of-sync child list, no #{pid} child" + end + end + + if in_phased_restart && childs_not_booted.zero? + events.fire(:on_booted) + in_phased_restart = false + end + + rescue Interrupt + @status = :stop + end + end + + stop_childs unless @status == :halt + ensure + @check_pipe.close + @suicide_pipe.close + @parent_read.close + @wakeup.close + end + + def start_phased_restart + events.fire(:on_restart) + @phase += 1 + @last_phased_restart = Time.now + logger.info "- Starting phased worker restart, phase: #{@phase}" + + # Be sure to change the directory again before loading + # the app. This way we can pick up new code. + dir = Delayed.root # @launcher.restart_dir + logger.info "+ Changing to #{dir}" + Dir.chdir dir + end + + def stop_childs + logger.info '- Gracefully shutting down workers...' + @child_handles.each(&:term) + + begin + loop do + wait_childs + break if @child_handles.reject { |h| h.pid.nil? }.empty? + sleep 0.2 + end + rescue Interrupt + logger.warn '! Cancelled waiting for workers' + end + end + + def wakeup! + return if !@wakeup || @wakeup.closed? + @wakeup.write('!') + rescue SystemCallError, IOError + Delayed.purge_interrupt_queue + end + + def fork_child_zero! + if (handle = @child_handles.find { |h| h.index == 0 }) + handle.phase += 1 + end + phased_restart + end + + def setup_pipes + @parent_read, @child_write = IO.pipe + @wakeup = @child_write + + # Used by the childs to detect if the parent process dies. + # If select says that @check_pipe is ready, it's because the + # parent has exited and @suicide_pipe has been automatically closed. + @check_pipe, @suicide_pipe = IO.pipe + + # Separate pipe used by child 0 to receive commands to + # fork new child processes. + @fork_pipe, @fork_writer = IO.pipe + end + + def setup_signals + setup_signal_int + setup_signal_term + setup_signal_wakeup + setup_signal_increment + setup_signal_decrement + setup_signal_fork_child_zero + logger.info 'Use Ctrl-C to stop' + end + + def setup_signal_wakeup + Signal.trap('SIGCHLD') { wakeup! } + end + + def setup_signal_increment + Signal.trap('TTIN') do + increment_child_count + wakeup! + end + end + + def setup_signal_decrement + Signal.trap('TTOU') do + decrement_child_count + wakeup! + end + end + + def setup_signal_fork_child_zero + Signal.trap('SIGURG') { fork_child_zero! } + end + + def setup_signal_int + Signal.trap('SIGINT') { stop } + end + + # Trapped signals are forwarded child processes. + # Hence it is not necessary to explicitly shutdown childs; + # we only need to stop the run loop. + def setup_signal_term + parent_pid = Process.pid + + Signal.trap 'SIGTERM' do + # The worker installs their own SIGTERM when booted. + # Until then, this is run by the worker and the worker + # should just exit if they get it. + if Process.pid != parent_pid + logger.info 'Early termination of worker' + exit! 0 + else + stop_childs + stop + events.fire(:on_stopped) + raise(SignalException, 'SIGTERM') if Delayed::Worker.raise_signal_exceptions + exit 0 # Clean exit, workers were stopped + end + end + end + + def increment_child_count + @child_count += 1 + end + + def decrement_child_count + @child_count -= 1 if @child_count >= 2 + end + + def setup_auto_fork_once + refork_delay = @options[:worker_refork_delay] + return unless refork_delay + events.register(:ping) do |handle| + if handle.index == 0 && handle.phase == 0 + time_exceeded = refork_delay && Time.now > @last_phased_restart + refork_delay + fork_child_zero! if time_exceeded + end + end + end + + def all_childs_booted? + @child_handles.count { |h| !h.booted? } == 0 + end + + def check_childs + return if @next_check >= Time.now + + @next_check = Time.now + @options[:worker_check_interval] + + timeout_childs + wait_childs + cull_childs + spawn_childs + phase_out_childs + + @next_check = [ + @child_handles.reject(&:term?).map(&:ping_timeout).min, + @next_check + ].compact.min + end + + def timeout_childs + @child_handles.each do |handle| + next unless !handle.term? && handle.ping_timeout <= Time.now + details = if handle.booted? + "(worker failed to check in within #{@options[:worker_timeout]} seconds)" + else + "(worker failed to boot within #{@options[:worker_boot_timeout]} seconds)" + end + logger.info "! Terminating timed out worker #{details}: #{handle.pid}" + handle.kill + end + end + + # loops thru @child_handles, removing childs that exited, + # and calling `#term` if needed + def wait_childs + @child_handles.reject! do |handle| + begin + next false if handle.pid.nil? + if Process.wait(handle.pid, Process::WNOHANG) + true + else + handle.term if handle.term? + nil + end + rescue Errno::ECHILD + begin + Process.kill(0, handle.pid) + # child still alive but has another parent (e.g., using fork_child) + handle.term if handle.term? + false + rescue Errno::ESRCH, Errno::EPERM + true # child is already terminated + end + end + end + end + + def cull_childs + diff = @child_handles.size - @child_count + return if diff < 1 + + logger.debug "Culling #{diff.inspect} workers" + + handles_to_cull = + case @options[:worker_culling_strategy] + when :youngest + @child_handles.sort_by(&:started_at)[-diff, diff] + when :oldest + @child_handles.sort_by(&:started_at)[0, diff] + end + logger.debug "Workers to cull: #{handles_to_cull.inspect}" + + handles_to_cull.each do |handle| + logger.info "- Worker #{handle.index} (PID: #{handle.pid}) terminating" + handle.term + end + end + + def spawn_childs + diff = @child_count - @child_handles.size + return if diff < 1 + + parent = Process.pid + @fork_writer << "-1\n" + + diff.times do + idx = next_child_index + + if idx != 0 + @fork_writer << "#{idx}\n" + pid = nil + else + pid = spawn_child(idx, parent) + end + + logger.debug "Spawned worker: #{pid}" + @child_handles << ChildHandle.new(idx, pid, @phase, @options) + end + + if @child_handles.all? { |h| h.phase == @phase } + @fork_writer << "0\n" + end + end + + def next_child_index + occupied_positions = @child_handles.map(&:index) + idx = 0 + idx += 1 until !occupied_positions.include?(idx) + idx + end + + def spawn_child(idx, parent) + Delayed::Worker.before_fork + + pid = fork { create_child(idx, parent) } + unless pid + logger.info '! Complete inability to spawn new child processses detected' + logger.info '! Seppuku is the only choice.' + exit! 1 + end + + Delayed::Worker.after_fork + pid + end + + def create_child(index, parent) + @child_handles = [] + + @parent_read.close + @suicide_pipe.close + @fork_writer.close + + pipes = { check_pipe: @check_pipe, + child_write: @child_write, + fork_pipe: @fork_pipe, + wakeup: @wakeup } + + new_child = Child.new(index, + parent, + logger, + @options, + pipes, + nil) + new_child.run + end + + # If we're running at proper capacity, check to see if + # we need to phase any childs out (which will restart + # in the right phase). + def phase_out_childs + return unless all_childs_booted? + + handle = @child_handles.find { |h| h.phase != @phase } + return unless handle + + logger.info "- Stopping #{handle.pid} for phased upgrade..." + + return if handle.term? + handle.term + logger.info "- #{handle.signal} sent to #{handle.pid}..." + end + end + end +end + + +# OLD +# def run +# @stopped = !!@options[:exit_on_complete] +# @killed = false +# setup_logger +# setup_signals +# setup_auto_fork_worker +# Delayed::Worker.before_fork if worker_count > 1 +# setup_workers +# run_loop if worker_count > 1 +# before_graceful_exit +# end + + +# OLD +# def stop(timeout = nil) +# @stopped = true +# message = " with #{timeout} second grace period" if timeout +# logger.info "Shutdown invoked#{message}" +# @child_handles.reject(&:term?).each do |worker| +# logger.info "Sending SIGTERM to worker #{worker.name}" +# worker.term +# end +# schedule_kill(timeout) if timeout +# end +# +# OLD +# def halt(exit_status = 0, message = nil) +# @stopped = true +# @killed = true +# message = " #{message}" if message +# logger.warn "Kill invoked#{message}" +# @child_handles.each do |worker| +# logger.info "Sending SIGKILL to worker #{worker.name}" +# worker.kill +# end +# logger.warn "#{parent_name} exited forcefully#{message} - pid #{$$}" +# exit(exit_status) +# end + +# def before_graceful_exit +# logger.info "#{parent_name} exited gracefully - pid #{$$}" +# end +# +# def parent_name +# "#{get_name(process_identifier)}#{' (parent)' if worker_count > 1}" +# end + + +# TODO: is this needed? +# def stop_blocked +# @status = :stop if @status == :run +# wakeup! +# Process.waitall +# end + +# def reload_worker_directory +# dir = @launcher.restart_dir +# logger.info "+ Changing to #{dir}" +# Dir.chdir dir +# end + +# Inside of a child process, this will return all zeroes, as @child_handles is only populated in +# the parent process. +# @!attribute [r] stats +# def stats +# old_worker_count = @child_handles.count { |handle| handle.phase != @phase } +# worker_status = @child_handles.map do |handle| +# { +# started_at: handle.started_at.utc.iso8601, +# pid: handle.pid, +# index: handle.index, +# phase: handle.phase, +# booted: handle.booted?, +# last_checkin: handle.last_checkin.utc.iso8601, +# last_status: handle.last_status, +# } +# end +# +# { +# started_at: @started_at.utc.iso8601, +# workers: @child_handles.size, +# phase: @phase, +# booted_workers: worker_status.count { |handle| w[:booted] }, +# old_workers: old_worker_count, +# worker_status: worker_status, +# } +# end + + + +# OLD +# def add_worker(options) +# worker_name = get_name(@worker_index) +# worker_pid = spawn_worker(worker_name, options) +# +# queues = options[:queues] +# queue_msg = " queues=#{queues.empty? ? '*' : queues.join(',')}" if queues +# logger.info "Worker #{worker_name} started - pid #{worker_pid}#{queue_msg}" +# +# @child_handles << ChildHandle.new(@worker_index, worker_pid, worker_name, queues) +# @worker_index += 1 +# end +# +# OLD +# def run_worker(worker_name, options) +# Dir.chdir(Delayed.root) +# set_process_name(worker_name) +# Delayed::Worker.after_fork +# setup_logger +# worker = Delayed::Worker.new(options) +# worker.name_prefix = "#{worker_name} " +# worker.start +# rescue => e +# STDERR.puts e.message +# STDERR.puts e.backtrace +# logger.fatal(e) +# exit_with_error_status +# end +# +# +# def exit_with_error_status +# exit(1) +# end diff --git a/lib/delayed/launcher/daemonized.rb b/lib/delayed/launcher/daemonized.rb new file mode 100644 index 000000000..2479ba29d --- /dev/null +++ b/lib/delayed/launcher/daemonized.rb @@ -0,0 +1,133 @@ +# frozen_string_literal: true + +module Delayed + module Launcher + + # Uses "daemons" gem to spawn as a background process. + # Launcher::Daemonized is deprecated and will be removed + # the next major DelayedJob version. Puma and other major + # webservers have removed their respective "daemonized" modes. + # See: https://github.com/puma/puma/pull/2170 + class Daemonized + + attr_accessor :worker_count, + :pools, + :process_prefix, + :process_identifier + + def initialize(options) + @worker_index = 0 + @worker_count = options.delete(:worker_count) || 1 + @pools = options.delete(:pools) + @pools = nil if @pools == [] + @monitor = options.delete(:monitor) + @process_prefix = options.delete(:prefix) + @process_identifier = options.delete(:identifier) + @args = options.delete(:args) + options[:daemonized] = true + + @options = options + @options[:pid_dir] ||= "#{Delayed.root}/tmp/pids" + @options[:log_dir] ||= "#{Delayed.root}/log" + end + + def run + require_daemons! + create_pid_dir + setup_workers + end + + private + + def setup_workers + if pools + setup_pooled_workers + elsif process_identifier + setup_identified_worker + elsif worker_count > 1 + setup_multiple_workers + else + setup_single_worker + end + end + + def setup_pooled_workers + pools.each do |queues, pool_worker_count| + options = @options.merge(:queues => queues) + pool_worker_count.times { add_worker(options) } + end + end + + def setup_multiple_workers + worker_count.times { add_worker(@options) } + end + + def setup_single_worker + run_process(get_name(process_identifier), @options) + end + alias_method :setup_identified_worker, :setup_single_worker + + def add_worker(options) + process_name = get_name(@worker_index) + run_process(process_name, options) + @worker_index += 1 + end + + def run_process(process_name, options = {}) + Delayed::Worker.before_fork + Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| + run_worker(process_name, options) + end + end + + def run_worker(worker_name, options) + Dir.chdir(Delayed.root) + set_process_name(worker_name) + Delayed::Worker.after_fork + setup_logger + worker = Delayed::Worker.new(options) + worker.name_prefix = "#{worker_name} " + worker.start(false) + rescue => e + STDERR.puts e.message + STDERR.puts e.backtrace + logger.fatal(e) + exit_with_error_status + end + + def set_process_name(name) # rubocop:disable AccessorMethodName + $0 = process_prefix ? File.join(process_prefix, name) : name + end + + def get_name(label) + "delayed_job#{".#{label}" if label}" + end + + def exit_with_error_status + exit(1) + end + + def setup_logger + Delayed::Worker.logger ||= Logger.new(File.join(@options[:log_dir], 'delayed_job.log')) + end + + def logger + @logger ||= Delayed::Worker.logger || (::Rails.logger if defined?(::Rails.logger)) || Logger.new(STDOUT) + end + + def require_daemons! + return if ENV['RAILS_ENV'] == 'test' + begin + require 'daemons' + rescue LoadError + raise "Add gem 'daemons' to your Gemfile or use --fork option." + end + end + + def create_pid_dir + dir = @options[:pid_dir] + FileUtils.mkdir_p(dir) unless File.exist?(dir) + end + end + end +end diff --git a/lib/delayed/launcher/events.rb b/lib/delayed/launcher/events.rb new file mode 100644 index 000000000..2f5f60c30 --- /dev/null +++ b/lib/delayed/launcher/events.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Delayed + module Launcher + + # Represents an event sink and source. + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Events` + class Events + + def initialize + @hooks = Hash.new { |h, k| h[k] = [] } + end + + # Fire callbacks for the named hook + def fire(hook, *args) + @hooks[hook].each { |t| t.call(*args) } + end + + # Register a callback for a given hook + def register(hook, obj = nil, &block) + raise 'Specify either an object or a block, not both' if obj && block + h = obj || block + @hooks[hook] << h + h + end + + def on_booted(&block) + register(:on_booted, &block) + end + + def on_restart(&block) + register(:on_restart, &block) + end + + def on_stopped(&block) + register(:on_stopped, &block) + end + end + end +end diff --git a/lib/delayed/launcher/forking.rb b/lib/delayed/launcher/forking.rb new file mode 100644 index 000000000..0a9a550cb --- /dev/null +++ b/lib/delayed/launcher/forking.rb @@ -0,0 +1,169 @@ +# frozen_string_literal: true + +require 'delayed/launcher/logger_delegator' +require 'delayed/launcher/events' +require 'delayed/launcher/single' +require 'delayed/launcher/cluster' +require 'delayed/launcher/pooled_cluster' + +module Delayed + module Launcher + + # Parent launcher class which spawns Delayed Job child worker + # processes in the foreground. + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Launcher` + class Forking + DEFAULT_WORKER_CHECK_INTERVAL = 5 + DEFAULT_WORKER_TIMEOUT = 60 + DEFAULT_WORKER_SHUTDOWN_TIMEOUT = 30 + DEFAULT_WORKER_REFORK_DELAY = 900 + + attr_reader :logger, + :events + + def initialize(options) + + # Remove options used only for Launcher::Daemonized + options[:daemonized] = false + options.delete(:monitor) + options.delete(:args) + + # Set default options + options[:worker_count] ||= 1 + options[:worker_check_interval] ||= DEFAULT_WORKER_CHECK_INTERVAL + options[:worker_timeout] ||= DEFAULT_WORKER_TIMEOUT + options[:worker_boot_timeout] ||= DEFAULT_WORKER_TIMEOUT + options[:worker_shutdown_timeout] ||= DEFAULT_WORKER_SHUTDOWN_TIMEOUT + # TODO: need phased restart timeout + options[:worker_culling_strategy] ||= :youngest + options[:worker_refork_delay] ||= DEFAULT_WORKER_REFORK_DELAY + options.delete(:worker_refork_delay) if options[:worker_refork_delay] <= 0 + + options.delete(:pools) if options[:pools] == [] + options[:pid_dir] ||= "#{Delayed.root}/tmp/pids" + options[:log_dir] ||= "#{Delayed.root}/log" + + @options = options + @logger = LoggerDelegator.new(options[:log_dir]) + @events = Events.new + @status = :run + end + + def run + previous_env = nil + @runner = build_runner + setup_signals + @runner.run + do_run_finished(previous_env) + end + + # Begin graceful shutdown of the workers + def stop + @status = :stop + @runner.stop + end + + # Begin forced shutdown of the workers + def halt + @status = :halt + @runner.halt + end + + # Begin restart of the workers + def restart + @status = :restart + @runner.restart + end + + # # Begin phased restart of the workers + def phased_restart + if @runner.respond_to?(:phased_restart) + @runner.phased_restart + else + logger.warn 'phased_restart called but not available, restarting normally.' + restart + end + end + + private + + def do_run_finished(previous_env) + case @status + when :halt + do_forceful_stop + when :run, :stop + do_graceful_stop + when :restart + do_restart(previous_env) + end + end + + def do_forceful_stop + logger.info '* Stopping immediately!' + end + + def do_graceful_stop + @events.fire(:on_stopped) + @runner.stop_blocked + end + + def do_restart(previous_env) + logger.info '* Restarting...' + logger.info '--- RESTART NOT YET SUPPORTED, EXITING ---' + # ENV.replace(previous_env) + # @runner.stop_control + # restart! + end + + def build_runner + if @options[:pools] + PooledCluster.new(self, @options) + elsif @options[:worker_count] > 1 + Cluster.new(self, @options) + else + # TODO: consider clustered mode with one worker for scaling + Single.new(self, @options) + end + end + + def setup_signals + setup_signal_restart + setup_signal_phased_restart + setup_signal_term + setup_signal_int + end + + def setup_signal_restart + Signal.trap('SIGUSR2') { restart } + rescue Exception # rubocop:disable Lint/RescueException + logger.info '*** SIGUSR2 not implemented, signal based restart unavailable!' + end + + def setup_signal_phased_restart + return if Delayed.jruby? + Signal.trap('SIGUSR1') { phased_restart } + rescue Exception # rubocop:disable Lint/RescueException + logger.info '*** SIGUSR1 not implemented, signal based restart unavailable!' + end + + def setup_signal_term + Signal.trap('SIGTERM') do + do_graceful_stop + raise(SignalException, 'SIGTERM') if Delayed::Worker.raise_signal_exceptions + end + rescue Exception # rubocop:disable Lint/RescueException + logger.info '*** SIGTERM not implemented, signal based gracefully stopping unavailable!' + end + + def setup_signal_int + Signal.trap('SIGINT') do + stop + end + rescue Exception # rubocop:disable Lint/RescueException + logger.info '*** SIGINT not implemented, signal based gracefully stopping unavailable!' + end + end + end +end diff --git a/lib/delayed/launcher/logger_delegator.rb b/lib/delayed/launcher/logger_delegator.rb new file mode 100644 index 000000000..c4cef76cb --- /dev/null +++ b/lib/delayed/launcher/logger_delegator.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +require 'fileutils' +require 'logger' + +# TODO: remove this class +module Delayed + module Launcher + + # Runs logger which can log even during signal traps. + class LoggerDelegator + LEVELS = %i[debug info warn error fatal].freeze + + def initialize(log_dir, logger = nil) + @log_dir = log_dir + setup_logger(logger) + end + + delegate *LEVELS, to: :logger + + def logger + @logger ||= Delayed::Worker.logger || rails_logger || stdout_logger + end + + private + + def setup_logger(logger) + Delayed::Worker.logger ||= logger || file_logger + end + + def file_logger + FileUtils.mkdir_p(@log_dir) + Logger.new(File.join(@log_dir, 'delayed_job.log')) + end + + def rails_logger + ::Rails.logger if defined?(::Rails.logger) + end + + def stdout_logger + Logger.new(STDOUT) + end + + # FROM PUMA + # + # def log(str) + # @events.log str + # end + # + # def error(str) + # @events.error str + # end + # + # def debug(str) + # @events.log "- #{str}" if @options[:debug] + # end + # + # def redirected_io? + # @options[:redirect_stdout] || @options[:redirect_stderr] + # end + # + # def redirect_io + # stdout = @options[:redirect_stdout] + # stderr = @options[:redirect_stderr] + # append = @options[:redirect_append] + # + # if stdout + # ensure_output_directory_exists(stdout, 'STDOUT') + # + # STDOUT.reopen stdout, (append ? "a" : "w") + # STDOUT.puts "=== puma startup: #{Time.now} ===" + # STDOUT.flush unless STDOUT.sync + # end + # + # if stderr + # ensure_output_directory_exists(stderr, 'STDERR') + # + # STDERR.reopen stderr, (append ? "a" : "w") + # STDERR.puts "=== puma startup: #{Time.now} ===" + # STDERR.flush unless STDERR.sync + # end + # + # if @options[:mutate_stdout_and_stderr_to_sync_on_write] + # STDOUT.sync = true + # STDERR.sync = true + # end + # end + # + # private + # def ensure_output_directory_exists(path, io_name) + # unless Dir.exist?(File.dirname(path)) + # raise "Cannot redirect #{io_name} to #{path}" + # end + # end + end + end +end diff --git a/lib/delayed/launcher/pooled_cluster.rb b/lib/delayed/launcher/pooled_cluster.rb new file mode 100644 index 000000000..bc9c96d6e --- /dev/null +++ b/lib/delayed/launcher/pooled_cluster.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require 'delayed/launcher/cluster' + +module Delayed + module Launcher + class PooledCluster < Cluster + + attr_reader :pools + + def initialize(launcher, options) + @pools = options.delete(:pools) + super + end + + private + + def mode + 'pooled_cluster' + end + + def setup_workers + pools.each do |queues, pool_worker_count| + options = @options.merge(:queues => queues) + pool_worker_count.times { add_worker(options) } + end + end + + # TODO: CoW forking needs to fork from each worker pool. + # because different pool jobs will have diff characteristics + # (Disable with an option?) + # Is each pool it's own cluster? + + # TODO: not yet supported + def increment_worker_count + # @worker_count += 1 + end + + # TODO: not yet supported + def decrement_worker_count + # @worker_count -= 1 if @worker_count >= 2 + end + + + # OLD + # def add_worker(options) + # worker_name = get_name(@worker_index) + # worker_pid = spawn_worker(worker_name, options) + # + # queues = options[:queues] + # queue_msg = " queues=#{queues.empty? ? '*' : queues.join(',')}" if queues + # logger.info "Worker #{worker_name} started - pid #{worker_pid}#{queue_msg}" + # + # @workers << WorkerHandle.new(@worker_index, worker_pid, worker_name, queues) + # @worker_index += 1 + # end + # + # + # + end + end +end diff --git a/lib/delayed/launcher/runner.rb b/lib/delayed/launcher/runner.rb new file mode 100644 index 000000000..bc6e6426b --- /dev/null +++ b/lib/delayed/launcher/runner.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module Delayed + module Launcher + + # Base class shared by `Delayed::Launcher::Single` and + # `Delayed::Launcher::Cluster` + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Runner` + class Runner + attr_reader :launcher, + :process_prefix, + :process_identifier + + def initialize(launcher, options) + @launcher = launcher + @process_prefix = options.delete(:prefix) + @process_identifier = options.delete(:identifier) + @options = options + end + + delegate :events, + :logger, + to: :launcher + + private + + def check_fork_supported! + return if Delayed.forkable? + raise "Process fork not supported on #{RUBY_ENGINE} on this platform" + end + + def output_header(mode) + # min_t = @options[:min_threads] + # max_t = @options[:max_threads] + + logger.info "Delayed Job starting in #{mode} mode..." + logger.info "* Version: #{Delayed::VERSION} (#{ruby_engine})" + # logger.info "* Min threads: #{min_t}" + # logger.info "* Max threads: #{max_t}" + logger.info "* Environment: #{ENV['RAILS_ENV']}" + + if mode == 'single' + logger.info "* PID: #{Process.pid}" + else + logger.info "* Parent PID: #{Process.pid}" + end + end + + def ruby_engine + if !defined?(RUBY_ENGINE) || RUBY_ENGINE == 'ruby' + "ruby #{RUBY_VERSION}-p#{RUBY_PATCHLEVEL}" + elsif defined?(RUBY_ENGINE_VERSION) + "#{RUBY_ENGINE} #{RUBY_ENGINE_VERSION} - ruby #{RUBY_VERSION}" + else + "#{RUBY_ENGINE} #{RUBY_VERSION}" + end + end + + def set_process_name(name) # rubocop:disable AccessorMethodName + $0 = process_prefix ? File.join(process_prefix, name) : name + end + + def get_name(label) + "delayed_job#{".#{label}" if label}" + end + end + end +end diff --git a/lib/delayed/launcher/single.rb b/lib/delayed/launcher/single.rb new file mode 100644 index 000000000..eef91afd2 --- /dev/null +++ b/lib/delayed/launcher/single.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +require 'delayed/launcher/runner' + +module Delayed + module Launcher + + # This class is instantiated by `Delayed::Launcher`. + # It boots and runs a Delayed Job application using a + # single in-process worker. For example: `$ delayed_job -n1` + # + # Code in this class is adapted from Puma (https://puma.io/) + # See: `Puma::Single` + class Single < Runner + + def run + # TODO: set_process_name(get_name(process_identifier)) + output_header 'single' + + @worker = worker = create_worker + worker_thread = worker.start(true) + + logger.info 'Use Ctrl-C to stop' + # redirect_io + + events.fire(:on_booted) + + begin + worker_thread.join + rescue Interrupt + # Swallow it + end + end + + def restart + @worker.begin_restart + end + + def stop + @worker.stop(false) if @worker + end + + def stop_blocked + logger.info '- Gracefully stopping, waiting for jobs to finish' + @worker.stop(true) if @worker + end + + def halt + @worker.halt + end + + private + + def create_worker + Delayed::Worker.new(@options) + end + + # TODO: OLD CODE ================== + # + # def run + # set_process_name(get_name(process_identifier)) + # start_worker + # + # # TODO: start_worker should be background thread. + # # Then add control flow for signal interrupts so this doesn't + # # need Thread.new for logging during interrupts. + # end + # + # def stop + # schedule_halt + # stop_worker + # # TODO: remove this as per above + # Thread.new { logger.info "#{process_name} exited gracefully - pid #{$$}" } + # exit(0) + # end + # + # def restart + # Thread.new { logger.info "#{process_name} restarting... - pid #{$$}" } + # stop_worker + # start_worker + # Thread.new { logger.info "#{process_name} restarted - pid #{$$}" } + # end + # + # def halt(exit_status = 0, message = nil) + # # TODO: remove this as per above + # Thread.new { logger.warn "#{process_name} exited forcefully #{message} - pid #{$$}" } + # exit(exit_status) + # end + # + # private + # + # def start_worker + # @worker = Delayed::Worker.new(@options) + # events.fire(:on_booted) + # @worker.start(false) + # end + # + # def stop_worker + # @worker.stop if @worker + # end + # + # def schedule_halt + # timeout = @options[:worker_shutdown_timeout] + # return unless timeout + # Thread.new do + # sleep(timeout) + # halt(1, "after #{timeout} second timeout") + # end + # end + # + # def process_name + # get_name(process_identifier) + # end + end + end +end diff --git a/lib/delayed/pool_parser.rb b/lib/delayed/pool_parser.rb new file mode 100644 index 000000000..38ba7443e --- /dev/null +++ b/lib/delayed/pool_parser.rb @@ -0,0 +1,17 @@ +module Delayed + class PoolParser + def add(string) + string.split('|').each do |segment| + queues, worker_count = segment.split(':') + queues = ['*', '', nil].include?(queues) ? [] : queues.split(',') + worker_count = (worker_count || 1).to_i rescue 1 + pools << [queues, worker_count] + end + self + end + + def pools + @pools ||= [] + end + end +end diff --git a/lib/delayed/railtie.rb b/lib/delayed/railtie.rb index a50ca1b4b..f92948d54 100644 --- a/lib/delayed/railtie.rb +++ b/lib/delayed/railtie.rb @@ -12,7 +12,7 @@ class Railtie < Rails::Railtie end rake_tasks do - load 'delayed/tasks.rb' + load 'delayed/tasks.rake' end end end diff --git a/lib/delayed/tasks.rake b/lib/delayed/tasks.rake new file mode 100644 index 000000000..cb53922bb --- /dev/null +++ b/lib/delayed/tasks.rake @@ -0,0 +1,60 @@ +namespace :jobs do + desc 'Clear the delayed_job queue.' + task :clear => :environment do + Delayed::Job.delete_all + end + + desc 'Start a delayed_job worker.' + task :work => :environment_options do + Delayed::Launcher::Forking.new(@options).run + end + + desc 'Start a delayed_job worker and exit when all available jobs are complete.' + task :workoff => :environment_options do + Delayed::Launcher::Forking.new(@options.merge(:exit_on_complete => true)).run + end + + task :environment_options => :environment do + @options = { + :worker_count => ENV['NUM_WORKERS'] ? Integer(ENV['NUM_WORKERS']) : 1, + :quiet => !!ENV['QUIET'] && ENV['QUIET'] !~ /\A(?:0|f|false)\z/i + } + @options[:min_priority] = Integer(ENV['MIN_PRIORITY']) if ENV['MIN_PRIORITY'] + @options[:max_priority] = Integer(ENV['MAX_PRIORITY']) if ENV['MAX_PRIORITY'] + @options[:sleep_delay] = Integer(ENV['SLEEP_DELAY']) if ENV['SLEEP_DELAY'] + @options[:read_ahead] = Integer(ENV['READ_AHEAD']) if ENV['READ_AHEAD'] + + queues = (ENV['QUEUES'] || ENV['QUEUE'] || '').split(',') + @options[:queues] = queues unless queues.empty? + + pools = Delayed::PoolParser.new.add(ENV['POOLS'] || ENV['POOL'] || '').pools + @options[:pools] = pools unless pools.empty? + + if @options[:worker_count] < 1 + raise ArgumentError, 'NUM_WORKERS must be 1 or greater' + end + + if @options[:min_priority] && @options[:max_priority] && @options[:min_priority] > @options[:max_priority] + raise ArgumentError, 'MIN_PRIORITY must be less than or equal to MAX_PRIORITY' + end + + if ENV['NUM_WORKERS'] && @options[:pools] + raise ArgumentError, 'Cannot specify both NUM_WORKERS and POOLS' + end + + if @options[:queues] && @options[:pools] + raise ArgumentError, 'Cannot specify both QUEUES and POOLS' + end + end + + desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet." + task :check, [:max_age] => :environment do |_, args| + args.with_defaults(:max_age => 300) + + unprocessed_jobs = Delayed::Job.where('attempts = 0 AND created_at < ?', Time.now - args[:max_age].to_i).count + + if unprocessed_jobs > 0 + raise "#{unprocessed_jobs} jobs older than #{args[:max_age]} seconds have not been processed yet" + end + end +end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb deleted file mode 100644 index 409ba48f8..000000000 --- a/lib/delayed/tasks.rb +++ /dev/null @@ -1,39 +0,0 @@ -namespace :jobs do - desc 'Clear the delayed_job queue.' - task :clear => :environment do - Delayed::Job.delete_all - end - - desc 'Start a delayed_job worker.' - task :work => :environment_options do - Delayed::Worker.new(@worker_options).start - end - - desc 'Start a delayed_job worker and exit when all available jobs are complete.' - task :workoff => :environment_options do - Delayed::Worker.new(@worker_options.merge(:exit_on_complete => true)).start - end - - task :environment_options => :environment do - @worker_options = { - :min_priority => ENV['MIN_PRIORITY'], - :max_priority => ENV['MAX_PRIORITY'], - :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), - :quiet => ENV['QUIET'] - } - - @worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY'] - @worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD'] - end - - desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet." - task :check, [:max_age] => :environment do |_, args| - args.with_defaults(:max_age => 300) - - unprocessed_jobs = Delayed::Job.where('attempts = 0 AND created_at < ?', Time.now - args[:max_age].to_i).count - - if unprocessed_jobs > 0 - raise "#{unprocessed_jobs} jobs older than #{args[:max_age]} seconds have not been processed yet" - end - end -end diff --git a/lib/delayed/util.rb b/lib/delayed/util.rb new file mode 100644 index 000000000..94c178e7a --- /dev/null +++ b/lib/delayed/util.rb @@ -0,0 +1,49 @@ +module Delayed + HAS_FORK = ::Process.respond_to?(:fork) + IS_JRUBY = Object.const_defined?(:JRUBY_VERSION) + IS_WINDOWS = !!(RUBY_PLATFORM =~ /mswin|ming|cygwin/) || IS_JRUBY && RUBY_DESCRIPTION.include?('mswin') + + def self.forkable? + HAS_FORK + end + + def self.jruby? + IS_JRUBY + end + + def self.windows? + IS_WINDOWS + end + + def self.program_name + File.basename($PROGRAM_NAME) + end + + def self.root + defined?(::Rails.root) ? ::Rails.root : Pathname.new(Dir.pwd) + end + + def self.set_thread_name(name) + return unless Thread.current.respond_to?(:name=) + Thread.current.name = "delayed_job #{name}" + end + + # An instance method on Thread has been provided to address https://bugs.ruby-lang.org/issues/13632, + # which currently effects some older versions of Ruby: 2.2.7 2.2.8 2.2.9 2.2.10 2.3.4 2.4.1 + # Additional context: https://github.com/puma/puma/pull/1345 + def self.purge_interrupt_queue + return unless Thread.current.respond_to?(:purge_interrupt_queue) + Thread.current.purge_interrupt_queue + end + + # Perform garbage collection compaction before fork. + def self.nakayoshi_gc(logger) + logger.info '! Promoting existing objects to old generation...' + 4.times { GC.start(full_mark: false) } + if GC.respond_to?(:compact) + logger.info '! Compacting...' + GC.compact + end + logger.info '! Friendly fork preparation complete.' + end +end diff --git a/lib/delayed/version.rb b/lib/delayed/version.rb new file mode 100644 index 000000000..a89eec2f3 --- /dev/null +++ b/lib/delayed/version.rb @@ -0,0 +1,3 @@ +module Delayed + VERSION = '4.1.9'.freeze +end diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 1a352f775..e15ea9373 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -128,6 +128,7 @@ def self.delay_job?(job) end def initialize(options = {}) + @daemonized = options.delete(:daemonized) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 @@ -138,6 +139,8 @@ def initialize(options = {}) # Reset lifecycle on the offhand chance that something lazily # triggered its creation before all plugins had been registered. self.class.setup_lifecycle + + setup_signals end # Every worker has a unique name which by default is the pid of the process. There are some @@ -153,50 +156,25 @@ def name # Setting the name to nil will reset the default worker name attr_writer :name - def start # rubocop:disable CyclomaticComplexity, PerceivedComplexity - trap('TERM') do - Thread.new { say 'Exiting...' } - stop - raise SignalException, 'TERM' if self.class.raise_signal_exceptions - end - - trap('INT') do - Thread.new { say 'Exiting...' } - stop - raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term - end - - say 'Starting job worker' - - self.class.lifecycle.run_callbacks(:execute, self) do - loop do - self.class.lifecycle.run_callbacks(:loop, self) do - @realtime = Benchmark.realtime do - @result = work_off - end - end - - count = @result[0] + @result[1] - - if count.zero? - if self.class.exit_on_complete - say 'No more jobs available. Exiting' - break - elsif !stop? - sleep(self.class.sleep_delay) - reload! - end - else - say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last) - end - - break if stop? + def start(background = false) + if background + @thread = Thread.new do + Delayed.set_thread_name 'worker' + run_loop end + else + run_loop end end - def stop + def begin_restart(sync = false) + stop # notify_safely(RESTART_COMMAND) + @thread.join if @thread && sync + end + + def stop(sync = false) @exit = true + @thread.join if @thread && sync end def stop? @@ -282,7 +260,7 @@ def say(text, level = default_log_level) unless level.is_a?(String) level = Logger::Severity.constants.detect { |i| Logger::Severity.const_get(i) == level }.to_s.downcase end - logger.send(level, "#{Time.now.strftime('%FT%T%z')}: #{text}") + logger.send(level, text) end def max_attempts(job) @@ -295,6 +273,52 @@ def max_run_time(job) protected + def run_loop # rubocop:disable CyclomaticComplexity, PerceivedComplexity + say 'Starting job worker' + + self.class.lifecycle.run_callbacks(:execute, self) do + loop do + self.class.lifecycle.run_callbacks(:loop, self) do + @realtime = Benchmark.realtime do + @result = work_off + end + end + + count = @result[0] + @result[1] + + if count.zero? + if self.class.exit_on_complete + say 'No more jobs available. Exiting' + break + elsif !stop? + sleep(self.class.sleep_delay) + reload! + end + else + say format("#{count} jobs processed at %.4f j/s, %d failed", count / @realtime, @result.last) + end + + break if stop? + end + end + end + + def setup_signals + return unless @daemonized + + trap('TERM') do + Thread.new { say 'Exiting...' } + stop + raise SignalException, 'TERM' if self.class.raise_signal_exceptions + end + + trap('INT') do + Thread.new { say 'Exiting...' } + stop + raise SignalException, 'INT' if self.class.raise_signal_exceptions && self.class.raise_signal_exceptions != :term + end + end + def say_queue(queue) " (queue=#{queue})" if queue end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index d38f2edbc..e907c4ada 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -1,4 +1,8 @@ require 'active_support' +require 'fileutils' +require 'pathname' +require 'delayed/version' +require 'delayed/util' require 'delayed/compatibility' require 'delayed/exceptions' require 'delayed/message_sending' @@ -10,6 +14,10 @@ require 'delayed/backend/base' require 'delayed/backend/job_preparer' require 'delayed/worker' +require 'delayed/launcher/daemonized' +require 'delayed/launcher/forking' +require 'delayed/pool_parser' +require 'delayed/command' require 'delayed/deserialization_error' require 'delayed/railtie' if defined?(Rails::Railtie) diff --git a/spec/daemons.rb b/spec/daemons.rb index 750b130c0..4665afbd0 100644 --- a/spec/daemons.rb +++ b/spec/daemons.rb @@ -1,2 +1,7 @@ # Fake "daemons" file on the spec load path to allow spec/delayed/command_spec.rb # to test the Delayed::Command class without actually adding daemons as a dependency. +module Daemons + def self.run_proc(*_args) + yield if block_given? + end +end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b57cd6efa..dd91a71e1 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -4,176 +4,494 @@ describe Delayed::Command do let(:options) { [] } let(:logger) { double('Logger') } + let(:output_options) { subject.instance_variable_get(:'@options') } + subject { Delayed::Command.new(options) } + + def verify_worker_processes + command = Delayed::Command.new(%w[-d] + options) + allow(FileUtils).to receive(:mkdir_p) + exp.each do |args| + launcher = command.send(:launcher) + args.last[:daemonized] = launcher.is_a?(Delayed::Launcher::Daemonized) + expect(launcher).to receive(:run_process).with(*args).once + end + command.run + end + + describe '#run' do + it 'should use fork mode by default' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:run) + Delayed::Command.new([]).run + end + + it 'should use fork mode if --fork set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:run) + Delayed::Command.new(%w[--fork]).run + end + + it 'should use daemon mode if -d set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new(%w[-d]).run + end - subject { Delayed::Command.new options } + it 'should use daemon mode if --daemonize set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new(%w[--daemonize]).run + end - before do - allow(Delayed::Worker).to receive(:after_fork) - allow(Dir).to receive(:chdir) - allow(Logger).to receive(:new).and_return(logger) - allow_any_instance_of(Delayed::Worker).to receive(:start) - allow(Delayed::Worker).to receive(:logger=) - allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + it 'using multiple switches should use first one' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new(%w[-d --fork]).run + end end - shared_examples_for 'uses --log-dir option' do - context 'when --log-dir is specified' do - let(:options) { ['--log-dir=/custom/log/dir'] } + describe '#daemonize' do + it 'should use daemon mode by default' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new([]).daemonize + end - it 'creates the delayed_job.log in the specified directory' do - expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') - subject.run - end + it 'should use fork mode if --fork set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:run) + Delayed::Command.new(%w[--fork]).daemonize + end + + it 'should use daemon mode if -d set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new(%w[-d]).daemonize + end + + it 'should use daemon mode if --daemonize set' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to_not receive(:run) + Delayed::Command.new(%w[--daemonize]).daemonize + end + + it 'using multiple switches should use first one' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:run) + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:run) + Delayed::Command.new(%w[--fork -d]).daemonize + end + end + + describe '--min-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:min_priority]).to eq nil } + end + + context 'set' do + let(:options) { %w[--min-priority 2] } + it { expect(output_options[:min_priority]).to eq 2 } + end + + context 'not a number' do + let(:options) { %w[--min-priority sponge] } + it { expect(output_options[:min_priority]).to eq nil } + end + end + + describe '--max-priority arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:max_priority]).to eq nil } + end + + context 'set' do + let(:options) { %w[--max-priority -5] } + it { expect(output_options[:max_priority]).to eq(-5) } + end + + context 'not a number' do + let(:options) { %w[--max-priority giraffe] } + it { expect(output_options[:max_priority]).to eq nil } end end - describe 'run' do - it 'sets the Delayed::Worker logger' do - expect(Delayed::Worker).to receive(:logger=).with(logger) - subject.run + describe '--num-workers arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:worker_count]).to eq 1 } end - context 'when Rails root is defined' do - let(:rails_root) { Pathname.new '/rails/root' } - let(:rails) { double('Rails', :root => rails_root) } + context '-n set' do + let(:options) { %w[-n 2] } + it { expect(output_options[:worker_count]).to eq 2 } + end + + context '-n not a number' do + let(:options) { %w[-n elephant] } + it { expect(output_options[:worker_count]).to eq 1 } + end + + context '--num-workers set' do + let(:options) { %w[--num-workers 4] } + it { expect(output_options[:worker_count]).to eq 4 } + end + + context '--num-workers not a number' do + let(:options) { %w[--num-workers hippo] } + it { expect(output_options[:worker_count]).to eq 1 } + end - before do - stub_const('Rails', rails) + context '--number_of_workers set' do + let(:options) { %w[--number_of_workers 5] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 5 end + end - it 'runs the Delayed::Worker process in Rails.root' do - expect(Dir).to receive(:chdir).with(rails_root) - subject.run + context '--number_of_workers not a number' do + let(:options) { %w[--number_of_workers rhino] } + it do + expect(STDERR).to receive(:puts) + expect(output_options[:worker_count]).to eq 1 end + end + end + + describe '--pid-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:pid_dir]).to eq nil } + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in Rails.root/log' do - expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') - subject.run - end + context 'set' do + let(:options) { %w[--pid-dir ./foo/bar] } + it { expect(output_options[:pid_dir]).to eq './foo/bar' } + end + + context 'worker processes' do + let(:options) { %w[--pid-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './foo/bar', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - include_examples 'uses --log-dir option' + describe '--log-dir arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:log_dir]).to eq nil } end - context 'when Rails root is not defined' do - let(:rails_without_root) { double('Rails') } + context 'set' do + let(:options) { %w[--log-dir ./foo/bar] } + it { expect(output_options[:log_dir]).to eq './foo/bar' } + end - before do - stub_const('Rails', rails_without_root) + context 'worker processes' do + let(:options) { %w[--log-dir ./foo/bar] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './foo/bar'}]] end + it { verify_worker_processes } + end + end + + describe '--monitor arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:monitor]).to eq false } + end + + context 'set' do + let(:options) { %w[--monitor] } + it { expect(output_options[:monitor]).to eq true } + end + end + + describe '--sleep-delay arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + + context 'set' do + let(:options) { %w[--sleep-delay 5] } + it { expect(output_options[:sleep_delay]).to eq(5) } + end + + context 'not a number' do + let(:options) { %w[--sleep-delay giraffe] } + it { expect(output_options[:sleep_delay]).to eq nil } + end + end + + describe '--read-ahead arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:read_ahead]).to eq nil } + end + + context 'set' do + let(:options) { %w[--read-ahead 5] } + it { expect(output_options[:read_ahead]).to eq(5) } + end + + context 'not a number' do + let(:options) { %w[--read-ahead giraffe] } + it { expect(output_options[:read_ahead]).to eq nil } + end + end + + describe '--identifier arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:identifier]).to eq nil } + end - it 'runs the Delayed::Worker process in $PWD' do - expect(Dir).to receive(:chdir).with(Delayed::Command::DIR_PWD) - subject.run + context '-i set' do + let(:options) { %w[-i bond] } + it { expect(output_options[:identifier]).to eq 'bond' } + end + + context '--identifier set' do + let(:options) { %w[--identifier goldfinger] } + it { expect(output_options[:identifier]).to eq 'goldfinger' } + end + + context 'worker processes' do + let(:options) { %w[--identifier spectre] } + let(:exp) do + [['delayed_job.spectre', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end + + describe '--prefix arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:prefix]).to eq nil } + end + + context '-p set' do + let(:options) { %w[-p oddjob] } + it { expect(output_options[:prefix]).to eq 'oddjob' } + end + + context '--prefix set' do + let(:options) { %w[--prefix jaws] } + it { expect(output_options[:prefix]).to eq 'jaws' } + end + end + + describe '--exit-on-complete arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } + end + + context '-x set' do + let(:options) { %w[-x] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + + context '--exit-on-complete set' do + let(:options) { %w[--exit-on-complete] } + it { expect(output_options[:exit_on_complete]).to eq true } + end + end + + describe '--verbose arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:quiet]).to eq true } + end + + context '-v set' do + let(:options) { %w[-v] } + it { expect(output_options[:quiet]).to eq false } + end - context 'when --log-dir is not specified' do - it 'creates the delayed_job.log in $PWD/log' do - expect(Logger).to receive(:new).with("#{Delayed::Command::DIR_PWD}/log/delayed_job.log") - subject.run - end + context '--verbose set' do + let(:options) { %w[--verbose] } + it { expect(output_options[:quiet]).to eq false } + end + + context 'worker processes' do + let(:options) { %w[-v] } + let(:exp) do + [['delayed_job', {:quiet => false, :pid_dir => './tmp/pids', :log_dir => './log'}]] end + it { verify_worker_processes } + end + end - include_examples 'uses --log-dir option' + describe '--queues arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:queues]).to eq nil } end - context 'when an error is raised' do - let(:test_error) { Class.new(StandardError) } + context '--queue set' do + let(:options) { %w[--queue mailers] } + it { expect(output_options[:queues]).to eq %w[mailers] } + end + + context '--queues set' do + let(:options) { %w[--queues mailers,tweets] } + it { expect(output_options[:queues]).to eq %w[mailers tweets] } + end - before do - allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) - allow(subject).to receive(:exit_with_error_status) - allow(STDERR).to receive(:puts) + context 'worker processes' do + let(:options) { %w[--queues mailers,tweets] } + let(:exp) do + [['delayed_job', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers tweets]}]] end + it { verify_worker_processes } + end + end - it 'prints the error message to STDERR' do - expect(STDERR).to receive(:puts).with('An error') - subject.run + describe '--pool arg' do + context 'multiple --pool args set' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] end + end - it 'exits with an error status' do - expect(subject).to receive(:exit_with_error_status) - subject.run + context 'pipe-delimited' do + let(:options) { %w[--pools=*:1|test_queue:4 --pool=mailers,misc:2] } + it 'should parse correctly' do + expect(output_options[:pools]).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] end + end - context 'when Rails logger is not defined' do - let(:rails) { double('Rails') } + context 'queues specified as *' do + let(:options) { ['--pool=*:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] + end + end - before do - stub_const('Rails', rails) - end + context 'queues not specified' do + let(:options) { ['--pools=:4'] } + it 'should use all queues' do + expect(output_options[:pools]).to eq [[[], 4]] + end + end - it 'does not attempt to use the Rails logger' do - subject.run - end + context 'worker count not specified' do + let(:options) { ['--pool=mailers'] } + it 'should default to one worker' do + expect(output_options[:pools]).to eq [[['mailers'], 1]] end + end - context 'when Rails logger is defined' do - let(:rails_logger) { double('Rails logger') } - let(:rails) { double('Rails', :logger => rails_logger) } + context 'worker processes' do + let(:options) { %w[--pool=*:1 --pool=test_queue:4 --pool=mailers,misc:2] } + let(:exp) do + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + end - before do - stub_const('Rails', rails) - end + describe '--daemon-options arg' do + context 'not set' do + let(:options) { [] } + it { expect(output_options[:exit_on_complete]).to eq nil } + it 'does not affect launch_strategy' do + expect(subject.instance_variable_get(:'@launch_strategy')).to eq nil + end + end - it 'logs the error to the Rails logger' do - expect(rails_logger).to receive(:fatal).with(test_error) - subject.run - end + context 'set' do + let(:options) { %w[--daemon-options a,b,c] } + it { expect(subject.instance_variable_get(:'@daemon_options')).to eq %w[a b c] } + it 'coerces launch_strategy to :daemon' do + expect(subject.instance_variable_get(:'@launch_strategy')).to eq :daemon end end end - describe 'parsing --pool argument' do - it 'should parse --pool correctly' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) + describe 'extra args' do + context '--daemon-options not set' do + let(:options) { %w[foo bar baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz] } + end + + context '--daemon-options set' do + let(:options) { %w[foo bar --daemon-options a,b,c baz] } + it { expect(output_options[:args]).to eq %w[foo bar baz a b c] } + end + end + + describe 'validations' do + it 'should launch normally without validations' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d]).run + end - expect(command.worker_pools).to eq [ - [[], 1], - [['test_queue'], 4], - [%w[mailers misc], 2] - ] + it 'raise error num-workers and identifier are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to_not receive(:run) + expect(STDERR).to_not receive(:puts) + expect { Delayed::Command.new(%w[-d --num-workers=2 --identifier=foobar]).run }.to raise_error(ArgumentError) end - it 'should allow * or blank to specify any pools' do - command = Delayed::Command.new(['--pool=*:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + it 'warn if num-workers is 0' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=0]).run + end - command = Delayed::Command.new(['--pool=:4']) - expect(command.worker_pools).to eq [ - [[], 4], - ] + it 'not warn if min-priority is less than max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-5 --max-priority=0]).run end - it 'should default to one worker if not specified' do - command = Delayed::Command.new(['--pool=mailers']) - expect(command.worker_pools).to eq [ - [['mailers'], 1], - ] + it 'not warn if min-priority equals max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to_not receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-5 --max-priority=-5]).run end - end - describe 'running worker pools defined by multiple --pool arguments' do - it 'should run the correct worker processes' do - command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2']) - expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + it 'warn if min-priority is greater than max-priority' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --min-priority=-4 --max-priority=-5]).run + end - [ - ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], - ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.3', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.4', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], - ['delayed_job.5', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], - ['delayed_job.6', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] - ].each do |args| - expect(command).to receive(:run_process).with(*args).once - end + it 'warn if both queues and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --queues=mailers --pool=mailers:2]).run + end - command.daemonize + it 'warn if both num-workers and pools are present' do + expect_any_instance_of(Delayed::Launcher::Daemonized).to receive(:run) + expect(STDERR).to receive(:puts) + Delayed::Command.new(%w[-d --num-workers=2 --pool=mailers:2]).run end end end diff --git a/spec/delayed/launcher/daemonized_spec.rb b/spec/delayed/launcher/daemonized_spec.rb new file mode 100644 index 000000000..1668f1f09 --- /dev/null +++ b/spec/delayed/launcher/daemonized_spec.rb @@ -0,0 +1,205 @@ +require_relative '../../helper' + +describe Delayed::Launcher::Daemonized do + def verify_worker_processes + exp.each do |args| + args.last[:daemonized] = true + expect(subject).to receive(:run_process).with(*args).once + end + subject.run + end + + let(:options) { {} } + subject { described_class.new(options) } + + before do + # stub I/O methods + allow(ObjectSpace).to receive(:each_object) + allow(FileUtils).to receive(:mkdir_p) + allow(Dir).to receive(:chdir) + end + + describe 'run_worker' do + let(:logger) { double('Logger') } + + before do + allow(Delayed::Worker).to receive(:after_fork) + allow_any_instance_of(Delayed::Worker).to receive(:start) + allow(Logger).to receive(:new).and_return(logger) + allow(Delayed::Worker).to receive(:logger=) + allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + end + + shared_examples_for 'uses log_dir option' do + context 'when log_dir is specified' do + let(:options) { {:log_dir => '/custom/log/dir'} } + + it 'creates the delayed_job.log in the specified directory' do + expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + it 'sets the Delayed::Worker logger' do + expect(Delayed::Worker).to receive(:logger=).with(logger) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails root is defined' do + let(:rails_root) { Pathname.new '/rails/root' } + let(:rails) { double('Rails', :root => rails_root) } + + before do + stub_const('Rails', rails) + end + + it 'runs the Delayed::Worker process in Rails.root' do + expect(Dir).to receive(:chdir).with(rails_root) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in Rails.root/log' do + expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when Rails root is not defined' do + let(:rails_without_root) { double('Rails') } + + before do + stub_const('Rails', rails_without_root) + end + + it 'runs the Delayed::Worker process in $PWD' do + expect(Dir).to receive(:chdir).with(Pathname.new(Dir.pwd)) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in $PWD/log' do + expect(Logger).to receive(:new).with("#{Pathname.new(Dir.pwd)}/log/delayed_job.log") + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when an error is raised' do + let(:test_error) { Class.new(StandardError) } + + before do + allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) + allow(subject).to receive(:exit_with_error_status) + allow(STDERR).to receive(:puts) + end + + context 'using Delayed::Worker logger' do + before do + expect(logger).to receive(:fatal).with(test_error) + end + + it 'prints the error message to STDERR' do + expect(STDERR).to receive(:puts).with('An error') + subject.send(:run_worker, 'delayed_job.0', {}) + end + + it 'exits with an error status' do + expect(subject).to receive(:exit_with_error_status) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails logger is not defined' do + let(:rails) { double('Rails') } + + before do + stub_const('Rails', rails) + end + + it 'does not attempt to use the Rails logger' do + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + context 'when Rails logger is defined' do + let(:rails_logger) { double('Rails logger') } + let(:rails) { double('Rails', :logger => rails_logger) } + + before do + stub_const('Rails', rails) + allow(Delayed::Worker).to receive(:logger).and_return(nil) + end + + it 'logs the error to the Rails logger' do + expect(rails_logger).to receive(:fatal).with(test_error) + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + end + + describe 'spawning workers' do + context 'no args' do + let(:options) { {} } + let(:exp) do + [['delayed_job', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + + context ':pools arg' do + let(:options) { {:pools => [[[], 1], [['test_queue'], 4], [%w[mailers misc], 2]]} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':queues and :worker_count args' do + let(:options) { {:queues => %w[mailers misc], :worker_count => 4} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':pid_dir and :log_dir args' do + let(:options) { {:pid_dir => './foo/bar', :log_dir => './baz/qux', :worker_count => 2} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ['delayed_job.1', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ] + end + it { verify_worker_processes } + end + + context ':identifier and other args' do + let(:options) { {:monitor => true, :prefix => 'my_prefix', :identifier => 'my_identifier', :worker_count => 2, :args => {:foo => 'bar', :baz => 'qux'}} } + let(:exp) do + [['delayed_job.my_identifier', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + end +end diff --git a/spec/delayed/launcher/forking_spec.rb b/spec/delayed/launcher/forking_spec.rb new file mode 100644 index 000000000..b480c35c2 --- /dev/null +++ b/spec/delayed/launcher/forking_spec.rb @@ -0,0 +1,21 @@ +require_relative '../../helper' +require_relative 'shared_examples' + +describe Delayed::Launcher::Forking do + before do + @workers = [] + allow(Delayed::Worker).to receive(:logger).and_return(Logger.new(nil)) + allow_any_instance_of(described_class).to receive(:run_loop).and_return(nil) + allow_any_instance_of(described_class).to receive(:fork_worker) { |_, *args| @workers << args } + allow_any_instance_of(described_class).to receive(:setup_single_worker) do + @workers << [subject.send(:get_name, subject.send(:process_identifier)), subject.instance_variable_get(:'@options')] + end + end + + def verify_worker_processes + subject.launch + expect(@workers).to eq(exp) + end + + it_behaves_like 'launcher shared examples' +end diff --git a/spec/delayed/launcher/shared_examples.rb b/spec/delayed/launcher/shared_examples.rb new file mode 100644 index 000000000..5fbcdd641 --- /dev/null +++ b/spec/delayed/launcher/shared_examples.rb @@ -0,0 +1,195 @@ +shared_examples_for 'launcher shared examples' do + let(:options) { {} } + subject { described_class.new(options) } + + before do + # stub I/O methods + allow(ObjectSpace).to receive(:each_object) + allow(FileUtils).to receive(:mkdir_p) + allow(Dir).to receive(:chdir) + end + + describe 'run_worker' do + let(:logger) { double('Logger') } + + before do + allow(Delayed::Worker).to receive(:after_fork) + allow_any_instance_of(Delayed::Worker).to receive(:start) + allow(Logger).to receive(:new).and_return(logger) + allow(Delayed::Worker).to receive(:logger=) + allow(Delayed::Worker).to receive(:logger).and_return(nil, logger) + end + + shared_examples_for 'uses log_dir option' do + context 'when log_dir is specified' do + let(:options) { {:log_dir => '/custom/log/dir'} } + + it 'creates the delayed_job.log in the specified directory' do + expect(Logger).to receive(:new).with('/custom/log/dir/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + it 'sets the Delayed::Worker logger' do + expect(Delayed::Worker).to receive(:logger=).with(logger) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails root is defined' do + let(:rails_root) { Pathname.new '/rails/root' } + let(:rails) { double('Rails', :root => rails_root) } + + before do + stub_const('Rails', rails) + end + + it 'runs the Delayed::Worker process in Rails.root' do + expect(Dir).to receive(:chdir).with(rails_root) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in Rails.root/log' do + expect(Logger).to receive(:new).with('/rails/root/log/delayed_job.log') + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when Rails root is not defined' do + let(:rails_without_root) { double('Rails') } + + before do + stub_const('Rails', rails_without_root) + end + + it 'runs the Delayed::Worker process in $PWD' do + expect(Dir).to receive(:chdir).with(Pathname.new(Dir.pwd)) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when --log-dir is not specified' do + it 'creates the delayed_job.log in $PWD/log' do + expect(Logger).to receive(:new).with("#{Pathname.new(Dir.pwd)}/log/delayed_job.log") + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + + include_examples 'uses log_dir option' + end + + context 'when an error is raised' do + let(:test_error) { Class.new(StandardError) } + + before do + allow(Delayed::Worker).to receive(:new).and_raise(test_error.new('An error')) + allow(subject).to receive(:exit_with_error_status) + allow(STDERR).to receive(:puts) + end + + context 'using Delayed::Worker logger' do + before do + expect(logger).to receive(:fatal).with(test_error) + end + + it 'prints the error message to STDERR' do + expect(STDERR).to receive(:puts).with('An error') + subject.send(:run_worker, 'delayed_job.0', {}) + end + + it 'exits with an error status' do + expect(subject).to receive(:exit_with_error_status) + subject.send(:run_worker, 'delayed_job.0', {}) + end + + context 'when Rails logger is not defined' do + let(:rails) { double('Rails') } + + before do + stub_const('Rails', rails) + end + + it 'does not attempt to use the Rails logger' do + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + + context 'when Rails logger is defined' do + let(:rails_logger) { double('Rails logger') } + let(:rails) { double('Rails', :logger => rails_logger) } + + before do + stub_const('Rails', rails) + allow(Delayed::Worker).to receive(:logger).and_return(nil) + end + + it 'logs the error to the Rails logger' do + expect(rails_logger).to receive(:fatal).with(test_error) + subject.send(:run_worker, 'delayed_job.0', {}) + end + end + end + end + + describe 'spawning workers' do + context 'no args' do + let(:options) { {} } + let(:exp) do + [['delayed_job', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + + context ':pools arg' do + let(:options) { {:pools => [[[], 1], [['test_queue'], 4], [%w[mailers misc], 2]]} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => []}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.4', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => ['test_queue']}], + ['delayed_job.5', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.6', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':queues and :worker_count args' do + let(:options) { {:queues => %w[mailers misc], :worker_count => 4} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.1', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.2', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}], + ['delayed_job.3', {:pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[mailers misc]}] + ] + end + it { verify_worker_processes } + end + + context ':pid_dir and :log_dir args' do + let(:options) { {:pid_dir => './foo/bar', :log_dir => './baz/qux', :worker_count => 2} } + let(:exp) do + [ + ['delayed_job.0', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ['delayed_job.1', {:pid_dir => './foo/bar', :log_dir => './baz/qux'}], + ] + end + it { verify_worker_processes } + end + + context ':identifier and other args' do + let(:options) { {:monitor => true, :prefix => 'my_prefix', :identifier => 'my_identifier', :worker_count => 2, :args => {:foo => 'bar', :baz => 'qux'}} } + let(:exp) do + [['delayed_job.my_identifier', {:pid_dir => './tmp/pids', :log_dir => './log'}]] + end + it { verify_worker_processes } + end + end +end diff --git a/spec/delayed/pool_parser_spec.rb b/spec/delayed/pool_parser_spec.rb new file mode 100644 index 000000000..8b20e5a3b --- /dev/null +++ b/spec/delayed/pool_parser_spec.rb @@ -0,0 +1,52 @@ +require 'helper' + +describe Delayed::PoolParser do + subject { described_class.new } + + describe '#add and #pools' do + it 'parsing pools' do + %w[*:1 test_queue:4 mailers,misc:2].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2] + ] + end + + it 'should allow pipe delimiter' do + %w[*:1|test_queue:4 mailers,misc:2|foo,bar:3|baz:4].each do |str| + subject.add(str) + end + + expect(subject.pools).to eq [ + [[], 1], + [['test_queue'], 4], + [%w[mailers misc], 2], + [%w[foo bar], 3], + [['baz'], 4], + ] + end + + it 'should allow * to specify any pools' do + subject.add('*:4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should allow blank to specify any pools' do + subject.add(':4') + expect(subject.pools).to eq [[[], 4]] + end + + it 'should default to one worker if not specified' do + subject.add('mailers') + expect(subject.pools).to eq [[['mailers'], 1]] + end + + it '#add should return self' do + expect(subject.add('mailers:2|*:4')).to eq subject + end + end +end diff --git a/spec/delayed/tasks_spec.rb b/spec/delayed/tasks_spec.rb new file mode 100644 index 000000000..783cf063e --- /dev/null +++ b/spec/delayed/tasks_spec.rb @@ -0,0 +1,265 @@ +require 'helper' +require 'rake' + +describe 'Rake tasks' do + let(:env) { {} } + + before do + stub_const('ENV', env) + Rake.application = Rake::Application.new + Rake.application.rake_require('delayed/tasks', $LOAD_PATH, []) + Rake::Task.define_task(:environment) + end + + describe 'jobs:clear' do + it do + expect(Delayed::Job).to receive(:delete_all) + Rake.application.invoke_task 'jobs:clear' + end + end + + shared_examples_for 'work task' do + let(:expect_success) do + expect(Delayed::Launcher::Forking).to receive(:new).with(default_args.merge(exp_args)).and_call_original + expect_any_instance_of(Delayed::Launcher::Forking).to receive(:run) + end + + context 'default case' do + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY=-2' do + let(:env) { {'MIN_PRIORITY' => '-2'} } + let(:exp_args) { {:min_priority => -2, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY not a number' do + let(:env) { {'MIN_PRIORITY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'MAX_PRIORITY=-5' do + let(:env) { {'MAX_PRIORITY' => '-5'} } + let(:exp_args) { {:max_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MAX_PRIORITY not a number' do + let(:env) { {'MAX_PRIORITY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'NUM_WORKERS=5' do + let(:env) { {'NUM_WORKERS' => '5'} } + let(:exp_args) { {:quiet => false, :worker_count => 5} } + it do + expect_success + run_task + end + end + + context 'NUM_WORKERS not a number' do + let(:env) { {'NUM_WORKERS' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'SLEEP_DELAY=5' do + let(:env) { {'SLEEP_DELAY' => '5'} } + let(:exp_args) { {:quiet => false, :sleep_delay => 5, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'SLEEP_DELAY not a number' do + let(:env) { {'SLEEP_DELAY' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'READ_AHEAD=5' do + let(:env) { {'READ_AHEAD' => '5'} } + let(:exp_args) { {:quiet => false, :read_ahead => 5, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'READ_AHEAD not a number' do + let(:env) { {'READ_AHEAD' => 'foo'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'QUIET=foo' do + let(:env) { {'QUIET' => 'foo'} } + let(:exp_args) { {:quiet => true, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=0' do + let(:env) { {'QUIET' => '0'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=f' do + let(:env) { {'QUIET' => 'f'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUIET=FaLsE' do + let(:env) { {'QUIET' => 'FaLsE'} } + let(:exp_args) { {:quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUE=mailers' do + let(:env) { {'QUEUE' => 'mailers'} } + let(:exp_args) { {:queues => %w[mailers], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUE=mailers,tweets,payments' do + let(:env) { {'QUEUE' => 'mailers,tweets,payments'} } + let(:exp_args) { {:queues => %w[mailers tweets payments], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUES=mailers' do + let(:env) { {'QUEUES' => 'mailers'} } + let(:exp_args) { {:queues => %w[mailers], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'QUEUES=mailers,tweets,payments' do + let(:env) { {'QUEUES' => 'mailers,tweets,payments'} } + let(:exp_args) { {:queues => %w[mailers tweets payments], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOL=*:1' do + let(:env) { {'POOL' => '*:1'} } + let(:exp_args) { {:pools => [[[], 1]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOL=*:1|test_queue:4|mailers,misc:2' do + let(:env) { {'POOL' => '*:1|test_queue:4|mailers,misc:2'} } + let(:exp_args) { {:pools => [[[], 1], [%w[test_queue], 4], [%w[mailers misc], 2]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOLS=*:1' do + let(:env) { {'POOL' => '*:1'} } + let(:exp_args) { {:pools => [[[], 1]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'POOLS=*:1|test_queue:4|mailers,misc:2' do + let(:env) { {'POOL' => '*:1|test_queue:4|mailers,misc:2'} } + let(:exp_args) { {:pools => [[[], 1], [%w[test_queue], 4], [%w[mailers misc], 2]], :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'NUM_WORKERS=0' do + let(:env) { {'NUM_WORKERS' => '0'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'MIN_PRIORITY less than MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '-5', 'MAX_PRIORITY' => '0'} } + let(:exp_args) { {:max_priority => 0, :min_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY equal to MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '-5', 'MAX_PRIORITY' => '-5'} } + let(:exp_args) { {:max_priority => -5, :min_priority => -5, :quiet => false, :worker_count => 1} } + it do + expect_success + run_task + end + end + + context 'MIN_PRIORITY greater than MAX_PRIORITY' do + let(:env) { {'MIN_PRIORITY' => '5', 'MAX_PRIORITY' => '0'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'NUM_WORKERS and POOLS' do + let(:env) { {'NUM_WORKERS' => '5', 'POOLS' => 'mailers:2'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + + context 'QUEUES and POOLS' do + let(:env) { {'QUEUES' => 'mailers', 'POOLS' => 'mailers:2'} } + it { expect { run_task }.to raise_error(ArgumentError) } + end + end + + describe 'jobs:work task' do + let(:run_task) { Rake.application.invoke_task 'jobs:work' } + let(:default_args) { {} } + + it_behaves_like 'work task' + end + + describe 'jobs:workoff' do + let(:run_task) { Rake.application.invoke_task 'jobs:workoff' } + let(:default_args) { {:exit_on_complete => true} } + + it_behaves_like 'work task' + end +end