Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested iteration #295

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### Master (unreleased)

- [295](https://github.com/Shopify/job-iteration/pull/295) - Support nested iteration
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ class CsvJob < ApplicationJob
end
```

```ruby
class NestedIterationJob < ApplicationJob
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.nested(
[
->(cursor) { enumerator_builder.active_record_on_records(Shop.all, cursor: cursor) },
->(shop, cursor) { enumerator_builder.active_record_on_records(shop.products, cursor: cursor) },
->(_shop, product, cursor) { enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor) }
],
cursor: cursor
)
end

def each_iteration(product_variants_relation)
# do something
end
end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.

## Guides
Expand Down
36 changes: 36 additions & 0 deletions lib/job-iteration/enumerator_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require_relative "./active_record_enumerator"
require_relative "./csv_enumerator"
require_relative "./throttle_enumerator"
require_relative "./nested_enumerator"
require "forwardable"

module JobIteration
Expand Down Expand Up @@ -146,6 +147,40 @@ def build_csv_enumerator(enumerable, cursor:)
CsvEnumerator.new(enumerable).rows(cursor: cursor)
end

# Builds Enumerator for nested iteration.
#
# @param enums [Array<Proc>] an Array of Procs, each should return an Enumerator.
# Each proc from enums should accept the yielded items from the parent enumerators
# and the `cursor` as its arguments.
# Each proc's `cursor` argument is its part from the `build_enumerator`'s `cursor` array.
# @param cursor [Array<Object>] array of offsets for each of the enums to start iteration from
#
# @example
# def build_enumerator(cursor:)
# enumerator_builder.nested(
# [
# ->(cursor) {
# enumerator_builder.active_record_on_records(Shop.all, cursor: cursor)
# },
# ->(shop, cursor) {
# enumerator_builder.active_record_on_records(shop.products, cursor: cursor)
# },
# ->(_shop, product, cursor) {
# enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor)
# }
# ],
# cursor: cursor
# )
# end
#
# def each_iteration(product_variants_relation)
# # do something
# end
#
def build_nested_enumerator(enums, cursor:)
NestedEnumerator.new(enums, cursor: cursor).each
end

alias_method :once, :build_once_enumerator
alias_method :times, :build_times_enumerator
alias_method :array, :build_array_enumerator
Expand All @@ -154,6 +189,7 @@ def build_csv_enumerator(enumerable, cursor:)
alias_method :active_record_on_batch_relations, :build_active_record_enumerator_on_batch_relations
alias_method :throttle, :build_throttle_enumerator
alias_method :csv, :build_csv_enumerator
alias_method :nested, :build_nested_enumerator

private

Expand Down
40 changes: 40 additions & 0 deletions lib/job-iteration/nested_enumerator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

module JobIteration
# @private
class NestedEnumerator
def initialize(enums, cursor: nil)
unless enums.all?(Proc)
raise ArgumentError, "enums must contain only procs/lambdas"
end

if cursor && enums.size != cursor.size
raise ArgumentError, "cursor should have one item per enum"
end

@enums = enums
@cursor = cursor || Array.new(enums.size)
end

def each(&block)
return to_enum unless block_given?

iterate([], [], 0, &block)
end

private

def iterate(current_items, current_cursor, index, &block)
cursor = @cursor[index]
enum = @enums[index].call(*current_items, cursor)

enum.each do |item, cursor_value|
if index == @cursor.size - 1
yield item, current_cursor + [cursor_value]
Copy link
Contributor

@bdewater bdewater Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been stepping through the debugger a few times and what stood out to me is cursor = @cursor[index], the resumed cursor value stays 'stuck' for subsequent iterations, but when not resuming is initialized as an array of nils.

In my nested array example this means skipping over "2a" but also "3a" if you extend the pattern as such:

integers = [0, 1, 2, 3]
strings = [["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"], ["3a", "3b", "3c"]]

This seems to do the trick unless I'm missing something (it's getting late here)?

Suggested change
yield item, current_cursor + [cursor_value]
yield item, current_cursor + [cursor_value]
@cursor[index] = nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this inside each, would work under cursor = @cursor[index] as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fix the mentioned issue.
But for the cursor like [0, 1], it will start from [1, something], because when the iteration is resumed, it starts from the cursor + 1. So 0b and 0c will be incorrectly skipped.

I would say that this PR is correctly implemented, but works incorrectly, because as I mentioned previously, we need to resume from the same cursor value each time (without +1).

A separate PR with changes like fatkodima/sidekiq-iteration@5d3fe18 needs to be made for this PR to start working and not skipping values.

Copy link
Contributor

@bdewater bdewater Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I am not grasping is why the cursor logic throughout this library needs to change to make nesting work. AFAIK its nil to start at the beginning of a collection, and a non-nil cursor to resume after. With the generic way nested enumerators are set up I don't see why this should change? The individual enumerators shouldn't need to care if it's nested or not, the nested enumerator should feed the correct value to them and handle the nesting level (index).

To whiteboard the possibly cursor values for ["0a", "0b", "0c"] and the item processed in each_iteration:

  • cursor: nil, item: "0a"
  • cursor: 0, item: "0b"
  • cursor: 1, item: "0c"

and nested with: [["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"]]:

  • cursor: [nil, nil], item: "0a"
  • cursor: [nil, 0], item: "0b"
  • cursor: [nil, 1], item: "0c"
  • cursor: [0, nil], item: "1a"
  • cursor: [0, 0], item: "1b"
  • cursor: [0, 1], item: "1c"
  • cursor: [1, nil], item: "2a"

