From e4356ed0bed0ee9ed249af22cd8aa621c71b629d Mon Sep 17 00:00:00 2001 From: Sam Bostock Date: Fri, 17 Nov 2023 16:23:37 -0500 Subject: [PATCH] Serialize `cursor_position` Previously, `cursor_position` was handed as-is to the queue adapter. This could lead to the queue adapter corrupting cursors of certain classes. For example, if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`, resulting in the deserialized cursor being a `String` instead of a `Time`. To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the `cursor_position` and ensure it will make the round trip safely. However, as this is a breaking change (as unsafe cursors would previously be accepted, but possibly corrupted, whereas they would now be rejected), we begin by rescuing (de)serialization failures and emitting a deprecation warning. Starting in Job Iteration version 2.0, the deprecation warning will be removed, and (de)serialization failure will raise. Application owners can opt-in to the 2.0 behavior either globally by setting JobIteration.enforce_serializable_cursors = true or on an inheritable per-class basis by setting class MyJob < ActiveJob::Base include JobIteration::Iteration self.job_iteration_enforce_serializable_cursors = true # ... end --- .github/workflows/ci.yml | 1 + CHANGELOG.md | 4 + README.md | 2 + guides/custom-enumerator.md | 57 ++++ lib/job-iteration.rb | 20 ++ lib/job-iteration/iteration.rb | 109 +++---- test/integration/integration_behaviour.rb | 9 +- test/support/jobs.rb | 5 +- test/unit/iteration_test.rb | 343 ++++++++++++++++------ 9 files changed, 392 insertions(+), 158 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 171b6abb..dd022ffb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ jobs: ports: - 6379:6379 strategy: + fail-fast: false matrix: ruby: ["2.6", "2.7", "3.0", "3.1", "3.2"] gemfile: [rails_5_2, rails_6_0, rails_6_1, rails_7_0, rails_edge] diff --git a/CHANGELOG.md b/CHANGELOG.md index cfbc6c95..e7a7f229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ - [437](https://github.com/Shopify/job-iteration/pull/437) - Use minimum between per-class `job_iteration_max_job_runtime` and `JobIteration.max_job_runtime`, instead of enforcing only setting decreasing values. Because it is possible to change the global or parent values after setting the value on a class, it is not possible to truly enforce the decreasing value constraint. Instead, we now use the minimum between the global value and per-class value. This is considered a non-breaking change, as it should not break any **existing** code, it only removes the constraint on new classes. +- [81](https://github.com/Shopify/job-iteration/pull/81) - Serialize cursors using `ActiveJob::Arguments` & deprecated unserializable cursors. + Cursor serialization has been dependent on the adapter's serialization method, which typically uses `JSON.dump` and `JSON.load`, meaning only JSON-serializable objects could be used as cursors. Using `ActiveJob::Arguments` to serialize cursors instead allows the use of any object that can be serialized using `ActiveJob::Arguments.serialize` and deserialized using `ActiveJob::Arguments.deserialize`, such as `Time` objects, which would previously be lossily serialized as strings. + This change is backwards compatible, by using a new job argument for the serialized cursor, but continuing to write to the old argument, ensuring that jobs can be processed regardless of if they are enqueued or dequeued with the old or new version of the gem. + In the event that a cursor is not serializable, the gem will fall back to the deprecated old behaviour. In Job Iteration 2.0, this fallback will be removed, and cursors will be required to be serializable, raising otherwise. To opt-in to this behaviour, set `JobIteration.enforce_serializable_cursors = true`. To support gradual migration, a per-class `job_iteration_enforce_serializable_cursors` option is also available, which overrides the global option for that class. ### Bug fixes diff --git a/README.md b/README.md index 18cb49da..570f4443 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,8 @@ class MyJob < ApplicationJob end ``` +See [the guide on Custom Enumerator](guides/custom-enumerator.md) for details. + ## Credits This project would not be possible without these individuals (in alphabetical order): diff --git a/guides/custom-enumerator.md b/guides/custom-enumerator.md index 05cfaeab..0a26fc8e 100644 --- a/guides/custom-enumerator.md +++ b/guides/custom-enumerator.md @@ -104,6 +104,7 @@ LoadRefundsForChargeJob.perform_later(charge_id = "chrg_345") Sometimes you can ignore the cursor. Consider the following custom `Enumerator` that takes items from a Redis list, which is essentially a queue. Even if this job doesn't need to persist a cursor in order to resume, it can still use `Iteration`'s signal handling to finish `each_iteration` and gracefully terminate. +`Iteration`'s signal handling to finish `each_iteration` and gracefully terminate. ```ruby class RedisPopListJob < ActiveJob::Base @@ -134,3 +135,59 @@ forced to exit ie `job_should_exit?` is true, then the job is re-enqueued during the enumerator does not run. You can follow that logic [here](https://github.com/Shopify/job-iteration/blob/v1.3.6/lib/job-iteration/iteration.rb#L161-L165) and [here](https://github.com/Shopify/job-iteration/blob/v1.3.6/lib/job-iteration/iteration.rb#L131-L143) + +### Cursor Serialization + + +Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments). + +For example, consider: + +```ruby +FancyCursor = Struct.new(:wrapped_value) do + def to_s + wrapped_value + end +end +``` + +```ruby +def build_enumerator(cursor:) + Enumerator.new do |yielder| + # ...something with fancy cursor... + yielder.yield 123, FancyCursor.new(:abc) + end +end +``` + +If this job was interrupted, Active Job would be unable to serialize +`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not +serializing the cursor. This would typically result in the queue adapter +eventually serializing the cursor as JSON by calling `.to_s` on it. The cursor +would be deserialized as `:abc`, rather than the intended `FancyCursor.new(:abc)`. + +To avoid this, job authors should take care to ensure that their cursor is +serializable by Active Job. This can be done in a couple ways, such as: +- [adding `GlobalID` support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid) +- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers) +- handling (de)serialization in the job/enumerator itself + ```ruby + def build_enumerator(cursor:) + fancy_cursor = FancyCursor.new(cursor) unless cursor.nil? + Enumerator.new do |yielder| + # ...something with fancy cursor... + yielder.yield 123, FancyCursor(:abc).wrapped_value + end + end + ``` +Note that starting in 2.0, Job Iteration will stop supporting fallback behavior +and raise when it encounters an unserializable cursor. To opt-in to this behavior early, set +```ruby +JobIteration.enforce_serializable_cursors = true +``` +or, to support gradual migration, a per-class option is also available to override the global value, if set: +```ruby +class MyJob < ActiveJob::Base + include JobIteration::Iteration + self.job_iteration_enforce_serializable_cursors = true +``` diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 77e48ec2..42c94d2b 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -11,6 +11,8 @@ module JobIteration INTEGRATIONS = [:resque, :sidekiq] + Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") + extend self attr_writer :logger @@ -57,6 +59,24 @@ def logger # where the throttle backoff value will take precedence over this setting. attr_accessor :default_retry_backoff + # Set this to `true` to enforce that cursors be composed of objects capable + # of built-in (de)serialization by Active Job. + # + # JobIteration.enforce_serializable_cursors = true + # + # For more granular control, this can also be configured per job class, and + # is inherited by child classes. + # + # class MyJob < ActiveJob::Base + # include JobIteration::Iteration + # self.job_iteration_enforce_serializable_cursors = false + # # ... + # end + # + # Note that non-enforcement is deprecated and enforcement will be mandatory + # in version 2.0, at which point this config will go away. + attr_accessor :enforce_serializable_cursors + # Used internally for hooking into job processing frameworks like Sidekiq and Resque. attr_accessor :interruption_adapter diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index b58eb242..0c092de9 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -18,28 +18,6 @@ module Iteration # The time isn't reset if the job is interrupted. attr_accessor :total_time - class CursorError < ArgumentError - attr_reader :cursor - - def initialize(message, cursor:) - super(message) - @cursor = cursor - end - - def message - "#{super} (#{inspected_cursor})" - end - - private - - def inspected_cursor - cursor.inspect - rescue NoMethodError - # For those brave enough to try to use BasicObject as cursor. Nice try. - Object.instance_method(:inspect).bind(cursor).call - end - end - included do |_base| define_callbacks :start define_callbacks :shutdown @@ -50,6 +28,12 @@ def inspected_cursor instance_accessor: false, instance_predicate: false, ) + + class_attribute( + :job_iteration_enforce_serializable_cursors, + instance_accessor: false, + instance_predicate: false, + ) end module ClassMethods @@ -88,16 +72,25 @@ def initialize(*arguments) ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) def serialize # @private - super.merge( - "cursor_position" => cursor_position, + iteration_job_data = { + "cursor_position" => cursor_position, # Backwards compatibility "times_interrupted" => times_interrupted, "total_time" => total_time, - ) + } + + begin + iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position) + rescue ActiveJob::SerializationError + raise if job_iteration_enforce_serializable_cursors? + # No point in duplicating the deprecation warning from assert_valid_cursor! + end + + super.merge(iteration_job_data) end def deserialize(job_data) # @private super - self.cursor_position = job_data["cursor_position"] + self.cursor_position = cursor_position_from_job_data(job_data) self.times_interrupted = Integer(job_data["times_interrupted"] || 0) self.total_time = Float(job_data["total_time"] || 0.0) end @@ -167,8 +160,7 @@ def iterate_with_enumerator(enumerator, arguments) @needs_reenqueue = false enumerator.each do |object_from_enumerator, cursor_from_enumerator| - # Deferred until 2.0.0 - # assert_valid_cursor!(cursor_from_enumerator) + assert_valid_cursor!(cursor_from_enumerator) tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator) ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do @@ -222,16 +214,19 @@ def build_enumerator(params, cursor:) EOS end - # The adapter must be able to serialize and deserialize the cursor back into an equivalent object. - # https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple def assert_valid_cursor!(cursor) - return if serializable?(cursor) + serialize_cursor(cursor) + true + rescue ActiveJob::SerializationError + raise if job_iteration_enforce_serializable_cursors? - raise CursorError.new( - "Cursor must be composed of objects capable of built-in (de)serialization: " \ - "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.", - cursor: cursor, - ) + Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3)) + The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE + + false end def assert_implements_methods! @@ -286,6 +281,13 @@ def job_iteration_max_job_runtime [global_max, class_max].min end + def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards + per_class_setting = self.class.job_iteration_enforce_serializable_cursors + return per_class_setting unless per_class_setting.nil? + + !!JobIteration.enforce_serializable_cursors + end + def handle_completed(completed) case completed when nil # someone aborted the job but wants to call the on_complete callback @@ -305,6 +307,25 @@ def handle_completed(completed) raise "Unexpected thrown value: #{completed.inspect}" end + def cursor_position_from_job_data(job_data) + if job_data.key?("serialized_cursor_position") + deserialize_cursor(job_data.fetch("serialized_cursor_position")) + else + # Backwards compatibility for + # - jobs interrupted before cursor serialization feature shipped, or + # - jobs whose cursor could not be serialized + job_data.fetch("cursor_position", nil) + end + end + + def serialize_cursor(cursor) + ActiveJob::Arguments.serialize([cursor]).first + end + + def deserialize_cursor(cursor) + ActiveJob::Arguments.deserialize([cursor]).first + end + def valid_cursor_parameter?(parameters) # this condition is when people use the splat operator. # def build_enumerator(*) @@ -316,21 +337,5 @@ def valid_cursor_parameter?(parameters) end false end - - SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze - private_constant :SIMPLE_SERIALIZABLE_CLASSES - def serializable?(object) - # Subclasses must be excluded, hence not using is_a? or ===. - if object.instance_of?(Array) - object.all? { |element| serializable?(element) } - elsif object.instance_of?(Hash) - object.all? { |key, value| serializable?(key) && serializable?(value) } - else - SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) } - end - rescue NoMethodError - # BasicObject doesn't respond to instance_of, but we can't serialize it anyway - false - end end end diff --git a/test/integration/integration_behaviour.rb b/test/integration/integration_behaviour.rb index 89dd3602..9ac0662e 100644 --- a/test/integration/integration_behaviour.rb +++ b/test/integration/integration_behaviour.rb @@ -36,17 +36,12 @@ module IntegrationBehaviour test "unserializable corruption is prevented" do skip_until_version("2.0.0") - # Cursors are serialized as JSON, but not all objects are serializable. - # time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC - # json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\"" - # string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC" - # We serialized a Time, but it was deserialized as a String. - TimeCursorJob.perform_later + UnserializableCursorJob.perform_later TerminateJob.perform_later start_worker_and_wait assert_equal( - JobIteration::Iteration::CursorError.name, + ActiveJob::SerializationError.name, failed_job_error_class_name, ) end diff --git a/test/support/jobs.rb b/test/support/jobs.rb index d56a808e..2000aff1 100644 --- a/test/support/jobs.rb +++ b/test/support/jobs.rb @@ -15,11 +15,12 @@ def each_iteration(omg) end end -class TimeCursorJob < ActiveJob::Base +class UnserializableCursorJob < ActiveJob::Base include JobIteration::Iteration + UnserializableCursor = Class.new def build_enumerator(cursor:) - return [["item", Time.now]].to_enum if cursor.nil? + return [["item", UnserializableCursor.new]].to_enum if cursor.nil? raise "This should never run; cursor is unserializable!" end diff --git a/test/unit/iteration_test.rb b/test/unit/iteration_test.rb index 27d60e1c..a92b7fe3 100644 --- a/test/unit/iteration_test.rb +++ b/test/unit/iteration_test.rb @@ -63,64 +63,6 @@ def each_iteration(*) end end - class InvalidCursorJob < ActiveJob::Base - include JobIteration::Iteration - def each_iteration(*) - raise "Cursor invalid. This should never run!" - end - end - - class JobWithTimeCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Time.now]].to_enum - end - end - - class JobWithSymbolCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || :symbol]].to_enum - end - end - - class JobWithActiveRecordCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Product.first]].to_enum - end - end - - class JobWithStringSubclassCursor < InvalidCursorJob - StringSubClass = Class.new(String) - - def build_enumerator(cursor:) - [["item", cursor || StringSubClass.new]].to_enum - end - end - - class JobWithBasicObjectCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || BasicObject.new]].to_enum - end - end - - class JobWithComplexCursor < ActiveJob::Base - include JobIteration::Iteration - def build_enumerator(cursor:) - [[ - "item", - cursor || [{ - "string" => "abc", - "integer" => 123, - "float" => 4.56, - "booleans" => [true, false], - "null" => nil, - }], - ]].to_enum - end - - def each_iteration(*) - end - end - class JobThatCompletesAfter3Seconds < ActiveJob::Base include JobIteration::Iteration include ActiveSupport::Testing::TimeHelpers @@ -198,6 +140,23 @@ def each_iteration(*) end end + class InfiniteCursorLoggingJob < ActiveJob::Base + include JobIteration::Iteration + class << self + def cursors + @cursors ||= [] + end + end + + def build_enumerator(cursor:) + self.class.cursors << cursor + ["VALUE", "CURSOR"].cycle + end + + def each_iteration(*) + end + end + def test_jobs_that_define_build_enumerator_and_each_iteration_will_not_raise push(JobWithRightMethods, "walrus" => "best") work_one_job @@ -259,41 +218,171 @@ def foo assert_includes(methods_added, :foo) end - def test_jobs_using_time_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithTimeCursor) - assert_raises_cursor_error { work_one_job } + UnserializableCursor = Class.new + + class SerializableCursor + include GlobalID::Identification + + def id + "singleton" + end + + class << self + def find(_id) + new + end + end end - def test_jobs_using_active_record_cursor_will_raise + def test_jobs_using_unserializable_cursor_will_raise skip_until_version("2.0.0") - refute_nil(Product.first) - push(JobWithActiveRecordCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end end - def test_jobs_using_symbol_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithSymbolCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_unserializable_cursor_is_deprecated + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_string_subclass_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithStringSubclassCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_serializable_cursor_is_not_deprecated + job_class = build_invalid_cursor_job(cursor: SerializableCursor.new) + + assert_no_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_basic_object_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithBasicObjectCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_complex_but_serializable_cursor_is_not_deprecated + job_class = build_invalid_cursor_job(cursor: [{ + "string" => "abc", + "integer" => 123, + "float" => 4.56, + "booleans" => [true, false], + "null" => nil, + }]) + + assert_no_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_complex_but_serializable_cursor_will_not_raise - skip_until_version("2.0.0") - push(JobWithComplexCursor) - work_one_job + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_globally_enabled + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_per_class + with_global_enforce_serializable_cursors(false) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + job_class.job_iteration_enforce_serializable_cursors = true + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_in_parent + with_global_enforce_serializable_cursors(false) do + parent = build_invalid_cursor_job(cursor: UnserializableCursor.new) + parent.job_iteration_enforce_serializable_cursors = true + child = Class.new(parent) + + assert_raises(ActiveJob::SerializationError) do + child.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_not_raise_if_enforce_serializable_cursors_unset_per_class + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + job_class.job_iteration_enforce_serializable_cursors = false + + assert_cursor_deprecation_warning_on_perform(job_class) + end + end + + def test_jobs_using_unserializable_cursor_when_interrupted_should_only_store_the_cursor_and_no_serialized_cursor + # We must ensure to store the unserializable cursor in the same way the legacy code did, for backwards compability + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + with_interruption do + assert_cursor_deprecation_warning_on_perform(job_class) + end + + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.last + refute_nil(job_data, "interrupted job expected in queue") + assert_instance_of(UnserializableCursor, job_data.fetch("cursor_position")) + refute_includes(job_data, "serialized_cursor_position") + end + + def test_jobs_using_serializable_cursor_when_interrupted_should_store_both_legacy_cursor_and_serialized_cursor + # We must ensure to store the legacy cursor for backwards compatibility. + job_class = build_invalid_cursor_job(cursor: SerializableCursor.new) + with_interruption do + assert_no_cursor_deprecation_warning_on_perform(job_class) + end + + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.last + refute_nil(job_data, "interrupted job expected in queue") + assert_instance_of(SerializableCursor, job_data.fetch("cursor_position")) + assert_equal( + ActiveJob::Arguments.serialize([SerializableCursor.new]).first, + job_data.fetch("serialized_cursor_position"), + ) + end + + def test_job_interrupted_with_only_cursor_position_should_resume + # Simulates loading a job serialized by an old version of job-iteration + with_interruption do + InfiniteCursorLoggingJob.perform_now + + work_one_job do |job_data| + job_data["cursor_position"] = "raw cursor" + job_data.delete("serialized_cursor_position") + end + + assert_equal("raw cursor", InfiniteCursorLoggingJob.cursors.last) + end + ensure + InfiniteCursorLoggingJob.cursors.clear + end + + def test_job_interrupted_with_serialized_cursor_position_should_ignore_unserialized_cursor_position + # Simulates loading a job serialized by the current version of job-iteration + with_interruption do + InfiniteCursorLoggingJob.perform_now + + work_one_job do |job_data| + job_data["cursor_position"] = "should be ignored" + job_data["serialized_cursor_position"] = ActiveJob::Arguments.serialize([SerializableCursor.new]).first + end + + assert_instance_of(SerializableCursor, InfiniteCursorLoggingJob.cursors.last) + end + ensure + InfiniteCursorLoggingJob.cursors.clear + end + + def test_job_resuming_with_invalid_serialized_cursor_position_should_raise + with_interruption do + InfiniteCursorLoggingJob.perform_now + assert_raises(ActiveJob::DeserializationError) do + work_one_job do |job_data| + job_data["cursor_position"] = "should be ignored" + job_data["serialized_cursor_position"] = UnserializableCursor.new # cannot be deserialized + end + end + end + ensure + InfiniteCursorLoggingJob.cursors.clear end def test_jobs_using_on_complete_have_accurate_total_time @@ -519,21 +608,52 @@ def build_slow_job_class(iterations: 3, iteration_duration: 30.seconds) end end - def assert_raises_cursor_error(&block) - error = assert_raises(JobIteration::Iteration::CursorError, &block) - inspected_cursor = - begin - error.cursor.inspect - rescue NoMethodError - Object.instance_method(:inspect).bind(error.cursor).call + # This helper allows us to create a class that reads config at test time, instead of when this file is loaded + def build_invalid_cursor_job(cursor:) + test_cursor = cursor # so we don't collide with the method param below + Class.new(ActiveJob::Base) do + include JobIteration::Iteration + define_method(:build_enumerator) do |cursor:| + current_cursor = cursor + [["item", current_cursor || test_cursor]].to_enum end + define_method(:each_iteration) do |*| + return if Gem::Version.new(JobIteration::VERSION) < Gem::Version.new("2.0") - assert_equal( - "Cursor must be composed of objects capable of built-in (de)serialization: " \ - "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil. " \ - "(#{inspected_cursor})", - error.message, - ) + raise "Cursor invalid. Starting in version 2.0, this should never run!" + end + singleton_class.define_method(:name) do + "InvalidCursorJob (#{cursor.class})" + end + end + end + + def assert_cursor_deprecation_warning_on_perform(job_class) + expected_message = <<~MESSAGE.chomp + DEPRECATION WARNING: The Enumerator returned by #{job_class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types + This will raise starting in version #{JobIteration::Deprecation.deprecation_horizon} of #{JobIteration::Deprecation.gem_name}! + MESSAGE + + warned = false + with_deprecation_behavior( + lambda do |message, *| + flunk("expected only one deprecation warning") if warned + warned = true + assert( + message.start_with?(expected_message), + "expected deprecation warning \n#{message.inspect}\n to start_with? \n#{expected_message.inspect}", + ) + end, + ) { job_class.perform_now } + + assert(warned, "expected deprecation warning") + end + + def assert_no_cursor_deprecation_warning_on_perform(job_class) + with_deprecation_behavior( + ->(message, *) { flunk("Expected no deprecation warning: #{message}") }, + ) { job_class.perform_now } end def assert_partially_completed_job(cursor_position:) @@ -548,8 +668,37 @@ def push(job, *args) end def work_one_job - job = ActiveJob::Base.queue_adapter.enqueued_jobs.pop - ActiveJob::Base.execute(job) + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.pop + yield job_data if block_given? + ActiveJob::Base.execute(job_data) + end + + def with_deprecation_behavior(behavior) + original_behaviour = JobIteration::Deprecation.behavior + JobIteration::Deprecation.behavior = behavior + yield + ensure + JobIteration::Deprecation.behavior = original_behaviour + end + + def with_global_enforce_serializable_cursors(temp) + original = JobIteration.enforce_serializable_cursors + JobIteration.enforce_serializable_cursors = temp + yield + ensure + JobIteration.enforce_serializable_cursors = original + end + + def with_interruption(&block) + with_interruption_adapter(-> { true }, &block) + end + + def with_interruption_adapter(temp) + original = JobIteration.interruption_adapter + JobIteration.interruption_adapter = temp + yield + ensure + JobIteration.interruption_adapter = original end def with_global_max_job_runtime(new)