Skip to content

Commit

Permalink
Infer interruption adapter from queue adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Mangara committed Jan 26, 2024
1 parent 9b1baed commit a37aabd
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 146 deletions.
3 changes: 0 additions & 3 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ AllCops:
Lint/SuppressedException:
Exclude:
- lib/job-iteration.rb
Style/GlobalVars:
Exclude:
- lib/job-iteration/integrations/resque.rb
Naming/FileName:
Exclude:
- lib/job-iteration.rb
Expand Down
47 changes: 15 additions & 32 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/interruption_adapters"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"
require_relative "./job-iteration/railtie"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")

extend self
Expand Down Expand Up @@ -60,10 +57,21 @@ def logger
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter
# Overrides interruption checks based on queue adapter.
# @deprecated - Use JobIteration::InterruptionAdapters.register(:foo, callable) instead.
attr_reader :interruption_adapter

self.interruption_adapter = -> { false }
def interruption_adapter=(adapter)
Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated."\
" Use JobIteration::InterruptionAdapters.register(:foo, callable) instead"\
" to register the callable (a proc, method, or other object responding to #call)"\
" as the interruption adapter for queue adapter :foo.")
@interruption_adapter = adapter
end

def register_interruption_adapter(adapter_name, adapter)
InterruptionAdapters.register(adapter_name, adapter)
end

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
Expand All @@ -76,29 +84,4 @@ def logger
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
24 changes: 0 additions & 24 deletions lib/job-iteration/integrations/resque.rb

This file was deleted.

25 changes: 0 additions & 25 deletions lib/job-iteration/integrations/sidekiq.rb

This file was deleted.

49 changes: 49 additions & 0 deletions lib/job-iteration/interruption_adapters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

require_relative "interruption_adapters/null_adapter"

module JobIteration
module InterruptionAdapters
class << self
# Returns adapter for specified name.
#
# JobIteration::InterruptionAdapters.lookup(:sidekiq)
# # => JobIteration::InterruptionAdapters::SidekiqAdapter
def lookup(name)
registry.fetch(name.to_sym) do
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1))
No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts.
Use `JobIteration::InterruptionAdapters.register(#{name.to_sym.inspect}, <adapter>) to register one.
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
DEPRECATION_MESSAGE

NullAdapter
end
end

# Registers adapter for specified name.
#
# JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter)
def register(name, adapter)
raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call)

registry[name.to_sym] = adapter
end

private

attr_reader :registry
end

@registry = {}

# Built-in Rails adapters. It doesn't make sense to interrupt for these.
register(:async, NullAdapter)
register(:inline, NullAdapter)
register(:test, NullAdapter)

# External adapters
require_relative "interruption_adapters/resque_adapter"
require_relative "interruption_adapters/sidekiq_adapter"
end
end
14 changes: 14 additions & 0 deletions lib/job-iteration/interruption_adapters/null_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# This adapter never interrupts.
module JobIteration
module InterruptionAdapters
module NullAdapter
class << self
def call
false
end
end
end
end
end
38 changes: 38 additions & 0 deletions lib/job-iteration/interruption_adapters/resque_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

begin
require "resque"
rescue LoadError
# Resque is not available, no need to load the adapter
return
end

module JobIteration
module InterruptionAdapters
module ResqueAdapter
# @private
module IterationExtension
def initialize(*)
$resque_worker = self # rubocop:disable Style/GlobalVars
super
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
end

class << self
def call
$resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars
end
end
end

InterruptionAdapters.register(:resque, ResqueAdapter)
end
end
30 changes: 30 additions & 0 deletions lib/job-iteration/interruption_adapters/sidekiq_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

begin
require "sidekiq"
rescue LoadError
# Sidekiq is not available, no need to load the adapter
return
end

module JobIteration
module InterruptionAdapters
module SidekiqAdapter
class << self
attr_accessor :stopping

def call
stopping
end
end

::Sidekiq.configure_server do |config|
config.on(:quiet) do
SidekiqAdapter.stopping = true
end
end

InterruptionAdapters.register(:sidekiq, SidekiqAdapter)
end
end
end
10 changes: 9 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def around_iterate(&blk)
set_callback(:iterate, :around, &blk)
end

def interruption_adapter
JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(queue_adapter_name)
end

private

def ban_perform_definition
Expand All @@ -89,6 +93,7 @@ def initialize(*arguments)
self.times_interrupted = 0
self.total_time = 0.0
assert_implements_methods!
@interruption_adapter = self.class.interruption_adapter
end
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

Expand Down Expand Up @@ -120,6 +125,9 @@ def retry_job(*, **)

private

# @api private
attr_reader :interruption_adapter

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -280,7 +288,7 @@ def job_should_exit?
max_job_runtime = job_iteration_max_job_runtime
return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def job_iteration_max_job_runtime
Expand Down
4 changes: 2 additions & 2 deletions test/integration/integration_behaviour.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ module IntegrationBehaviour
private

# Should return the symbol to use when configuring the adapter
# ActiveJob::Base.queue_adapter = adapter
def adapter
# ActiveJob::Base.queue_adapter = queue_adapter
def queue_adapter
raise NotImplemented, "#{self.class.name} must implement #{__method__}"
end

Expand Down
55 changes: 0 additions & 55 deletions test/integration/integrations_test.rb

This file was deleted.

Loading

0 comments on commit a37aabd

Please sign in to comment.