Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable per job class max_job_runtime #240

Merged
merged 2 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`

## v1.3.6 (Mar 9, 2022)

Expand Down
15 changes: 15 additions & 0 deletions guides/best-practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ JobIteration.max_job_runtime = 5.minutes # nil by default
```

Use this accessor to tweak how often you'd like the job to interrupt itself.

### Per job max job runtime

For more granular control, `job_iteration_max_job_runtime` can be set **per-job class**. This allows both incremental adoption, as well as using a conservative global setting, and an aggressive setting on a per-job basis.

```ruby
class MyJob < ApplicationJob
include JobIteration::Iteration

self.job_iteration_max_job_runtime = 3.minutes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed the class attribute 👍


# ...
```

This setting will be inherited by any child classes, although it can be further overridden. Note that no class can **increase** the `max_job_runtime` it has inherited; it can only be **decreased**. No job can increase its `max_job_runtime` beyond the global limit.
9 changes: 9 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ module JobIteration
#
# This setting will make it to always interrupt a job after it's been iterating for 5 minutes.
# Defaults to nil which means that jobs will not be interrupted except on termination signal.
#
# This setting can be further reduced (but not increased) by using the inheritable per-class
# job_iteration_max_job_runtime setting.
# @example
#
# class MyJob < ActiveJob::Base
# include JobIteration::Iteration
# self.job_iteration_max_job_runtime = 1.minute
# # ...
attr_accessor :max_job_runtime

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
Expand Down
29 changes: 28 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ def inspected_cursor
define_callbacks :start
define_callbacks :shutdown
define_callbacks :complete

class_attribute(
:job_iteration_max_job_runtime,
instance_writer: false,
instance_predicate: false,
default: JobIteration.max_job_runtime,
)

singleton_class.prepend(PrependedClassMethods)
end

module PrependedClassMethods
def job_iteration_max_job_runtime=(new)
existing = job_iteration_max_job_runtime

if existing && (!new || new > existing)
sambostock marked this conversation as resolved.
Show resolved Hide resolved
existing_label = existing.inspect
new_label = new ? new.inspect : "#{new.inspect} (no limit)"
raise(
ArgumentError,
"job_iteration_max_job_runtime may only decrease; " \
"#{self} tried to increase it from #{existing_label} to #{new_label}",
)
end

super
end
end

module ClassMethods
Expand Down Expand Up @@ -262,7 +289,7 @@ def output_interrupt_summary
end

def job_should_exit?
if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
if job_iteration_max_job_runtime && start_time && (Time.now.utc - start_time) > job_iteration_max_job_runtime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a user of the library, it would be helpful if there was an easy way to get notified (as in ActiveSupport::Notifications.instrument) when a job is interrupted due to elapsing the max run time setting. For example, a user could track how many times this is happening per some application-level metric to tune the time out setting accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do instrument that a job has been interrupted, although we do not note the reason why.

I think that could be something worth considering, but I would consider it outside the scope of this PR. I've opened #247 to capture the discussion.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to comment on such an old PR, please let me know if you'd rather I convert this to an issue, or move the conversation somewhere else.

This change actually creates a subtle behaviour change. Previously, we read the global configuration at job runtime. Now, we read the global configuration when JobIteration::Iteration is loaded, subsequent changes are ignored.

Today, we noticed that maintenance tasks were never pausing because that gem were setting a default job runtime in an after_initialze block that was run after this module was included.

I'm not actually sure which option is better. Just wanted to point out the difference in behaviours. In the meantime, I've proposed Shopify/maintenance_tasks#918 to fix maintenance tasks under the assumption that the current behaviour of job-iteration is correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, nice catch! I'd consider that a bug - changes to the global value should be picked up by the jobs as they run. I'm not entirely sure what the best fix is, especially in a way that enforces the "individual jobs must not increase this value".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to have a look on Monday, but I think the fix would be to override the method defined by class_attribute to delegate to the top level default, as opposed to setting it to the default value at the time the method was defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#436 should fix this.

return true
end

Expand Down
142 changes: 142 additions & 0 deletions test/unit/iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,135 @@ def test_jobs_using_on_complete_have_accurate_total_time
end
end

def test_global_max_job_runtime
freeze_time
with_global_max_job_runtime(1.minute) do
klass = build_slow_job_class(iterations: 3, iteration_duration: 30.seconds)
klass.perform_now
assert_partially_completed_job(cursor_position: 2)
end
end

