diff --git a/.rubocop.yml b/.rubocop.yml index 0a890404..cf4ee76a 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -8,9 +8,6 @@ AllCops: Lint/SuppressedException: Exclude: - lib/job-iteration.rb -Style/GlobalVars: - Exclude: - - lib/job-iteration/integrations/resque.rb Naming/FileName: Exclude: - lib/job-iteration.rb diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 8e9d78be..e553c853 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -3,15 +3,12 @@ require "active_job" require_relative "./job-iteration/version" require_relative "./job-iteration/enumerator_builder" +require_relative "./job-iteration/interruption_adapters" require_relative "./job-iteration/iteration" require_relative "./job-iteration/log_subscriber" require_relative "./job-iteration/railtie" module JobIteration - IntegrationLoadError = Class.new(StandardError) - - INTEGRATIONS = [:resque, :sidekiq] - Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") extend self @@ -60,10 +57,21 @@ def logger # where the throttle backoff value will take precedence over this setting. attr_accessor :default_retry_backoff - # Used internally for hooking into job processing frameworks like Sidekiq and Resque. - attr_accessor :interruption_adapter + # Overrides interruption checks based on queue adapter. + # @deprecated - Use JobIteration::InterruptionAdapters.register(:foo, callable) instead. + attr_reader :interruption_adapter - self.interruption_adapter = -> { false } + def interruption_adapter=(adapter) + Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated."\ + " Use JobIteration::InterruptionAdapters.register(:foo, callable) instead"\ + " to register the callable (a proc, method, or other object responding to #call)"\ + " as the interruption adapter for queue adapter :foo.") + @interruption_adapter = adapter + end + + def register_interruption_adapter(adapter_name, adapter) + InterruptionAdapters.register(adapter_name, adapter) + end # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. # @example @@ -76,29 +84,4 @@ def logger attr_accessor :enumerator_builder self.enumerator_builder = JobIteration::EnumeratorBuilder - - def load_integrations - loaded = nil - INTEGRATIONS.each do |integration| - load_integration(integration) - if loaded - raise IntegrationLoadError, - "#{loaded} integration has already been loaded, but #{integration} is also available. " \ - "Iteration will only work with one integration." - end - loaded = integration - rescue LoadError - end - end - - def load_integration(integration) - unless INTEGRATIONS.include?(integration) - raise IntegrationLoadError, - "#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}" - end - - require_relative "./job-iteration/integrations/#{integration}" - end end - -JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"] diff --git a/lib/job-iteration/integrations/resque.rb b/lib/job-iteration/integrations/resque.rb deleted file mode 100644 index 23bf41f6..00000000 --- a/lib/job-iteration/integrations/resque.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -require "resque" - -module JobIteration - module Integrations - module ResqueIterationExtension # @private - def initialize(*) # @private - $resque_worker = self - super - end - end - - # @private - module ::Resque - class Worker - # The patch is required in order to call shutdown? on a Resque::Worker instance - prepend(ResqueIterationExtension) - end - end - - JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) } - end -end diff --git a/lib/job-iteration/integrations/sidekiq.rb b/lib/job-iteration/integrations/sidekiq.rb deleted file mode 100644 index ea37df68..00000000 --- a/lib/job-iteration/integrations/sidekiq.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -require "sidekiq" - -module JobIteration - module Integrations # @private - module Sidekiq - class << self - attr_accessor :stopping - - def call - stopping - end - end - end - - JobIteration.interruption_adapter = JobIteration::Integrations::Sidekiq - - ::Sidekiq.configure_server do |config| - config.on(:quiet) do - JobIteration::Integrations::Sidekiq.stopping = true - end - end - end -end diff --git a/lib/job-iteration/interruption_adapters.rb b/lib/job-iteration/interruption_adapters.rb new file mode 100644 index 00000000..6b81f00a --- /dev/null +++ b/lib/job-iteration/interruption_adapters.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require_relative "interruption_adapters/null_adapter" + +module JobIteration + module InterruptionAdapters + class << self + # Returns adapter for specified name. + # + # JobIteration::InterruptionAdapters.lookup(:sidekiq) + # # => JobIteration::InterruptionAdapters::SidekiqAdapter + def lookup(name) + registry.fetch(name.to_sym) do + Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1)) + No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts. + Use `JobIteration::InterruptionAdapters.register(#{name.to_sym.inspect}, ) to register one. + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE + + NullAdapter + end + end + + # Registers adapter for specified name. + # + # JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter) + def register(name, adapter) + raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call) + + registry[name.to_sym] = adapter + end + + private + + attr_reader :registry + end + + @registry = {} + + # Built-in Rails adapters. It doesn't make sense to interrupt for these. + register(:async, NullAdapter) + register(:inline, NullAdapter) + register(:test, NullAdapter) + + # External adapters + require_relative "interruption_adapters/resque_adapter" + require_relative "interruption_adapters/sidekiq_adapter" + end +end diff --git a/lib/job-iteration/interruption_adapters/null_adapter.rb b/lib/job-iteration/interruption_adapters/null_adapter.rb new file mode 100644 index 00000000..852615bc --- /dev/null +++ b/lib/job-iteration/interruption_adapters/null_adapter.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# This adapter never interrupts. +module JobIteration + module InterruptionAdapters + module NullAdapter + class << self + def call + false + end + end + end + end +end diff --git a/lib/job-iteration/interruption_adapters/resque_adapter.rb b/lib/job-iteration/interruption_adapters/resque_adapter.rb new file mode 100644 index 00000000..81f98c09 --- /dev/null +++ b/lib/job-iteration/interruption_adapters/resque_adapter.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +begin + require "resque" +rescue LoadError + # Resque is not available, no need to load the adapter + return +end + +module JobIteration + module InterruptionAdapters + module ResqueAdapter + # @private + module IterationExtension + def initialize(*) + $resque_worker = self # rubocop:disable Style/GlobalVars + super + end + end + + # @private + module ::Resque + class Worker + # The patch is required in order to call shutdown? on a Resque::Worker instance + prepend(IterationExtension) + end + end + + class << self + def call + $resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars + end + end + end + + InterruptionAdapters.register(:resque, ResqueAdapter) + end +end diff --git a/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb new file mode 100644 index 00000000..242f7a9a --- /dev/null +++ b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +begin + require "sidekiq" +rescue LoadError + # Sidekiq is not available, no need to load the adapter + return +end + +module JobIteration + module InterruptionAdapters + module SidekiqAdapter + class << self + attr_accessor :stopping + + def call + stopping + end + end + + ::Sidekiq.configure_server do |config| + config.on(:quiet) do + SidekiqAdapter.stopping = true + end + end + + InterruptionAdapters.register(:sidekiq, SidekiqAdapter) + end + end +end diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index 1c734ad4..731fddf3 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -75,6 +75,10 @@ def around_iterate(&blk) set_callback(:iterate, :around, &blk) end + def interruption_adapter + JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(queue_adapter_name) + end + private def ban_perform_definition @@ -89,6 +93,7 @@ def initialize(*arguments) self.times_interrupted = 0 self.total_time = 0.0 assert_implements_methods! + @interruption_adapter = self.class.interruption_adapter end ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) @@ -120,6 +125,9 @@ def retry_job(*, **) private + # @api private + attr_reader :interruption_adapter + def enumerator_builder JobIteration.enumerator_builder.new(self) end @@ -280,7 +288,7 @@ def job_should_exit? max_job_runtime = job_iteration_max_job_runtime return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime - JobIteration.interruption_adapter.call || (defined?(super) && super) + interruption_adapter.call || (defined?(super) && super) end def job_iteration_max_job_runtime diff --git a/test/integration/integration_behaviour.rb b/test/integration/integration_behaviour.rb index 89dd3602..34cd2571 100644 --- a/test/integration/integration_behaviour.rb +++ b/test/integration/integration_behaviour.rb @@ -54,8 +54,8 @@ module IntegrationBehaviour private # Should return the symbol to use when configuring the adapter - # ActiveJob::Base.queue_adapter = adapter - def adapter + # ActiveJob::Base.queue_adapter = queue_adapter + def queue_adapter raise NotImplemented, "#{self.class.name} must implement #{__method__}" end diff --git a/test/integration/integrations_test.rb b/test/integration/integrations_test.rb deleted file mode 100644 index 21217b15..00000000 --- a/test/integration/integrations_test.rb +++ /dev/null @@ -1,55 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" -require "open3" - -class IntegrationsTest < ActiveSupport::TestCase - test "will prevent loading two integrations" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - ruby = <<~RUBY - require 'bundler/setup' - require 'job-iteration' - RUBY - _stdout, stderr, status = run_ruby(ruby) - - assert_equal false, status.success? - assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr) - end - end - - test "successfully loads one (resque) integration" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - ruby = <<~RUBY - require 'bundler/setup' - # Remove sidekiq, only resque will be left - $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } - require 'job-iteration' - RUBY - _stdout, _stderr, status = run_ruby(ruby) - - assert_equal true, status.success? - end - end - - private - - def run_ruby(body) - stdout, stderr, status = nil - Tempfile.open do |f| - f.write(body) - f.close - - command = "ruby #{f.path}" - stdout, stderr, status = Open3.capture3(command) - end - [stdout, stderr, status] - end - - def with_env(variable, value) - original = ENV[variable] - ENV[variable] = value - yield - ensure - ENV[variable] = original - end -end diff --git a/test/integration/interruption_adapters_test.rb b/test/integration/interruption_adapters_test.rb new file mode 100644 index 00000000..452657ff --- /dev/null +++ b/test/integration/interruption_adapters_test.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require "test_helper" +require "open3" + +class InterruptionAdaptersTest < ActiveSupport::TestCase + test "successfully loads one (resque) interruption adapter" do + ruby = <<~RUBY + require 'bundler/setup' + # Remove sidekiq, only resque will be left + $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } + require 'job-iteration' + JobIteration::InterruptionAdapters.lookup(:resque) + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + refute_match(/No interruption adapter is registered for :resque/, stderr) + end + + test "does not load interruption adapter if queue adapter is not available" do + ruby = <<~RUBY + require 'bundler/setup' + # Remove sidekiq, only resque will be left + $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } + require 'job-iteration' + JobIteration::InterruptionAdapters.lookup(:sidekiq) + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + assert_match(/No interruption adapter is registered for :sidekiq/, stderr) + end + + test "loads all available interruption adapters" do + ruby = <<~RUBY + require 'bundler/setup' + require 'job-iteration' + JobIteration::InterruptionAdapters.lookup(:resque) + JobIteration::InterruptionAdapters.lookup(:sidekiq) + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + refute_match(/No interruption adapter is registered for/, stderr) + end + + private + + def run_ruby(body) + stdout, stderr, status = nil + Tempfile.open do |f| + f.write(body) + f.close + + command = "ruby #{f.path}" + stdout, stderr, status = Open3.capture3(command) + end + [stdout, stderr, status] + end +end diff --git a/test/support/resque/Rakefile b/test/support/resque/Rakefile index 277779c7..e307beeb 100644 --- a/test/support/resque/Rakefile +++ b/test/support/resque/Rakefile @@ -5,7 +5,6 @@ require "resque/tasks" require "job-iteration" -require "job-iteration/integrations/resque" require "active_job" require "i18n" diff --git a/test/support/sidekiq/init.rb b/test/support/sidekiq/init.rb index 4f930536..9c327923 100644 --- a/test/support/sidekiq/init.rb +++ b/test/support/sidekiq/init.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require "job-iteration" -require "job-iteration/integrations/sidekiq" require "active_job" require "i18n" diff --git a/test/test_helper.rb b/test/test_helper.rb index 01ef131b..8f137609 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,8 +3,6 @@ $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "minitest/autorun" -ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true" - require "job-iteration" require "job-iteration/test_helper"