Skip to content

Commit

Permalink
Do not eagerly close inner iterators in CloseableIterator#flatMap (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
rohangarg authored Sep 15, 2023
1 parent 0fc5d54 commit 39d9595
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,14 +1033,15 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
temporaryFolder.newFolder()
);

final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, reader::read);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class));
MatcherAssert.assertThat(
e.getCause().getCause().getMessage(),
CoreMatchers.startsWith("Data read has a different length than the expected")
);
try (CloseableIterator<InputRow> readerIterator = reader.read()) {
final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, readerIterator::hasNext);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class));
MatcherAssert.assertThat(
e.getCause().getCause().getMessage(),
CoreMatchers.startsWith("Data read has a different length than the expected")
);
}

EasyMock.verify(S3_CLIENT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.java.util.common.parsers;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -62,37 +61,37 @@ public void close() throws IOException

default <R> CloseableIterator<R> flatMap(Function<T, CloseableIterator<R>> function)
{
final CloseableIterator<T> delegate = this;
final CloseableIterator<T> outerIterator = this;

return new CloseableIterator<R>()
{
CloseableIterator<R> iterator = findNextIteratorIfNecessary();
CloseableIterator<R> currInnerIterator = null;

@Nullable
private CloseableIterator<R> findNextIteratorIfNecessary()
private void findNextIteratorIfNecessary()
{
while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) {
if (iterator != null) {
while ((currInnerIterator == null || !currInnerIterator.hasNext()) && outerIterator.hasNext()) {
if (currInnerIterator != null) {
try {
iterator.close();
iterator = null;
currInnerIterator.close();
currInnerIterator = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
iterator = function.apply(delegate.next());
if (iterator.hasNext()) {
return iterator;
currInnerIterator = function.apply(outerIterator.next());
if (currInnerIterator.hasNext()) {
return;
}
}
return null;
}

@Override
public boolean hasNext()
{
return iterator != null && iterator.hasNext();
// closes the current iterator if it is finished, and opens a new non-empty iterator if possible
findNextIteratorIfNecessary();
return currInnerIterator != null && currInnerIterator.hasNext();
}

@Override
Expand All @@ -101,21 +100,16 @@ public R next()
if (!hasNext()) {
throw new NoSuchElementException();
}
try {
return iterator.next();
}
finally {
findNextIteratorIfNecessary();
}
return currInnerIterator.next();
}

@Override
public void close() throws IOException
{
delegate.close();
if (iterator != null) {
iterator.close();
iterator = null;
outerIterator.close();
if (currInnerIterator != null) {
currInnerIterator.close();
currInnerIterator = null;
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ protected int getMaxRetries()
).iterator(),
temporaryFolder.newFolder()
);
String expectedMessage = "Error occurred while trying to read uri: testscheme://some/path";
Exception exception = Assert.assertThrows(RuntimeException.class, firehose::read);

Assert.assertTrue(exception.getMessage().contains(expectedMessage));
try (CloseableIterator<InputRow> readIterator = firehose.read()) {
String expectedMessage = "Error occurred while trying to read uri: testscheme://some/path";
Exception exception = Assert.assertThrows(RuntimeException.class, readIterator::hasNext);
Assert.assertTrue(exception.getMessage().contains(expectedMessage));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,47 @@ public void testFlatMapClosedEarly() throws IOException
}
}

@Test
public void testFlatMapInnerClose() throws IOException
{
List<CloseTrackingCloseableIterator<Integer>> innerIterators = new ArrayList<>();
// the nested iterators is : [ [], [0], [0, 1] ]
try (final CloseTrackingCloseableIterator<Integer> actual = new CloseTrackingCloseableIterator<>(
generateTestIterator(3)
.flatMap(list -> {
CloseTrackingCloseableIterator<Integer> inner =
new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
innerIterators.add(inner);
return inner;
})
)) {
final Iterator<Integer> expected = IntStream
.range(0, 3)
.flatMap(i -> IntStream.range(0, i))
.iterator();

int iterCount = 0, innerIteratorIdx = 0;
while (actual.hasNext()) {
iterCount++;
if (iterCount == 1) {
Assert.assertEquals(2, innerIterators.size()); //empty iterator and single element iterator
innerIteratorIdx++;
} else if (iterCount == 2) {
Assert.assertEquals(3, innerIterators.size()); //empty iterator + single element iterator + double element iterator
innerIteratorIdx++;
}
Assert.assertEquals(expected.next(), actual.next()); // assert expected value to the iterator's value
for (int i = 0; i < innerIteratorIdx; i++) {
Assert.assertEquals(1, innerIterators.get(i).closeCount); // expect all previous iterators to be closed
}
// never expect the current iterator to be closed, even after doing the last next call on it
Assert.assertEquals(0, innerIterators.get(innerIteratorIdx).closeCount);
}
}
// check the last inner iterator is closed
Assert.assertEquals(1, innerIterators.get(2).closeCount);
}

private static CloseableIterator<List<Integer>> generateTestIterator(int numIterates)
{
return new CloseableIterator<List<Integer>>()
Expand Down

0 comments on commit 39d9595

Please sign in to comment.