Skip to content

Commit

Permalink
Fix tests and coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Oct 30, 2023
1 parent d5b9e6e commit 5cbb28a
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 136 deletions.
2 changes: 1 addition & 1 deletion docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ Sample specs:
|--------|-----------|-------|---------|
|type|Set the value to `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (HDFS URI starting with `hdfs://`) and `__file_path` (HDFS path).|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (URI) and `__file_path` (path component of URI).|None|no|

You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
Expand Down
242 changes: 124 additions & 118 deletions extensions-core/hdfs-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.druid.extensions</groupId>
Expand All @@ -41,147 +42,152 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>test</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>hadoop3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.compile.version}</version>
<scope>runtime</scope>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<version>${hadoop.compile.version}</version>
<scope>test</scope>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>test</scope>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</dependencies>
<profiles>
<profile>
<id>hadoop3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.compile.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.inputsource.hdfs;

import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -30,22 +31,37 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.function.Supplier;

public class HdfsInputEntity extends RetryingInputEntity
{
private final Configuration conf;
private final Path path;
private final Supplier<URI> uri;

HdfsInputEntity(Configuration conf, Path path)
{
this.conf = conf;
this.path = path;
this.uri = Suppliers.memoize(() -> {
final URI uri0 = path.toUri();
if (uri0.getScheme() == null || uri0.getAuthority() == null) {
try {
return path.getFileSystem(conf).makeQualified(path).toUri();
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return uri0;
}
});
}

@Override
public URI getUri()
{
return path.toUri();
return uri.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -327,6 +329,7 @@ static final class Builder
private Object paths;
private Configuration configuration;
private HdfsInputSourceConfig inputSourceConfig;
private SystemFields systemFields = SystemFields.none();

private Builder()
{
Expand All @@ -350,11 +353,17 @@ Builder inputSourceConfig(HdfsInputSourceConfig inputSourceConfig)
return this;
}

Builder systemFields(SystemField... systemFields)
{
this.systemFields = new SystemFields(EnumSet.copyOf(Arrays.asList(systemFields)));
return this;
}

HdfsInputSource build()
{
return new HdfsInputSource(
Preconditions.checkNotNull(paths, "paths"),
SystemFields.none(),
systemFields,
Preconditions.checkNotNull(configuration, "configuration"),
Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig")
);
Expand Down
Loading

0 comments on commit 5cbb28a

Please sign in to comment.