diff --git a/README.md b/README.md index 73ede38..c18ab96 100644 --- a/README.md +++ b/README.md @@ -569,14 +569,13 @@ topics=topic1,topic2 # Required. gcs.bucket.name=my-gcs-bucket -## The following two options are used to specify GCP credentials. +## The following three options are used to specify GCP credentials. ## See the overview of GCP authentication: ## - https://cloud.google.com/docs/authentication/ ## - https://cloud.google.com/docs/authentication/production -## If they both are not present, the connector will try to detect -## the credentials automatically. +## If none are present, the connector will default to trying to connect without credentials. ## If only one is present, the connector will use it to get the credentials. -## If both are present, this is an error. +## If more than one is present, this is an error. # The path to a GCP credentials file. # Optional, the default is null. @@ -586,6 +585,8 @@ gcs.credentials.path=/some/path/google_credentials.json # Optional, the default is null. gcs.credentials.json={"type":"...", ...} +# Autodiscover GCP Credentials from the execution environment +gcs.credentials.default=true ## diff --git a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 17d2b7f..e53ea0b 100644 --- a/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -53,6 +54,7 @@ public final class GcsSinkConfig extends AivenCommonConfig { public static final String GCS_CREDENTIALS_PATH_CONFIG = "gcs.credentials.path"; public static final String GCS_ENDPOINT_CONFIG = "gcs.endpoint"; public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json"; + public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default"; public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name"; public static final String GCS_USER_AGENT = "gcs.user.agent"; private static final String GROUP_FILE = "File"; @@ -91,6 +93,9 @@ public final class GcsSinkConfig extends AivenCommonConfig { public static final String NAME_CONFIG = "name"; + // the maximum number of allowable credential configurations that can be defined at a single time. + private static final int MAX_ALLOWED_CREDENTIAL_CONFIGS = 1; + public static ConfigDef configDef() { final GcsSinkConfigDef configDef = new GcsSinkConfigDef(); addGcsConfigGroup(configDef); @@ -113,17 +118,22 @@ private static void addGcsConfigGroup(final ConfigDef configDef) { "Explicit GCS Endpoint Address, mainly for testing", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_ENDPOINT_CONFIG); configDef.define(GCS_CREDENTIALS_PATH_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, - "The path to a GCP credentials file. " - + "If not provided, the connector will try to detect the credentials automatically. " - + "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\"", + "The path to a GCP credentials file. " + "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + + "\" " + "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_PATH_CONFIG); configDef.define(GCS_CREDENTIALS_JSON_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.LOW, - "GCP credentials as a JSON string. " - + "If not provided, the connector will try to detect the credentials automatically. " - + "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + "\"", + "GCP credentials as a JSON string. " + "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + + "\" " + "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_JSON_CONFIG); + configDef.define(GCS_CREDENTIALS_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW, + "Whether to connect using default the GCP SDK default credential discovery. When set to" + + "null (the default) or false, will fall back to connecting with No Credentials." + + "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" " + "or \"" + + GCS_CREDENTIALS_PATH_CONFIG + "\"", + GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG); + configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, "The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD @@ -315,25 +325,39 @@ static Map handleDeprecatedYyyyUppercase(final Map MAX_ALLOWED_CREDENTIAL_CONFIGS) { + throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.", + GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG)); } } public OAuth2Credentials getCredentials() { final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG); final Password credentialsJsonPwd = getPassword(GCS_CREDENTIALS_JSON_CONFIG); - if (credentialsPath == null && credentialsJsonPwd == null) { + final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG); + + // if we've got no path, json and not configured to use default credentials, fall back to connecting without + // any credentials at all. + if (credentialsPath == null && credentialsJsonPwd == null + && (defaultCredentials == null || !defaultCredentials)) { LOG.warn("No GCS credentials provided, trying to connect without credentials."); return NoCredentials.getInstance(); } + try { String credentialsJson = null; if (credentialsJsonPwd != null) { credentialsJson = credentialsJsonPwd.value(); } + // in the case where both path and json are empty the GoogleCredentialsBuilder will fall back to + // connecting with default credential discovery. return GoogleCredentialsBuilder.build(credentialsPath, credentialsJson); } catch (final Exception e) { // NOPMD broad exception catched throw new ConfigException("Failed to create GCS credentials: " + e.getMessage()); diff --git a/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java b/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java index adaeb9d..1b1560b 100644 --- a/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java @@ -24,9 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Arrays; @@ -51,8 +48,6 @@ import io.aiven.kafka.connect.common.templating.VariableTemplatePart; import io.aiven.kafka.connect.gcs.GcsSinkConfig; -import com.google.auth.oauth2.UserCredentials; -import com.google.common.io.Resources; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; @@ -348,58 +343,6 @@ void unsupportedOutputField() { assertEquals(expectedErrorMessage, throwable.getMessage()); } - @Test - void gcsCredentialsPath() { - final Map properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path", - Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath()); - - assertConfigDefValidationPasses(properties); - - final GcsSinkConfig config = new GcsSinkConfig(properties); - final UserCredentials credentials = (UserCredentials) config.getCredentials(); - assertEquals("test-client-id", credentials.getClientId()); - assertEquals("test-client-secret", credentials.getClientSecret()); - } - - @Test - void gcsCredentialsJson() throws IOException { - final Map properties = new HashMap<>(); - properties.put("gcs.bucket.name", "test-bucket"); - - final String credentialsJson = Resources.toString( - Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"), - StandardCharsets.UTF_8); - properties.put("gcs.credentials.json", credentialsJson); - - assertConfigDefValidationPasses(properties); - - final GcsSinkConfig config = new GcsSinkConfig(properties); - final UserCredentials credentials = (UserCredentials) config.getCredentials(); - assertEquals("test-client-id", credentials.getClientId()); - assertEquals("test-client-secret", credentials.getClientSecret()); - } - - @Test - void gcsCredentialsExclusivity() throws IOException { - final Map properties = new HashMap<>(); - properties.put("gcs.bucket.name", "test-bucket"); - - final URL credentialsResource = Thread.currentThread() - .getContextClassLoader() - .getResource("test_gcs_credentials.json"); - final String credentialsJson = Resources.toString(credentialsResource, StandardCharsets.UTF_8); - properties.put("gcs.credentials.json", credentialsJson); - properties.put("gcs.credentials.path", credentialsResource.getPath()); - - // Should pass here, because ConfigDef validation doesn't check interdependencies. - assertConfigDefValidationPasses(properties); - - final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); - assertEquals( - "\"gcs.credentials.path\" and \"gcs.credentials.json\" are mutually exclusive options, but both are set.", - throwable.getMessage()); - } - @Test void connectorName() { final Map properties = Map.of("gcs.bucket.name", "test-bucket", "name", "test-connector"); diff --git a/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java b/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java new file mode 100644 index 0000000..3c30923 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.gcs.config; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigValue; + +import io.aiven.kafka.connect.gcs.GcsSinkConfig; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.OAuth2Credentials; +import com.google.auth.oauth2.UserCredentials; +import com.google.cloud.NoCredentials; +import com.google.common.io.Resources; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Tests {@link GcsSinkConfig} class. + */ +final class GcsSinkCredentialsConfigTest { + @ParameterizedTest + @ValueSource(strings = { "", "{{topic}}", "{{partition}}", "{{start_offset}}", "{{topic}}-{{partition}}", + "{{topic}}-{{start_offset}}", "{{partition}}-{{start_offset}}", + "{{topic}}-{{partition}}-{{start_offset}}-{{unknown}}" }) + void incorrectFilenameTemplates(final String template) { + final Map properties = Map.of("file.name.template", template, "gcs.bucket.name", "some-bucket"); + + final ConfigValue configValue = GcsSinkConfig.configDef() + .validate(properties) + .stream() + .filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name())) + .findFirst() + .get(); + assertFalse(configValue.errorMessages().isEmpty()); + + final var throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); + assertTrue(throwable.getMessage().startsWith("Invalid value ")); + } + + @Test + void gcsCredentialsPath() { + final Map properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path", + Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath()); + + assertConfigDefValidationPasses(properties); + + final GcsSinkConfig config = new GcsSinkConfig(properties); + final UserCredentials credentials = (UserCredentials) config.getCredentials(); + assertEquals("test-client-id", credentials.getClientId()); + assertEquals("test-client-secret", credentials.getClientSecret()); + } + + @Test + void gcsCredentialsJson() throws IOException { + final Map properties = new HashMap<>(); + properties.put("gcs.bucket.name", "test-bucket"); + + final String credentialsJson = Resources.toString( + Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"), + StandardCharsets.UTF_8); + properties.put("gcs.credentials.json", credentialsJson); + + assertConfigDefValidationPasses(properties); + + final GcsSinkConfig config = new GcsSinkConfig(properties); + final UserCredentials credentials = (UserCredentials) config.getCredentials(); + assertEquals("test-client-id", credentials.getClientId()); + assertEquals("test-client-secret", credentials.getClientSecret()); + } + + /** + * This test validates that the NoCredentials are used when default is specified as false. This behaviour mimics + * that of the Tiered Storage Manager. + */ + @Test + void gcsCredentialsNoCredentialsWhenDefaultCredentialsFalse() { + final Map properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket", + GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(false)); + + assertConfigDefValidationPasses(properties); + + final GcsSinkConfig config = new GcsSinkConfig(properties); + + final Credentials credentials = config.getCredentials(); + assertEquals(NoCredentials.getInstance(), credentials); + } + + /** Verifies that NoCredentials are used when no credential configurations is supplied. */ + @Test + void gcsCredentialsNoCredentialsWhenNoCredentialsSupplied() { + final Map properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket"); + + assertConfigDefValidationPasses(properties); + + final GcsSinkConfig config = new GcsSinkConfig(properties); + + final Credentials credentials = config.getCredentials(); + assertEquals(NoCredentials.getInstance(), credentials); + } + + @Test + void gcsCredentialsDefault() throws IOException { + final Map properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket", + GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(true)); + + assertConfigDefValidationPasses(properties); + + final GcsSinkConfig config = new GcsSinkConfig(properties); + + final OAuth2Credentials credentials = config.getCredentials(); + assertEquals(GoogleCredentials.getApplicationDefault(), credentials); + } + + @ParameterizedTest + @MethodSource("provideMoreThanOneNonNull") + void gcsCredentialsExclusivity(final Boolean defaultCredentials, final String credentialsJson, + final String credentialsPath) { + final Map properties = new HashMap<>(); + properties.put(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket"); + properties.put(GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, + defaultCredentials == null ? null : String.valueOf(defaultCredentials)); + properties.put(GcsSinkConfig.GCS_CREDENTIALS_JSON_CONFIG, credentialsJson); + properties.put(GcsSinkConfig.GCS_CREDENTIALS_PATH_CONFIG, credentialsPath); + + // Should pass here, because ConfigDef validation doesn't check interdependencies. + assertConfigDefValidationPasses(properties); + + final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); + assertEquals( + "Only one of gcs.credentials.default, gcs.credentials.json, and gcs.credentials.path can be non-null.", + throwable.getMessage()); + } + + private static Stream provideMoreThanOneNonNull() { + return Stream.of(Arguments.of(true, "json", "path"), Arguments.of(false, "json", "path"), + Arguments.of(true, "json", null), Arguments.of(false, "json", null), Arguments.of(true, null, "path"), + Arguments.of(false, null, "path"), Arguments.of(null, "json", "path")); + } + + private void assertConfigDefValidationPasses(final Map properties) { + for (final ConfigValue configValue : GcsSinkConfig.configDef().validate(properties)) { + assertTrue(configValue.errorMessages().isEmpty()); + } + } +}