diff --git a/Gemfile b/Gemfile index 536a22ed..d88ba639 100644 --- a/Gemfile +++ b/Gemfile @@ -83,6 +83,7 @@ group :test do if ENV['RAILS_VERSION'].nil? || ENV['RAILS_VERSION'] >= '6.0.0' gem 'zeitwerk', :require => false end + gem 'concurrent-ruby' end group :rubocop do diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb index 56626dab..6ff6d2ef 100644 --- a/lib/active_job/queue_adapters/delayed_job_adapter.rb +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -1,5 +1,3 @@ -require "byebug" - module ActiveJob module QueueAdapters # Explicitly remove the implementation existing in older rails'. diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb index 68cc4f24..757465e3 100644 --- a/spec/active_job_adapter_spec.rb +++ b/spec/active_job_adapter_spec.rb @@ -1,20 +1,22 @@ require 'helper' require 'active_job' -require "byebug" +require 'concurrent' describe 'a Rails active job backend' do module JobBuffer + @values = Concurrent::Array.new + class << self def clear - values.clear + @values.clear end def add(value) - values << value + @values << value end def values - @values ||= [] + @values.dup end end end @@ -27,21 +29,41 @@ def perform(message) end end - let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w(integration_tests)) } + let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w[integration_tests]) } before do + JobBuffer.clear + Delayed::Job.delete_all ActiveJob::Base.queue_adapter = :delayed_job + ActiveJob::Base.logger = nil end - after do - JobBuffer.clear + it "should supply a wrapped class name to DelayedJob" do + TestJob.perform_later + job = Delayed::Job.all.last + expect(job.name).to match(/TestJob \[[0-9a-f-]+\] from DelayedJob\(integration_tests\) with arguments: \[\]/) end it 'enqueus and executes the job' do start_worker do - TestJob.perform_later('hello') + TestJob.perform_later('Rails') sleep 2 - expect(JobBuffer.values).to eq(['hello']) + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should not run jobs queued on a non-listening queue" do + start_worker do + old_queue = TestJob.queue_name + + begin + TestJob.queue_as :some_other_queue + TestJob.perform_later "Rails" + sleep 2 + expect(JobBuffer.values.empty?).to eq true + ensure + TestJob.queue_name = old_queue + end end end @@ -55,7 +77,7 @@ def perform(message) it 'should not run job enqueued in the future' do start_worker do - TestJob.set(wait: 5.seconds).perform_later('hello') + TestJob.set(wait: 5.seconds).perform_later('Rails') sleep 2 expect(JobBuffer.values.empty?).to eq true end @@ -63,9 +85,28 @@ def perform(message) it 'should run job enqueued in the future at the specified time' do start_worker do - TestJob.set(wait: 5.seconds).perform_later('hello') + TestJob.set(wait: 5.seconds).perform_later('Rails') + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job bulk enqueued in the future at the specified time" do + start_worker do + ActiveJob.perform_all_later([TestJob.new("Rails").set(wait: 5.seconds)]) + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job with higher priority first" do + start_worker do + wait_until = Time.now + 3.seconds + TestJob.set(wait_until: wait_until, priority: 20).perform_later "1" + TestJob.set(wait_until: wait_until, priority: 10).perform_later "2" sleep 10 - expect(JobBuffer.values).to eq(['hello']) + + expect(JobBuffer.values).to eq(['2', '1']) end end