Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Mangara committed Jan 24, 2024
1 parent 8fc8504 commit ffe6d9b
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 31 deletions.
43 changes: 15 additions & 28 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/interruption_adapters"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"
require_relative "./job-iteration/railtie"
Expand Down Expand Up @@ -60,10 +61,21 @@ def logger
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter
# DEPRECATED - Overrides interruption checks based on queue adapter.
# To customize the interruption checks, use register_interruption_adapter(:my_queue_adapter_name) instead.
attr_reader :interruption_adapter

self.interruption_adapter = -> { false }
def interruption_adapter=(adapter)
Deprecation.warn("Setting JobIteration.interruption_adapter is deprecated."\
" Use JobIteration.register_interruption_adapter(:foo, callable) instead"\
" to register the callable (a proc, method, or other object responding to #call)"\
" as the interruption adapter for queue adapter :foo.")
@interruption_adapter = adapter
end

def register_interruption_adapter(adapter_name, adapter)
InterruptionAdapters.register(adapter_name, adapter)
end

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
Expand All @@ -76,29 +88,4 @@ def logger
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
62 changes: 62 additions & 0 deletions lib/job-iteration/interruption_adapters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

require_relative "interruption_adapters/null_adapter"

module JobIteration
module InterruptionAdapters
class << self
# Returns adapter for specified name.
#
# JobIteration::InterruptionAdapters.lookup(:sidekiq)
# # => JobIteration::InterruptionAdapters::SidekiqAdapter
def lookup(name)
registry.fetch(name.to_sym) do
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(1))
No interruption adapter is registered for #{name.inspect}; falling back to `NullAdapter`, which never interrupts.
Use `JobIteration.register_queue_adapter(#{name.to_sym.inspect}, <adapter>) to register one.
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
DEPRECATION_MESSAGE

NullAdapter
end
end

# Registers adapter for specified name.
#
# JobIteration::InterruptionAdapters.register(:sidekiq, MyCustomSidekiqAdapter)
def register(name, adapter)
raise ArgumentError, "adapter must be callable" unless adapter.respond_to?(:call)

registry[name.to_sym] = adapter
end

private

attr_reader :registry
end

@registry = {}

# Built-in Rails adapters. It doesn't make sense to interrupt for these.
register(:async, NullAdapter)
register(:inline, NullAdapter)
register(:test, NullAdapter)

# External adapters
begin
require "resque"
require_relative "interruption_adapters/resque_adapter"
register(:resque, ResqueAdapter)
rescue LoadError
# Resque is not available, no need to load the adapter
end

begin
require "sidekiq"
require_relative "interruption_adapters/sidekiq_adapter"
register(:sidekiq, SidekiqAdapter)
rescue LoadError
# Sidekiq is not available, no need to load the adapter
end
end
end
14 changes: 14 additions & 0 deletions lib/job-iteration/interruption_adapters/null_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# This adapter never interrupts.
module JobIteration
module InterruptionAdapters
module NullAdapter
class << self
def call
false
end
end
end
end
end
29 changes: 29 additions & 0 deletions lib/job-iteration/interruption_adapters/resque_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module JobIteration
module InterruptionAdapters
module ResqueAdapter
# @private
module IterationExtension
def initialize(*)
$resque_worker = self # rubocop:disable Style/GlobalVars
super
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
end

class << self
def call
$resque_worker.try!(:shutdown?) # rubocop:disable Style/GlobalVars
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/job-iteration/interruption_adapters/sidekiq_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module JobIteration
module InterruptionAdapters
module SidekiqAdapter
class << self
def call
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
end
end
end
end
end
end
10 changes: 9 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def around_iterate(&blk)
set_callback(:iterate, :around, &blk)
end

def interruption_adapter
JobIteration.interruption_adapter || JobIteration::InterruptionAdapters.lookup(queue_adapter_name)
end

private

def ban_perform_definition
Expand All @@ -89,6 +93,7 @@ def initialize(*arguments)
self.times_interrupted = 0
self.total_time = 0.0
assert_implements_methods!
@interruption_adapter = self.class.interruption_adapter
end
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

Expand Down Expand Up @@ -120,6 +125,9 @@ def retry_job(*, **)

private

# @api private
attr_reader :interruption_adapter

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -280,7 +288,7 @@ def job_should_exit?
max_job_runtime = job_iteration_max_job_runtime
return true if max_job_runtime && start_time && (Time.now.utc - start_time) > max_job_runtime

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def job_iteration_max_job_runtime
Expand Down
2 changes: 0 additions & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down

0 comments on commit ffe6d9b

Please sign in to comment.