Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Restore support for autodiscovering credentials
Browse files Browse the repository at this point in the history
PR #228 removed support for autodiscovered credentials by replacing the same configuration
with a NoCredentials implementation.

This change introduces a new property, `gcs.credentials.default` which matches that
supported by the TieredStorageManager for configuring autodiscovery of default credentials.

When this property is set to `false`, or omitted entirely the NoCredentials option will be
used, but when it is defined and set to `true` it will autodiscover credentials.

Closes #291
  • Loading branch information
markallanson committed Jan 12, 2024
1 parent c971c32 commit 323f706
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 72 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -569,14 +569,13 @@ topics=topic1,topic2
# Required.
gcs.bucket.name=my-gcs-bucket

## The following two options are used to specify GCP credentials.
## The following three options are used to specify GCP credentials.
## See the overview of GCP authentication:
## - https://cloud.google.com/docs/authentication/
## - https://cloud.google.com/docs/authentication/production
## If they both are not present, the connector will try to detect
## the credentials automatically.
## If none are present, the connector will default to trying to connect without credentials.
## If only one is present, the connector will use it to get the credentials.
## If both are present, this is an error.
## If more than one is present, this is an error.

# The path to a GCP credentials file.
# Optional, the default is null.
Expand All @@ -586,6 +585,8 @@ gcs.credentials.path=/some/path/google_credentials.json
# Optional, the default is null.
gcs.credentials.json={"type":"...", ...}

# Autodiscover GCP Credentials from the execution environment
gcs.credentials.default=true
##


