diff --git a/build.gradle b/build.gradle index 357eee31..d7418145 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,12 @@ plugins { id "com.diffplug.spotless" version "6.25.0" } +idea { + module { + downloadSources = true + } +} + wrapper { distributionType = 'ALL' doLast { @@ -173,6 +179,9 @@ dependencies { implementation "org.slf4j:slf4j-api:$slf4jVersion" implementation "com.google.code.gson:gson:2.10.1" implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion" + implementation "org.bouncycastle:bcprov-jdk18on:1.78.1" + implementation "org.bouncycastle:bcpkix-jdk18on:1.78.1" + testImplementation "org.junit.jupiter:junit-jupiter:5.10.2" testImplementation "org.mockito:mockito-core:5.11.0" @@ -184,6 +193,7 @@ dependencies { testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0" testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0" testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0" + testImplementation "org.apache.commons:commons-lang3:3.14.0" testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" diff --git a/docs/opensearch-sink-connector-config-options.rst b/docs/opensearch-sink-connector-config-options.rst index 06d5a1c1..32aecfd5 100644 --- a/docs/opensearch-sink-connector-config-options.rst +++ b/docs/opensearch-sink-connector-config-options.rst @@ -231,4 +231,75 @@ Authentication * Default: null * Importance: medium +X.509 PEM certificates and PKCS#8 keys +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``connection.ca.certificate.location`` + Path to X.509 root CAs file (PEM format) + + * Type: string + * Default: null + * Importance: medium + * Dependents: ``connection.access.certificate.location``, ``connection.access.key.location``, ``connection.access.key.password`` + +``connection.access.certificate.location`` + Path to X.509 user access certificate file (PEM format) + + * Type: string + * Default: null + * Importance: medium + +``connection.access.key.location`` + Path to the user certificate’s keys (PKCS #8) file (PEM format) + + * Type: string + * Default: null + * Importance: medium + +``connection.access.key.password`` + User access key password + + * Type: password + * Default: null + * Importance: medium + +Keystore and Truststore files +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``connection.truststore.location`` + Path to the Truststore file (JKS format) + + * Type: string + * Default: null + * Importance: medium + * Dependents: ``connection.truststore.password``, ``connection.keystore.location``, ``connection.keystore.password`` + +``connection.truststore.password`` + Truststore password + + * Type: password + * Default: null + * Importance: medium + +``connection.keystore.location`` + Path to the Keystore file (PKCS12/PFX format) + + * Type: string + * Default: null + * Importance: medium + +``connection.keystore.type`` + Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX + + * Type: string + * Default: JKS + * Importance: medium + +``connection.keystore.password`` + Keystore password + + * Type: password + * Default: null + * Importance: medium + diff --git a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java index d2c95b30..48cb1208 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java +++ b/src/integration-test/java/io/aiven/kafka/connect/opensearch/AbstractIT.java @@ -15,9 +15,9 @@ */ package io.aiven.kafka.connect.opensearch; -import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG; -import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG; import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG; import java.io.IOException; import java.io.UncheckedIOException; diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java b/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java deleted file mode 100644 index 22c5e6f0..00000000 --- a/src/main/java/io/aiven/kafka/connect/opensearch/OpensearchBasicAuthConfigurator.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2019 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.aiven.kafka.connect.opensearch; - -import java.util.Objects; - -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigDef.Width; -import org.apache.kafka.common.config.types.Password; - -import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor; -import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator; - -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; - -/** - * Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured. - */ -public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor { - public static final String CONNECTION_USERNAME_CONFIG = "connection.username"; - private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. " - + "The default is the null, and authentication will only be performed if " - + " both the username and password are non-null."; - public static final String CONNECTION_PASSWORD_CONFIG = "connection.password"; - private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. " - + "The default is the null, and authentication will only be performed if " - + " both the username and password are non-null."; - - @Override - public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) { - if (!isAuthenticatedConnection(config)) { - return false; - } - - final var credentialsProvider = new BasicCredentialsProvider(); - for (final var httpHost : config.httpHosts()) { - credentialsProvider.setCredentials(new AuthScope(httpHost), - new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value())); - } - - builder.setDefaultCredentialsProvider(credentialsProvider); - return true; - } - - @Override - public void addConfig(final ConfigDef config) { - config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC, - "Authentication", 0, Width.SHORT, "Connection Username") - .define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC, - "Authentication", 1, Width.SHORT, "Connection Password"); - } - - private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) { - return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config)); - } - - private static String connectionUsername(final OpensearchSinkConnectorConfig config) { - return config.getString(CONNECTION_USERNAME_CONFIG); - } - - private static Password connectionPassword(final OpensearchSinkConnectorConfig config) { - return config.getPassword(CONNECTION_PASSWORD_CONFIG); - } - -} diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/auth/OpensearchBasicAuthConfigurator.java b/src/main/java/io/aiven/kafka/connect/opensearch/auth/OpensearchBasicAuthConfigurator.java new file mode 100644 index 00000000..29d6aad6 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/opensearch/auth/OpensearchBasicAuthConfigurator.java @@ -0,0 +1,144 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aiven.kafka.connect.opensearch.auth; + +import java.util.List; +import java.util.Objects; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.common.config.types.Password; + +import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig; +import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor; +import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; + +/** + * Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured. + */ +public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor { + public static final String CONNECTION_USERNAME_CONFIG = "connection.username"; + private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. " + + "The default is the null, and authentication will only be performed if " + + " both the username and password are non-null."; + public static final String CONNECTION_PASSWORD_CONFIG = "connection.password"; + private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. " + + "The default is the null, and authentication will only be performed if " + + " both the username and password are non-null."; + + public static final String CLIENT_mTLS_CA_CERTIFICATE_LOCATION = "connection.ca.certificate.location"; + private static final String CLIENT_mTLS_CA_CERTIFICATE_LOCATION_DOC = "Path to X.509 root CAs file (PEM format)"; + + public static final String CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION = "connection.access.certificate.location"; + private static final String CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION_DOC = "Path to X.509 user access certificate file (PEM format)"; + + public static final String CLIENT_mTLS_ACCESS_KEY_LOCATION = "connection.access.key.location"; + private static final String CLIENT_mTLS_ACCESS_KEY_LOCATION_DOC = "Path to the user certificate’s keys (PKCS #8) file (PEM format)"; + + public static final String CLIENT_mTLS_ACCESS_KEY_PASSWORD = "connection.access.key.password"; + private static final String CLIENT_mTLS_ACCESS_KEY_PASSWORD_DOC = "User access key password"; + + public static final String CLIENT_mTLS_TRUSTSTORE_LOCATION = "connection.truststore.location"; + private static final String CLIENT_mTLS_TRUSTSTORE_LOCATION_DOC = "Path to the Truststore file (JKS format)"; + + public static final String CLIENT_mTLS_TRUSTSTORE_PASSWORD = "connection.truststore.password"; + private static final String CLIENT_mTLS_TRUSTSTORE_PASSWORD_DOC = "Truststore password"; + + public static final String CLIENT_mTLS_KEYSTORE_LOCATION = "connection.keystore.location"; + private static final String CLIENT_mTLS_KEYSTORE_LOCATION_DOC = "Path to the Keystore file (PKCS12/PFX format)"; + + public static final String CLIENT_mTLS_KEYSTORE_TYPE = "connection.keystore.type"; + private static final String CLIENT_mTLS_KEYSTORE_TYPE_DOC = "Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX"; + + public static final String CLIENT_mTLS_KEYSTORE_PASSWORD = "connection.keystore.password"; + private static final String CLIENT_mTLS_KEYSTORE_PASSWORD_DOC = "Keystore password"; + + @Override + public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) { + if (!isAuthenticatedConnection(config)) { + return false; + } + + final var credentialsProvider = new BasicCredentialsProvider(); + for (final var httpHost : config.httpHosts()) { + credentialsProvider.setCredentials(new AuthScope(httpHost), + new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value())); + } + SSLContextBuilder.buildSSLContext(config).map(builder::setSSLContext); + return true; + } + + @Override + public void addConfig(final ConfigDef config) { + int order = -1; + config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC, + "Authentication", order++, Width.SHORT, "Connection Username") + .define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC, + "Authentication", order++, Width.SHORT, "Connection Password"); + // PEM settings + config.define(CLIENT_mTLS_CA_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM, + CLIENT_mTLS_CA_CERTIFICATE_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++, Width.SHORT, + "Root CAs", + List.of(CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, CLIENT_mTLS_ACCESS_KEY_LOCATION, + CLIENT_mTLS_ACCESS_KEY_PASSWORD)) + .define(CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM, + CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++, + Width.SHORT, "User access certificate") + .define(CLIENT_mTLS_ACCESS_KEY_LOCATION, Type.STRING, null, Importance.MEDIUM, + CLIENT_mTLS_ACCESS_KEY_LOCATION_DOC, "X.509 PEM certificates and PKCS#8 keys", order++, + Width.SHORT, "User certificate’s key") + .define(CLIENT_mTLS_ACCESS_KEY_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, + CLIENT_mTLS_ACCESS_KEY_PASSWORD_DOC, "X.509 PEM certificates and PKCS#8 keys", order++, + Width.SHORT, "User access key password"); + // Keystore and Truststore files + config.define(CLIENT_mTLS_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, + CLIENT_mTLS_TRUSTSTORE_LOCATION_DOC, "Keystore and Truststore files", order++, Width.SHORT, + "Trust store location", + List.of(CLIENT_mTLS_TRUSTSTORE_PASSWORD, CLIENT_mTLS_KEYSTORE_LOCATION, CLIENT_mTLS_KEYSTORE_PASSWORD)) + .define(CLIENT_mTLS_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, + CLIENT_mTLS_TRUSTSTORE_PASSWORD_DOC, "Keystore and Truststore files", order++, Width.SHORT, + "Trust store password") + .define(CLIENT_mTLS_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, + CLIENT_mTLS_KEYSTORE_LOCATION_DOC, "Keystore and Truststore files", order, Width.SHORT, + "Key store location") + .define(CLIENT_mTLS_KEYSTORE_TYPE, Type.STRING, "JKS", Importance.MEDIUM, CLIENT_mTLS_KEYSTORE_TYPE_DOC, + "Keystore and Truststore files", order++, Width.SHORT, "Key store type") + .define(CLIENT_mTLS_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, + CLIENT_mTLS_KEYSTORE_PASSWORD_DOC, "Keystore and Truststore files", order++, Width.SHORT, + "Key store password"); + + } + + private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) { + return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config)); + } + + private static String connectionUsername(final OpensearchSinkConnectorConfig config) { + return config.getString(CONNECTION_USERNAME_CONFIG); + } + + private static Password connectionPassword(final OpensearchSinkConnectorConfig config) { + return config.getPassword(CONNECTION_PASSWORD_CONFIG); + } + +} diff --git a/src/main/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilder.java b/src/main/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilder.java new file mode 100644 index 00000000..f13b17da --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilder.java @@ -0,0 +1,246 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aiven.kafka.connect.opensearch.auth; + +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_KEY_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_KEY_PASSWORD; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_CA_CERTIFICATE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_PASSWORD; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_TYPE; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_TRUSTSTORE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_TRUSTSTORE_PASSWORD; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Optional; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig; + +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.ssl.SSLContexts; +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.openssl.PEMEncryptedKeyPair; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder; +import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo; +import org.bouncycastle.pkcs.PKCSException; + +class SSLContextBuilder { + + public static Optional buildSSLContext(final OpensearchSinkConnectorConfig connectorConfig) { + if (!hasKeystoreAndTruststoreSettings(connectorConfig) && !hasPemCertificatesSettings(connectorConfig)) { + return Optional.empty(); + } + if (hasKeystoreAndTruststoreSettings(connectorConfig) && hasPemCertificatesSettings(connectorConfig)) { + throw new ConfigException( + "One of Keystore and Truststore files or X.509 PEM certificates and PKCS#8 keys groups should be set"); + } + try { + final var trustStore = buildTrustStore(connectorConfig); + final var keyStore = buildKeyStore(connectorConfig); + return Optional.of(SSLContexts.custom() + .loadTrustMaterial(trustStore, TrustAllStrategy.INSTANCE) + .loadKeyMaterial(keyStore, accessKeyPassword(connectorConfig).value().toCharArray()) + .build()); + } catch (final NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException + | KeyManagementException e) { + throw new ConfigException("Couldn't build SSL context", e); + } + } + + private static KeyStore buildTrustStore(final OpensearchSinkConnectorConfig connectorConfig) { + try { + if (hasPemCertificatesSettings(connectorConfig)) { + final var certificatesChain = readX509Certificates(Path.of(caCertificateLocation(connectorConfig))); + final var trustStore = createKeyStore("JKS"); + for (var i = 0; i < certificatesChain.length; i++) { + String alias = Integer.toString(i); + trustStore.setCertificateEntry(alias, certificatesChain[i]); + } + return trustStore; + } else if (hasKeystoreAndTruststoreSettings(connectorConfig)) { + return createKeyStore("JKS", Path.of(trustStoreLocation(connectorConfig)), + trustStorePassword(connectorConfig).value().toCharArray()); + } + return null; + } catch (final IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) { + throw new ConfigException("Couldn't build trust store", e); + } + } + + private static KeyStore buildKeyStore(final OpensearchSinkConnectorConfig connectorConfig) { + try { + if (hasPemCertificatesSettings(connectorConfig)) { + final var certificatesChain = readX509Certificates(Path.of(accessCertificateLocation(connectorConfig))); + final var privetKey = readPrivateKey(Path.of(accessKeyLocation(connectorConfig)), + accessKeyPassword(connectorConfig)); + final var keystore = createKeyStore("JKS"); + keystore.setKeyEntry("access_key", privetKey, accessKeyPassword(connectorConfig).value().toCharArray(), + certificatesChain); + return keystore; + } else if (hasKeystoreAndTruststoreSettings(connectorConfig)) { + return createKeyStore(keyStoreType(connectorConfig), Path.of(keyStoreLocation(connectorConfig)), + keyStorePassword(connectorConfig).value().toCharArray()); + } + return null; + } catch (final IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) { + throw new ConfigException("Couldn't build key store", e); + } + } + + private static KeyStore createKeyStore(final String type) + throws CertificateException, KeyStoreException, IOException, NoSuchAlgorithmException { + return createKeyStore(type, null, null); + } + + private static KeyStore createKeyStore(final String type, final Path location, final char[] password) + throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException { + final var keyStore = KeyStore.getInstance(type); + if (isNull(location)) { + keyStore.load(null, null); + } else { + try (final var in = Files.newInputStream(location)) { + keyStore.load(in, password); + } + } + return keyStore; + } + + private static boolean hasPemCertificatesSettings(final OpensearchSinkConnectorConfig config) { + return nonNull(caCertificateLocation(config)) || nonNull(accessCertificateLocation(config)) + || nonNull(accessKeyLocation(config)); + } + + private static boolean hasKeystoreAndTruststoreSettings(final OpensearchSinkConnectorConfig config) { + return nonNull(trustStoreLocation(config)) || nonNull(keyStoreLocation(config)); + } + + private static String caCertificateLocation(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_CA_CERTIFICATE_LOCATION); + } + + private static String accessCertificateLocation(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION); + } + + private static String accessKeyLocation(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_ACCESS_KEY_LOCATION); + } + + private static Password accessKeyPassword(final OpensearchSinkConnectorConfig config) { + return config.getPassword(CLIENT_mTLS_ACCESS_KEY_PASSWORD); + } + + private static String trustStoreLocation(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_TRUSTSTORE_LOCATION); + } + + private static Password trustStorePassword(final OpensearchSinkConnectorConfig config) { + return config.getPassword(CLIENT_mTLS_TRUSTSTORE_PASSWORD); + } + + private static String keyStoreType(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_KEYSTORE_TYPE); + } + + private static String keyStoreLocation(final OpensearchSinkConnectorConfig config) { + return config.getString(CLIENT_mTLS_KEYSTORE_LOCATION); + } + + private static Password keyStorePassword(final OpensearchSinkConnectorConfig config) { + return config.getPassword(CLIENT_mTLS_KEYSTORE_PASSWORD); + } + + private static PEMParser createPEMParser(final Path location) throws IOException { + return new PEMParser(Files.newBufferedReader(location)); + } + + private static X509Certificate[] readX509Certificates(final Path location) { + final var certificates = new ArrayList(); + final var converter = new JcaX509CertificateConverter(); + try (final var pemParser = createPEMParser(location)) { + Object pemObject; + while ((pemObject = pemParser.readObject()) != null) { + if (pemObject instanceof X509CertificateHolder) { + certificates.add(converter.getCertificate((X509CertificateHolder) pemObject)); + } + } + } catch (final IOException ioe) { + throw new ConfigException("Couldn't read PEM content from " + location); + } catch (final CertificateException ce) { + throw new ConfigException("Couldn't get X.509 certificate from " + location); + } + return certificates.toArray(new X509Certificate[0]); + } + + private static PrivateKey readPrivateKey(final Path location, final Password keyPassword) { + final var converter = new JcaPEMKeyConverter(); + try (final var pemParser = createPEMParser(location)) { + Object pemObject; + while ((pemObject = pemParser.readObject()) != null) { + if (isNull(keyPassword)) { + if (pemObject instanceof PrivateKeyInfo) { + return converter.getPrivateKey((PrivateKeyInfo) pemObject); + } else if (pemObject instanceof PEMKeyPair) { + return converter.getKeyPair((PEMKeyPair) pemObject).getPrivate(); + } else { + throw new ConfigException("Unable to parse PEM object {} as a non encrypted key", location); + } + } else if (pemObject instanceof PEMEncryptedKeyPair) { + final var provider = new JcePEMDecryptorProviderBuilder().build(keyPassword.value().toCharArray()); + return converter.getKeyPair(((PEMEncryptedKeyPair) pemObject).decryptKeyPair(provider)) + .getPrivate(); + } else if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) { + final var provider = new JceOpenSSLPKCS8DecryptorProviderBuilder() + .build(keyPassword.value().toCharArray()); + return converter + .getPrivateKey(((PKCS8EncryptedPrivateKeyInfo) pemObject).decryptPrivateKeyInfo(provider)); + } + } + } catch (final IOException ioe) { + throw new ConfigException("Couldn't read PEM content from " + location, ioe); + } catch (final PKCSException | OperatorCreationException e) { + throw new ConfigException("Couldn't get private key from " + location, e); + } + throw new ConfigException("Couldn't get private key from " + location); + } + +} diff --git a/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor b/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor index 1c9ed88f..554dcf73 100644 --- a/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor +++ b/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor @@ -1 +1 @@ -io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator +io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator diff --git a/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator b/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator index 1c9ed88f..554dcf73 100644 --- a/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator +++ b/src/main/resources/META-INF/services/io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator @@ -1 +1 @@ -io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator +io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator diff --git a/src/test/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilderTest.java b/src/test/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilderTest.java new file mode 100644 index 00000000..44b724b7 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/opensearch/auth/SSLContextBuilderTest.java @@ -0,0 +1,243 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aiven.kafka.connect.opensearch.auth; + +import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_KEY_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_ACCESS_KEY_PASSWORD; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_CA_CERTIFICATE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_PASSWORD; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_KEYSTORE_TYPE; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_TRUSTSTORE_LOCATION; +import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CLIENT_mTLS_TRUSTSTORE_PASSWORD; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.SecureRandom; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig; + +import org.apache.commons.lang3.RandomStringUtils; +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x500.style.RFC4519Style; +import org.bouncycastle.asn1.x509.BasicConstraints; +import org.bouncycastle.asn1.x509.ExtendedKeyUsage; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.KeyPurposeId; +import org.bouncycastle.asn1.x509.KeyUsage; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.cert.CertIOException; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.PKCS8Generator; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.bouncycastle.openssl.jcajce.JcaPEMWriter; +import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder; +import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo; +import org.bouncycastle.pkcs.PKCSException; +import org.bouncycastle.util.io.pem.PemReader; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class SSLContextBuilderTest { + + private final static BouncyCastleProvider BOUNCY_CASTLE_PROVIDER = new BouncyCastleProvider(); + + static Path CA_CERTIFICATE_PATH; + + static Path USER_ACCESS_CERTIFICATE_PATH; + + static Path USER_ACCESS_KEY_PATH; + + final static String USER_ACCESS_KEY_PASSWORD = RandomStringUtils.randomAlphabetic(10); + + @BeforeAll + static void setup(final @TempDir Path tmpFolder) throws Exception { + final var keyPair = generateKeyPair(); + generateCaCertificate(keyPair, tmpFolder); + generateAccessCertificate(keyPair, tmpFolder); + } + + private static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator generator = KeyPairGenerator.getInstance("RSA", BOUNCY_CASTLE_PROVIDER); + generator.initialize(4096); + return generator.generateKeyPair(); + } + + private static void generateCaCertificate(final KeyPair parentKeyPair, final Path tmpFolder) + throws IOException, NoSuchAlgorithmException, OperatorCreationException { + CA_CERTIFICATE_PATH = tmpFolder.resolve("ca_root.crt"); + writePemContent(CA_CERTIFICATE_PATH, + createCertificateBuilder( + "DC=org,DC=example,O=Example Org.,OU=Example Org. Root CA,CN=Example Org. Root CA", + parentKeyPair.getPublic(), parentKeyPair.getPublic()) + .addExtension(Extension.basicConstraints, true, new BasicConstraints(true)) + .addExtension(Extension.keyUsage, true, + new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyCertSign | KeyUsage.cRLSign)) + .build(new JcaContentSignerBuilder("SHA256withRSA").setProvider(BOUNCY_CASTLE_PROVIDER) + .build(parentKeyPair.getPrivate()))); + } + + private static void generateAccessCertificate(final KeyPair parentKeyPair, final Path tmpFolder) + throws NoSuchAlgorithmException, IOException, OperatorCreationException { + final var keyPair = generateKeyPair(); + USER_ACCESS_CERTIFICATE_PATH = tmpFolder.resolve("user_access.crt"); + writePemContent(USER_ACCESS_CERTIFICATE_PATH, + createCertificateBuilder("CN=user_access,OU=client,O=client,L=test,C=de", keyPair.getPublic(), + parentKeyPair.getPublic()) + .addExtension(Extension.basicConstraints, true, new BasicConstraints(false)) + .addExtension(Extension.keyUsage, true, + new KeyUsage( + KeyUsage.digitalSignature | KeyUsage.nonRepudiation | KeyUsage.keyEncipherment)) + .addExtension(Extension.extendedKeyUsage, true, + new ExtendedKeyUsage(KeyPurposeId.id_kp_clientAuth)) + .build(new JcaContentSignerBuilder("SHA256withRSA").setProvider(BOUNCY_CASTLE_PROVIDER) + .build(parentKeyPair.getPrivate()))); + USER_ACCESS_KEY_PATH = tmpFolder.resolve("user_access.key"); + writePemContent(USER_ACCESS_KEY_PATH, + new PKCS8Generator(PrivateKeyInfo.getInstance(keyPair.getPrivate().getEncoded()), + new JceOpenSSLPKCS8EncryptorBuilder(PKCS8Generator.PBE_SHA1_3DES).setRandom(new SecureRandom()) + .setPassword(USER_ACCESS_KEY_PASSWORD.toCharArray()) + .build()) + .generate()); + } + + private static void writePemContent(final Path path, final Object pemContent) throws IOException { + try (JcaPEMWriter writer = new JcaPEMWriter(Files.newBufferedWriter(path))) { + writer.writeObject(pemContent); + } + } + + private static X509v3CertificateBuilder createCertificateBuilder(final String subject, + final PublicKey certificatePublicKey, final PublicKey parentPublicKey) + throws NoSuchAlgorithmException, CertIOException { + final var issuerName = new X500Name(RFC4519Style.INSTANCE, subject); + final var subjectName = new X500Name(RFC4519Style.INSTANCE, subject); + final var startDate = Instant.now().minusMillis(24 * 3600 * 1000); + final var endDate = Instant.from(startDate).plus(10, ChronoUnit.DAYS); + final var serialNumber = BigInteger.valueOf(Instant.now().plusMillis(100).getEpochSecond()); + final var extUtils = new JcaX509ExtensionUtils(); + return new X509v3CertificateBuilder(issuerName, serialNumber, Date.from(startDate), Date.from(endDate), + subjectName, SubjectPublicKeyInfo.getInstance(certificatePublicKey.getEncoded())) + .addExtension(Extension.authorityKeyIdentifier, false, + extUtils.createAuthorityKeyIdentifier(parentPublicKey)) + .addExtension(Extension.subjectKeyIdentifier, false, + extUtils.createSubjectKeyIdentifier(certificatePublicKey)); + } + + @Test + public void shouldFailIfBothSSLConfigGroupsWereSet() { + final var config = new OpensearchSinkConnectorConfig( + Map.of(CONNECTION_URL_CONFIG, "http://127.0.0.1", CLIENT_mTLS_CA_CERTIFICATE_LOCATION, "a", + CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, "b", CLIENT_mTLS_ACCESS_KEY_LOCATION, "c", + CLIENT_mTLS_ACCESS_KEY_PASSWORD, "d", CLIENT_mTLS_TRUSTSTORE_LOCATION, "e", + CLIENT_mTLS_TRUSTSTORE_PASSWORD, "f", CLIENT_mTLS_KEYSTORE_LOCATION, "g", + CLIENT_mTLS_KEYSTORE_TYPE, "h", CLIENT_mTLS_KEYSTORE_PASSWORD, "i")); + assertThrows(ConfigException.class, () -> SSLContextBuilder.buildSSLContext(config)); + } + + @Test + public void shouldSkipSSLContextConfigIfNothingSet() { + final var config = new OpensearchSinkConnectorConfig(Map.of(CONNECTION_URL_CONFIG, "http://127.0.0.1")); + assertTrue(SSLContextBuilder.buildSSLContext(config).isEmpty()); + } + + @Test + public void shouldBuildSSLContextUsingPemFiles() { + final var config = new OpensearchSinkConnectorConfig( + Map.of(CONNECTION_URL_CONFIG, "http://127.0.0.1", CLIENT_mTLS_CA_CERTIFICATE_LOCATION, + CA_CERTIFICATE_PATH.toString(), CLIENT_mTLS_ACCESS_CERTIFICATE_LOCATION, + USER_ACCESS_CERTIFICATE_PATH.toString(), CLIENT_mTLS_ACCESS_KEY_LOCATION, + USER_ACCESS_KEY_PATH.toString(), CLIENT_mTLS_ACCESS_KEY_PASSWORD, USER_ACCESS_KEY_PASSWORD)); + assertTrue(SSLContextBuilder.buildSSLContext(config).isPresent()); + } + + @Test + public void shouldBuildSSLContextUsingTrustAndKeyStoreFiles(final @TempDir Path tmpFolder) throws Exception { + final var trustStorePath = tmpFolder.resolve("truststore.jks"); + final var trustStorePassword = RandomStringUtils.randomAlphabetic(10); + + final var trustStore = KeyStore.getInstance("JKS"); + trustStore.load(null, null); + trustStore.setCertificateEntry("ca", x509Certificate(CA_CERTIFICATE_PATH)); + trustStore.store(Files.newOutputStream(trustStorePath), trustStorePassword.toCharArray()); + + final var keyStorePath = tmpFolder.resolve("keystore.jks"); + final var keyStorePassword = RandomStringUtils.randomAlphabetic(10); + final var keyStore = KeyStore.getInstance("JKS"); + keyStore.load(null, null); + keyStore.setKeyEntry("key", privateKey(USER_ACCESS_KEY_PATH), USER_ACCESS_KEY_PASSWORD.toCharArray(), + new Certificate[] { x509Certificate(USER_ACCESS_CERTIFICATE_PATH) }); + keyStore.store(Files.newOutputStream(keyStorePath), keyStorePassword.toCharArray()); + + final var config = new OpensearchSinkConnectorConfig( + Map.of(CONNECTION_URL_CONFIG, "http://127.0.0.1", CLIENT_mTLS_TRUSTSTORE_LOCATION, + trustStorePath.toString(), CLIENT_mTLS_TRUSTSTORE_PASSWORD, trustStorePassword, + CLIENT_mTLS_KEYSTORE_LOCATION, keyStorePath.toString(), CLIENT_mTLS_KEYSTORE_PASSWORD, + keyStorePassword, CLIENT_mTLS_ACCESS_KEY_PASSWORD, USER_ACCESS_KEY_PASSWORD)); + + assertTrue(SSLContextBuilder.buildSSLContext(config).isPresent()); + } + + private X509Certificate x509Certificate(final Path path) throws IOException, CertificateException { + try (final var pemReader = new PemReader(Files.newBufferedReader(path))) { + final var pemContent = pemReader.readPemObject().getContent(); + return (X509Certificate) CertificateFactory.getInstance("X.509") + .generateCertificate(new ByteArrayInputStream(pemContent)); + } + } + + private PrivateKey privateKey(final Path path) throws IOException, PKCSException, OperatorCreationException { + try (final var parser = new PEMParser(Files.newBufferedReader(path))) { + final var pemObject = parser.readObject(); + final var provider = new JceOpenSSLPKCS8DecryptorProviderBuilder() + .build(USER_ACCESS_KEY_PASSWORD.toCharArray()); + return new JcaPEMKeyConverter() + .getPrivateKey(((PKCS8EncryptedPrivateKeyInfo) pemObject).decryptPrivateKeyInfo(provider)); + } + } + +}