diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 8e9d78be..d4edaccb 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -3,6 +3,7 @@ require "active_job" require_relative "./job-iteration/version" require_relative "./job-iteration/enumerator_builder" +require_relative "./job-iteration/interruption_adapters" require_relative "./job-iteration/iteration" require_relative "./job-iteration/log_subscriber" require_relative "./job-iteration/railtie" @@ -60,10 +61,21 @@ def logger # where the throttle backoff value will take precedence over this setting. attr_accessor :default_retry_backoff - # Used internally for hooking into job processing frameworks like Sidekiq and Resque. - attr_accessor :interruption_adapter + # DEPRECATED - Overrides interruption checks based on queue adapter. + # To customize the interruption checks, use register_interruption_adapter(:my_queue_adapter_name) instead. + attr_reader :interruption_adapter - self.interruption_adapter = -> { false } + def interruption_adapter=(adapter) + Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated."\ + " Use JobIteration.register_interruption_adapter(:foo, callable) instead"\ + " to register the callable (a proc, method, or other object responding to #call)"\ + " as the interruption adapter for queue adapter :foo.") + @interruption_adapter = adapter + end + + def register_interruption_adapter(adapter_name, adapter) + InterruptionAdapters.register(adapter_name, adapter) + end # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. # @example @@ -76,29 +88,4 @@ def logger attr_accessor :enumerator_builder self.enumerator_builder = JobIteration::EnumeratorBuilder - - def load_integrations - loaded = nil - INTEGRATIONS.each do |integration| - load_integration(integration) - if loaded - raise IntegrationLoadError, - "#{loaded} integration has already been loaded, but #{integration} is also available. " \ - "Iteration will only work with one integration." - end - loaded = integration - rescue LoadError - end - end - - def load_integration(integration) - unless INTEGRATIONS.include?(integration) - raise IntegrationLoadError, - "#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}" - end - - require_relative "./job-iteration/integrations/#{integration}" - end end - -JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"] diff --git a/lib/job-iteration/interruption_adapters.rb b/lib/job-iteration/interruption_adapters.rb new file mode 100644 index 00000000..657d6cf8 --- /dev/null +++ b/lib/job-iteration/interruption_adapters.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require_relative "interruption_adapters/null_adapter" + +module JobIteration + module InterruptionAdapters + class << self + # Returns adapter for specified name. + # + # JobIteration::InterruptionAdapters.lookup(:sidekiq) + # # => JobIteration::InterruptionAdapters::SidekiqAdapter + def lookup(name) + registry.fetch(name.to_sym) do + Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1)) + No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts. + Use `JobIteration.register_queue_adapter(#{name.to_sym.inspect}, ) to register one. + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE + + NullAdapter + end + end + + # Registers adapter for specified name. + # + # JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter) + def register(name, adapter) + raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call) + + registry[name.to_sym] = adapter + end + + private + + attr_reader :registry + end + + @registry = {} + + # Built-in Rails adapters. It doesn't make sense to interrupt for these. + register(:async, NullAdapter) + register(:inline, NullAdapter) + register(:test, NullAdapter) + + # External adapters + begin + require "resque" + require_relative "interruption_adapters/resque_adapter" + register(:resque, ResqueAdapter) + rescue LoadError + # Resque is not available, no need to load the adapter + end + + begin + require "sidekiq" + require_relative "interruption_adapters/sidekiq_adapter" + register(:sidekiq, SidekiqAdapter) + rescue LoadError + # Sidekiq is not available, no need to load the adapter + end + end +end diff --git a/lib/job-iteration/interruption_adapters/null_adapter.rb b/lib/job-iteration/interruption_adapters/null_adapter.rb new file mode 100644 index 00000000..852615bc --- /dev/null +++ b/lib/job-iteration/interruption_adapters/null_adapter.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# This adapter never interrupts. +module JobIteration + module InterruptionAdapters + module NullAdapter + class << self + def call + false + end + end + end + end +end diff --git a/lib/job-iteration/interruption_adapters/resque_adapter.rb b/lib/job-iteration/interruption_adapters/resque_adapter.rb new file mode 100644 index 00000000..f7fc274a --- /dev/null +++ b/lib/job-iteration/interruption_adapters/resque_adapter.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module JobIteration + module InterruptionAdapters + module ResqueAdapter + # @private + module IterationExtension + def initialize(*) + $resque_worker = self # rubocop:disable Style/GlobalVars + super + end + end + + # @private + module ::Resque + class Worker + # The patch is required in order to call shutdown? on a Resque::Worker instance + prepend(IterationExtension) + end + end + + class << self + def call + $resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars + end + end + end + end +end diff --git a/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb new file mode 100644 index 00000000..a78eec8e --- /dev/null +++ b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module JobIteration + module InterruptionAdapters + module SidekiqAdapter + class << self + def call + if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance + Sidekiq::CLI.instance.launcher.stopping? + else + false + end + end + end + end + end +end diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index b58eb242..0dccee55 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -70,6 +70,10 @@ def on_complete(*filters, &blk) set_callback(:complete, :after, *filters, &blk) end + def interruption_adapter + JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(queue_adapter_name) + end + private def ban_perform_definition @@ -84,6 +88,7 @@ def initialize(*arguments) self.times_interrupted = 0 self.total_time = 0.0 assert_implements_methods! + @interruption_adapter = self.class.interruption_adapter end ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) @@ -115,6 +120,9 @@ def retry_job(*, **) private + # @api private + attr_reader :interruption_adapter + def enumerator_builder JobIteration.enumerator_builder.new(self) end @@ -273,7 +281,7 @@ def job_should_exit? max_job_runtime = job_iteration_max_job_runtime return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime - JobIteration.interruption_adapter.call || (defined?(super) && super) + interruption_adapter.call || (defined?(super) && super) end def job_iteration_max_job_runtime diff --git a/test/test_helper.rb b/test/test_helper.rb index 8bd67d59..dc133417 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,8 +3,6 @@ $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "minitest/autorun" -ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true" - require "job-iteration" require "job-iteration/test_helper"