Skip to content

Commit

Permalink
#61: Fixed bug that skips setting SSL keystore and truststore file wh…
Browse files Browse the repository at this point in the history
…en using `SASL_SSL` protocol (#62)

Co-authored-by: jakobbraun <[email protected]>
  • Loading branch information
morazow and jakobbraun authored Oct 15, 2021
1 parent c341ce1 commit bb8c055
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 6 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-kafka-connector-extension")
.settings(version := "1.5.0")
.settings(version := "1.5.1")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
1 change: 1 addition & 0 deletions doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Releases

* [1.5.1](changes_1.5.1.md)
* [1.5.0](changes_1.5.0.md)
* [1.4.0](changes_1.4.0.md)
* [1.3.0](changes_1.3.0.md)
Expand Down
21 changes: 21 additions & 0 deletions doc/changes/changes_1.5.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Kafka Connector Extension 1.5.1, released 2021-10-15

Code name: Fixed setting SSL keystore and truststore files

## Summary

This release fixes a bug that skips setting SSL keystore and truststore files when using `SASL_SSL` protocol.

## Bug Fixes

* #61: Fixed bug that skips setting SSL files when using SASL_SSL protocol

## Dependency Updates

### Runtime Dependency Updates

### Test Dependency Updates

* Updated `com.exasol:exasol-testcontainers:5.1.0` to `5.1.1`

### Plugin Updates
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object Dependencies {
private val MockitoCoreVersion = "4.0.0"
private val KafkaSchemaRegistryVersion = "6.2.1"
private val ExasolTestDBBuilderVersion = "3.2.1"
private val ExasolTestContainersVersion = "5.1.0"
private val ExasolTestContainersVersion = "5.1.1"
private val ExasolHamcrestMatcherVersion = "1.5.1"

val Resolvers: Seq[Resolver] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,24 @@ class KafkaConsumerProperties(private val properties: Map[String, String]) exten
if (isSASLEnabled()) {
props.put(SASL_MECHANISM.kafkaPropertyName, getSASLMechanism())
props.put(SASL_JAAS_CONFIG.kafkaPropertyName, getSASLJaasConfig())
addOptionalParametersForSASL(props)
}
props.toMap.asInstanceOf[Map[String, AnyRef]].asJava
}

private[this] def addOptionalParametersForSASL(props: MMap[String, String]): Unit =
Seq(
SSL_KEY_PASSWORD,
SSL_KEYSTORE_PASSWORD,
SSL_KEYSTORE_LOCATION,
SSL_TRUSTSTORE_PASSWORD,
SSL_TRUSTSTORE_LOCATION
).foreach { config =>
properties.get(config.userPropertyName).foreach { value =>
props.put(config.kafkaPropertyName, value)
}
}

/**
* Returns a new [[KafkaConsumerProperties]] that merges the key-value pairs
* parsed from user provided Exasol named connection object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.nio.file.Path
import java.nio.file.Paths

import com.exasol.{ExaConnectionInformation, ExaMetadata}
import com.exasol.cloudetl.kafka.KafkaConsumerProperties._

import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -505,16 +506,18 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
val properties =
getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE), Option(DUMMY_TRUSTSTORE_FILE))
assert(properties.getSASLJaasConfig().contains(s""""pass""""))
assert(properties.getSSLKeystoreLocation() === s"$DUMMY_KEYSTORE_FILE")
assert(properties.getSSLTruststoreLocation() === s"$DUMMY_TRUSTSTORE_FILE")
}

test("apply throws if provided keystore file is not availablewith SSL_SASL protocol") {
test("apply throws if provided keystore file is not available with SSL_SASL protocol") {
val thrown = intercept[KafkaConnectorException] {
getSecurityEnabledConsumerProperties("SASL_SSL", Option(Paths.get("ssl_keystore_file")))
}
assert(thrown.getMessage().contains("Unable to find the SSL keystore file"))
}

test("apply throws if provided truststore file is not availablewith SSL_SASL protocol") {
test("apply throws if provided truststore file is not available with SSL_SASL protocol") {
val thrown = intercept[KafkaConnectorException] {
getSecurityEnabledConsumerProperties(
"SASL_SSL",
Expand All @@ -525,13 +528,42 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
assert(thrown.getMessage().contains("Unable to find the SSL truststore file"))
}

test("getProperties contains keystore file with SSL_SASL protocol") {
val properties = getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE))
assert(properties.getProperties().get(SSL_KEYSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_KEYSTORE_FILE")
}

test("getProperties contains truststore file with SSL_SASL protocol") {
val properties = getSecurityEnabledConsumerProperties("SASL_SSL", None, Option(DUMMY_TRUSTSTORE_FILE))
assert(properties.getProperties().get(SSL_TRUSTSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_TRUSTSTORE_FILE")
}

test("getProperties contains keystore and truststore files with SSL_SASL protocol") {
val properties =
getSecurityEnabledConsumerProperties("SASL_SSL", Option(DUMMY_KEYSTORE_FILE), Option(DUMMY_TRUSTSTORE_FILE))
assert(properties.getProperties().get(SSL_KEYSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_KEYSTORE_FILE")
assert(properties.getProperties().get(SSL_TRUSTSTORE_LOCATION.kafkaPropertyName) === s"$DUMMY_TRUSTSTORE_FILE")
}

test("getProperties contains SSL related information with SSL_SASL protocol") {
val properties = getSecurityEnabledConsumerProperties("SASL_SSL", None, None)
assert(properties.getProperties().get(SSL_KEY_PASSWORD.kafkaPropertyName) === "kpw")
assert(properties.getProperties().get(SSL_KEYSTORE_PASSWORD.kafkaPropertyName) === "kspw")
assert(properties.getProperties().get(SSL_TRUSTSTORE_PASSWORD.kafkaPropertyName) === "tspw")
}

@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) // fine in tests
private[this] def getSecurityEnabledConsumerProperties(
securityProtocol: String,
keystoreFile: Option[Path] = None,
truststoreFile: Option[Path] = None
): KafkaConsumerProperties = {
val properties = Map("SECURITY_PROTOCOL" -> securityProtocol, "CONNECTION_NAME" -> "SSL_CONNECTION")
val properties = Map(
"BOOTSTRAP_SERVERS" -> "kafka01",
"RECORD_FORMAT" -> "string",
"SECURITY_PROTOCOL" -> securityProtocol,
"CONNECTION_NAME" -> "SSL_CONNECTION"
)
val metadata = mock[ExaMetadata]
val connectionInformation = mock[ExaConnectionInformation]
when(metadata.getConnection("SSL_CONNECTION")).thenReturn(connectionInformation)
Expand All @@ -542,14 +574,16 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
if (securityProtocol === "SSL") {
addSimpleSSLParameters(stringBuilder)
} else if (securityProtocol === "SASL_SSL") {
addSimpleSSLParameters(stringBuilder)
stringBuilder.append(";")
addSimpleSASLParameters(stringBuilder)
}
when(connectionInformation.getPassword()).thenReturn(stringBuilder.toString())
KafkaConsumerProperties(properties, metadata)
}

private[this] def addSimpleSSLParameters(sb: StringBuilder): Unit = {
sb.append("SSL_KEY_PASSWORD=pass123;SSL_KEYSTORE_PASSWORD=pass123;SSL_TRUSTSTORE_PASSWORD=pass123")
sb.append("SSL_KEY_PASSWORD=kpw;SSL_KEYSTORE_PASSWORD=kspw;SSL_TRUSTSTORE_PASSWORD=tspw")
()
}

Expand Down

0 comments on commit bb8c055

Please sign in to comment.