From bb8c055e316d9593b4de9cef0e362ad7083699d2 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 15 Oct 2021 15:19:01 +0200 Subject: [PATCH] #61: Fixed bug that skips setting SSL keystore and truststore file when using `SASL_SSL` protocol (#62) Co-authored-by: jakobbraun --- build.sbt | 2 +- doc/changes/changelog.md | 1 + doc/changes/changes_1.5.1.md | 21 ++++++++++ project/Dependencies.scala | 2 +- .../kafka/KafkaConsumerProperties.scala | 14 +++++++ .../kafka/KafkaConsumerPropertiesTest.scala | 42 +++++++++++++++++-- 6 files changed, 76 insertions(+), 6 deletions(-) create mode 100644 doc/changes/changes_1.5.1.md diff --git a/build.sbt b/build.sbt index 66b35c7..886662c 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ lazy val root = project .in(file(".")) .settings(moduleName := "exasol-kafka-connector-extension") - .settings(version := "1.5.0") + .settings(version := "1.5.1") .settings(orgSettings) .settings(buildSettings) .settings(Settings.projectSettings(scalaVersion)) diff --git a/doc/changes/changelog.md b/doc/changes/changelog.md index e9a1cdc..b132034 100644 --- a/doc/changes/changelog.md +++ b/doc/changes/changelog.md @@ -1,5 +1,6 @@ # Releases +* [1.5.1](changes_1.5.1.md) * [1.5.0](changes_1.5.0.md) * [1.4.0](changes_1.4.0.md) * [1.3.0](changes_1.3.0.md) diff --git a/doc/changes/changes_1.5.1.md b/doc/changes/changes_1.5.1.md new file mode 100644 index 0000000..5e4060d --- /dev/null +++ b/doc/changes/changes_1.5.1.md @@ -0,0 +1,21 @@ +# Kafka Connector Extension 1.5.1, released 2021-10-15 + +Code name: Fixed setting SSL keystore and truststore files + +## Summary + +This release fixes a bug that skips setting SSL keystore and truststore files when using `SASL_SSL` protocol. + +## Bug Fixes + +* #61: Fixed bug that skips setting SSL files when using SASL_SSL protocol + +## Dependency Updates + +### Runtime Dependency Updates + +### Test Dependency Updates + +* Updated `com.exasol:exasol-testcontainers:5.1.0` to `5.1.1` + +### Plugin Updates diff --git a/project/Dependencies.scala b/project/Dependencies.scala index bc0430c..09fd5e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,7 +18,7 @@ object Dependencies { private val MockitoCoreVersion = "4.0.0" private val KafkaSchemaRegistryVersion = "6.2.1" private val ExasolTestDBBuilderVersion = "3.2.1" - private val ExasolTestContainersVersion = "5.1.0" + private val ExasolTestContainersVersion = "5.1.1" private val ExasolHamcrestMatcherVersion = "1.5.1" val Resolvers: Seq[Resolver] = Seq( diff --git a/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerProperties.scala b/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerProperties.scala index 3d9de9b..f574a93 100644 --- a/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerProperties.scala +++ b/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerProperties.scala @@ -328,10 +328,24 @@ class KafkaConsumerProperties(private val properties: Map[String, String]) exten if (isSASLEnabled()) { props.put(SASL_MECHANISM.kafkaPropertyName, getSASLMechanism()) props.put(SASL_JAAS_CONFIG.kafkaPropertyName, getSASLJaasConfig()) + addOptionalParametersForSASL(props) } props.toMap.asInstanceOf[Map[String, AnyRef]].asJava } + private[this] def addOptionalParametersForSASL(props: MMap[String, String]): Unit = + Seq( + SSL_KEY_PASSWORD, + SSL_KEYSTORE_PASSWORD, + SSL_KEYSTORE_LOCATION, + SSL_TRUSTSTORE_PASSWORD, + SSL_TRUSTSTORE_LOCATION + ).foreach { config => + properties.get(config.userPropertyName).foreach { value => + props.put(config.kafkaPropertyName, value) + } + } + /** * Returns a new [[KafkaConsumerProperties]] that merges the key-value pairs * parsed from user provided Exasol named connection object. diff --git a/src/test/scala/com/exasol/cloudetl/kafka/KafkaConsumerPropertiesTest.scala b/src/test/scala/com/exasol/cloudetl/kafka/KafkaConsumerPropertiesTest.scala index 49647fc..7bbddb4 100644 --- a/src/test/scala/com/exasol/cloudetl/kafka/KafkaConsumerPropertiesTest.scala +++ b/src/test/scala/com/exasol/cloudetl/kafka/KafkaConsumerPropertiesTest.scala @@ -4,6 +4,7 @@ import java.nio.file.Path import java.nio.file.Paths import com.exasol.{ExaConnectionInformation, ExaMetadata} +import com.exasol.cloudetl.kafka.KafkaConsumerProperties._ import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterEach @@ -505,16 +506,18 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi val properties = getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE), Option(DUMMY_TRUSTSTORE_FILE)) assert(properties.getSASLJaasConfig().contains(s""""pass"""")) + assert(properties.getSSLKeystoreLocation() === s"$DUMMY_KEYSTORE_FILE") + assert(properties.getSSLTruststoreLocation() === s"$DUMMY_TRUSTSTORE_FILE") } - test("apply throws if provided keystore file is not availablewith SSL_SASL protocol") { + test("apply throws if provided keystore file is not available with SSL_SASL protocol") { val thrown = intercept[KafkaConnectorException] { getSecurityEnabledConsumerProperties("SASL_SSL", Option(Paths.get("ssl_keystore_file"))) } assert(thrown.getMessage().contains("Unable to find the SSL keystore file")) } - test("apply throws if provided truststore file is not availablewith SSL_SASL protocol") { + test("apply throws if provided truststore file is not available with SSL_SASL protocol") { val thrown = intercept[KafkaConnectorException] { getSecurityEnabledConsumerProperties( "SASL_SSL", @@ -525,13 +528,42 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi assert(thrown.getMessage().contains("Unable to find the SSL truststore file")) } + test("getProperties contains keystore file with SSL_SASL protocol") { + val properties = getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE)) + assert(properties.getProperties().get(SSL_KEYSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_KEYSTORE_FILE") + } + + test("getProperties contains truststore file with SSL_SASL protocol") { + val properties = getSecurityEnabledConsumerProperties("SASL_SSL", None, Option(DUMMY_TRUSTSTORE_FILE)) + assert(properties.getProperties().get(SSL_TRUSTSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_TRUSTSTORE_FILE") + } + + test("getProperties contains keystore and truststore files with SSL_SASL protocol") { + val properties = + getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE), Option(DUMMY_TRUSTSTORE_FILE)) + assert(properties.getProperties().get(SSL_KEYSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_KEYSTORE_FILE") + assert(properties.getProperties().get(SSL_TRUSTSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_TRUSTSTORE_FILE") + } + + test("getProperties contains SSL related information with SSL_SASL protocol") { + val properties = getSecurityEnabledConsumerProperties("SASL_SSL", None, None) + assert(properties.getProperties().get(SSL_KEY_PASSWORD.kafkaPropertyName) === "kpw") + assert(properties.getProperties().get(SSL_KEYSTORE_PASSWORD.kafkaPropertyName) === "kspw") + assert(properties.getProperties().get(SSL_TRUSTSTORE_PASSWORD.kafkaPropertyName) === "tspw") + } + @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) // fine in tests private[this] def getSecurityEnabledConsumerProperties( securityProtocol: String, keystoreFile: Option[Path] = None, truststoreFile: Option[Path] = None ): KafkaConsumerProperties = { - val properties = Map("SECURITY_PROTOCOL" -> securityProtocol, "CONNECTION_NAME" -> "SSL_CONNECTION") + val properties = Map( + "BOOTSTRAP_SERVERS" -> "kafka01", + "RECORD_FORMAT" -> "string", + "SECURITY_PROTOCOL" -> securityProtocol, + "CONNECTION_NAME" -> "SSL_CONNECTION" + ) val metadata = mock[ExaMetadata] val connectionInformation = mock[ExaConnectionInformation] when(metadata.getConnection("SSL_CONNECTION")).thenReturn(connectionInformation) @@ -542,6 +574,8 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi if (securityProtocol === "SSL") { addSimpleSSLParameters(stringBuilder) } else if (securityProtocol === "SASL_SSL") { + addSimpleSSLParameters(stringBuilder) + stringBuilder.append(";") addSimpleSASLParameters(stringBuilder) } when(connectionInformation.getPassword()).thenReturn(stringBuilder.toString()) @@ -549,7 +583,7 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi } private[this] def addSimpleSSLParameters(sb: StringBuilder): Unit = { - sb.append("SSL_KEY_PASSWORD=pass123;SSL_KEYSTORE_PASSWORD=pass123;SSL_TRUSTSTORE_PASSWORD=pass123") + sb.append("SSL_KEY_PASSWORD=kpw;SSL_KEYSTORE_PASSWORD=kspw;SSL_TRUSTSTORE_PASSWORD=tspw") () }