diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index ca67388b1a26..a0479c181e9c 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.util.function.Function; +import java.util.stream.Collectors; /** * InputSourceReader iterating multiple {@link InputEntity}s. This class could be used for @@ -86,9 +87,17 @@ public CloseableIterator sample() { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. + final Function systemFieldDecorator = systemFieldDecoratorFactory.decorator(entity); try { final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory); - return reader.sample(); + return reader.sample() + .map(i -> InputRowListPlusRawValues.ofList(i.getRawValuesList(), + i.getInputRows() == null + ? null + : i.getInputRows().stream().map( + systemFieldDecorator).collect(Collectors.toList()), + i.getParseException() + )); } catch (IOException e) { throw new RuntimeException(e); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index 70b75d23955f..744c29dba2a1 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -23,9 +23,12 @@ import com.google.common.collect.Iterables; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.impl.systemfield.SystemField; import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; @@ -44,6 +47,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; public class InputEntityIteratingReaderTest extends InitializedNullHandlingTest @@ -110,6 +114,76 @@ public void test() throws IOException } } + @Test + public void testSampleWithSystemFields() throws IOException + { + final int numFiles = 5; + final List files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final File file = temporaryFolder.newFile("test_" + i); + files.add(file); + try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) { + writer.write(StringUtils.format("%d,%s,%d\n", 20190101 + i, "name_" + i, i)); + writer.write(StringUtils.format("%d,%s,%d", 20190102 + i, "name_" + (i + 1), i + 1)); + } + } + + LocalInputSource inputSource = new LocalInputSource( + temporaryFolder.getRoot(), + "test_*", + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH))); + final InputEntityIteratingReader reader = new InputEntityIteratingReader( + new InputRowSchema( + new TimestampSpec("time", "yyyyMMdd", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "time", + "name", + "score", + SystemField.URI.getFieldName(), + SystemField.PATH.getFieldName() + )) + ), + ColumnsFilter.all() + ), + new CsvInputFormat( + ImmutableList.of("time", "name", "score"), + null, + null, + false, + 0 + ), + CloseableIterators.withEmptyBaggage( + files.stream().flatMap(file -> ImmutableList.of(new FileEntity(file)).stream()).iterator() + ), + SystemFieldDecoratorFactory.fromInputSource(inputSource), + temporaryFolder.newFolder() + ); + + try (CloseableIterator iterator = reader.sample()) { + int i = 0; + while (iterator.hasNext()) { + InputRow row = iterator.next().getInputRows().get(0); + Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i + 1)), row.getTimestamp()); + Assert.assertEquals(StringUtils.format("name_%d", i), Iterables.getOnlyElement(row.getDimension("name"))); + Assert.assertEquals(Integer.toString(i), Iterables.getOnlyElement(row.getDimension("score"))); + Assert.assertEquals(files.get(i).toURI().toString(), row.getDimension(SystemField.URI.getFieldName()).get(0)); + Assert.assertEquals(files.get(i).getAbsolutePath(), row.getDimension(SystemField.PATH.getFieldName()).get(0)); + + Assert.assertTrue(iterator.hasNext()); + row = iterator.next().getInputRows().get(0); + Assert.assertEquals(DateTimes.of(StringUtils.format("2019-01-%02d", i + 2)), row.getTimestamp()); + Assert.assertEquals(StringUtils.format("name_%d", i + 1), Iterables.getOnlyElement(row.getDimension("name"))); + Assert.assertEquals(Integer.toString(i + 1), Iterables.getOnlyElement(row.getDimension("score"))); + Assert.assertEquals(files.get(i).toURI().toString(), row.getDimension(SystemField.URI.getFieldName()).get(0)); + Assert.assertEquals(files.get(i).getAbsolutePath(), row.getDimension(SystemField.PATH.getFieldName()).get(0)); + i++; + } + Assert.assertEquals(numFiles, i); + } + } + @Test public void testIncorrectURI() throws IOException, URISyntaxException {