From 5cbb28af9e12f33253ae5203bad1da62befd73b5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 30 Oct 2023 09:00:24 -0700 Subject: [PATCH] Fix tests and coverage. --- docs/ingestion/input-sources.md | 2 +- extensions-core/hdfs-storage/pom.xml | 242 +++++++++--------- .../inputsource/hdfs/HdfsInputEntity.java | 18 +- .../inputsource/hdfs/HdfsInputSource.java | 11 +- .../inputsource/hdfs/HdfsInputSourceTest.java | 74 +++++- .../impl/CloudObjectInputSourceTest.java | 19 +- .../data/input/impl/LocalInputSourceTest.java | 2 +- 7 files changed, 232 insertions(+), 136 deletions(-) diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index 3981e20b5943..e10b1c72474a 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -465,7 +465,7 @@ Sample specs: |--------|-----------|-------|---------| |type|Set the value to `hdfs`.|None|yes| |paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes| -|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (HDFS URI starting with `hdfs://`) and `__file_path` (HDFS path).|None|no| +|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (URI) and `__file_path` (path component of URI).|None|no| You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage. However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage. diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 2be63de0286f..d8c133875a91 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -18,7 +18,8 @@ ~ under the License. --> - + 4.0.0 org.apache.druid.extensions @@ -41,147 +42,152 @@ provided - org.apache.hadoop - hadoop-aws - ${hadoop.compile.version} - runtime - - - com.amazonaws - aws-java-sdk-bundle - - + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + runtime + + + com.amazonaws + aws-java-sdk-bundle + + commons-io commons-io provided - - com.google.code.findbugs - jsr305 - provided - - - com.fasterxml.jackson.core - jackson-annotations - provided - - - joda-time - joda-time - provided - - - com.google.inject - guice - provided - - - com.fasterxml.jackson.core - jackson-databind - provided - - - com.fasterxml.jackson.core - jackson-core - provided - - - com.google.inject.extensions - guice-multibindings - provided - - - commons-lang - commons-lang - provided - - - com.google.guava - guava - provided - - - jakarta.validation - jakarta.validation-api - provided - - - - junit - junit - test + com.google.code.findbugs + jsr305 + provided - com.google.protobuf - protobuf-java - test + com.fasterxml.jackson.core + jackson-annotations + provided - org.apache.druid - druid-server - ${project.parent.version} - test + joda-time + joda-time + provided - org.apache.druid - druid-processing - ${project.parent.version} - test-jar - test + com.google.inject + guice + provided - - org.apache.druid - druid-indexing-hadoop - ${project.parent.version} - test - - - - - hadoop3 - - true - - - org.apache.hadoop - hadoop-client-api - ${hadoop.compile.version} - compile + com.fasterxml.jackson.core + jackson-databind + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.google.inject.extensions + guice-multibindings + provided + + + commons-lang + commons-lang + provided + + + com.google.guava + guava + provided + + + jakarta.validation + jakarta.validation-api + provided + + + + + junit + junit + test - org.apache.hadoop - hadoop-client-runtime - ${hadoop.compile.version} - runtime + com.google.protobuf + protobuf-java + test - org.apache.hadoop - hadoop-client-minicluster - ${hadoop.compile.version} - test + org.apache.druid + druid-server + ${project.parent.version} + test - com.amazonaws - aws-java-sdk-s3 - ${aws.sdk.version} - runtime + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test - log4j - log4j - 1.2.17 - test + org.apache.druid + druid-indexing-hadoop + ${project.parent.version} + test - org.slf4j - slf4j-api - provided + nl.jqno.equalsverifier + equalsverifier + test - - - + + + + hadoop3 + + true + + + + org.apache.hadoop + hadoop-client-api + ${hadoop.compile.version} + compile + + + org.apache.hadoop + hadoop-client-runtime + ${hadoop.compile.version} + runtime + + + org.apache.hadoop + hadoop-client-minicluster + ${hadoop.compile.version} + test + + + com.amazonaws + aws-java-sdk-s3 + ${aws.sdk.version} + runtime + + + log4j + log4j + 1.2.17 + test + + + org.slf4j + slf4j-api + provided + + + + diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java index ab533538f43d..db967aac3e8b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java @@ -20,6 +20,7 @@ package org.apache.druid.inputsource.hdfs; import com.google.common.base.Predicate; +import com.google.common.base.Suppliers; import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.hadoop.conf.Configuration; @@ -30,22 +31,37 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.function.Supplier; public class HdfsInputEntity extends RetryingInputEntity { private final Configuration conf; private final Path path; + private final Supplier uri; HdfsInputEntity(Configuration conf, Path path) { this.conf = conf; this.path = path; + this.uri = Suppliers.memoize(() -> { + final URI uri0 = path.toUri(); + if (uri0.getScheme() == null || uri0.getAuthority() == null) { + try { + return path.getFileSystem(conf).makeQualified(path).toUri(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } else { + return uri0; + } + }); } @Override public URI getUri() { - return path.toUri(); + return uri.get(); } @Override diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 6c349f4d8e73..f64ead50c94e 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -61,8 +61,10 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -327,6 +329,7 @@ static final class Builder private Object paths; private Configuration configuration; private HdfsInputSourceConfig inputSourceConfig; + private SystemFields systemFields = SystemFields.none(); private Builder() { @@ -350,11 +353,17 @@ Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig) return this; } + Builder systemFields(SystemField... systemFields) + { + this.systemFields = new SystemFields(EnumSet.copyOf(Arrays.asList(systemFields))); + return this; + } + HdfsInputSource build() { return new HdfsInputSource( Preconditions.checkNotNull(paths, "paths"), - SystemFields.none(), + systemFields, Preconditions.checkNotNull(configuration, "configuration"), Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig") ); diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 879e49f0c7e9..918f051d2243 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; @@ -37,6 +38,9 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputStatsImpl; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -64,6 +68,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -169,11 +174,12 @@ public void testGetTypes() { final Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:7020"); - HdfsInputSource inputSource = HdfsInputSource.builder() - .paths("/foo/bar*") - .configuration(conf) - .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) - .build(); + HdfsInputSource inputSource = + HdfsInputSource.builder() + .paths("/foo/bar*") + .configuration(conf) + .inputSourceConfig(DEFAULT_INPUT_SOURCE_CONFIG) + .build(); Assert.assertEquals(Collections.singleton(HdfsInputSource.TYPE_KEY), inputSource.getTypes()); } @@ -220,6 +226,13 @@ public void serializesDeserializesStringPaths() testSerializesDeserializes(target); } + @Test + public void serializesDeserializesStringPathsWithSystemFields() + { + Wrapper target = new Wrapper(hdfsInputSourceBuilder.paths(PATH).systemFields(SystemField.URI)); + testSerializesDeserializes(target); + } + private static void testSerializesDeserializes(Wrapper hdfsInputSourceWrapper) { try { @@ -440,4 +453,55 @@ public void hasCorrectNumberOfSplits() throws IOException Assert.assertEquals(0, numSplits); } } + + public static class SystemFieldsTest + { + @Test + public void testSystemFields() + { + final Configuration configuration = new Configuration(); + final HdfsInputSource inputSource = new HdfsInputSource( + "hdfs://127.0.0.1/bar", + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)), + configuration, + new HdfsInputSourceConfig(null) + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.PATH), + inputSource.getConfiguredSystemFields() + ); + + final HdfsInputEntity entity = new HdfsInputEntity(configuration, new Path("hdfs://127.0.0.1/bar")); + Assert.assertEquals("hdfs://127.0.0.1/bar", inputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("/bar", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + + final HdfsInputEntity entity2 = new HdfsInputEntity(configuration, new Path("/127.0.0.1/bar")); + Assert.assertEquals("file:///127.0.0.1/bar", inputSource.getSystemFieldValue(entity2, SystemField.URI)); + Assert.assertEquals("/127.0.0.1/bar", inputSource.getSystemFieldValue(entity2, SystemField.PATH)); + + final HdfsInputEntity entity3 = new HdfsInputEntity(configuration, new Path("bar")); + Assert.assertEquals( + StringUtils.format("file:%s/bar", System.getProperty("user.dir")), + inputSource.getSystemFieldValue(entity3, SystemField.URI) + ); + Assert.assertEquals( + StringUtils.format("%s/bar", System.getProperty("user.dir")), + inputSource.getSystemFieldValue(entity3, SystemField.PATH) + ); + } + } + + public static class EqualsTest + { + @Test + public void testEquals() + { + EqualsVerifier.forClass(HdfsInputSource.class) + .usingGetClass() + .withIgnoredFields("cachedPaths") + .withPrefabValues(Configuration.class, new Configuration(), new Configuration()) + .verify(); + } + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java index fc7ea2bbfa66..0a582b9a8d91 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java @@ -23,6 +23,7 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.junit.Assert; import org.junit.Rule; @@ -75,7 +76,7 @@ public class CloudObjectInputSourceTest public void testGetUris() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS, null, null, null) + .useConstructor(SCHEME, URIS, null, null, null, SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); @@ -89,7 +90,7 @@ public void testGetUris() public void testGetPrefixes() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, null, PREFIXES, null, null) + .useConstructor(SCHEME, null, PREFIXES, null, null, SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); @@ -103,7 +104,7 @@ public void testGetPrefixes() public void testGetObjectGlob() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS, null, null, "**.parquet") + .useConstructor(SCHEME, URIS, null, null, "**.parquet", SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); @@ -114,12 +115,12 @@ public void testGetObjectGlob() public void testInequality() { CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS, null, null, "**.parquet") + .useConstructor(SCHEME, URIS, null, null, "**.parquet", SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS, null, null, "**.csv") + .useConstructor(SCHEME, URIS, null, null, "**.csv", SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); @@ -132,7 +133,7 @@ public void testInequality() public void testWithUrisFilter() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS2, null, null, "**.csv") + .useConstructor(SCHEME, URIS2, null, null, "**.csv", SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget()); @@ -158,7 +159,7 @@ public void testWithUrisFilter() public void testWithUris() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, URIS, null, null, null) + .useConstructor(SCHEME, URIS, null, null, null, SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget()); @@ -184,7 +185,7 @@ public void testWithUris() public void testWithObjectsFilter() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv") + .useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv", SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget()); @@ -210,7 +211,7 @@ public void testWithObjectsFilter() public void testWithObjects() { CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() - .useConstructor(SCHEME, null, null, OBJECTS, null) + .useConstructor(SCHEME, null, null, OBJECTS, null, SystemFields.none()) .defaultAnswer(Mockito.CALLS_REAL_METHODS) ); Mockito.when(inputSource.getSplitWidget()).thenReturn(new MockSplitWidget()); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index 9958cd5174b3..ff742ac057a3 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -120,7 +120,7 @@ public void testGetTypes() @Test public void testSystemFields() { - final LocalInputSource inputSource = (LocalInputSource) new LocalInputSource( + final LocalInputSource inputSource = new LocalInputSource( null, null, ImmutableList.of(