-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support nested iteration #295
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# frozen_string_literal: true | ||
|
||
module JobIteration | ||
# @private | ||
class NestedEnumerator | ||
def initialize(enums, cursor: nil) | ||
unless enums.all?(Proc) | ||
raise ArgumentError, "enums must contain only procs/lambdas" | ||
end | ||
|
||
if cursor && enums.size != cursor.size | ||
raise ArgumentError, "cursor should have one item per enum" | ||
end | ||
|
||
@enums = enums | ||
@cursor = cursor || Array.new(enums.size) | ||
end | ||
|
||
def each(&block) | ||
return to_enum unless block_given? | ||
|
||
iterate([], [], 0, &block) | ||
end | ||
|
||
private | ||
|
||
def iterate(current_items, current_cursor, index, &block) | ||
cursor = @cursor[index] | ||
enum = @enums[index].call(*current_items, cursor) | ||
|
||
enum.each do |item, cursor_value| | ||
if index == @cursor.size - 1 | ||
yield item, current_cursor + [cursor_value] | ||
else | ||
iterate(current_items + [item], current_cursor + [cursor_value], index + 1, &block) | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,11 @@ def enqueue_at(job, _delay) | |
ActiveJob::Base.queue_adapter = :iteration_test | ||
|
||
class Product < ActiveRecord::Base | ||
has_many :comments | ||
end | ||
|
||
class Comment < ActiveRecord::Base | ||
belongs_to :product | ||
end | ||
|
||
host = ENV["USING_DEV"] == "1" ? "job-iteration.railgun" : "localhost" | ||
|
@@ -68,9 +73,16 @@ class Product < ActiveRecord::Base | |
config.redis = { host: host } | ||
end | ||
|
||
ActiveRecord::Base.connection.create_table(Product.table_name, force: true) do |t| | ||
t.string(:name) | ||
t.timestamps | ||
ActiveRecord::Schema.define do | ||
create_table(:products, force: true) do |t| | ||
t.string(:name) | ||
t.timestamps | ||
end | ||
|
||
create_table(:comments, force: true) do |t| | ||
t.string(:content) | ||
t.belongs_to(:product) | ||
end | ||
end | ||
|
||
module LoggingHelpers | ||
|
@@ -118,9 +130,16 @@ class IterationUnitTest < ActiveSupport::TestCase | |
end | ||
|
||
def insert_fixtures | ||
10.times do |n| | ||
Product.create!(name: "lipstick #{n}") | ||
end | ||
now = Time.now | ||
products = 10.times.map { |n| { name: "lipstick #{n}", created_at: now - n, updated_at: now - n } } | ||
Product.insert_all!(products) | ||
|
||
comments = Product.order(:id).limit(3).map.with_index do |product, index| | ||
comments_count = index + 1 | ||
comments_count.times.map { |n| { content: "#{product.name} comment ##{n}", product_id: product.id } } | ||
end.flatten | ||
|
||
Comment.insert_all!(comments) | ||
end | ||
|
||
def truncate_fixtures | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you want to change the line below to |
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# frozen_string_literal: true | ||
|
||
require "test_helper" | ||
|
||
module JobIteration | ||
class NestedEnumeratorTest < IterationUnitTest | ||
test "accepts only callables as enums" do | ||
error = assert_raises(ArgumentError) do | ||
build_enumerator(outer: [[1, 2, 3].each]) | ||
end | ||
assert_equal("enums must contain only procs/lambdas", error.message) | ||
end | ||
|
||
test "raises when cursor is not of the same size as enums" do | ||
error = assert_raises(ArgumentError) do | ||
build_enumerator(cursor: [Product.first.id]) | ||
end | ||
assert_equal("cursor should have one item per enum", error.message) | ||
end | ||
|
||
test "yields enumerator when called without a block" do | ||
enum = build_enumerator | ||
assert enum.is_a?(Enumerator) | ||
assert_nil enum.size | ||
end | ||
|
||
test "yields every nested record with their cursor position" do | ||
enum = build_enumerator | ||
|
||
products = Product.includes(:comments).order(:id).take(3) | ||
comments = products.map do |product| | ||
product.comments.sort_by(&:id).map { |comment| [comment, [product.id, comment.id]] } | ||
end.flatten(1) | ||
|
||
enum.each_with_index do |(comment, cursor), index| | ||
expected_comment, expected_cursor = comments[index] | ||
assert_equal(expected_comment, comment) | ||
assert_equal(expected_cursor, cursor) | ||
end | ||
end | ||
|
||
test "cursor can be used to resume" do | ||
enum = build_enumerator | ||
_first_comment, first_cursor = enum.next | ||
second_comment, second_cursor = enum.next | ||
|
||
enum = build_enumerator(cursor: first_cursor) | ||
assert_equal([second_comment, second_cursor], enum.first) | ||
end | ||
|
||
test "doesn't yield anything if contains empty enum" do | ||
enum = ->(cursor, _product) { records_enumerator(Comment.none, cursor: cursor) } | ||
enum = build_enumerator(inner: enum) | ||
assert_empty(enum.to_a) | ||
end | ||
|
||
test "works with single level nesting" do | ||
enum = build_enumerator(inner: nil) | ||
products = Product.order(:id).to_a | ||
|
||
enum.each_with_index do |(product, cursor), index| | ||
assert_equal(products[index], product) | ||
assert_equal([products[index].id], cursor) | ||
end | ||
end | ||
|
||
private | ||
|
||
def build_enumerator( | ||
outer: ->(cursor) { records_enumerator(Product.all, cursor: cursor) }, | ||
inner: ->(product, cursor) { records_enumerator(product.comments, cursor: cursor) }, | ||
cursor: nil | ||
) | ||
NestedEnumerator.new([outer, inner].compact, cursor: cursor).each | ||
end | ||
|
||
def records_enumerator(scope, cursor: nil) | ||
ActiveRecordEnumerator.new(scope, cursor: cursor).records | ||
end | ||
end | ||
end |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been stepping through the debugger a few times and what stood out to me is
cursor = @cursor[index]
, the resumed cursor value stays 'stuck' for subsequent iterations, but when not resuming is initialized as an array of nils.In my nested array example this means skipping over
"2a"
but also"3a"
if you extend the pattern as such:This seems to do the trick unless I'm missing something (it's getting late here)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do this inside
each
, would work undercursor = @cursor[index]
as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fix the mentioned issue.
But for the cursor like
[0, 1]
, it will start from[1, something]
, because when the iteration is resumed, it starts from thecursor
+ 1. So0b
and0c
will be incorrectly skipped.I would say that this PR is correctly implemented, but works incorrectly, because as I mentioned previously, we need to resume from the same
cursor
value each time (without +1).A separate PR with changes like fatkodima/sidekiq-iteration@5d3fe18 needs to be made for this PR to start working and not skipping values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess what I am not grasping is why the cursor logic throughout this library needs to change to make nesting work. AFAIK its
nil
to start at the beginning of a collection, and a non-nil cursor to resume after. With the generic way nested enumerators are set up I don't see why this should change? The individual enumerators shouldn't need to care if it's nested or not, the nested enumerator should feed the correct value to them and handle the nesting level (index
).To whiteboard the possibly cursor values for
["0a", "0b", "0c"]
and the item processed in each_iteration:nil
, item: "0a"0
, item: "0b"1
, item: "0c"and nested with:
[["0a", "0b", "0c"], ["1a", "1b", "1c"], ["2a", "2b", "2c"]]
:[nil, nil]
, item: "0a"[nil, 0]
, item: "0b"[nil, 1]
, item: "0c"[0, nil]
, item: "1a"[0, 0]
, item: "1b"[0, 1]
, item: "1c"[1, nil]
, item: "2a"etc. It seems to me resetting the right cursor index to nil at the correct time should make the above possible and make it work without changes to the rest of the library.
Having debugged some more I think it's also possible to refactor outthis is needed to serialize the cursor, and inspecting that I am beginning to see what you described :) Inside JobIteration::Iteration#iterate_with_enumerator for value object_from_enumerator="1a", index=[1, 0] and that would be serialized as cursor_position. A more integration-style test asserting that pausing and resuming a nested job will cover all the records would be nice. Will play with this a bit more!current_cursor
in the iterate methodEither way - thanks for taking the time to work on this and answering my questions 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward for it 👍