Skip to content

Commit

Permalink
Custom krb5.conf file location (#87)
Browse files Browse the repository at this point in the history
* Configuration option

* Check for file exstence and set the property

* Fix optional dereference

* Remove empty line

* Unit test

* Trigger the exception

* Reformating

* Unit test for case when file exists

* New parameter documentation

* Prepare the release

* Fix artefacts ref

* Update doc/changes/changes_1.7.3.md

Co-authored-by: Christoph Kuhnke <[email protected]>

* Update doc/user_guide/user_guide.md

Co-authored-by: Christoph Kuhnke <[email protected]>

* Docs updates

* Update commons-compress to fix CVE-2024-25710 and CVE-2024-26308

* PK update

---------

Co-authored-by: Christoph Kuhnke <[email protected]>
  • Loading branch information
Shmuma and ckunki authored Feb 20, 2024
1 parent 7a1fc9f commit a831fdc
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 134 deletions.
244 changes: 123 additions & 121 deletions dependencies.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions doc/changes/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions doc/changes/changes_1.7.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Exasol Kafka Connector Extension 1.7.3, released 2024-02-20

Code name: Custom `krb5.conf` files support.

## Summary

Implemented support for custom `krb5.conf` files.
Updated transient dependency to fix CVE-2024-25710 and CVE-2024-26308.

## Features

* #86: Add support for custom krb5.conf

## Dependency Updates

### Compile Dependency Updates

* Added `org.apache.commons:commons-compress:1.26.0`

### Plugin Dependency Updates

* Updated `com.exasol:project-keeper-maven-plugin:3.0.0` to `3.0.1`
18 changes: 12 additions & 6 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ checksum provided together with the jar file.
To check the SHA256 sum of the downloaded jar, run the command:

```sh
sha256sum exasol-kafka-connector-extension-1.7.2.jar
sha256sum exasol-kafka-connector-extension-1.7.3.jar
```

### Building From Source
Expand All @@ -84,7 +84,7 @@ sbt assembly
```

The packaged jar file should be located at
`target/scala-2.12/exasol-kafka-connector-extension-1.7.2.jar`.
`target/scala-2.12/exasol-kafka-connector-extension-1.7.3.jar`.

### Create an Exasol BucketFS Bucket

Expand All @@ -106,7 +106,7 @@ jar, please make sure the BucketFS ports are open.
Upload the jar file using the `curl` command:

```bash
curl -X PUT -T exasol-kafka-connector-extension-1.7.2.jar \
curl -X PUT -T exasol-kafka-connector-extension-1.7.3.jar \
http://w:<WRITE_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET_NAME>/
```

Expand Down Expand Up @@ -135,12 +135,12 @@ OPEN SCHEMA KAFKA_EXTENSION;

CREATE OR REPLACE JAVA SET SCRIPT KAFKA_CONSUMER(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaConsumerQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.2.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.3.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT KAFKA_IMPORT(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaTopicDataImporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.2.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.3.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT KAFKA_METADATA(
Expand All @@ -150,7 +150,7 @@ CREATE OR REPLACE JAVA SET SCRIPT KAFKA_METADATA(
)
EMITS (partition_index DECIMAL(18, 0), max_offset DECIMAL(36,0)) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaTopicMetadataReader;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.2.jar;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.3.jar;
/
```

Expand Down Expand Up @@ -489,6 +489,8 @@ keyTab="/buckets/bfsdefault/bucket1/kafka.keytab"
principal="[email protected]";
```
In some complex setups, you might need to provide a custom ``krb5.conf`` file. Thes could be done by uploading it to the BucketFS and providing the path in ``SASL_KRB5CONF_LOCATION`` parameter, similar to ``SASL_JAAS_LOCATION``.
## Importing Data From Azure Event Hubs
To import data from [Azure Event Hubs][azure-event-hubs], we are going to create
Expand Down Expand Up @@ -690,6 +692,10 @@ not in import statement itself.
more complex configuration of SASL authentication. It should refer to the file
stored inside a bucket in Exasol BucketFS.

* ``SASL_KRB5CONF_LOCATION`` - It is the location of the custom ``krb5.conf`` file.
It should refer to the file stored inside a bucket in Exasol BucketFS. In default
configuration, the path starts with ``/buckets/bfsdefault/<bucket_name>/``

[gh-releases]: https://github.com/exasol/kafka-connector-extension/releases
[schema-registry]: https://docs.confluent.io/current/schema-registry/index.html
[kafka-security]: https://kafka.apache.org/documentation/#security
Expand Down
2 changes: 1 addition & 1 deletion error_code_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ error-tags:
KCE:
packages:
- com.exasol.cloudetl
highest-index: 27
highest-index: 28
2 changes: 1 addition & 1 deletion pk_generated_parent.pom

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.exasol</groupId>
<artifactId>kafka-connector-extension</artifactId>
<version>1.7.2</version>
<version>1.7.3</version>
<name>Exasol Kafka Connector Extension</name>
<description>Exasol Kafka Extension for accessing Apache Kafka</description>
<url>https://github.com/exasol/kafka-connector-extension/</url>
Expand Down Expand Up @@ -56,6 +56,11 @@
<artifactId>error-reporting-java</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
Expand Down Expand Up @@ -439,7 +444,7 @@
<plugin>
<groupId>com.exasol</groupId>
<artifactId>project-keeper-maven-plugin</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<executions>
<execution>
<goals>
Expand Down Expand Up @@ -597,7 +602,7 @@
<parent>
<artifactId>kafka-connector-extension-generated-parent</artifactId>
<groupId>com.exasol</groupId>
<version>1.7.2</version>
<version>1.7.3</version>
<relativePath>pk_generated_parent.pom</relativePath>
</parent>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,28 @@ class KafkaConsumerProperties(private val properties: Map[String, String]) exten
}
}

/**
* Gets custom krb5.conf file location. Checks that user provided path exists.
* @return file path or empty string
*/
final def getSASLKrb5ConfLocation(): String = {
val krb5confLocation = get(SASL_KRB5CONF_LOCATION.userPropertyName)
if (krb5confLocation.isEmpty) {
""
} else {
if (!Files.isRegularFile(Paths.get(krb5confLocation.get))) {
throw new KafkaConnectorException(
ExaError
.messageBuilder("E-KCE-28")
.message("Unable to find the custom krb5.conf file at {{JAAS_LOCATION}}", krb5confLocation)
.mitigation(BUCKETFS_CHECK_MITIGATION)
.toString()
)
}
krb5confLocation.get
}
}

/** Returns the Kafka consumer properties as Java map. */
final def getProperties(): java.util.Map[String, AnyRef] = {
val props = MMap.empty[String, String]
Expand Down Expand Up @@ -328,6 +350,7 @@ class KafkaConsumerProperties(private val properties: Map[String, String]) exten
if (isSASLEnabled()) {
props.put(SASL_MECHANISM.kafkaPropertyName, getSASLMechanism())
props.put(SASL_JAAS_CONFIG.kafkaPropertyName, getSASLJaasConfig())
props.put(SASL_KRB5CONF_LOCATION.kafkaPropertyName, getSASLKrb5ConfLocation())
addOptionalParametersForSASL(props)
}
props.toMap.asInstanceOf[Map[String, AnyRef]].asJava
Expand Down Expand Up @@ -374,7 +397,6 @@ class KafkaConsumerProperties(private val properties: Map[String, String]) exten
.toString()
)
}

}

/**
Expand Down Expand Up @@ -757,6 +779,16 @@ object KafkaConsumerProperties {
""
)

/**
* SASL krb5.conf file location. It is can be used when [[SECURITY_PROTOCOL]] is set to {@code SASL_PLAINTEXT}
* or {@code SASL_SSL} to provide location of custom krb5.conf.
*/
private[kafka] final val SASL_KRB5CONF_LOCATION: Config[String] = Config[String](
"SASL_KRB5CONF_LOCATION",
"java.security.krb5.conf",
""
)

/**
* SASL password. It is used when [[SASL_MECHANISM]] is set to {@code PLAIN}, {@code Digest-*} or {@code SCRAM-*}.
*/
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
private[this] val DUMMY_SASL_JAAS_FILE =
Paths.get(getClass.getResource("/kafka_client_jaas.conf").toURI).toAbsolutePath

private[this] val DUMMY_KRB5CONF_FILE =
Paths.get(getClass.getResource("/test_krb5.conf").toURI).toAbsolutePath

test("apply returns a SSL enabled consumer properties") {
val properties =
getSecurityEnabledConsumerProperties("SSL", Option(DUMMY_KEYSTORE_FILE), Option(DUMMY_TRUSTSTORE_FILE))
Expand Down Expand Up @@ -553,10 +556,28 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
assert(properties.getProperties().get(SSL_TRUSTSTORE_PASSWORD.kafkaPropertyName) === "tspw")
}

test("error is throws when non-existent krb5.conf file passed") {
val properties =
getSecurityEnabledConsumerProperties("SASL_SSL", krb5confFile = Option(Paths.get("krb5_non_existing")))
val thrown = intercept[KafkaConnectorException] {
properties.getProperties()
}
val message = thrown.getMessage()
assert(message.contains("Unable to find the custom krb5.conf file"))
assert(message.contains("Please make sure it is successfully uploaded to BucketFS bucket"))
}

test("property is set when existing krb5.conf file passed") {
val properties = getSecurityEnabledConsumerProperties("SASL_SSL", krb5confFile = Option(DUMMY_KRB5CONF_FILE))
val props = properties.getProperties()
assert(props.get(SASL_KRB5CONF_LOCATION.kafkaPropertyName) === s"$DUMMY_KRB5CONF_FILE")
}

private[this] def getSecurityEnabledConsumerProperties(
securityProtocol: String,
keystoreFile: Option[Path] = None,
truststoreFile: Option[Path] = None
truststoreFile: Option[Path] = None,
krb5confFile: Option[Path] = None
): KafkaConsumerProperties = {
val properties = Map(
"BOOTSTRAP_SERVERS" -> "kafka01",
Expand All @@ -571,6 +592,7 @@ class KafkaConsumerPropertiesTest extends AnyFunSuite with BeforeAndAfterEach wi
val stringBuilder = new StringBuilder()
keystoreFile.foreach(file => stringBuilder.append("SSL_KEYSTORE_LOCATION=").append(file).append(";"))
truststoreFile.foreach(file => stringBuilder.append("SSL_TRUSTSTORE_LOCATION=").append(file).append(";"))
krb5confFile.foreach(file => stringBuilder.append("SASL_KRB5CONF_LOCATION=").append(file).append(";"))
if (securityProtocol === "SSL") {
addSimpleSSLParameters(stringBuilder)
} else if (securityProtocol === "SASL_SSL") {
Expand Down

0 comments on commit a831fdc

Please sign in to comment.