Skip to content

Commit

Permalink
Allow leaving accessKey and secretKey undefined for DEFAULT auth mode. (
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan authored Oct 27, 2023
1 parent eac6be7 commit 247261c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.lenses.connect.secrets.config

import org.apache.kafka.common.config.types.Password
import org.apache.kafka.connect.errors.ConnectException

import scala.util.Try

case class AWSCredentials(accessKey: String, secretKey: Password)

object AWSCredentials {
def apply(configs: AWSProviderConfig): Either[Throwable, AWSCredentials] =
for {
accessKey <- Try(configs.getString(AWSProviderConfig.AWS_ACCESS_KEY)).toEither
secretKey <- Try(configs.getPassword(AWSProviderConfig.AWS_SECRET_KEY)).toEither

accessKeyValidated <- Either.cond(accessKey.nonEmpty,
accessKey,
new ConnectException(
s"${AWSProviderConfig.AWS_ACCESS_KEY} not set",
),
)
secretKeyValidated <- Either.cond(secretKey.value().nonEmpty,
secretKey,
new ConnectException(
s"${AWSProviderConfig.AWS_SECRET_KEY} not set",
),
)
} yield {
AWSCredentials(accessKeyValidated, secretKeyValidated)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ object AWSProviderConfig {
.define(
AWS_ACCESS_KEY,
Type.STRING,
null,
"",
Importance.HIGH,
"AWS access key",
)
.define(
AWS_SECRET_KEY,
Type.PASSWORD,
null,
"",
Importance.HIGH,
"AWS password key",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import scala.util.Try

case class AWSProviderSettings(
region: String,
accessKey: String,
secretKey: Password,
credentials: Option[AWSCredentials],
authMode: AuthMode,
fileWriterOpts: Option[FileWriterOptions],
defaultTtl: Option[Duration],
Expand All @@ -32,33 +31,21 @@ object AWSProviderSettings {
def apply(configs: AWSProviderConfig): AWSProviderSettings = {
// TODO: Validate all configs in one step and provide all errors together
val region = configs.getStringOrThrowOnNull(AWSProviderConfig.AWS_REGION)
val accessKey =
configs.getStringOrThrowOnNull(AWSProviderConfig.AWS_ACCESS_KEY)
val secretKey =
configs.getPasswordOrThrowOnNull(AWSProviderConfig.AWS_SECRET_KEY)

val endpointOverride =
Try(configs.getString(AWSProviderConfig.ENDPOINT_OVERRIDE)).toOption.filterNot(_.trim.isEmpty)
val authMode =
getAuthenticationMethod(configs.getString(AWSProviderConfig.AUTH_METHOD))

if (authMode == AuthMode.CREDENTIALS) {
if (accessKey.isEmpty)
throw new ConnectException(
s"${AWSProviderConfig.AWS_ACCESS_KEY} not set",
)
if (secretKey.value().isEmpty)
throw new ConnectException(
s"${AWSProviderConfig.AWS_SECRET_KEY} not set",
)
val awsCredentials: Option[AWSCredentials] = Option.when(authMode == AuthMode.CREDENTIALS) {
AWSCredentials(configs).left.map(throw _).merge
}

val secretType = SecretTypeConfig.lookupAndValidateSecretTypeValue(configs.getString)

new AWSProviderSettings(
region = region,
accessKey = accessKey,
secretKey = secretKey,
credentials = awsCredentials,
authMode = authMode,
fileWriterOpts = FileWriterOptions(configs),
defaultTtl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import io.lenses.connect.secrets.connect.AuthMode
import io.lenses.connect.secrets.connect.decodeKey
import io.lenses.connect.secrets.io.FileWriter
import io.lenses.connect.secrets.utils.EncodingAndId
import org.apache.kafka.connect.errors.ConnectException
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
Expand Down Expand Up @@ -163,14 +165,20 @@ object AWSHelper extends StrictLogging {
s"Initializing client with mode [${settings.authMode}]",
)

val credentialProvider = settings.authMode match {
val credentialProvider: AwsCredentialsProvider = settings.authMode match {
case AuthMode.CREDENTIALS =>
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
settings.accessKey,
settings.secretKey.value(),
),
)
settings.credentials.map {
creds =>
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
creds.accessKey,
creds.secretKey.value(),
),
)
}.getOrElse(throw new ConnectException(
"No access key and secret key credentials available for CREDENTIALS mode",
))

case _ =>
DefaultCredentialsProvider.create()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,21 @@ class AWSSecretProviderTest extends AnyWordSpec with Matchers with MockitoSugar
provider.close()
}

"should allow default auth mode" in {

val provSettings = AWSProviderSettings(
AWSProviderConfig(
Map(
AWSProviderConfig.AWS_REGION -> "someregion",
AWSProviderConfig.AUTH_METHOD -> AuthMode.DEFAULT.toString,
).asJava,
),
)

provSettings.authMode should be(AuthMode.DEFAULT)
provSettings.credentials should be(empty)
}

"should throw an exception if access key not set and not default auth mode" in {

intercept[ConnectException] {
Expand Down

0 comments on commit 247261c

Please sign in to comment.