Skip to content

Commit

Permalink
Add mTLS support
Browse files Browse the repository at this point in the history
Added mTLS support to set up OpenSearch connection with client certificate

Signed-off-by: Andrey Pleskach <[email protected]>
  • Loading branch information
willyborankin committed May 9, 2024
1 parent 8ed2745 commit a128c43
Show file tree
Hide file tree
Showing 9 changed files with 718 additions and 87 deletions.
10 changes: 10 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ plugins {
id "com.diffplug.spotless" version "6.25.0"
}

idea {
module {
downloadSources = true
}
}

wrapper {
distributionType = 'ALL'
doLast {
Expand Down Expand Up @@ -173,6 +179,9 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.gson:gson:2.10.1"
implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion"
implementation "org.bouncycastle:bcprov-jdk18on:1.78.1"
implementation "org.bouncycastle:bcpkix-jdk18on:1.78.1"


testImplementation "org.junit.jupiter:junit-jupiter:5.10.2"
testImplementation "org.mockito:mockito-core:5.11.0"
Expand All @@ -184,6 +193,7 @@ dependencies {
testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0"
testImplementation "org.apache.commons:commons-lang3:3.14.0"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"

integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
Expand Down
71 changes: 71 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,75 @@ Authentication
* Default: null
* Importance: medium

X.509 PEM certificates and PKCS#8 keys
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

``connection.ca.certificate.location``
Path to X.509 root CAs file (PEM format)

* Type: string
* Default: null
* Importance: medium
* Dependents: ``connection.access.certificate.location``, ``connection.access.key.location``, ``connection.access.key.password``

``connection.access.certificate.location``
Path to X.509 user access certificate file (PEM format)

* Type: string
* Default: null
* Importance: medium

``connection.access.key.location``
Path to the user certificate’s keys (PKCS #8) file (PEM format)

* Type: string
* Default: null
* Importance: medium

``connection.access.key.password``
User access key password

* Type: password
* Default: null
* Importance: medium

Keystore and Truststore files
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

``connection.truststore.location``
Path to the Truststore file (JKS format)

* Type: string
* Default: null
* Importance: medium
* Dependents: ``connection.truststore.password``, ``connection.keystore.location``, ``connection.keystore.password``

``connection.truststore.password``
Truststore password

* Type: password
* Default: null
* Importance: medium

``connection.keystore.location``
Path to the Keystore file (PKCS12/PFX format)

* Type: string
* Default: null
* Importance: medium

``connection.keystore.type``
Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX

* Type: string
* Default: JKS
* Importance: medium

``connection.keystore.password``
Keystore password

* Type: password
* Default: null
* Importance: medium


Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.aiven.kafka.connect.opensearch;

import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2019 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aiven.kafka.connect.opensearch.auth;

import java.util.List;
import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;

import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig;
import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor;
import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator;

import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

/**
* Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured.
*/
public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor {
public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. "
+ "The default is the null, and authentication will only be performed if "
+ " both the username and password are non-null.";
public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. "
+ "The default is the null, and authentication will only be performed if "
+ " both the username and password are non-null.";

public static final String CLIENT_mTLS_CA_CERTIFICATE_LOCATION = "connection.ca.certificate.location";
private static final String CLIENT_mTLS_CA_CERTIFICATE_LOCATION_DOC = "Path to X.509 root CAs file (PEM format)";

public static final String CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION = "connection.access.certificate.location";
private static final String CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION_DOC = "Path to X.509 user access certificate file (PEM format)";

public static final String CLIENT_mTLS_ACCESS_KEY_LOCATION = "connection.access.key.location";
private static final String CLIENT_mTLS_ACCESS_KEY_LOCATION_DOC = "Path to the user certificate’s keys (PKCS #8) file (PEM format)";

public static final String CLIENT_mTLS_ACCESS_KEY_PASSWORD = "connection.access.key.password";
private static final String CLIENT_mTLS_ACCESS_KEY_PASSWORD_DOC = "User access key password";

public static final String CLIENT_mTLS_TRUSTSTORE_LOCATION = "connection.truststore.location";
private static final String CLIENT_mTLS_TRUSTSTORE_LOCATION_DOC = "Path to the Truststore file (JKS format)";

public static final String CLIENT_mTLS_TRUSTSTORE_PASSWORD = "connection.truststore.password";
private static final String CLIENT_mTLS_TRUSTSTORE_PASSWORD_DOC = "Truststore password";

public static final String CLIENT_mTLS_KEYSTORE_LOCATION = "connection.keystore.location";
private static final String CLIENT_mTLS_KEYSTORE_LOCATION_DOC = "Path to the Keystore file (PKCS12/PFX format)";

public static final String CLIENT_mTLS_KEYSTORE_TYPE = "connection.keystore.type";
private static final String CLIENT_mTLS_KEYSTORE_TYPE_DOC = "Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX";

public static final String CLIENT_mTLS_KEYSTORE_PASSWORD = "connection.keystore.password";
private static final String CLIENT_mTLS_KEYSTORE_PASSWORD_DOC = "Keystore password";

@Override
public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) {
if (!isAuthenticatedConnection(config)) {
return false;
}

final var credentialsProvider = new BasicCredentialsProvider();
for (final var httpHost : config.httpHosts()) {
credentialsProvider.setCredentials(new AuthScope(httpHost),
new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value()));
}
SSLContextBuilder.buildSSLContext(config).map(builder::setSSLContext);
return true;
}

@Override
public void addConfig(final ConfigDef config) {
int order = -1;
config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC,
"Authentication", order++, Width.SHORT, "Connection Username")
.define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC,
"Authentication", order++, Width.SHORT, "Connection Password");
// PEM settings
config.define(CLIENT_mTLS_CA_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_mTLS_CA_CERTIFICATE_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++, Width.SHORT,
"Root CAs",
List.of(CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, CLIENT_mTLS_ACCESS_KEY_LOCATION,
CLIENT_mTLS_ACCESS_KEY_PASSWORD))
.define(CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++,
Width.SHORT, "User access certificate")
.define(CLIENT_mTLS_ACCESS_KEY_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_mTLS_ACCESS_KEY_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++,
Width.SHORT, "User certificate’s key")
.define(CLIENT_mTLS_ACCESS_KEY_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_mTLS_ACCESS_KEY_PASSWORD_DOC, "X.509 PEM certificates and PKCS#8 keys", order++,
Width.SHORT, "User access key password");
// Keystore and Truststore files
config.define(CLIENT_mTLS_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_mTLS_TRUSTSTORE_LOCATION_DOC, "Keystore and Truststore files", order++, Width.SHORT,
"Trust store location",
List.of(CLIENT_mTLS_TRUSTSTORE_PASSWORD, CLIENT_mTLS_KEYSTORE_LOCATION, CLIENT_mTLS_KEYSTORE_PASSWORD))
.define(CLIENT_mTLS_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_mTLS_TRUSTSTORE_PASSWORD_DOC, "Keystore and Truststore files", order++, Width.SHORT,
"Trust store password")
.define(CLIENT_mTLS_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_mTLS_KEYSTORE_LOCATION_DOC, "Keystore and Truststore files", order, Width.SHORT,
"Key store location")
.define(CLIENT_mTLS_KEYSTORE_TYPE, Type.STRING, "JKS", Importance.MEDIUM, CLIENT_mTLS_KEYSTORE_TYPE_DOC,
"Keystore and Truststore files", order++, Width.SHORT, "Key store type")
.define(CLIENT_mTLS_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_mTLS_KEYSTORE_PASSWORD_DOC, "Keystore and Truststore files", order++, Width.SHORT,
"Key store password");

}

private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config));
}

private static String connectionUsername(final OpensearchSinkConnectorConfig config) {
return config.getString(CONNECTION_USERNAME_CONFIG);
}

private static Password connectionPassword(final OpensearchSinkConnectorConfig config) {
return config.getPassword(CONNECTION_PASSWORD_CONFIG);
}

}
Loading

0 comments on commit a128c43

Please sign in to comment.