From 33112f0f62b81f57ea77a1a80e2d179dccf3a911 Mon Sep 17 00:00:00 2001 From: Sam Bostock Date: Wed, 29 Jun 2022 00:27:02 -0400 Subject: [PATCH] Deprecate un(de)serializable cursor & add config Version 2.0 of the gem will forbid cursors that are not (de)serializable using built in types. This is because adapters serialize to JSON, and we must ensure this is possible in a lossless fashion. Ahead of this change, we introduce a deprecation warning, as well as a configuration which makes it possible to enable the strict behavior. This configuration can be set globally, and/or at the job level. For instance, it is possible to enable it globally, but disable it for a jobs that have yet to be migrated, or alternatively to enable it for jobs that are already compliant, but leave it disabled globally to allow for non-compliant jobs. In theory, any non-compliant jobs are "already broken", however it is possible that the job works around the "bug" in (de)serialization. One known scenario is where jobs complete rapidly enough to never actually require cursor serialization, and as such never actually encounter the problem of un(de)serializable cursors in production environments. However, this could technically occur at any time. --- CHANGELOG.md | 2 + guides/custom-enumerator.md | 2 + lib/job-iteration.rb | 21 +++ lib/job-iteration/iteration.rb | 62 ++++---- test/unit/iteration_test.rb | 260 ++++++++++++++++++++++++--------- 5 files changed, 250 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75360644..e2083344 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support +- [80](https://github.com/Shopify/job-iteration/pull/80) - Deprecate un(de)serializable cursors +- [80](https://github.com/Shopify/job-iteration/pull/80) - Add `enforce_serializable_cursors` config ## v1.3.6 (Mar 9, 2022) diff --git a/guides/custom-enumerator.md b/guides/custom-enumerator.md index 3978eb61..92c758fe 100644 --- a/guides/custom-enumerator.md +++ b/guides/custom-enumerator.md @@ -148,3 +148,5 @@ Instead, the job should take steps to serialize and deserialize the cursor as an + SomeAPI.stream(since: Time.iso8601(cursor)) end ``` + +Use of unsupported classes is deprecated, and will raise an error in a future version of Job Iteration. diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index f5c505bf..7ba53099 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -9,6 +9,8 @@ module JobIteration INTEGRATIONS = [:resque, :sidekiq] + Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") + extend self # Use this to _always_ interrupt the job after it's been running for more than N seconds. @@ -20,6 +22,25 @@ module JobIteration # Defaults to nil which means that jobs will not be interrupted except on termination signal. attr_accessor :max_job_runtime + # Set this to `true` to enforce that cursors be composed of objects capable + # of built-in (de)serialization: Strings, Integers, Floats, Arrays, Hashes, + # true, false, or nil. + # + # JobIteration.enforce_serializable_cursors = true + # + # For more granular control, this can also be configured per job class, and + # is inherited by child classes. + # + # class MyJob < ActiveJob::Base + # include JobIteration::Iteration + # self.enforce_serializable_cursors = false + # # ... + # end + # + # Note that non-enforcement is deprecated and enforcement will be mandatory + # in version 2.0, at which point this config will go away. + attr_accessor :enforce_serializable_cursors + # Used internally for hooking into job processing frameworks like Sidekiq and Resque. attr_accessor :interruption_adapter diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index e8d6b5c9..5e581f75 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -13,32 +13,17 @@ module Iteration :total_time, ) - class CursorError < ArgumentError - attr_reader :cursor - - def initialize(message, cursor:) - super(message) - @cursor = cursor - end - - def message - "#{super} (#{inspected_cursor})" - end - - private - - def inspected_cursor - cursor.inspect - rescue NoMethodError - # For those brave enough to try to use BasicObject as cursor. Nice try. - Object.instance_method(:inspect).bind(cursor).call - end - end - included do |_base| define_callbacks :start define_callbacks :shutdown define_callbacks :complete + + class_attribute( + :enforce_serializable_cursors, + instance_writer: false, + instance_predicate: false, + default: JobIteration.enforce_serializable_cursors, + ) end module ClassMethods @@ -66,6 +51,28 @@ def ban_perform_definition end end + class CursorError < ArgumentError + attr_reader :cursor + + def initialize(message, cursor:) + super(message) + @cursor = cursor + end + + def message + "#{super} (#{inspected_cursor})" + end + + private + + def inspected_cursor + cursor.inspect + rescue NoMethodError + # For those brave enough to try to use BasicObject as cursor. Nice try. + Object.instance_method(:inspect).bind(cursor).call + end + end + def initialize(*arguments) super @job_iteration_retry_backoff = nil @@ -149,8 +156,7 @@ def iterate_with_enumerator(enumerator, arguments) @needs_reenqueue = false enumerator.each do |object_from_enumerator, index| - # Deferred until 2.0.0 - # assert_valid_cursor!(index) + assert_valid_cursor!(index) record_unit_of_work do found_record = true @@ -218,7 +224,13 @@ def assert_valid_cursor!(cursor) "Cursor must be composed of objects capable of built-in (de)serialization: " \ "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.", cursor: cursor, - ) + ) if enforce_serializable_cursors + + Deprecation.warn(<<~DEPRECATION_MESSAGE) + The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + Cursors must be composed of objects capable of built-in (de)serialization: Strings, Integers, Floats, Arrays, Hashes, true, false, or nil. + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE end def assert_implements_methods! diff --git a/test/unit/iteration_test.rb b/test/unit/iteration_test.rb index a64bfff9..fac792d4 100644 --- a/test/unit/iteration_test.rb +++ b/test/unit/iteration_test.rb @@ -63,64 +63,6 @@ def each_iteration(*) end end - class InvalidCursorJob < ActiveJob::Base - include JobIteration::Iteration - def each_iteration(*) - raise "Cursor invalid. This should never run!" - end - end - - class JobWithTimeCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Time.now]].to_enum - end - end - - class JobWithSymbolCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || :symbol]].to_enum - end - end - - class JobWithActiveRecordCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Product.first]].to_enum - end - end - - class JobWithStringSubclassCursor < InvalidCursorJob - StringSubClass = Class.new(String) - - def build_enumerator(cursor:) - [["item", cursor || StringSubClass.new]].to_enum - end - end - - class JobWithBasicObjectCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || BasicObject.new]].to_enum - end - end - - class JobWithComplexCursor < ActiveJob::Base - include JobIteration::Iteration - def build_enumerator(cursor:) - [[ - "item", - cursor || [{ - "string" => "abc", - "integer" => 123, - "float" => 4.56, - "booleans" => [true, false], - "null" => nil, - }], - ]].to_enum - end - - def each_iteration(*) - end - end - class JobThatCompletesAfter3Seconds < ActiveJob::Base include JobIteration::Iteration include ActiveSupport::Testing::TimeHelpers @@ -201,39 +143,148 @@ def foo def test_jobs_using_time_cursor_will_raise skip_until_version("2.0.0") - push(JobWithTimeCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: Time.now) + + assert_raises_cursor_error do + job_class.perform_now + end + end + + def test_jobs_using_time_cursor_is_deprecated + job_class = build_invalid_cursor_job(cursor: Time.now) + + assert_cursor_deprecation_warning_on_perform(job_class) end def test_jobs_using_active_record_cursor_will_raise skip_until_version("2.0.0") + + refute_nil(Product.first) + + job_class = build_invalid_cursor_job(cursor: Product.first) + + assert_raises_cursor_error do + job_class.perform_now + end + end + + def test_jobs_using_active_record_cursor_is_deprecated refute_nil(Product.first) - push(JobWithActiveRecordCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: Product.first) + + assert_cursor_deprecation_warning_on_perform(job_class) end def test_jobs_using_symbol_cursor_will_raise skip_until_version("2.0.0") - push(JobWithSymbolCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: :symbol) + + assert_raises_cursor_error do + job_class.perform_now + end + end + + def test_jobs_using_symbol_cursor_is_deprecated + job_class = build_invalid_cursor_job(cursor: :symbol) + + assert_cursor_deprecation_warning_on_perform(job_class) end def test_jobs_using_string_subclass_cursor_will_raise skip_until_version("2.0.0") - push(JobWithStringSubclassCursor) - assert_raises_cursor_error { work_one_job } + + string_subclass = Class.new(String) + string_subclass.define_singleton_method(:name) { "StringSubclass" } + + job_class = build_invalid_cursor_job(cursor: string_subclass.new) + + assert_raises_cursor_error do + job_class.perform_now + end + end + + def test_jobs_using_string_subclass_cursor_is_deprecated + string_subclass = Class.new(String) + string_subclass.define_singleton_method(:name) { "StringSubclass" } + + job_class = build_invalid_cursor_job(cursor: string_subclass.new) + + assert_cursor_deprecation_warning_on_perform(job_class) end def test_jobs_using_basic_object_cursor_will_raise skip_until_version("2.0.0") - push(JobWithBasicObjectCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: BasicObject.new) + + assert_raises_cursor_error do + job_class.perform_now + end end - def test_jobs_using_complex_but_serializable_cursor_will_not_raise - skip_until_version("2.0.0") - push(JobWithComplexCursor) - work_one_job + def test_jobs_using_basic_object_cursor_is_deprecated + job_class = build_invalid_cursor_job(cursor: BasicObject.new) + + assert_cursor_deprecation_warning_on_perform(job_class) + end + + def test_jobs_using_complex_but_serializable_cursor_is_not_deprecated + job_class = build_invalid_cursor_job(cursor: [{ + "string" => "abc", + "integer" => 123, + "float" => 4.56, + "booleans" => [true, false], + "null" => nil, + }]) + + assert_no_cursor_deprecation_warning_on_perform(job_class) + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_globally_enabled + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: :unserializable) + + assert_raises_cursor_error do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_per_class + with_global_enforce_serializable_cursors(false) do + job_class = build_invalid_cursor_job(cursor: :unserializable) + job_class.enforce_serializable_cursors = true + + assert_raises_cursor_error do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_in_parent + with_global_enforce_serializable_cursors(false) do + parent = build_invalid_cursor_job(cursor: :unserializable) + parent.enforce_serializable_cursors = true + child = Class.new(parent) + + assert_raises_cursor_error do + child.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_not_raise_if_enforce_serializable_cursors_unset_per_class + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: :unserializable) + job_class.enforce_serializable_cursors = false + + assert_nothing_raised do + job_class.perform_now + end + end end def test_jobs_using_on_complete_have_accurate_total_time @@ -244,6 +295,28 @@ def test_jobs_using_on_complete_have_accurate_total_time private + # This helper allows us to create a class that reads config at test time, instead of when this file is loaded + def build_invalid_cursor_job(cursor:) + test_cursor = cursor # so we don't collide with the method param below + Class.new(ActiveJob::Base) do + include JobIteration::Iteration + define_method(:build_enumerator) do |cursor:| + current_cursor = cursor + [["item", current_cursor || test_cursor]].to_enum + end + define_method(:each_iteration) do |*| + return if Gem::Version.new(JobIteration::VERSION) < Gem::Version.new("2.0") + + raise "Cursor invalid. Starting in version 2.0, this should never run!" + end + singleton_class.define_method(:name) do + "JobWith#{cursor.class}Cursor" + rescue NoMethodError + "JobWithBasicObjectCursor" + end + end + end + def assert_raises_cursor_error(&block) error = assert_raises(JobIteration::Iteration::CursorError, &block) inspected_cursor = @@ -261,6 +334,34 @@ def assert_raises_cursor_error(&block) ) end + def assert_cursor_deprecation_warning_on_perform(job_class) + expected_message = <<~MESSAGE.chomp + DEPRECATION WARNING: The Enumerator returned by #{job_class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + Cursors must be composed of objects capable of built-in (de)serialization: Strings, Integers, Floats, Arrays, Hashes, true, false, or nil. + This will raise starting in version #{JobIteration::Deprecation.deprecation_horizon} of #{JobIteration::Deprecation.gem_name}! + MESSAGE + + warned = false + with_deprecation_behavior( + lambda do |message, *| + flunk("expected only one deprecation warning") if warned + warned = true + assert( + message.start_with?(expected_message), + "expected deprecation warning \n#{message.inspect}\n to start_with? \n#{expected_message.inspect}", + ) + end, + ) { job_class.perform_now } + + assert(warned, "expected deprecation warning") + end + + def assert_no_cursor_deprecation_warning_on_perform(job_class) + with_deprecation_behavior( + ->(message, *) { flunk("Expected no deprecation warning: #{message}") }, + ) { job_class.perform_now } + end + def push(job, *args) job.perform_later(*args) end @@ -269,4 +370,19 @@ def work_one_job job = ActiveJob::Base.queue_adapter.enqueued_jobs.pop ActiveJob::Base.execute(job) end + + def with_deprecation_behavior(behavior) + original_behaviour = JobIteration::Deprecation.behavior + JobIteration::Deprecation.behavior = behavior + yield + ensure + JobIteration::Deprecation.behavior = original_behaviour + end + + def with_global_enforce_serializable_cursors(temp) + original = JobIteration.enforce_serializable_cursors + JobIteration.enforce_serializable_cursors = temp + ensure + JobIteration.enforce_serializable_cursors = original + end end