Expand Down
46 changes: 35 additions & 11 deletions src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -53,6 +54,7 @@ public final class GcsSinkConfig extends AivenCommonConfig {
public static final String GCS_CREDENTIALS_PATH_CONFIG = "gcs.credentials.path";
public static final String GCS_ENDPOINT_CONFIG = "gcs.endpoint";
public static final String GCS_CREDENTIALS_JSON_CONFIG = "gcs.credentials.json";
public static final String GCS_CREDENTIALS_DEFAULT_CONFIG = "gcs.credentials.default";
public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name";
public static final String GCS_USER_AGENT = "gcs.user.agent";
private static final String GROUP_FILE = "File";
Expand Down Expand Up @@ -91,6 +93,9 @@ public final class GcsSinkConfig extends AivenCommonConfig {

public static final String NAME_CONFIG = "name";

// the maximum number of allowable credential configurations that can be defined at a single time.
private static final int MAX_ALLOWED_CREDENTIAL_CONFIGS = 1;

public static ConfigDef configDef() {
final GcsSinkConfigDef configDef = new GcsSinkConfigDef();
addGcsConfigGroup(configDef);
Expand All @@ -113,17 +118,22 @@ private static void addGcsConfigGroup(final ConfigDef configDef) {
"Explicit GCS Endpoint Address, mainly for testing", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE,
GCS_ENDPOINT_CONFIG);
configDef.define(GCS_CREDENTIALS_PATH_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW,
"The path to a GCP credentials file. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\"",
"The path to a GCP credentials file. " + "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG
+ "\" " + "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_PATH_CONFIG);

configDef.define(GCS_CREDENTIALS_JSON_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.LOW,
"GCP credentials as a JSON string. "
+ "If not provided, the connector will try to detect the credentials automatically. "
+ "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG + "\"",
"GCP credentials as a JSON string. " + "Cannot be set together with \"" + GCS_CREDENTIALS_PATH_CONFIG
+ "\" " + "or \"" + GCS_CREDENTIALS_DEFAULT_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_JSON_CONFIG);

configDef.define(GCS_CREDENTIALS_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, null, ConfigDef.Importance.LOW,
"Whether to connect using default the GCP SDK default credential discovery. When set to"
+ "null (the default) or false, will fall back to connecting with No Credentials."
+ "Cannot be set together with \"" + GCS_CREDENTIALS_JSON_CONFIG + "\" " + "or \""
+ GCS_CREDENTIALS_PATH_CONFIG + "\"",
GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, GCS_CREDENTIALS_DEFAULT_CONFIG);

configDef.define(GCS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The GCS bucket name to store output files in.", GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD
Expand Down Expand Up @@ -315,25 +325,39 @@ static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, Strin
private void validate() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJson = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null && credentialsJson != null) {
final String msg = String.format("\"%s\" and \"%s\" are mutually exclusive options, but both are set.",
GCS_CREDENTIALS_PATH_CONFIG, GCS_CREDENTIALS_JSON_CONFIG);
throw new ConfigException(msg);
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

final long nonNulls = Stream.of(defaultCredentials, credentialsJson, credentialsPath)
.filter(Objects::nonNull)
.count();

// only validate non nulls here, since all nulls means falling back to the default "no credential" behavour.
if (nonNulls > MAX_ALLOWED_CREDENTIAL_CONFIGS) {
throw new ConfigException(String.format("Only one of %s, %s, and %s can be non-null.",
GCS_CREDENTIALS_DEFAULT_CONFIG, GCS_CREDENTIALS_JSON_CONFIG, GCS_CREDENTIALS_PATH_CONFIG));
}
}

public OAuth2Credentials getCredentials() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJsonPwd = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
if (credentialsPath == null && credentialsJsonPwd == null) {
final Boolean defaultCredentials = getBoolean(GCS_CREDENTIALS_DEFAULT_CONFIG);

// if we've got no path, json and not configured to use default credentials, fall back to connecting without
// any credentials at all.
if (credentialsPath == null && credentialsJsonPwd == null
&& (defaultCredentials == null || !defaultCredentials)) {
LOG.warn("No GCS credentials provided, trying to connect without credentials.");
return NoCredentials.getInstance();
}

try {
String credentialsJson = null;
if (credentialsJsonPwd != null) {
credentialsJson = credentialsJsonPwd.value();
}
// in the case where both path and json are empty the GoogleCredentialsBuilder will fall back to
// connecting with default credential discovery.
return GoogleCredentialsBuilder.build(credentialsPath, credentialsJson);
} catch (final Exception e) { // NOPMD broad exception catched
throw new ConfigException("Failed to create GCS credentials: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
Expand All @@ -51,8 +48,6 @@
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.gcs.GcsSinkConfig;

import com.google.auth.oauth2.UserCredentials;
import com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
Expand Down Expand Up @@ -348,58 +343,6 @@ void unsupportedOutputField() {
assertEquals(expectedErrorMessage, throwable.getMessage());
}

@Test
void gcsCredentialsPath() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path",
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath());

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsJson() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final String credentialsJson = Resources.toString(
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"),
StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsExclusivity() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final URL credentialsResource = Thread.currentThread()
.getContextClassLoader()
.getResource("test_gcs_credentials.json");
final String credentialsJson = Resources.toString(credentialsResource, StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);
properties.put("gcs.credentials.path", credentialsResource.getPath());

// Should pass here, because ConfigDef validation doesn't check interdependencies.
assertConfigDefValidationPasses(properties);

final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertEquals(
"\"gcs.credentials.path\" and \"gcs.credentials.json\" are mutually exclusive options, but both are set.",
throwable.getMessage());
}

@Test
void connectorName() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "name", "test-connector");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.gcs.config;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;

import io.aiven.kafka.connect.gcs.GcsSinkConfig;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.UserCredentials;
import com.google.cloud.NoCredentials;
import com.google.common.io.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Tests {@link GcsSinkConfig} class.
*/
final class GcsSinkCredentialsConfigTest {
@ParameterizedTest
@ValueSource(strings = { "", "{{topic}}", "{{partition}}", "{{start_offset}}", "{{topic}}-{{partition}}",
"{{topic}}-{{start_offset}}", "{{partition}}-{{start_offset}}",
"{{topic}}-{{partition}}-{{start_offset}}-{{unknown}}" })
void incorrectFilenameTemplates(final String template) {
final Map<String, String> properties = Map.of("file.name.template", template, "gcs.bucket.name", "some-bucket");

final ConfigValue configValue = GcsSinkConfig.configDef()
.validate(properties)
.stream()
.filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name()))
.findFirst()
.get();
assertFalse(configValue.errorMessages().isEmpty());

final var throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertTrue(throwable.getMessage().startsWith("Invalid value "));
}

@Test
void gcsCredentialsPath() {
final Map<String, String> properties = Map.of("gcs.bucket.name", "test-bucket", "gcs.credentials.path",
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json").getPath());

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

@Test
void gcsCredentialsJson() throws IOException {
final Map<String, String> properties = new HashMap<>();
properties.put("gcs.bucket.name", "test-bucket");

final String credentialsJson = Resources.toString(
Thread.currentThread().getContextClassLoader().getResource("test_gcs_credentials.json"),
StandardCharsets.UTF_8);
properties.put("gcs.credentials.json", credentialsJson);

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);
final UserCredentials credentials = (UserCredentials) config.getCredentials();
assertEquals("test-client-id", credentials.getClientId());
assertEquals("test-client-secret", credentials.getClientSecret());
}

/**
* This test validates that the NoCredentials are used when default is specified as false. This behaviour mimics
* that of the Tiered Storage Manager.
*/
@Test
void gcsCredentialsNoCredentialsWhenDefaultCredentialsFalse() {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket",
GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(false));

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

/** Verifies that NoCredentials are used when no credential configurations is supplied. */
@Test
void gcsCredentialsNoCredentialsWhenNoCredentialsSupplied() {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket");

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final Credentials credentials = config.getCredentials();
assertEquals(NoCredentials.getInstance(), credentials);
}

@Test
void gcsCredentialsDefault() throws IOException {
final Map<String, String> properties = Map.of(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket",
GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG, String.valueOf(true));

assertConfigDefValidationPasses(properties);

final GcsSinkConfig config = new GcsSinkConfig(properties);

final OAuth2Credentials credentials = config.getCredentials();
assertEquals(GoogleCredentials.getApplicationDefault(), credentials);
}

@ParameterizedTest
@MethodSource("provideMoreThanOneNonNull")
void gcsCredentialsExclusivity(final Boolean defaultCredentials, final String credentialsJson,
final String credentialsPath) {
final Map<String, String> properties = new HashMap<>();
properties.put(GcsSinkConfig.GCS_BUCKET_NAME_CONFIG, "test-bucket");
properties.put(GcsSinkConfig.GCS_CREDENTIALS_DEFAULT_CONFIG,
defaultCredentials == null ? null : String.valueOf(defaultCredentials));
properties.put(GcsSinkConfig.GCS_CREDENTIALS_JSON_CONFIG, credentialsJson);
properties.put(GcsSinkConfig.GCS_CREDENTIALS_PATH_CONFIG, credentialsPath);

// Should pass here, because ConfigDef validation doesn't check interdependencies.
assertConfigDefValidationPasses(properties);

final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties));
assertEquals(
"Only one of gcs.credentials.default, gcs.credentials.json, and gcs.credentials.path can be non-null.",
throwable.getMessage());
}

private static Stream<Arguments> provideMoreThanOneNonNull() {
return Stream.of(Arguments.of(true, "json", "path"), Arguments.of(false, "json", "path"),
Arguments.of(true, "json", null), Arguments.of(false, "json", null), Arguments.of(true, null, "path"),
Arguments.of(false, null, "path"), Arguments.of(null, "json", "path"));
}

private void assertConfigDefValidationPasses(final Map<String, String> properties) {
for (final ConfigValue configValue : GcsSinkConfig.configDef().validate(properties)) {
assertTrue(configValue.errorMessages().isEmpty());
}
}
}

0 comments on commit 323f706

Please sign in to comment.