def test_per_class_max_job_runtime_with_default_global
freeze_time
parent = build_slow_job_class(iterations: 3, iteration_duration: 30.seconds)
child = Class.new(parent) do
self.job_iteration_max_job_runtime = 1.minute
end

parent.perform_now
assert_empty(ActiveJob::Base.queue_adapter.enqueued_jobs)

child.perform_now
assert_partially_completed_job(cursor_position: 2)
end

def test_per_class_max_job_runtime_with_global_set_to_nil
freeze_time
with_global_max_job_runtime(nil) do
sambostock marked this conversation as resolved.
Show resolved Hide resolved
parent = build_slow_job_class(iterations: 3, iteration_duration: 30.seconds)
child = Class.new(parent) do
self.job_iteration_max_job_runtime = 1.minute
end

parent.perform_now
assert_empty(ActiveJob::Base.queue_adapter.enqueued_jobs)

child.perform_now
assert_partially_completed_job(cursor_position: 2)
end
end

def test_per_class_max_job_runtime_with_global_set
freeze_time
with_global_max_job_runtime(1.minute) do
parent = build_slow_job_class(iterations: 3, iteration_duration: 30.seconds)
child = Class.new(parent) do
self.job_iteration_max_job_runtime = 30.seconds
end

parent.perform_now
assert_partially_completed_job(cursor_position: 2)
ActiveJob::Base.queue_adapter.enqueued_jobs = []

child.perform_now
assert_partially_completed_job(cursor_position: 1)
end
end

def test_max_job_runtime_cannot_unset_global
with_global_max_job_runtime(30.seconds) do
klass = Class.new(ActiveJob::Base) do
include JobIteration::Iteration
end

error = assert_raises(ArgumentError) do
klass.job_iteration_max_job_runtime = nil
end

assert_equal(
"job_iteration_max_job_runtime may only decrease; " \
"#{klass} tried to increase it from 30 seconds to nil (no limit)",
error.message,
)
end
end

def test_max_job_runtime_cannot_be_higher_than_global
with_global_max_job_runtime(30.seconds) do
klass = Class.new(ActiveJob::Base) do
include JobIteration::Iteration
end

error = assert_raises(ArgumentError) do
klass.job_iteration_max_job_runtime = 1.minute
end

assert_equal(
"job_iteration_max_job_runtime may only decrease; #{klass} tried to increase it from 30 seconds to 1 minute",
error.message,
)
end
end

def test_max_job_runtime_cannot_be_higher_than_parent
with_global_max_job_runtime(1.minute) do
parent = Class.new(ActiveJob::Base) do
include JobIteration::Iteration
self.job_iteration_max_job_runtime = 30.seconds
end
child = Class.new(parent)

error = assert_raises(ArgumentError) do
child.job_iteration_max_job_runtime = 45.seconds
end

assert_equal(
"job_iteration_max_job_runtime may only decrease; #{child} tried to increase it from 30 seconds to 45 seconds",
error.message,
)
end
end

private

# Allows building job classes that read max_job_runtime during the test,
# instead of when this file is read
def build_slow_job_class(iterations: 3, iteration_duration: 30.seconds)
Class.new(ActiveJob::Base) do
include JobIteration::Iteration
include ActiveSupport::Testing::TimeHelpers

define_method(:build_enumerator) do |cursor:|
enumerator_builder.build_times_enumerator(iterations, cursor: cursor)
end

define_method(:each_iteration) do |*|
travel(iteration_duration)
end
end
end

def assert_raises_cursor_error(&block)
error = assert_raises(JobIteration::Iteration::CursorError, &block)
inspected_cursor =
Expand All @@ -261,6 +388,13 @@ def assert_raises_cursor_error(&block)
)
end

def assert_partially_completed_job(cursor_position:)
message = "Expected to find partially completed job enqueued with cursor_position: #{cursor_position}"
enqueued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.first
refute_nil(enqueued_job, message)
assert_equal(cursor_position, enqueued_job.fetch("cursor_position"))
end

def push(job, *args)
job.perform_later(*args)
end
Expand All @@ -269,4 +403,12 @@ def work_one_job
job = ActiveJob::Base.queue_adapter.enqueued_jobs.pop
ActiveJob::Base.execute(job)
end

def with_global_max_job_runtime(new)
original = JobIteration.max_job_runtime
JobIteration.max_job_runtime = new
yield
ensure
JobIteration.max_job_runtime = original
end
end