diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index fc538b682fc6..6b0bb537c7e1 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -1033,14 +1033,15 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), temporaryFolder.newFolder() ); - - final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, reader::read); - MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class)); - MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class)); - MatcherAssert.assertThat( - e.getCause().getCause().getMessage(), - CoreMatchers.startsWith("Data read has a different length than the expected") - ); + try (CloseableIterator readerIterator = reader.read()) { + final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, readerIterator::hasNext); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class)); + MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class)); + MatcherAssert.assertThat( + e.getCause().getCause().getMessage(), + CoreMatchers.startsWith("Data read has a different length than the expected") + ); + } EasyMock.verify(S3_CLIENT); } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java index af1baafe4198..7b81934367d1 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java @@ -19,7 +19,6 @@ package org.apache.druid.java.util.common.parsers; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; @@ -62,37 +61,37 @@ public void close() throws IOException default CloseableIterator flatMap(Function> function) { - final CloseableIterator delegate = this; + final CloseableIterator outerIterator = this; return new CloseableIterator() { - CloseableIterator iterator = findNextIteratorIfNecessary(); + CloseableIterator currInnerIterator = null; - @Nullable - private CloseableIterator findNextIteratorIfNecessary() + private void findNextIteratorIfNecessary() { - while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) { - if (iterator != null) { + while ((currInnerIterator == null || !currInnerIterator.hasNext()) && outerIterator.hasNext()) { + if (currInnerIterator != null) { try { - iterator.close(); - iterator = null; + currInnerIterator.close(); + currInnerIterator = null; } catch (IOException e) { throw new UncheckedIOException(e); } } - iterator = function.apply(delegate.next()); - if (iterator.hasNext()) { - return iterator; + currInnerIterator = function.apply(outerIterator.next()); + if (currInnerIterator.hasNext()) { + return; } } - return null; } @Override public boolean hasNext() { - return iterator != null && iterator.hasNext(); + // closes the current iterator if it is finished, and opens a new non-empty iterator if possible + findNextIteratorIfNecessary(); + return currInnerIterator != null && currInnerIterator.hasNext(); } @Override @@ -101,21 +100,16 @@ public R next() if (!hasNext()) { throw new NoSuchElementException(); } - try { - return iterator.next(); - } - finally { - findNextIteratorIfNecessary(); - } + return currInnerIterator.next(); } @Override public void close() throws IOException { - delegate.close(); - if (iterator != null) { - iterator.close(); - iterator = null; + outerIterator.close(); + if (currInnerIterator != null) { + currInnerIterator.close(); + currInnerIterator = null; } } }; diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index 9e175b2a3dfe..a33899b25354 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -137,9 +137,11 @@ protected int getMaxRetries() ).iterator(), temporaryFolder.newFolder() ); - String expectedMessage = "Error occurred while trying to read uri: testscheme://some/path"; - Exception exception = Assert.assertThrows(RuntimeException.class, firehose::read); - Assert.assertTrue(exception.getMessage().contains(expectedMessage)); + try (CloseableIterator readIterator = firehose.read()) { + String expectedMessage = "Error occurred while trying to read uri: testscheme://some/path"; + Exception exception = Assert.assertThrows(RuntimeException.class, readIterator::hasNext); + Assert.assertTrue(exception.getMessage().contains(expectedMessage)); + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java index be2d1d58bd5c..3f701ee92f6f 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java @@ -120,6 +120,47 @@ public void testFlatMapClosedEarly() throws IOException } } + @Test + public void testFlatMapInnerClose() throws IOException + { + List> innerIterators = new ArrayList<>(); + // the nested iterators is : [ [], [0], [0, 1] ] + try (final CloseTrackingCloseableIterator actual = new CloseTrackingCloseableIterator<>( + generateTestIterator(3) + .flatMap(list -> { + CloseTrackingCloseableIterator inner = + new CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator())); + innerIterators.add(inner); + return inner; + }) + )) { + final Iterator expected = IntStream + .range(0, 3) + .flatMap(i -> IntStream.range(0, i)) + .iterator(); + + int iterCount = 0, innerIteratorIdx = 0; + while (actual.hasNext()) { + iterCount++; + if (iterCount == 1) { + Assert.assertEquals(2, innerIterators.size()); //empty iterator and single element iterator + innerIteratorIdx++; + } else if (iterCount == 2) { + Assert.assertEquals(3, innerIterators.size()); //empty iterator + single element iterator + double element iterator + innerIteratorIdx++; + } + Assert.assertEquals(expected.next(), actual.next()); // assert expected value to the iterator's value + for (int i = 0; i < innerIteratorIdx; i++) { + Assert.assertEquals(1, innerIterators.get(i).closeCount); // expect all previous iterators to be closed + } + // never expect the current iterator to be closed, even after doing the last next call on it + Assert.assertEquals(0, innerIterators.get(innerIteratorIdx).closeCount); + } + } + // check the last inner iterator is closed + Assert.assertEquals(1, innerIterators.get(2).closeCount); + } + private static CloseableIterator> generateTestIterator(int numIterates) { return new CloseableIterator>()