Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Add support for autodiscovering default credentials #292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -569,14 +569,13 @@ topics=topic1,topic2
# Required.
gcs.bucket.name=my-gcs-bucket

## The following two options are used to specify GCP credentials.
## The following three options are used to specify GCP credentials.
## See the overview of GCP authentication:
## - https://cloud.google.com/docs/authentication/
## - https://cloud.google.com/docs/authentication/production
## If they both are not present, the connector will try to detect
## the credentials automatically.
## If none are present, the connector will default to trying to connect without credentials.
## If only one is present, the connector will use it to get the credentials.
## If both are present, this is an error.
## If more than one is present, this is an error.

# The path to a GCP credentials file.
# Optional, the default is null.
Expand All @@ -586,6 +585,8 @@ gcs.credentials.path=/some/path/google_credentials.json
# Optional, the default is null.
gcs.credentials.json={"type":"...", ...}

# Autodiscover GCP Credentials from the execution environment
gcs.credentials.default=true
##


Expand Down
49 changes: 38 additions & 11 deletions src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -39,6 +40,7 @@
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.NoCredentials;
import org.slf4j.Logger;
Expand All @@ -53,6 +55,7 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_PATH_CONFIG = "gcs.credentials.path";
public static final String GCS_ENDPOINT_CONFIG = "gcs.endpoint";
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
Expand Down Expand Up @@ -91,6 +94,9 @@ public final class GcsSinkConfig extends AivenCommonConfig {

public static final String NAME_CONFIG = "name";

// the maximum number of allowable credential configurations that can be defined at a single time.
private static final int MAX_ALLOWED_CREDENTIAL_CONFIGS = 1;

public static ConfigDef configDef() {
final GcsSinkConfigDef configDef = new GcsSinkConfigDef();
addGcsConfigGroup(configDef);
Expand All @@ -113,17 +119,22 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
"Explicit GCS Endpoint Address, mainly for testing", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE,
GCS_ENDPOINT_CONFIG);
configDef.define(GCS_CREDENTIALS_PATH_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
"The path to a GCP credentials file. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\"",
"The path to a GCP credentials file. Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG
+ " or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_PATH_CONFIG);

configDef.define(GCS_CREDENTIALS_JSON_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.LOW,
"GCP credentials as a JSON string. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + "\"",
"GCP credentials as a JSON string. Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG
+ " or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_JSON_CONFIG);

configDef.define(GCS_CREDENTIALS_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW,
"Whether to connect using default the GCP SDK default credential discovery. When set to"
+ "null (the default) or false, will fall back to connecting with No Credentials."
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" or \""
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -315,21 +326,37 @@ static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, Strin
private void validate() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJson = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null && credentialsJson != null) {
final String msg = String.format("\"%s\" and \"%s\" are mutually exclusive options, but both are set.",
GCS_CREDENTIALS_PATH_CONFIG, GCS_CREDENTIALS_JSON_CONFIG);
throw new ConfigException(msg);
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

final long nonNulls = Stream.of(defaultCredentials, credentialsJson, credentialsPath)
.filter(Objects::nonNull)
.count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
}
}

public OAuth2Credentials getCredentials() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJsonPwd = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath == null && credentialsJsonPwd == null) {
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

// if we've got no path, json and not configured to use default credentials, fall back to connecting without
// any credentials at all.
if (credentialsPath == null && credentialsJsonPwd == null
&& (defaultCredentials == null || !defaultCredentials)) {
LOG.warn("No GCS credentials provided, trying to connect without credentials.");
return NoCredentials.getInstance();
}

try {
if (Boolean.TRUE.equals(defaultCredentials)) {
return GoogleCredentials.getApplicationDefault();
}

String credentialsJson = null;
if (credentialsJsonPwd != null) {
credentialsJson = credentialsJsonPwd.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
Expand All @@ -51,8 +48,6 @@
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.gcs.GcsSinkConfig;

import com.google.auth.oauth2.UserCredentials;
import com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
Expand Down Expand Up @@ -348,58 +343,6 @@ void unsupportedOutputField() {
assertEquals(expectedErrorMessage, throwable.getMessage());
}

@Test
void gcsCredentialsPath() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path",
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath());

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsJson() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final String credentialsJson = Resources.toString(
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"),
StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsExclusivity() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final URL credentialsResource = Thread.currentThread()
.getContextClassLoader()
.getResource("test_gcs_credentials.json");
final String credentialsJson = Resources.toString(credentialsResource, StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);
properties.put("gcs.credentials.path", credentialsResource.getPath());

// Should pass here, because ConfigDef validation doesn't check interdependencies.
assertConfigDefValidationPasses(properties);

final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertEquals(
"\"gcs.credentials.path\" and \"gcs.credentials.json\" are mutually exclusive options, but both are set.",
throwable.getMessage());
}

@Test
void connectorName() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "name", "test-connector");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.gcs.config;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;

import io.aiven.kafka.connect.gcs.GcsSinkConfig;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.UserCredentials;
import com.google.cloud.NoCredentials;
import com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;

/**
* Tests {@link GcsSinkConfig} class.
*/
final class GcsSinkCredentialsConfigTest {
@ParameterizedTest
@ValueSource(strings = { "", "{{topic}}", "{{partition}}", "{{start_offset}}", "{{topic}}-{{partition}}",
"{{topic}}-{{start_offset}}", "{{partition}}-{{start_offset}}",
"{{topic}}-{{partition}}-{{start_offset}}-{{unknown}}" })
void incorrectFilenameTemplates(final String template) {
final Map<String, String> properties = Map.of("file.name.template", template, "gcs.bucket.name", "some-bucket");

final ConfigValue configValue = GcsSinkConfig.configDef()
.validate(properties)
.stream()
.filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name()))
.findFirst()
.get();
assertFalse(configValue.errorMessages().isEmpty());

final var throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertTrue(throwable.getMessage().startsWith("Invalid value "));
}

@Test
void gcsCredentialsPath() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path",
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath());

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsJson() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final String credentialsJson = Resources.toString(
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"),
StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

/**
* This test validates that the NoCredentials are used when default is specified as false. This behaviour mimics
* that of the Tiered Storage Manager.
markallanson marked this conversation as resolved.
Show resolved Hide resolved
*/
@Test
void gcsCredentialsNoCredentialsWhenDefaultCredentialsFalse() {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket",
GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(false));

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

/** Verifies that NoCredentials are used when no credential configurations is supplied. */
@Test
void gcsCredentialsNoCredentialsWhenNoCredentialsSupplied() {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket");

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

@Test
void gcsCredentialsDefault() {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket",
GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(true));

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

// Note that we're using a mock here since the Google credentials are not part of the environment when running
// in github actions. It's better to use a mock here and make the test self-contained than it is to make things
// more complicated and making it rely on the environment it's executing within.
try (MockedStatic<GoogleCredentials> mocked = mockStatic(GoogleCredentials.class)) {
final GoogleCredentials googleCredentials = mock(GoogleCredentials.class);
mocked.when(GoogleCredentials::getApplicationDefault).thenReturn(googleCredentials);

final OAuth2Credentials credentials = config.getCredentials();
assertEquals(googleCredentials, credentials);
}
}

@ParameterizedTest
@MethodSource("provideMoreThanOneNonNull")
void gcsCredentialsExclusivity(final Boolean defaultCredentials, final String credentialsJson,
final String credentialsPath) {
final Map<String, String> properties = new HashMap<>();
properties.put(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket");
properties.put(GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG,
defaultCredentials == null ? null : String.valueOf(defaultCredentials));
properties.put(GcsSinkConfig.GCS_CREDENTIALS_JSON_CONFIG, credentialsJson);
properties.put(GcsSinkConfig.GCS_CREDENTIALS_PATH_CONFIG, credentialsPath);

// Should pass here, because ConfigDef validation doesn't check interdependencies.
assertConfigDefValidationPasses(properties);

final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertEquals(
"Only one of gcs.credentials.default, gcs.credentials.json, and gcs.credentials.path can be non-null.",
throwable.getMessage());
}

private static Stream<Arguments> provideMoreThanOneNonNull() {
return Stream.of(Arguments.of(true, "json", "path"), Arguments.of(false, "json", "path"),
Arguments.of(true, "json", null), Arguments.of(false, "json", null), Arguments.of(true, null, "path"),
Arguments.of(false, null, "path"), Arguments.of(null, "json", "path"));
}

private void assertConfigDefValidationPasses(final Map<String, String> properties) {
for (final ConfigValue configValue : GcsSinkConfig.configDef().validate(properties)) {
assertTrue(configValue.errorMessages().isEmpty());
}
}
}