diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssEntity.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssEntity.java index 3f501f212f7b5..0e9aafa52ec69 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssEntity.java +++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssEntity.java @@ -85,4 +85,9 @@ public Predicate getRetryCondition() { return OssUtils.RETRYABLE; } + + CloudObjectLocation getObject() + { + return object; + } } diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java index 5c9d498279657..dab5fa9da3b47 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java +++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.aliyun; import com.aliyun.oss.OSS; -import com.aliyun.oss.model.OSSObjectSummary; import com.aliyun.oss.model.ObjectMetadata; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; @@ -31,13 +30,15 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; -import org.apache.commons.lang.StringUtils; import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectSplitWidget; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.storage.aliyun.OssInputDataConfig; import org.apache.druid.storage.aliyun.OssStorageDruidModule; import org.apache.druid.storage.aliyun.OssUtils; @@ -45,9 +46,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.net.URI; -import java.nio.file.FileSystems; -import java.nio.file.PathMatcher; -import java.nio.file.Paths; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -80,10 +78,11 @@ public OssInputSource( @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, @JsonProperty("objectGlob") @Nullable String objectGlob, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, @JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig ) { - super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob); + super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob, systemFields); this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig"); Preconditions.checkNotNull(client, "client"); this.inputSourceConfig = inputSourceConfig; @@ -164,10 +163,28 @@ public SplittableInputSource> withSplit(InputSplit m.matches(Paths.get(object.getKey())) - ); - } - - return iterator; - }; - } } diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java index aea40ec6ef009..207cddf90b4bd 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java +++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java @@ -41,6 +41,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -54,6 +55,8 @@ import org.apache.druid.data.input.impl.InputStatsImpl; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.HumanReadableBytes; @@ -83,6 +86,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -153,10 +157,33 @@ public void testSerdeWithUris() throws Exception null, null, null, + null, + null + ); + final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class); + Assert.assertEquals(withUris, serdeWithUris); + Assert.assertEquals(Collections.emptySet(), serdeWithUris.getConfiguredSystemFields()); + } + + @Test + public void testSerdeWithUrisAndSystemFields() throws Exception + { + final OssInputSource withUris = (OssInputSource) new OssInputSource( + OSSCLIENT, + INPUT_DATA_CONFIG, + EXPECTED_URIS, + null, + null, + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)), null ); final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class); Assert.assertEquals(withUris, serdeWithUris); + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH), + serdeWithUris.getConfiguredSystemFields() + ); } @Test @@ -169,6 +196,7 @@ public void testSerdeWithPrefixes() throws Exception PREFIXES, null, null, + null, null ); final OssInputSource serdeWithPrefixes = @@ -186,6 +214,7 @@ public void testSerdeWithObjects() throws Exception null, EXPECTED_LOCATION, null, + null, null ); final OssInputSource serdeWithPrefixes = @@ -210,6 +239,7 @@ public void testInputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCre null, EXPECTED_LOCATION, null, + null, mockConfigPropertiesWithoutKeyAndSecret ); Assert.assertNotNull(withPrefixes); @@ -230,6 +260,7 @@ public void testSerdeOssClientLazyInitializedWithCrediential() throws Exception null, EXPECTED_LOCATION, null, + null, CLOUD_CONFIG_PROPERTIES ); final OssInputSource serdeWithPrefixes = @@ -250,6 +281,7 @@ public void testSerdeOssClientLazyInitializedWithoutCrediential() throws Excepti null, EXPECTED_LOCATION, null, + null, null ); final OssInputSource serdeWithPrefixes = @@ -268,6 +300,7 @@ public void testSerdeWithExtraEmptyLists() throws Exception ImmutableList.of(), EXPECTED_LOCATION, null, + null, null ); final OssInputSource serdeWithPrefixes = @@ -287,6 +320,7 @@ public void testSerdeWithInvalidArgs() PREFIXES, EXPECTED_LOCATION, null, + null, null ); } @@ -303,6 +337,7 @@ public void testSerdeWithOtherInvalidArgs() PREFIXES, ImmutableList.of(), null, + null, null ); } @@ -319,6 +354,7 @@ public void testSerdeWithOtherOtherInvalidArgs() PREFIXES, EXPECTED_LOCATION, null, + null, null ); } @@ -340,6 +376,7 @@ public void testWithUrisSplit() null, null, null, + null, null ); @@ -367,6 +404,7 @@ public void testWithPrefixesSplit() PREFIXES, null, null, + null, null ); @@ -394,6 +432,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() PREFIXES, null, null, + null, null ); @@ -424,6 +463,7 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() PREFIXES, null, null, + null, null ); @@ -453,6 +493,7 @@ public void testAccessDeniedWhileListingPrefix() ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, null, + null, null ); @@ -484,6 +525,7 @@ public void testReader() throws IOException ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null, null, + null, null ); @@ -530,6 +572,7 @@ public void testCompressedReader() throws IOException ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), null, null, + null, null ); @@ -569,12 +612,48 @@ public void testGetTypes() ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), null, null, + null, null ); Assert.assertEquals(ImmutableSet.of(OssStorageDruidModule.SCHEME), inputSource.getTypes()); } + @Test + public void testSystemFields() + { + OssInputSource inputSource = new OssInputSource( + OSSCLIENT, + INPUT_DATA_CONFIG, + null, + ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), + null, + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)), + null + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH), + inputSource.getConfiguredSystemFields() + ); + + final OssEntity entity = new OssEntity(null, new CloudObjectLocation("foo", "bar")); + + Assert.assertEquals("oss://foo/bar", inputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("foo", inputSource.getSystemFieldValue(entity, SystemField.BUCKET)); + Assert.assertEquals("bar", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(OssInputSource.class) + .usingGetClass() + .withIgnoredFields("clientSupplier", "inputDataConfig") + .verify(); + } + private static void expectListObjects(URI prefix, List uris, byte[] content) { final ObjectListing result = new ObjectListing(); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java index ebf453ae3f4f7..fc04b7b710ff6 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureEntity.java @@ -75,4 +75,9 @@ protected String getPath() { return location.getPath(); } + + CloudObjectLocation getLocation() + { + return location; + } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java index 6d0e60fe873ba..1732f8f60fc7e 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java @@ -27,11 +27,15 @@ import com.google.common.collect.Iterators; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectSplitWidget; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureInputDataConfig; import org.apache.druid.storage.azure.AzureStorage; @@ -68,10 +72,11 @@ public AzureInputSource( @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, - @JsonProperty("objectGlob") @Nullable String objectGlob + @JsonProperty("objectGlob") @Nullable String objectGlob, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields ) { - super(SCHEME, uris, prefixes, objects, objectGlob); + super(SCHEME, uris, prefixes, objects, objectGlob, systemFields); this.storage = Preconditions.checkNotNull(storage, "AzureStorage"); this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory"); this.azureCloudBlobIterableFactory = Preconditions.checkNotNull( @@ -100,10 +105,28 @@ public SplittableInputSource> withSplit(InputSplit EXPECTED_URIS; private static final List EXPECTED_PREFIXES; private static final List EXPECTED_CLOUD_OBJECTS; @@ -108,6 +117,23 @@ public void test_uriSerde_constructsProperAzureInputSource() throws Exception } + @Test + public void test_uriAndSystemFieldsSerde_constructsProperAzureInputSource() throws Exception + { + final InjectableValues.Std injectableValues = initInjectableValues(); + final ObjectMapper objectMapper = new DefaultObjectMapper() + .registerModules(new AzureStorageDruidModule().getJacksonModules()); + objectMapper.setInjectableValues(injectableValues); + + final AzureInputSource inputSource = objectMapper.readValue(JSON_WITH_URIS_AND_SYSFIELDS, AzureInputSource.class); + Assert.assertEquals(Collections.singleton(SystemField.URI), inputSource.getConfiguredSystemFields()); + + final AzureInputSource roundTripInputSource = objectMapper.readValue( + objectMapper.writeValueAsBytes(inputSource), + AzureInputSource.class); + Assert.assertEquals(Collections.singleton(SystemField.URI), roundTripInputSource.getConfiguredSystemFields()); + } + @Test public void test_prefixSerde_constructsProperAzureInputSource() throws Exception { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index 655e1f342a559..51518855cfc69 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -29,6 +29,8 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.azure.AzureCloudBlobIterable; @@ -47,6 +49,7 @@ import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -119,6 +122,7 @@ public void test_constructor_emptyUrisEmptyPrefixesEmptyObjects_throwsIllegalArg EMPTY_URIS, EMPTY_PREFIXES, EMPTY_OBJECTS, + null, null ); } @@ -139,6 +143,7 @@ public void test_createEntity_returnsExpectedEntity() EMPTY_URIS, EMPTY_PREFIXES, objects, + null, null ); @@ -172,6 +177,7 @@ public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLo EMPTY_URIS, prefixes, EMPTY_OBJECTS, + null, null ); @@ -221,7 +227,8 @@ public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudL EMPTY_URIS, prefixes, EMPTY_OBJECTS, - objectGlob + objectGlob, + null ); Stream>> cloudObjectStream = azureInputSource.createSplits( @@ -250,6 +257,7 @@ public void test_withSplit_constructsExpectedInputSource() EMPTY_URIS, prefixes, EMPTY_OBJECTS, + null, null ); @@ -270,11 +278,44 @@ public void test_toString_returnsExpectedString() EMPTY_URIS, prefixes, EMPTY_OBJECTS, + null, null ); String actualToString = azureInputSource.toString(); - Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", actualToString); + Assert.assertEquals( + "AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", + actualToString + ); + } + + @Test + public void test_toString_withAllSystemFields_returnsExpectedString() + { + List prefixes = ImmutableList.of(PREFIX_URI); + azureInputSource = new AzureInputSource( + storage, + entityFactory, + azureCloudBlobIterableFactory, + inputDataConfig, + EMPTY_URIS, + prefixes, + EMPTY_OBJECTS, + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)) + ); + + String actualToString = azureInputSource.toString(); + Assert.assertEquals( + "AzureInputSource{" + + "uris=[], " + + "prefixes=[azure://container/blob], " + + "objects=[], " + + "objectGlob=null, " + + "systemFields=[__file_uri, __file_bucket, __file_path]" + + "}", + actualToString + ); } @Test @@ -289,11 +330,42 @@ public void test_getTypes_returnsExpectedTypes() EMPTY_URIS, prefixes, EMPTY_OBJECTS, + null, null ); Assert.assertEquals(ImmutableSet.of(AzureInputSource.SCHEME), azureInputSource.getTypes()); } + @Test + public void test_systemFields() + { + azureInputSource = (AzureInputSource) new AzureInputSource( + storage, + entityFactory, + azureCloudBlobIterableFactory, + inputDataConfig, + EMPTY_URIS, + ImmutableList.of(PREFIX_URI), + EMPTY_OBJECTS, + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)) + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH), + azureInputSource.getConfiguredSystemFields() + ); + + final AzureEntity entity = new AzureEntity( + new CloudObjectLocation("foo", "bar"), + (containerName, blobPath) -> null + ); + + Assert.assertEquals("azure://foo/bar", azureInputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("foo", azureInputSource.getSystemFieldValue(entity, SystemField.BUCKET)); + Assert.assertEquals("bar", azureInputSource.getSystemFieldValue(entity, SystemField.PATH)); + } + @Test public void abidesEqualsContract() { diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index 5e35122a67086..cb3c72d1b954e 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -166,6 +166,11 @@ joda-time test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 13583a12f9c73..6c5539772ad18 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -69,4 +69,9 @@ public Predicate getRetryCondition() { return GoogleUtils.GOOGLE_RETRY; } + + CloudObjectLocation getLocation() + { + return location; + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 3e2e50d6bc752..3df8dcfae07c8 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -26,11 +26,14 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.Iterators; import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectSplitWidget; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; @@ -62,10 +65,11 @@ public GoogleCloudStorageInputSource( @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, - @JsonProperty("objectGlob") @Nullable String objectGlob + @JsonProperty("objectGlob") @Nullable String objectGlob, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields ) { - super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, objectGlob); + super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, objectGlob, systemFields); this.storage = storage; this.inputDataConfig = inputDataConfig; } @@ -87,7 +91,32 @@ protected InputEntity createEntity(CloudObjectLocation location) @Override public SplittableInputSource> withSplit(InputSplit> split) { - return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob()); + return new GoogleCloudStorageInputSource( + storage, + inputDataConfig, + null, + null, + split.get(), + getObjectGlob(), + systemFields + ); + } + + @Override + public Object getSystemFieldValue(InputEntity entity, SystemField field) + { + final GoogleCloudStorageEntity googleEntity = (GoogleCloudStorageEntity) entity; + + switch (field) { + case URI: + return googleEntity.getUri().toString(); + case BUCKET: + return googleEntity.getLocation().getBucket(); + case PATH: + return googleEntity.getLocation().getPath(); + default: + return null; + } } @Override @@ -151,6 +180,7 @@ public String toString() ", prefixes=" + getPrefixes() + ", objects=" + getObjects() + ", objectGlob=" + getObjectGlob() + + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + '}'; } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 404fd45d4d001..556eb840ea9f0 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -31,6 +31,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -44,6 +45,8 @@ import org.apache.druid.data.input.impl.InputStatsImpl; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.initialization.DruidModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -70,6 +73,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -118,7 +122,15 @@ public void testSerde() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); final GoogleCloudStorageInputSource withUris = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); + new GoogleCloudStorageInputSource( + STORAGE, + INPUT_DATA_CONFIG, + EXPECTED_URIS, + ImmutableList.of(), + null, + null, + null + ); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); @@ -129,7 +141,7 @@ public void testSerdePrefixes() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); final GoogleCloudStorageInputSource withPrefixes = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null, null); final GoogleCloudStorageInputSource serdeWithPrefixes = mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); Assert.assertEquals(withPrefixes, serdeWithPrefixes); @@ -146,18 +158,51 @@ public void testSerdeObjects() throws Exception null, null, ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")), + null, null ); final GoogleCloudStorageInputSource serdeWithObjects = mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class); Assert.assertEquals(withObjects, serdeWithObjects); + Assert.assertEquals(Collections.emptySet(), serdeWithObjects.getConfiguredSystemFields()); + } + + @Test + public void testSerdeObjectsAndSystemFields() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + final GoogleCloudStorageInputSource withObjects = + (GoogleCloudStorageInputSource) new GoogleCloudStorageInputSource( + STORAGE, + INPUT_DATA_CONFIG, + null, + null, + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")), + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)) + ); + final GoogleCloudStorageInputSource serdeWithObjects = + mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class); + Assert.assertEquals(withObjects, serdeWithObjects); + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH), + serdeWithObjects.getConfiguredSystemFields() + ); } @Test public void testGetTypes() { final GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); + new GoogleCloudStorageInputSource( + STORAGE, + INPUT_DATA_CONFIG, + EXPECTED_URIS, + ImmutableList.of(), + null, + null, + null + ); Assert.assertEquals(Collections.singleton(GoogleCloudStorageInputSource.TYPE_KEY), inputSource.getTypes()); } @@ -179,7 +224,15 @@ public void testWithUrisSplit() throws Exception ).andReturn(new StorageObject().setSize(BigInteger.valueOf(CONTENT.length))); EasyMock.replay(STORAGE); GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); + new GoogleCloudStorageInputSource( + STORAGE, + INPUT_DATA_CONFIG, + EXPECTED_URIS, + ImmutableList.of(), + null, + null, + null + ); Stream>> splits = inputSource.createSplits( null, @@ -212,7 +265,8 @@ public void testWithUrisGlob() throws Exception URIS_BEFORE_GLOB, null, null, - "**.csv" + "**.csv", + null ); Stream>> splits = inputSource.createSplits( @@ -234,7 +288,8 @@ public void testIllegalObjectsAndPrefixes() null, PREFIXES, EXPECTED_OBJECTS.get(0), - "**.csv" + "**.csv", + null ); } @@ -249,7 +304,8 @@ public void testIllegalUrisAndPrefixes() URIS_BEFORE_GLOB, PREFIXES, null, - "**.csv" + "**.csv", + null ); } @@ -265,7 +321,7 @@ public void testWithPrefixesSplit() throws IOException EasyMock.replay(INPUT_DATA_CONFIG); GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null); Stream>> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), @@ -287,7 +343,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() throws IOException EasyMock.replay(INPUT_DATA_CONFIG); GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null); Stream>> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), @@ -319,6 +375,7 @@ public void testReader() throws IOException null, PREFIXES, null, + null, null ); @@ -365,6 +422,7 @@ public void testCompressedReader() throws IOException null, PREFIXES, null, + null, null ); @@ -392,6 +450,40 @@ public void testCompressedReader() throws IOException Assert.assertEquals(2 * CONTENT.length, inputStats.getProcessedBytes()); } + @Test + public void testSystemFields() + { + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( + STORAGE, + INPUT_DATA_CONFIG, + null, + PREFIXES, + null, + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH)) + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.BUCKET, SystemField.PATH), + inputSource.getConfiguredSystemFields() + ); + + final GoogleCloudStorageEntity entity = new GoogleCloudStorageEntity(null, new CloudObjectLocation("foo", "bar")); + + Assert.assertEquals("gs://foo/bar", inputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("foo", inputSource.getSystemFieldValue(entity, SystemField.BUCKET)); + Assert.assertEquals("bar", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(GoogleCloudStorageInputSource.class) + .withIgnoredFields("storage", "inputDataConfig") + .usingGetClass() + .verify(); + } + private static void addExpectedPrefixObjects(URI prefix, List uris) throws IOException { final String bucket = prefix.getAuthority(); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java index cd75d6c2a7206..ab533538f43d8 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java @@ -60,7 +60,7 @@ protected InputStream readFrom(long offset) throws IOException @Override protected String getPath() { - return path.getName(); + return getUri().getPath(); } @Override diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index 9e76a69ec3b5b..6c349f4d8e735 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -36,7 +37,12 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.guice.Hdfs; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; @@ -59,16 +65,20 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource> +public class HdfsInputSource + extends AbstractInputSource + implements SplittableInputSource>, SystemFieldInputSource { static final String TYPE_KEY = HdfsStorageDruidModule.SCHEME; private static final String PROP_PATHS = "paths"; private final List inputPaths; + private final SystemFields systemFields; private final Configuration configuration; private final HdfsInputSourceConfig inputSourceConfig; @@ -86,11 +96,13 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn @JsonCreator public HdfsInputSource( @JsonProperty(PROP_PATHS) Object inputPaths, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) SystemFields systemFields, @JacksonInject @Hdfs Configuration configuration, @JacksonInject HdfsInputSourceConfig inputSourceConfig ) { this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); + this.systemFields = systemFields == null ? SystemFields.none() : systemFields; this.configuration = configuration; this.inputSourceConfig = inputSourceConfig; this.inputPaths.forEach(p -> verifyProtocol(configuration, inputSourceConfig, p)); @@ -182,6 +194,27 @@ List getInputPaths() return inputPaths; } + @Override + public Set getConfiguredSystemFields() + { + return systemFields.getFields(); + } + + @Override + public Object getSystemFieldValue(InputEntity entity, SystemField field) + { + final HdfsInputEntity hdfsEntity = (HdfsInputEntity) entity; + + switch (field) { + case URI: + return hdfsEntity.getUri().toString(); + case PATH: + return hdfsEntity.getPath(); + default: + return null; + } + } + @Override protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, @@ -198,7 +231,9 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Iterators.transform(cachedPaths.iterator(), path -> new HdfsInputEntity(configuration, path)), + CloseableIterators.withEmptyBaggage( + Iterators.transform(cachedPaths.iterator(), path -> new HdfsInputEntity(configuration, path))), + SystemFieldDecoratorFactory.fromInputSource(this), temporaryDirectory ); } @@ -234,7 +269,7 @@ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec sp public SplittableInputSource> withSplit(InputSplit> split) { List paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); - return new HdfsInputSource(paths, configuration, inputSourceConfig); + return new HdfsInputSource(paths, systemFields, configuration, inputSourceConfig); } @Override @@ -250,6 +285,37 @@ private void cachePathsIfNeeded() throws IOException } } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HdfsInputSource that = (HdfsInputSource) o; + return Objects.equals(inputPaths, that.inputPaths) + && Objects.equals(systemFields, that.systemFields) + && Objects.equals(configuration, that.configuration) + && Objects.equals(inputSourceConfig, that.inputSourceConfig); + } + + @Override + public int hashCode() + { + return Objects.hash(inputPaths, systemFields, configuration, inputSourceConfig); + } + + @Override + public String toString() + { + return "HdfsInputSource{" + + "inputPaths=" + inputPaths + + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + + '}'; + } + @VisibleForTesting static Builder builder() { @@ -288,6 +354,7 @@ HdfsInputSource build() { return new HdfsInputSource( Preconditions.checkNotNull(paths, "paths"), + SystemFields.none(), Preconditions.checkNotNull(configuration, "configuration"), Preconditions.checkNotNull(inputSourceConfig, "inputSourceConfig") ); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java index d2b9da3e3440b..0276ad3ae7369 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import org.apache.druid.data.input.InputSourceFactory; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.guice.Hdfs; import org.apache.hadoop.conf.Configuration; @@ -46,6 +47,6 @@ public HdfsInputSourceFactory( @Override public SplittableInputSource create(List inputFilePaths) { - return new HdfsInputSource(inputFilePaths, configuration, inputSourceConfig); + return new HdfsInputSource(inputFilePaths, SystemFields.none(), configuration, inputSourceConfig); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java index f11b956f27e2c..c46f3e9044f46 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java @@ -27,7 +27,6 @@ import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -42,15 +41,6 @@ public class S3Entity extends RetryingInputEntity private final CloudObjectLocation object; private final int maxRetries; - S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords) - { - this.s3Client = s3Client; - this.object = coords; - this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES; - } - - // this was added for testing but it might be useful in other cases (you can - // configure maxRetries... S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords, int maxRetries) { Preconditions.checkArgument(maxRetries >= 0); @@ -104,4 +94,9 @@ public Predicate getRetryCondition() { return S3Utils.S3RETRY; } + + CloudObjectLocation getObject() + { + return object; + } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 106d33216f296..69f97c24669b0 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -47,6 +47,8 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectSplitWidget; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.s3.S3InputDataConfig; @@ -109,13 +111,14 @@ public S3InputSource( @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects, @JsonProperty("objectGlob") @Nullable String objectGlob, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig, @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig, @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig, @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig ) { - super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob); + super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob, systemFields); this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig"); Preconditions.checkNotNull(s3Client, "s3Client"); this.s3InputSourceConfig = s3InputSourceConfig; @@ -200,6 +203,7 @@ public S3InputSource( prefixes, objects, objectGlob, + SystemFields.none(), s3InputSourceConfig, awsProxyConfig, awsEndpointConfig, @@ -216,6 +220,7 @@ public S3InputSource( List prefixes, List objects, String objectGlob, + SystemFields systemFields, S3InputSourceConfig s3InputSourceConfig, AWSProxyConfig awsProxyConfig, AWSEndpointConfig awsEndpointConfig, @@ -232,6 +237,7 @@ public S3InputSource( prefixes, objects, objectGlob, + systemFields, s3InputSourceConfig, awsProxyConfig, awsEndpointConfig, @@ -369,6 +375,7 @@ public SplittableInputSource> withSplit(InputSplit> withSplit(InputSplit uris, byte[] content) { final ListObjectsV2Result result = new ListObjectsV2Result(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 8056c69901fc5..2e548e181f405 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -46,11 +46,13 @@ import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Comparators; @@ -308,7 +310,8 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu return new InputEntityIteratingReader( getInputRowSchemaToUse(inputRowSchema), inputFormat, - entityIterator, + CloseableIterators.withEmptyBaggage(entityIterator), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java index e594c45739790..d78f576681eb5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.indexing.overlord.sampler.SamplerException; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -125,6 +126,7 @@ protected InputSourceReader formattableReader( inputRowSchema, format, createEntityIterator(), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java index 22a006bd6c293..69bcde0487f32 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java @@ -39,12 +39,14 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -431,7 +433,10 @@ public InputSourceReader reader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - data.stream().map(str -> new ByteEntity(StringUtils.toUtf8(str))).iterator(), + CloseableIterators.withEmptyBaggage( + data.stream().map(str -> new ByteEntity(StringUtils.toUtf8(str))).iterator() + ), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index e02806fce6cc5..6e6ceda47c2d1 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -34,6 +34,11 @@ import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.utils.CollectionUtils; import org.apache.druid.utils.Streams; @@ -47,24 +52,28 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class CloudObjectInputSource extends AbstractInputSource - implements SplittableInputSource> +public abstract class CloudObjectInputSource + extends AbstractInputSource + implements SplittableInputSource>, SystemFieldInputSource { private final String scheme; private final List uris; private final List prefixes; private final List objects; private final String objectGlob; + protected final SystemFields systemFields; public CloudObjectInputSource( String scheme, @Nullable List uris, @Nullable List prefixes, @Nullable List objects, - @Nullable String objectGlob + @Nullable String objectGlob, + @Nullable SystemFields systemFields ) { this.scheme = scheme; @@ -72,6 +81,7 @@ public CloudObjectInputSource( this.prefixes = prefixes; this.objects = objects; this.objectGlob = objectGlob; + this.systemFields = systemFields == null ? SystemFields.none() : systemFields; illegalArgsChecker(); } @@ -106,6 +116,12 @@ public String getObjectGlob() return objectGlob; } + @Override + public Set getConfiguredSystemFields() + { + return systemFields.getFields(); + } + /** * Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This * is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}. @@ -174,7 +190,8 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - getInputEntities(inputFormat), + CloseableIterators.withEmptyBaggage(getInputEntities(inputFormat)), + SystemFieldDecoratorFactory.fromInputSource(this), temporaryDirectory ); } @@ -208,13 +225,14 @@ public boolean equals(Object o) Objects.equals(uris, that.uris) && Objects.equals(prefixes, that.prefixes) && Objects.equals(objects, that.objects) && - Objects.equals(objectGlob, that.objectGlob); + Objects.equals(objectGlob, that.objectGlob) && + Objects.equals(systemFields, that.systemFields); } @Override public int hashCode() { - return Objects.hash(scheme, uris, prefixes, objects, objectGlob); + return Objects.hash(scheme, uris, prefixes, objects, objectGlob, systemFields); } private void illegalArgsChecker() throws IllegalArgumentException diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FileEntity.java b/processing/src/main/java/org/apache/druid/data/input/impl/FileEntity.java index 4d8ba20877b99..01762a9e87986 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/FileEntity.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/FileEntity.java @@ -62,6 +62,11 @@ public URI getUri() return file.toURI(); } + public File getFile() + { + return file; + } + @Override public InputStream open() throws IOException { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 61b4a8e675376..0ef8194f1e130 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -26,11 +26,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PasswordProvider; @@ -45,7 +51,9 @@ import java.util.Set; import java.util.stream.Stream; -public class HttpInputSource extends AbstractInputSource implements SplittableInputSource +public class HttpInputSource + extends AbstractInputSource + implements SplittableInputSource, SystemFieldInputSource { public static final String TYPE_KEY = "http"; @@ -54,6 +62,7 @@ public class HttpInputSource extends AbstractInputSource implements SplittableIn private final String httpAuthenticationUsername; @Nullable private final PasswordProvider httpAuthenticationPasswordProvider; + private final SystemFields systemFields; private final HttpInputSourceConfig config; @JsonCreator @@ -61,6 +70,7 @@ public HttpInputSource( @JsonProperty("uris") List uris, @JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername, @JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields, @JacksonInject HttpInputSourceConfig config ) { @@ -69,6 +79,7 @@ public HttpInputSource( this.uris = uris; this.httpAuthenticationUsername = httpAuthenticationUsername; this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider; + this.systemFields = systemFields == null ? SystemFields.none() : systemFields; this.config = config; } @@ -95,6 +106,12 @@ public List getUris() return uris; } + @Override + public Set getConfiguredSystemFields() + { + return systemFields.getFields(); + } + @Nullable @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) @@ -130,10 +147,26 @@ public SplittableInputSource withSplit(InputSplit split) Collections.singletonList(split.get()), httpAuthenticationUsername, httpAuthenticationPasswordProvider, + systemFields, config ); } + @Override + public Object getSystemFieldValue(InputEntity entity, SystemField field) + { + final HttpEntity httpEntity = (HttpEntity) entity; + + switch (field) { + case URI: + return httpEntity.getUri().toString(); + case PATH: + return httpEntity.getPath(); + default: + return null; + } + } + @Override protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, @@ -144,11 +177,14 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).map(split -> new HttpEntity( - split.get(), - httpAuthenticationUsername, - httpAuthenticationPasswordProvider - )).iterator(), + CloseableIterators.withEmptyBaggage( + createSplits(inputFormat, null).map(split -> new HttpEntity( + split.get(), + httpAuthenticationUsername, + httpAuthenticationPasswordProvider + )).iterator() + ), + SystemFieldDecoratorFactory.fromInputSource(this), temporaryDirectory ); } @@ -163,16 +199,17 @@ public boolean equals(Object o) return false; } HttpInputSource that = (HttpInputSource) o; - return Objects.equals(uris, that.uris) && - Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) && - Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) && - Objects.equals(config, that.config); + return Objects.equals(uris, that.uris) + && Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername) + && Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider) + && Objects.equals(systemFields, that.systemFields) + && Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, config); + return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, config); } @Override @@ -185,9 +222,10 @@ public boolean needsFormat() public String toString() { return "HttpInputSource{" + - "uris=\"" + uris + - "\", httpAuthenticationUsername=" + httpAuthenticationUsername + + "uris=\"" + uris + "\"" + + ", httpAuthenticationUsername=" + httpAuthenticationUsername + ", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider + + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + "}"; } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java index 319750ef3e1a4..cbcb7e1537dc6 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java @@ -28,6 +28,8 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nonnull; @@ -88,7 +90,8 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Stream.of(new ByteEntity(StringUtils.toUtf8(data))).iterator(), + CloseableIterators.withEmptyBaggage(Stream.of(new ByteEntity(StringUtils.toUtf8(data))).iterator()), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java index 1d450e79ab7c5..ca67388b1a263 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/InputEntityIteratingReader.java @@ -28,12 +28,11 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputStats; -import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.function.Function; /** @@ -45,28 +44,21 @@ public class InputEntityIteratingReader implements InputSourceReader private final InputRowSchema inputRowSchema; private final InputFormat inputFormat; private final CloseableIterator sourceIterator; + private final SystemFieldDecoratorFactory systemFieldDecoratorFactory; private final File temporaryDirectory; public InputEntityIteratingReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - Iterator sourceIterator, - File temporaryDirectory - ) - { - this(inputRowSchema, inputFormat, CloseableIterators.withEmptyBaggage(sourceIterator), temporaryDirectory); - } - - public InputEntityIteratingReader( - InputRowSchema inputRowSchema, - InputFormat inputFormat, - CloseableIterator sourceCloseableIterator, + CloseableIterator sourceIterator, + SystemFieldDecoratorFactory systemFieldDecoratorFactory, File temporaryDirectory ) { this.inputRowSchema = inputRowSchema; this.inputFormat = inputFormat; - this.sourceIterator = (CloseableIterator) sourceCloseableIterator; + this.sourceIterator = (CloseableIterator) sourceIterator; + this.systemFieldDecoratorFactory = systemFieldDecoratorFactory; this.temporaryDirectory = temporaryDirectory; } @@ -75,10 +67,11 @@ public CloseableIterator read(InputStats inputStats) { return createIterator(entity -> { // InputEntityReader is stateful and so a new one should be created per entity. + final Function systemFieldDecorator = systemFieldDecoratorFactory.decorator(entity); try { final InputEntity entityToRead = inputStats == null ? entity : new BytesCountingInputEntity(entity, inputStats); final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entityToRead, temporaryDirectory); - return reader.read(); + return reader.read().map(systemFieldDecorator); } catch (IOException e) { throw new RuntimeException(entity.getUri() != null ? diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index f802ae6621cf9..18eb7c7472c2f 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -35,12 +35,18 @@ import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.CollectionUtils; @@ -57,7 +63,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class LocalInputSource extends AbstractInputSource implements SplittableInputSource> +public class LocalInputSource + extends AbstractInputSource + implements SplittableInputSource>, SystemFieldInputSource { private static final Logger log = new Logger(LocalInputSource.class); public static final String TYPE_KEY = "local"; @@ -67,17 +75,20 @@ public class LocalInputSource extends AbstractInputSource implements SplittableI @Nullable private final String filter; private final List files; + private final SystemFields systemFields; @JsonCreator public LocalInputSource( @JsonProperty("baseDir") @Nullable File baseDir, @JsonProperty("filter") @Nullable String filter, - @JsonProperty("files") @Nullable List files + @JsonProperty("files") @Nullable List files, + @JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields ) { this.baseDir = baseDir; this.filter = baseDir != null ? Preconditions.checkNotNull(filter, "filter") : filter; this.files = files == null ? Collections.emptyList() : files; + this.systemFields = systemFields == null ? SystemFields.none() : systemFields; if (baseDir == null && CollectionUtils.isNullOrEmpty(files)) { throw new IAE("At least one of baseDir or files should be specified"); @@ -94,7 +105,7 @@ public Set getTypes() public LocalInputSource(File baseDir, String filter) { - this(baseDir, filter, null); + this(baseDir, filter, null, SystemFields.none()); } @Nullable @@ -128,6 +139,12 @@ public String getFilter() return filter; } + @Override + public Set getConfiguredSystemFields() + { + return systemFields.getFields(); + } + public List getFiles() { return files; @@ -227,7 +244,22 @@ private Iterator getFilesListIterator() @Override public SplittableInputSource> withSplit(InputSplit> split) { - return new LocalInputSource(null, null, split.get()); + return new LocalInputSource(null, null, split.get(), systemFields); + } + + @Override + public Object getSystemFieldValue(InputEntity entity, SystemField field) + { + final FileEntity fileEntity = (FileEntity) entity; + + switch (field) { + case URI: + return fileEntity.getUri().toString(); + case PATH: + return fileEntity.getFile().getPath(); + default: + return null; + } } @Override @@ -247,7 +279,8 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Iterators.transform(getFileIterator(), FileEntity::new), + CloseableIterators.withEmptyBaggage(Iterators.transform(getFileIterator(), FileEntity::new)), + SystemFieldDecoratorFactory.fromInputSource(this), temporaryDirectory ); } @@ -262,15 +295,16 @@ public boolean equals(Object o) return false; } LocalInputSource that = (LocalInputSource) o; - return Objects.equals(baseDir, that.baseDir) && - Objects.equals(filter, that.filter) && - Objects.equals(files, that.files); + return Objects.equals(baseDir, that.baseDir) + && Objects.equals(filter, that.filter) + && Objects.equals(files, that.files) + && Objects.equals(systemFields, that.systemFields); } @Override public int hashCode() { - return Objects.hash(baseDir, filter, files); + return Objects.hash(baseDir, filter, files, systemFields); } @Override @@ -280,6 +314,7 @@ public String toString() "baseDir=\"" + baseDir + "\", filter=" + filter + ", files=" + files + + (systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) + "}"; } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java index b2fa6a13dcb71..ff7d72aa431ef 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.impl; import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import java.io.File; import java.util.List; @@ -35,7 +36,8 @@ public LocalInputSource create(List inputFilePaths) null, null, inputFilePaths.stream().map(chosenPath -> new File(chosenPath)).collect( - Collectors.toList()) + Collectors.toList()), + SystemFields.none() ); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemField.java b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemField.java new file mode 100644 index 0000000000000..59b74ef22c673 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemField.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl.systemfield; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.column.ColumnType; + +/** + * System fields that can appear when reading data. These are generated by the system itself, rather than actually + * being present in the data being scanned. + * + * Currently these are only used by {@link SystemFieldInputSource}, and are therefore part of the package that + * contains things related to {@link org.apache.druid.data.input.InputSource}. Perhaps, in the future, system fields + * may be generated from segment scans as well (for example "__segment_id" returning the + * {@link org.apache.druid.timeline.SegmentId}). At that point this enum may move to a more central location. + */ +public enum SystemField +{ + URI("__file_uri", ColumnType.STRING), + BUCKET("__file_bucket", ColumnType.STRING), + PATH("__file_path", ColumnType.STRING); + + private final String fieldName; + private final ColumnType columnType; + + SystemField(final String fieldName, final ColumnType columnType) + { + this.fieldName = fieldName; + this.columnType = columnType; + } + + @JsonCreator + public static SystemField fromFieldName(final String fieldName) + { + for (final SystemField field : values()) { + if (field.getFieldName().equals(fieldName)) { + return field; + } + } + + throw new IAE("No such system field[%s]", fieldName); + } + + /** + * Name of this system field. + */ + @JsonValue + public String getFieldName() + { + return fieldName; + } + + /** + * Type of this system field. + */ + public ColumnType getColumnType() + { + return columnType; + } + + @Override + public String toString() + { + return fieldName; + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldDecoratorFactory.java b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldDecoratorFactory.java new file mode 100644 index 0000000000000..fcdc2cede55e2 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldDecoratorFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl.systemfield; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.Row; +import org.apache.druid.segment.transform.RowFunction; +import org.apache.druid.segment.transform.TransformedInputRow; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Decorator of {@link InputRow} from an {@link InputEntity}. + */ +public class SystemFieldDecoratorFactory +{ + /** + * Decorator factory that does not generate system fields. + */ + public static final SystemFieldDecoratorFactory NONE = + new SystemFieldDecoratorFactory((entity, field) -> null, EnumSet.noneOf(SystemField.class)); + + private final BiFunction extractor; + private final Set fields; + + /** + * Constructor. Package-private; most callers should use {@link #fromInputSource(SystemFieldInputSource)} + * or {@link #NONE}. + */ + SystemFieldDecoratorFactory( + final BiFunction extractor, + final Set fields + ) + { + this.extractor = extractor; + this.fields = fields; + } + + /** + * Create a decorator factory for a given {@link SystemFieldInputSource}. + */ + public static SystemFieldDecoratorFactory fromInputSource(final SystemFieldInputSource source) + { + return new SystemFieldDecoratorFactory(source::getSystemFieldValue, source.getConfiguredSystemFields()); + } + + /** + * Create a decorator for the given {@link InputEntity}. All {@link InputRow} for a given {@link InputEntity} + * have the same value for all {@link SystemField}. + */ + public Function decorator(final InputEntity entity) + { + if (fields.isEmpty()) { + return Function.identity(); + } else { + final Map transforms = new HashMap<>(); + + for (final SystemField field : fields) { + final Object fieldValue = extractor.apply(entity, field); + transforms.put(field.toString(), new ConstantRowFunction(fieldValue)); + } + + return row -> new TransformedInputRow(row, transforms); + } + } + + /** + * Row function that returns a constant value. Helper for {@link #decorator(InputEntity)}. + */ + private static class ConstantRowFunction implements RowFunction + { + private final Object value; + + public ConstantRowFunction(Object value) + { + this.value = value; + } + + @Override + public Object eval(Row row) + { + return value; + } + + @Override + public List evalDimension(Row row) + { + return Collections.singletonList(value == null ? null : String.valueOf(value)); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldInputSource.java b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldInputSource.java new file mode 100644 index 0000000000000..70c528165dab9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFieldInputSource.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl.systemfield; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Set; + +/** + * An {@link InputSource} that can generate system fields. + * + * Implementations of {@link InputSource#reader(InputRowSchema, InputFormat, File)} tend to create a decorator factory + * using {@link SystemFieldDecoratorFactory#fromInputSource(SystemFieldInputSource)} on "this" and then pass it to + * {@link org.apache.druid.data.input.impl.InputEntityIteratingReader}. + */ +public interface SystemFieldInputSource extends InputSource +{ + String SYSTEM_FIELDS_PROPERTY = "systemFields"; + + /** + * System fields that this input source is configured to return. + * + * This is not the same set that {@link #getSystemFieldValue(InputEntity, SystemField)} returns nonnull for. For + * example, if a {@link org.apache.druid.data.input.impl.LocalInputSource} is configured to return + * {@link SystemField#BUCKET} then it will show up in this list, even though its value is always null. For another + * example in a different direction, if a {@link org.apache.druid.data.input.impl.LocalInputSource} is *not* + * configured to return {@link SystemField#URI}, then it will *not* show up in this list, even though its value + * from {@link #getSystemFieldValue(InputEntity, SystemField)} would be nonnull. + */ + @JsonProperty(SYSTEM_FIELDS_PROPERTY) + @JsonInclude(JsonInclude.Include.NON_EMPTY) + Set getConfiguredSystemFields(); + + /** + * Compute the value of a system field for a particular {@link InputEntity}. + */ + @Nullable + Object getSystemFieldValue(InputEntity entity, SystemField field); +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFields.java b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFields.java new file mode 100644 index 0000000000000..efc562be38177 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/systemfield/SystemFields.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl.systemfield; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.EnumSet; +import java.util.Objects; + +/** + * Container of {@link SystemField}. Wrapper around an {@link EnumSet}. + */ +public class SystemFields +{ + private static final SystemFields NONE = new SystemFields(EnumSet.noneOf(SystemField.class)); + + private final EnumSet fields; + + @JsonCreator + public SystemFields(EnumSet fields) + { + this.fields = fields; + } + + public static SystemFields none() + { + return NONE; + } + + @JsonValue + public EnumSet getFields() + { + return fields; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SystemFields that = (SystemFields) o; + return Objects.equals(fields, that.fields); + } + + @Override + public int hashCode() + { + return Objects.hash(fields); + } + + @Override + public String toString() + { + return fields.toString(); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/common/CloseableIterators.java b/processing/src/main/java/org/apache/druid/java/util/common/CloseableIterators.java index a26bcf4dd200b..ca84d02e99fc9 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/CloseableIterators.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/CloseableIterators.java @@ -31,6 +31,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.function.Function; public class CloseableIterators { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 5b08f5863aec8..1f45f202cdbac 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -77,7 +77,7 @@ import org.apache.druid.segment.serde.ComplexMetricExtractor; import org.apache.druid.segment.serde.ComplexMetricSerde; import org.apache.druid.segment.serde.ComplexMetrics; -import org.apache.druid.segment.transform.Transformer; +import org.apache.druid.segment.transform.TransformedInputRow; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -713,8 +713,8 @@ private static String getSimplifiedEventStringFromRow(InputRow inputRow) return ((MapBasedInputRow) inputRow).getEvent().toString(); } - if (inputRow instanceof Transformer.TransformedInputRow) { - InputRow innerRow = ((Transformer.TransformedInputRow) inputRow).getBaseRow(); + if (inputRow instanceof TransformedInputRow) { + InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow(); if (innerRow instanceof MapBasedInputRow) { return ((MapBasedInputRow) innerRow).getEvent().toString(); } diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformedInputRow.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformedInputRow.java new file mode 100644 index 0000000000000..7d1db5ca479f6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformedInputRow.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.transform; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.Rows; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.column.ColumnHolder; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class TransformedInputRow implements InputRow +{ + private final InputRow row; + private final Map transforms; + + // cached column, because it will be read frequently + private final DateTime timestamp; + + public TransformedInputRow(final InputRow row, final Map transforms) + { + this.row = row; + this.transforms = transforms; + + this.timestamp = readTimestampFromRow(row, transforms); + } + + @Override + public List getDimensions() + { + return row.getDimensions(); + } + + static DateTime readTimestampFromRow(final InputRow row, final Map transforms) + { + final RowFunction transform = transforms.get(ColumnHolder.TIME_COLUMN_NAME); + final long ts; + if (transform != null) { + //noinspection ConstantConditions time column is never null + final Number transformedVal = Rows.objectToNumber(ColumnHolder.TIME_COLUMN_NAME, transform.eval(row), true); + if (transformedVal == null) { + throw new ParseException(row.toString(), "Could not transform value for __time."); + } + ts = transformedVal.longValue(); + } else { + ts = row.getTimestampFromEpoch(); + } + return DateTimes.utc(ts); + } + + @Override + public long getTimestampFromEpoch() + { + return timestamp.getMillis(); + } + + @Override + public DateTime getTimestamp() + { + return timestamp; + } + + @Override + public List getDimension(final String dimension) + { + final RowFunction transform = transforms.get(dimension); + if (transform != null) { + return transform.evalDimension(row); + } else { + return row.getDimension(dimension); + } + } + + @Override + public Object getRaw(final String column) + { + final RowFunction transform = transforms.get(column); + if (transform != null) { + return transform.eval(row); + } else { + return row.getRaw(column); + } + } + + @Override + public Number getMetric(final String metric) + { + final RowFunction transform = transforms.get(metric); + if (transform != null) { + return Rows.objectToNumber(metric, transform.eval(row), true); + } else { + return row.getMetric(metric); + } + } + + public InputRow getBaseRow() + { + return row; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TransformedInputRow that = (TransformedInputRow) o; + return Objects.equals(row, that.row) && + Objects.equals(transforms, that.transforms); + } + + @Override + public int hashCode() + { + return Objects.hash(row, transforms); + } + + @Override + public int compareTo(final Row o) + { + return row.compareTo(o); + } + + @Override + public String toString() + { + return "TransformedInputRow{" + + "row=" + row + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java index 65acf7c58d0d3..248fa21360168 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java @@ -22,22 +22,17 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.Row; -import org.apache.druid.data.input.Rows; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedColumnSelectorFactory; -import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; -import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; /** * @@ -147,128 +142,4 @@ public InputRowListPlusRawValues transform(@Nullable final InputRowListPlusRawVa return inputRowListPlusRawValues; } - - public static class TransformedInputRow implements InputRow - { - private final InputRow row; - private final Map transforms; - - // cached column, because it will be read frequently - private final DateTime timestamp; - - public TransformedInputRow(final InputRow row, final Map transforms) - { - this.row = row; - this.transforms = transforms; - - this.timestamp = readTimestampFromRow(row, transforms); - } - - @Override - public List getDimensions() - { - return row.getDimensions(); - } - - static DateTime readTimestampFromRow(final InputRow row, final Map transforms) - { - final RowFunction transform = transforms.get(ColumnHolder.TIME_COLUMN_NAME); - final long ts; - if (transform != null) { - //noinspection ConstantConditions time column is never null - final Number transformedVal = Rows.objectToNumber(ColumnHolder.TIME_COLUMN_NAME, transform.eval(row), true); - if (transformedVal == null) { - throw new ParseException(row.toString(), "Could not transform value for __time."); - } - ts = transformedVal.longValue(); - } else { - ts = row.getTimestampFromEpoch(); - } - return DateTimes.utc(ts); - } - - @Override - public long getTimestampFromEpoch() - { - return timestamp.getMillis(); - } - - @Override - public DateTime getTimestamp() - { - return timestamp; - } - - @Override - public List getDimension(final String dimension) - { - final RowFunction transform = transforms.get(dimension); - if (transform != null) { - return transform.evalDimension(row); - } else { - return row.getDimension(dimension); - } - } - - @Override - public Object getRaw(final String column) - { - final RowFunction transform = transforms.get(column); - if (transform != null) { - return transform.eval(row); - } else { - return row.getRaw(column); - } - } - - @Override - public Number getMetric(final String metric) - { - final RowFunction transform = transforms.get(metric); - if (transform != null) { - return Rows.objectToNumber(metric, transform.eval(row), true); - } else { - return row.getMetric(metric); - } - } - - public InputRow getBaseRow() - { - return row; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TransformedInputRow that = (TransformedInputRow) o; - return Objects.equals(row, that.row) && - Objects.equals(transforms, that.transforms); - } - - @Override - public int hashCode() - { - return Objects.hash(row, transforms); - } - - @Override - public int compareTo(final Row o) - { - return row.compareTo(o); - } - - @Override - public String toString() - { - return "TransformedInputRow{" + - "row=" + row + - '}'; - } - } } diff --git a/processing/src/test/java/org/apache/druid/data/input/ResourceInputSource.java b/processing/src/test/java/org/apache/druid/data/input/ResourceInputSource.java index 44aa52615d417..a0d87bdec638a 100644 --- a/processing/src/test/java/org/apache/druid/data/input/ResourceInputSource.java +++ b/processing/src/test/java/org/apache/druid/data/input/ResourceInputSource.java @@ -20,6 +20,8 @@ package org.apache.druid.data.input; import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; @@ -72,7 +74,10 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Collections.singletonList(new ResourceStreamEntity(classLoader, resourceFile)).iterator(), + CloseableIterators.withEmptyBaggage( + Collections.singletonList(new ResourceStreamEntity(classLoader, resourceFile)).iterator() + ), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java index 9c17b57d21eb4..bcd6152f05dd1 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/HttpInputSourceTest.java @@ -23,7 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.metadata.DefaultPasswordProvider; import org.junit.Assert; import org.junit.Rule; @@ -32,6 +35,7 @@ import java.io.IOException; import java.net.URI; +import java.util.EnumSet; public class HttpInputSourceTest { @@ -48,6 +52,7 @@ public void testSerde() throws IOException ImmutableList.of(URI.create("http://test.com/http-test")), "myName", new DefaultPasswordProvider("myPassword"), + new SystemFields(EnumSet.of(SystemField.URI)), httpInputSourceConfig ); final byte[] json = mapper.writeValueAsBytes(source); @@ -62,6 +67,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() ImmutableList.of(URI.create("http:///")), "myName", new DefaultPasswordProvider("myPassword"), + null, new HttpInputSourceConfig(null) ); @@ -69,6 +75,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() ImmutableList.of(URI.create("https:///")), "myName", new DefaultPasswordProvider("myPassword"), + null, new HttpInputSourceConfig(null) ); @@ -78,6 +85,7 @@ public void testConstructorAllowsOnlyDefaultProtocols() ImmutableList.of(URI.create("my-protocol:///")), "myName", new DefaultPasswordProvider("myPassword"), + null, new HttpInputSourceConfig(null) ); } @@ -90,6 +98,7 @@ public void testConstructorAllowsOnlyCustomProtocols() ImmutableList.of(URI.create("druid:///")), "myName", new DefaultPasswordProvider("myPassword"), + null, customConfig ); @@ -99,7 +108,39 @@ public void testConstructorAllowsOnlyCustomProtocols() ImmutableList.of(URI.create("https:///")), "myName", new DefaultPasswordProvider("myPassword"), + null, customConfig ); } + + @Test + public void testSystemFields() + { + HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null); + final HttpInputSource inputSource = new HttpInputSource( + ImmutableList.of(URI.create("http://test.com/http-test")), + "myName", + new DefaultPasswordProvider("myPassword"), + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)), + httpInputSourceConfig + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.PATH), + inputSource.getConfiguredSystemFields() + ); + + final HttpEntity entity = new HttpEntity(URI.create("https://example.com/foo"), null, null); + + Assert.assertEquals("https://example.com/foo", inputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(HttpInputSource.class) + .usingGetClass() + .verify(); + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index a33899b253549..70b75d23955f7 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -25,6 +25,8 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -80,7 +82,10 @@ public void test() throws IOException false, 0 ), - files.stream().flatMap(file -> ImmutableList.of(new FileEntity(file)).stream()).iterator(), + CloseableIterators.withEmptyBaggage( + files.stream().flatMap(file -> ImmutableList.of(new FileEntity(file)).stream()).iterator() + ), + SystemFieldDecoratorFactory.NONE, temporaryFolder.newFolder() ); @@ -123,18 +128,21 @@ public void testIncorrectURI() throws IOException, URISyntaxException false, 0 ), - ImmutableList.of( - new HttpEntity(new URI("testscheme://some/path"), null, null) - { - @Override - protected int getMaxRetries() - { - // override so this test does not take like 4 minutes to run - return 2; - } - } + CloseableIterators.withEmptyBaggage( + ImmutableList.of( + new HttpEntity(new URI("testscheme://some/path"), null, null) + { + @Override + protected int getMaxRetries() + { + // override so this test does not take like 4 minutes to run + return 2; + } + } - ).iterator(), + ).iterator() + ), + SystemFieldDecoratorFactory.NONE, temporaryFolder.newFolder() ); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java index c72c52f462ba3..9958cd5174b3d 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceTest.java @@ -26,6 +26,8 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.systemfield.SystemField; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.utils.Streams; import org.easymock.EasyMock; @@ -41,6 +43,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -70,6 +73,23 @@ public void testSerdeRelativeBaseDir() throws IOException final byte[] json = mapper.writeValueAsBytes(source); final LocalInputSource fromJson = (LocalInputSource) mapper.readValue(json, InputSource.class); Assert.assertEquals(source, fromJson); + Assert.assertEquals(Collections.emptySet(), fromJson.getConfiguredSystemFields()); + } + + @Test + public void testSerdeRelativeBaseDirWithSystemFields() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + final LocalInputSource source = new LocalInputSource( + new File("myFile"), + "myFilter", + null, + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)) + ); + final byte[] json = mapper.writeValueAsBytes(source); + final LocalInputSource fromJson = (LocalInputSource) mapper.readValue(json, InputSource.class); + Assert.assertEquals(source, fromJson); + Assert.assertEquals(EnumSet.of(SystemField.URI, SystemField.PATH), fromJson.getConfiguredSystemFields()); } @Test @@ -82,7 +102,8 @@ public void testSerdeMixedAbsoluteAndRelativeFiles() throws IOException ImmutableList.of( new File("myFile1"), new File("myFile2").getAbsoluteFile() - ) + ), + null ); final byte[] json = mapper.writeValueAsBytes(source); final LocalInputSource fromJson = (LocalInputSource) mapper.readValue(json, InputSource.class); @@ -96,6 +117,30 @@ public void testGetTypes() Assert.assertEquals(Collections.singleton(LocalInputSource.TYPE_KEY), source.getTypes()); } + @Test + public void testSystemFields() + { + final LocalInputSource inputSource = (LocalInputSource) new LocalInputSource( + null, + null, + ImmutableList.of( + new File("myFile1"), + new File("myFile2").getAbsoluteFile() + ), + new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)) + ); + + Assert.assertEquals( + EnumSet.of(SystemField.URI, SystemField.PATH), + inputSource.getConfiguredSystemFields() + ); + + final FileEntity entity = new FileEntity(new File("/tmp/foo")); + + Assert.assertEquals("file:/tmp/foo", inputSource.getSystemFieldValue(entity, SystemField.URI)); + Assert.assertEquals("/tmp/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH)); + } + @Test public void testEquals() { @@ -108,7 +153,7 @@ public void testCreateSplitsRespectingSplitHintSpec() final long fileSize = 15; final HumanReadableBytes maxSplitSize = new HumanReadableBytes(50L); final List files = mockFiles(10, fileSize); - final LocalInputSource inputSource = new LocalInputSource(null, null, files); + final LocalInputSource inputSource = new LocalInputSource(null, null, files, null); final List>> splits = inputSource .createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize, null)) .collect(Collectors.toList()); @@ -125,7 +170,7 @@ public void testEstimateNumSplitsRespectingSplitHintSpec() final long fileSize = 13; final HumanReadableBytes maxSplitSize = new HumanReadableBytes(40L); final List files = mockFiles(10, fileSize); - final LocalInputSource inputSource = new LocalInputSource(null, null, files); + final LocalInputSource inputSource = new LocalInputSource(null, null, files, null); Assert.assertEquals( 4, inputSource.estimateNumSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize, null)) @@ -155,7 +200,7 @@ public void testGetFileIteratorWithBothBaseDirAndDuplicateFilesIteratingFilesOnl Set expectedFiles = new HashSet<>(filesInBaseDir); expectedFiles.addAll(files); File.createTempFile("local-input-source", ".filtered", baseDir); - Iterator fileIterator = new LocalInputSource(baseDir, "*.data", files).getFileIterator(); + Iterator fileIterator = new LocalInputSource(baseDir, "*.data", files, null).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); Assert.assertEquals(expectedFiles, actualFiles); } @@ -172,7 +217,7 @@ public void testGetFileIteratorWithOnlyBaseDirIteratingAllFiles() throws IOExcep } filesInBaseDir.add(file); } - Iterator fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator(); + Iterator fileIterator = new LocalInputSource(baseDir, "*", null, null).getFileIterator(); Set actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet()); Assert.assertEquals(filesInBaseDir, actualFiles); } @@ -189,7 +234,7 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti } filesInBaseDir.add(file); } - Iterator fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator(); + Iterator fileIterator = new LocalInputSource(null, null, filesInBaseDir, null).getFileIterator(); List actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toList()); Assert.assertEquals(filesInBaseDir, actualFiles); } @@ -199,7 +244,7 @@ public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly() { final List files = mockFiles(10, 5); files.addAll(mockFiles(10, 0)); - final LocalInputSource inputSource = new LocalInputSource(null, null, files); + final LocalInputSource inputSource = new LocalInputSource(null, null, files, null); List iteratedFiles = Lists.newArrayList(inputSource.getFileIterator()); Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0)); } diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java index 0064a343107e6..8d886b058fca3 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -33,7 +33,9 @@ import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.metadata.SQLFirehoseDatabaseConnector; import javax.annotation.Nonnull; @@ -128,8 +130,9 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null) - .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator(), + CloseableIterators.withEmptyBaggage(createSplits(inputFormat, null) + .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator()), + SystemFieldDecoratorFactory.NONE, temporaryDirectory ); } diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java index 68e4d5887bc2d..96afcad9a8cc2 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/ExternalTableTest.java @@ -169,6 +169,7 @@ public void httpDocExample() throws URISyntaxException Collections.singletonList(new URI("https://example.com/my.csv")), // removed "bob", new DefaultPasswordProvider("secret"), + null, new HttpInputSourceConfig(null) ); Map sourceMap = toMap(inputSource); @@ -193,6 +194,7 @@ public void httpConnDocExample() throws URISyntaxException Collections.singletonList(new URI("https://example.com/")), "bob", new DefaultPasswordProvider("secret"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("koala") diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java index 67ceedf30391d..215c9e0c62e66 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java @@ -98,6 +98,7 @@ public void testNoFormatWithURI() throws URISyntaxException Collections.singletonList(new URI("http://example.com/file.csv")), null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -117,6 +118,7 @@ public void testNoColumnsWithUri() throws URISyntaxException Collections.singletonList(new URI("http://example.com/file.csv")), null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -147,6 +149,7 @@ public void testURIAndTemplate() throws URISyntaxException Collections.singletonList(new URI("http://example.com/file.csv")), null, null, + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -212,6 +215,7 @@ public void testFullTableSpecHappyPath() throws URISyntaxException Collections.singletonList(new URI("http://foo.com/my.csv")), "bob", new DefaultPasswordProvider("secret"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -339,6 +343,7 @@ public void testTemplateSpecWithoutFormatHappyPath() throws URISyntaxException Collections.singletonList(new URI("http://foo.com/my.csv")), // removed "bob", new DefaultPasswordProvider("secret"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -376,6 +381,7 @@ public void testMultipleURIsInTableSpec() throws URISyntaxException Arrays.asList(new URI("http://foo.com/foo.csv"), new URI("http://foo.com/bar.csv")), "bob", new EnvironmentVariablePasswordProvider("SECRET"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -408,6 +414,7 @@ public void testMultipleURIsWithTemplate() throws URISyntaxException Collections.singletonList(new URI("http://foo.com/my.csv")), // removed "bob", new DefaultPasswordProvider("secret"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") @@ -476,6 +483,7 @@ public void testEnvPassword() throws URISyntaxException Collections.singletonList(new URI("http://foo.com/my.csv")), "bob", new EnvironmentVariablePasswordProvider("SECRET"), + null, new HttpInputSourceConfig(null) ); TableMetadata table = TableBuilder.external("foo") diff --git a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java index 1bbc74513b2f2..3994bf0112582 100644 --- a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java +++ b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java @@ -144,7 +144,8 @@ public void testValidateFilesWithFormat() LocalInputSource inputSource = new LocalInputSource( null, null, - Collections.singletonList(new File("/tmp/myFile.csv")) + Collections.singletonList(new File("/tmp/myFile.csv")), + null ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) @@ -314,6 +315,7 @@ public void testFullyDefinedBaseDirAndPattern() LocalInputSource inputSource = new LocalInputSource( new File("/tmp"), "*.csv", + null, null ); TableMetadata table = TableBuilder.external("foo") @@ -365,7 +367,8 @@ public void testFullyDefinedFiles() LocalInputSource inputSource = new LocalInputSource( null, null, - files + files, + null ); TableMetadata table = TableBuilder.external("foo") .inputSource(toMap(inputSource)) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java index a3ab06ab287b5..0b2131197d369 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.HttpInputSource; import org.apache.druid.data.input.impl.HttpInputSourceConfig; import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.systemfield.SystemFields; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -78,6 +79,7 @@ protected static URI toURI(String uri) Collections.singletonList(toURI("http://foo.com/bar.csv")), "bob", new DefaultPasswordProvider("secret"), + SystemFields.none(), new HttpInputSourceConfig(null) ), new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), @@ -250,6 +252,7 @@ public void testHttpFn2() Arrays.asList(toURI("http://example.com/foo.csv"), toURI("http://example.com/bar.csv")), "bob", new DefaultPasswordProvider("secret"), + SystemFields.none(), new HttpInputSourceConfig(null) ), new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, false, false, 0), @@ -392,6 +395,7 @@ public void testHttpJson() Collections.singletonList(toURI("http://foo.com/bar.json")), "bob", new DefaultPasswordProvider("secret"), + SystemFields.none(), new HttpInputSourceConfig(null) ), new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), @@ -546,7 +550,8 @@ public void testInlineFn() new LocalInputSource( null, null, - Arrays.asList(new File("/tmp/foo.csv"), new File("/tmp/bar.csv")) + Arrays.asList(new File("/tmp/foo.csv"), new File("/tmp/bar.csv")), + SystemFields.none() ), new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0), RowSignature.builder()