From db81365f9a2984e1177f37d5994f0a1660e6f99d Mon Sep 17 00:00:00 2001 From: Sander Verdonschot Date: Thu, 11 Jan 2024 07:21:00 -0500 Subject: [PATCH] Infer interruption adapter from queue adapter --- .rubocop.yml | 3 - CHANGELOG.md | 1 + lib/job-iteration.rb | 47 +++++---------- lib/job-iteration/integrations/resque.rb | 24 -------- lib/job-iteration/integrations/sidekiq.rb | 25 -------- lib/job-iteration/interruption_adapters.rb | 52 ++++++++++++++++ .../interruption_adapters/null_adapter.rb | 14 +++++ .../interruption_adapters/resque_adapter.rb | 38 ++++++++++++ .../interruption_adapters/sidekiq_adapter.rb | 30 ++++++++++ lib/job-iteration/iteration.rb | 6 +- test/integration/integration_behaviour.rb | 4 +- test/integration/integrations_test.rb | 55 ----------------- .../integration/interruption_adapters_test.rb | 59 +++++++++++++++++++ test/support/resque/Rakefile | 1 - test/support/sidekiq/init.rb | 1 - test/test_helper.rb | 2 - 16 files changed, 216 insertions(+), 146 deletions(-) delete mode 100644 lib/job-iteration/integrations/resque.rb delete mode 100644 lib/job-iteration/integrations/sidekiq.rb create mode 100644 lib/job-iteration/interruption_adapters.rb create mode 100644 lib/job-iteration/interruption_adapters/null_adapter.rb create mode 100644 lib/job-iteration/interruption_adapters/resque_adapter.rb create mode 100644 lib/job-iteration/interruption_adapters/sidekiq_adapter.rb delete mode 100644 test/integration/integrations_test.rb create mode 100644 test/integration/interruption_adapters_test.rb diff --git a/.rubocop.yml b/.rubocop.yml index 557b9b51..70930314 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -12,9 +12,6 @@ AllCops: Lint/SuppressedException: Exclude: - lib/job-iteration.rb -Style/GlobalVars: - Exclude: - - lib/job-iteration/integrations/resque.rb Naming/FileName: Exclude: - lib/job-iteration.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 404d8dae..fc89df68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [437](https://github.com/Shopify/job-iteration/pull/437) - Use minimum between per-class `job_iteration_max_job_runtime` and `JobIteration.max_job_runtime`, instead of enforcing only setting decreasing values. Because it is possible to change the global or parent values after setting the value on a class, it is not possible to truly enforce the decreasing value constraint. Instead, we now use the minimum between the global value and per-class value. This is considered a non-breaking change, as it should not break any **existing** code, it only removes the constraint on new classes. - [443](https://github.com/Shopify/job-iteration/pull/443) - Use Sidekiq `:quit` callback to detect graceful shutdown. This makes job-iteration compatible with Sidekiq run in embedded mode. +- [450](https://github.com/Shopify/job-iteration/pull/450) - Infer which interruption adapter to use from the queue adapter of the job. This deprecates setting `JobIteration.interruption_adapter = `, in favor of `JobIteration.register_interruption_adapter(, )`. `JobIteration.interruption_adapter` will be removed in a future release. ### Bug fixes diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 8e9d78be..8e8f3dee 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -3,15 +3,12 @@ require "active_job" require_relative "./job-iteration/version" require_relative "./job-iteration/enumerator_builder" +require_relative "./job-iteration/interruption_adapters" require_relative "./job-iteration/iteration" require_relative "./job-iteration/log_subscriber" require_relative "./job-iteration/railtie" module JobIteration - IntegrationLoadError = Class.new(StandardError) - - INTEGRATIONS = [:resque, :sidekiq] - Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") extend self @@ -60,10 +57,21 @@ def logger # where the throttle backoff value will take precedence over this setting. attr_accessor :default_retry_backoff - # Used internally for hooking into job processing frameworks like Sidekiq and Resque. - attr_accessor :interruption_adapter + attr_reader :interruption_adapter - self.interruption_adapter = -> { false } + # Overrides interruption checks based on queue adapter. + # @deprecated - Use JobIteration::InterruptionAdapters.register(:foo, callable) instead. + def interruption_adapter=(adapter) + Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated. "\ + "Use JobIteration::InterruptionAdapters.register(:foo, callable) instead "\ + "to register the callable (a proc, method, or other object responding to #call) "\ + "as the interruption adapter for queue adapter :foo.") + @interruption_adapter = adapter + end + + def register_interruption_adapter(adapter_name, adapter) + InterruptionAdapters.register(adapter_name, adapter) + end # Set if you want to use your own enumerator builder instead of default EnumeratorBuilder. # @example @@ -76,29 +84,4 @@ def logger attr_accessor :enumerator_builder self.enumerator_builder = JobIteration::EnumeratorBuilder - - def load_integrations - loaded = nil - INTEGRATIONS.each do |integration| - load_integration(integration) - if loaded - raise IntegrationLoadError, - "#{loaded} integration has already been loaded, but #{integration} is also available. " \ - "Iteration will only work with one integration." - end - loaded = integration - rescue LoadError - end - end - - def load_integration(integration) - unless INTEGRATIONS.include?(integration) - raise IntegrationLoadError, - "#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}" - end - - require_relative "./job-iteration/integrations/#{integration}" - end end - -JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"] diff --git a/lib/job-iteration/integrations/resque.rb b/lib/job-iteration/integrations/resque.rb deleted file mode 100644 index 23bf41f6..00000000 --- a/lib/job-iteration/integrations/resque.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -require "resque" - -module JobIteration - module Integrations - module ResqueIterationExtension # @private - def initialize(*) # @private - $resque_worker = self - super - end - end - - # @private - module ::Resque - class Worker - # The patch is required in order to call shutdown? on a Resque::Worker instance - prepend(ResqueIterationExtension) - end - end - - JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) } - end -end diff --git a/lib/job-iteration/integrations/sidekiq.rb b/lib/job-iteration/integrations/sidekiq.rb deleted file mode 100644 index ea37df68..00000000 --- a/lib/job-iteration/integrations/sidekiq.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -require "sidekiq" - -module JobIteration - module Integrations # @private - module Sidekiq - class << self - attr_accessor :stopping - - def call - stopping - end - end - end - - JobIteration.interruption_adapter = JobIteration::Integrations::Sidekiq - - ::Sidekiq.configure_server do |config| - config.on(:quiet) do - JobIteration::Integrations::Sidekiq.stopping = true - end - end - end -end diff --git a/lib/job-iteration/interruption_adapters.rb b/lib/job-iteration/interruption_adapters.rb new file mode 100644 index 00000000..1cd8f10b --- /dev/null +++ b/lib/job-iteration/interruption_adapters.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +require_relative "interruption_adapters/null_adapter" + +module JobIteration + module InterruptionAdapters + BUNDLED_ADAPTERS = [:resque, :sidekiq].freeze # @api private + + class << self + # Returns adapter for specified name. + # + # JobIteration::InterruptionAdapters.lookup(:sidekiq) + # # => JobIteration::InterruptionAdapters::SidekiqAdapter + def lookup(name) + registry.fetch(name.to_sym) do + Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1)) + No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts. + Use `JobIteration::InterruptionAdapters.register(#{name.to_sym.inspect}, ) to register one. + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE + + NullAdapter + end + end + + # Registers adapter for specified name. + # + # JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter) + def register(name, adapter) + raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call) + + registry[name.to_sym] = adapter + end + + private + + attr_reader :registry + end + + @registry = {} + + # Built-in Rails adapters. It doesn't make sense to interrupt for these. + register(:async, NullAdapter) + register(:inline, NullAdapter) + register(:test, NullAdapter) + + # External adapters + BUNDLED_ADAPTERS.each do |name| + require_relative "interruption_adapters/#{name}_adapter" + end + end +end diff --git a/lib/job-iteration/interruption_adapters/null_adapter.rb b/lib/job-iteration/interruption_adapters/null_adapter.rb new file mode 100644 index 00000000..852615bc --- /dev/null +++ b/lib/job-iteration/interruption_adapters/null_adapter.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# This adapter never interrupts. +module JobIteration + module InterruptionAdapters + module NullAdapter + class << self + def call + false + end + end + end + end +end diff --git a/lib/job-iteration/interruption_adapters/resque_adapter.rb b/lib/job-iteration/interruption_adapters/resque_adapter.rb new file mode 100644 index 00000000..661494c1 --- /dev/null +++ b/lib/job-iteration/interruption_adapters/resque_adapter.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +begin + require "resque" +rescue LoadError + # Resque is not available, no need to load the adapter + return +end + +module JobIteration + module InterruptionAdapters + module ResqueAdapter + # @private + module IterationExtension + def initialize(*) + $resque_worker = self # rubocop:disable Style/GlobalVars + super + end + end + + # @private + module ::Resque + class Worker + # The patch is required in order to call shutdown? on a Resque::Worker instance + prepend(IterationExtension) + end + end + + class << self + def call + $resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars + end + end + end + + register(:resque, ResqueAdapter) + end +end diff --git a/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb new file mode 100644 index 00000000..056efd58 --- /dev/null +++ b/lib/job-iteration/interruption_adapters/sidekiq_adapter.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +begin + require "sidekiq" +rescue LoadError + # Sidekiq is not available, no need to load the adapter + return +end + +module JobIteration + module InterruptionAdapters + module SidekiqAdapter + class << self + attr_accessor :stopping + + def call + stopping + end + end + + ::Sidekiq.configure_server do |config| + config.on(:quiet) do + SidekiqAdapter.stopping = true + end + end + end + + register(:sidekiq, SidekiqAdapter) + end +end diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index 1c734ad4..49a92db9 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -120,6 +120,10 @@ def retry_job(*, **) private + def interruption_adapter # @private + JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(self.class.queue_adapter_name) + end + def enumerator_builder JobIteration.enumerator_builder.new(self) end @@ -280,7 +284,7 @@ def job_should_exit? max_job_runtime = job_iteration_max_job_runtime return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime - JobIteration.interruption_adapter.call || (defined?(super) && super) + interruption_adapter.call || (defined?(super) && super) end def job_iteration_max_job_runtime diff --git a/test/integration/integration_behaviour.rb b/test/integration/integration_behaviour.rb index 1fca001d..6dcb92e5 100644 --- a/test/integration/integration_behaviour.rb +++ b/test/integration/integration_behaviour.rb @@ -55,8 +55,8 @@ module IntegrationBehaviour private # Should return the symbol to use when configuring the adapter - # ActiveJob::Base.queue_adapter = adapter - def adapter + # ActiveJob::Base.queue_adapter = queue_adapter + def queue_adapter raise NotImplemented, "#{self.class.name} must implement #{__method__}" end diff --git a/test/integration/integrations_test.rb b/test/integration/integrations_test.rb deleted file mode 100644 index 21217b15..00000000 --- a/test/integration/integrations_test.rb +++ /dev/null @@ -1,55 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" -require "open3" - -class IntegrationsTest < ActiveSupport::TestCase - test "will prevent loading two integrations" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - ruby = <<~RUBY - require 'bundler/setup' - require 'job-iteration' - RUBY - _stdout, stderr, status = run_ruby(ruby) - - assert_equal false, status.success? - assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr) - end - end - - test "successfully loads one (resque) integration" do - with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do - ruby = <<~RUBY - require 'bundler/setup' - # Remove sidekiq, only resque will be left - $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } - require 'job-iteration' - RUBY - _stdout, _stderr, status = run_ruby(ruby) - - assert_equal true, status.success? - end - end - - private - - def run_ruby(body) - stdout, stderr, status = nil - Tempfile.open do |f| - f.write(body) - f.close - - command = "ruby #{f.path}" - stdout, stderr, status = Open3.capture3(command) - end - [stdout, stderr, status] - end - - def with_env(variable, value) - original = ENV[variable] - ENV[variable] = value - yield - ensure - ENV[variable] = original - end -end diff --git a/test/integration/interruption_adapters_test.rb b/test/integration/interruption_adapters_test.rb new file mode 100644 index 00000000..5445e79d --- /dev/null +++ b/test/integration/interruption_adapters_test.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require "test_helper" +require "open3" + +class InterruptionAdaptersTest < ActiveSupport::TestCase + test "successfully loads one (resque) interruption adapter" do + ruby = <<~RUBY + require 'bundler/setup' + # Remove sidekiq, only resque will be left + $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } + require 'job-iteration' + JobIteration::InterruptionAdapters.lookup(:resque) + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + refute_match(/No interruption adapter is registered for :resque/, stderr) + end + + test "does not load interruption adapter if queue adapter is not available" do + ruby = <<~RUBY + require 'bundler/setup' + # Remove sidekiq, only resque will be left + $LOAD_PATH.delete_if { |p| p =~ /sidekiq/ } + require 'job-iteration' + JobIteration::InterruptionAdapters.lookup(:sidekiq) + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + assert_match(/No interruption adapter is registered for :sidekiq/, stderr) + end + + test "loads all available interruption adapters" do + ruby = <<~RUBY + require 'bundler/setup' + require 'job-iteration' + JobIteration::InterruptionAdapters::BUNDLED_ADAPTERS.each do |name| + JobIteration::InterruptionAdapters.lookup(name) + end + RUBY + _stdout, stderr, status = run_ruby(ruby) + + assert_predicate(status, :success?) + refute_match(/No interruption adapter is registered for/, stderr) + end + + private + + def run_ruby(body) + Tempfile.open do |f| + f.write(body) + f.close + + Open3.capture3("ruby", f.path) + end + end +end diff --git a/test/support/resque/Rakefile b/test/support/resque/Rakefile index 277779c7..e307beeb 100644 --- a/test/support/resque/Rakefile +++ b/test/support/resque/Rakefile @@ -5,7 +5,6 @@ require "resque/tasks" require "job-iteration" -require "job-iteration/integrations/resque" require "active_job" require "i18n" diff --git a/test/support/sidekiq/init.rb b/test/support/sidekiq/init.rb index fa2d4d35..a1ed9824 100644 --- a/test/support/sidekiq/init.rb +++ b/test/support/sidekiq/init.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require "job-iteration" -require "job-iteration/integrations/sidekiq" require "active_job" require "i18n" diff --git a/test/test_helper.rb b/test/test_helper.rb index f05bf268..d9733382 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -3,8 +3,6 @@ $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__)) require "minitest/autorun" -ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true" - require "job-iteration" require "job-iteration/test_helper"