etc. It seems to me resetting the right cursor index to nil at the correct time should make the above possible and make it work without changes to the rest of the library.

Having debugged some more I think it's also possible to refactor out current_cursor in the iterate method this is needed to serialize the cursor, and inspecting that I am beginning to see what you described :) Inside JobIteration::Iteration#iterate_with_enumerator for value object_from_enumerator="1a", index=[1, 0] and that would be serialized as cursor_position. A more integration-style test asserting that pausing and resuming a nested job will cover all the records would be nice. Will play with this a bit more!

Either way - thanks for taking the time to work on this and answering my questions 🙇‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to get a PR up in the next few days.

Looking forward for it 👍

else
iterate(current_items + [item], current_cursor + [cursor_value], index + 1, &block)
end
end
end
end
end
31 changes: 25 additions & 6 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ def enqueue_at(job, _delay)
ActiveJob::Base.queue_adapter = :iteration_test

class Product < ActiveRecord::Base
has_many :comments
end

class Comment < ActiveRecord::Base
belongs_to :product
end

host = ENV["USING_DEV"] == "1" ? "job-iteration.railgun" : "localhost"
Expand All @@ -68,9 +73,16 @@ class Product < ActiveRecord::Base
config.redis = { host: host }
end

ActiveRecord::Base.connection.create_table(Product.table_name, force: true) do |t|
t.string(:name)
t.timestamps
ActiveRecord::Schema.define do
create_table(:products, force: true) do |t|
t.string(:name)
t.timestamps
end

create_table(:comments, force: true) do |t|
t.string(:content)
t.belongs_to(:product)
end
end

module LoggingHelpers
Expand Down Expand Up @@ -118,9 +130,16 @@ class IterationUnitTest < ActiveSupport::TestCase
end

def insert_fixtures
10.times do |n|
Product.create!(name: "lipstick #{n}")
end
now = Time.now
products = 10.times.map { |n| { name: "lipstick #{n}", created_at: now - n, updated_at: now - n } }
Product.insert_all!(products)

comments = Product.order(:id).limit(3).map.with_index do |product, index|
comments_count = index + 1
comments_count.times.map { |n| { content: "#{product.name} comment ##{n}", product_id: product.id } }
end.flatten

Comment.insert_all!(comments)
end

def truncate_fixtures
Copy link
Contributor

@bdewater bdewater Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to change the line below to ActiveRecord::Base.connection.truncate(Product.table_name, Comment.table_name) to clean up this new model as well.

Expand Down
14 changes: 14 additions & 0 deletions test/unit/enumerator_builder_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase
enumerator_builder(wraps: 0).build_csv_enumerator(CSV.new("test"), cursor: nil)
end

test_builder_method(:build_nested_enumerator) do
enumerator_builder(wraps: 0).build_nested_enumerator(
[
->(cursor) {
enumerator_builder.build_active_record_enumerator_on_records(Product.all, cursor: cursor)
},
->(product, cursor) {
enumerator_builder.build_active_record_enumerator_on_records(product.comments, cursor: cursor)
},
],
cursor: nil,
)
end

# checks that all the non-alias methods were tested
raise "methods not tested: #{methods.inspect}" unless methods.empty?

Expand Down
81 changes: 81 additions & 0 deletions test/unit/nested_enumerator_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# frozen_string_literal: true

require "test_helper"

module JobIteration
class NestedEnumeratorTest < IterationUnitTest
test "accepts only callables as enums" do
error = assert_raises(ArgumentError) do
build_enumerator(outer: [[1, 2, 3].each])
end
assert_equal("enums must contain only procs/lambdas", error.message)
end

test "raises when cursor is not of the same size as enums" do
error = assert_raises(ArgumentError) do
build_enumerator(cursor: [Product.first.id])
end
assert_equal("cursor should have one item per enum", error.message)
end

test "yields enumerator when called without a block" do
enum = build_enumerator
assert enum.is_a?(Enumerator)
assert_nil enum.size
end

test "yields every nested record with their cursor position" do
enum = build_enumerator

products = Product.includes(:comments).order(:id).take(3)
comments = products.map do |product|
product.comments.sort_by(&:id).map { |comment| [comment, [product.id, comment.id]] }
end.flatten(1)

enum.each_with_index do |(comment, cursor), index|
expected_comment, expected_cursor = comments[index]
assert_equal(expected_comment, comment)
assert_equal(expected_cursor, cursor)
end
end

test "cursor can be used to resume" do
enum = build_enumerator
_first_comment, first_cursor = enum.next
second_comment, second_cursor = enum.next

enum = build_enumerator(cursor: first_cursor)
assert_equal([second_comment, second_cursor], enum.first)
end

test "doesn't yield anything if contains empty enum" do
enum = ->(cursor, _product) { records_enumerator(Comment.none, cursor: cursor) }
enum = build_enumerator(inner: enum)
assert_empty(enum.to_a)
end

test "works with single level nesting" do
enum = build_enumerator(inner: nil)
products = Product.order(:id).to_a

enum.each_with_index do |(product, cursor), index|
assert_equal(products[index], product)
assert_equal([products[index].id], cursor)
end
end

private

def build_enumerator(
outer: ->(cursor) { records_enumerator(Product.all, cursor: cursor) },
inner: ->(product, cursor) { records_enumerator(product.comments, cursor: cursor) },
cursor: nil
)
NestedEnumerator.new([outer, inner].compact, cursor: cursor).each
end

def records_enumerator(scope, cursor: nil)
ActiveRecordEnumerator.new(scope, cursor: cursor).records
end
end
end