Skip to content

Commit

Permalink
Adding AWS SigV4 authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiojmendes committed Oct 14, 2023
1 parent fb0e600 commit 8a2ffb3
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.gson:gson:2.10.1"
implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion"
implementation "io.github.acm19:aws-request-signing-apache-interceptor:2.3.1"

testImplementation "org.junit.jupiter:junit-jupiter:5.10.0"
testImplementation "org.mockito:mockito-core:5.5.0"
Expand Down
24 changes: 24 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,28 @@ Authentication
* Default: null
* Importance: medium

AWS Authentication SigV4
^^^^^^^^^^^^^^^^^^^^^^^^

``aws.access_key_id``
AWS Access key id, this field is required to enable AWS SigV4 request signing

* Type: string
* Default: null
* Importance: medium

``aws.region``
AWS Region, eg us-east-1. This field is required to enable AWS SigV4 request signing

* Type: string
* Default: null
* Importance: medium

``aws.secret_access_key``
AWS secret access key, this field is required to enable AWS SigV4 request signing

* Type: password
* Default: null
* Importance: medium


Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.opensearch;

import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;

import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor;
import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator;

import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.regions.Region;


/**
* Adds AWS SigV4 authentication to the {@index HttpAsyncClientBuilder} for Opensearch client
* if configured.
*/
public class OpensearchSigV4Configurator
implements OpensearchClientConfigurator, ConfigDefContributor {

public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access_key_id";
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret_access_key";
public static final String AWS_REGION_CONFIG = "aws.region";
private static final String AWS_ACCESS_KEY_ID_DOC =
"AWS Access key id, this field is required "
+ "to enable AWS SigV4 request signing";
private static final String AWS_SECRET_ACCESS_KEY_DOC =
"AWS secret access key, this field is required "
+ "to enable AWS SigV4 request signing";
private static final String AWS_REGION_DOC =
"AWS Region, eg us-east-1. This field is required "
+ "to enable AWS SigV4 request signing";

private static final String AWS_GROUP_NAME = "AWS Authentication SigV4";

private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
return Objects.nonNull(awsAccessKeyId(config))
&& Objects.nonNull(awsSecretAccessKey(config))
&& Objects.nonNull(awsRegion(config));
}

private static String awsRegion(final OpensearchSinkConnectorConfig config) {
return config.getString(AWS_REGION_CONFIG);
}

private static String awsAccessKeyId(final OpensearchSinkConnectorConfig config) {
return config.getString(AWS_ACCESS_KEY_ID_CONFIG);
}

private static Password awsSecretAccessKey(final OpensearchSinkConnectorConfig config) {
return config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

@Override
public boolean apply(final OpensearchSinkConnectorConfig config,
final HttpAsyncClientBuilder builder) {
if (!isAuthenticatedConnection(config)) {
return false;
}

final AwsCredentials credentials = AwsBasicCredentials.create(
awsAccessKeyId(config),
awsSecretAccessKey(config).value());
final StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials);

final HttpRequestInterceptor awsSignerInterceptor = new AwsRequestSigningApacheInterceptor(
"es",
Aws4Signer.create(),
credentialsProvider,
Region.of(config.getString(AWS_REGION_CONFIG))
);

builder.addInterceptorLast(awsSignerInterceptor);

return true;
}

@Override
public void addConfig(final ConfigDef config) {
config
.define(
AWS_ACCESS_KEY_ID_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
AWS_ACCESS_KEY_ID_DOC,
AWS_GROUP_NAME,
0,
Width.SHORT,
"Access Key Id"
).define(
AWS_SECRET_ACCESS_KEY_CONFIG,
Type.PASSWORD,
null,
Importance.MEDIUM,
AWS_SECRET_ACCESS_KEY_DOC,
AWS_GROUP_NAME,
1,
Width.SHORT,
"Secret Access Key"
).define(
AWS_REGION_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
AWS_REGION_DOC,
AWS_GROUP_NAME,
1,
Width.SHORT,
"Region");
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator
io.aiven.kafka.connect.opensearch.OpensearchSigV4Configurator
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator
io.aiven.kafka.connect.opensearch.OpensearchSigV4Configurator
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.opensearch;

import java.util.Map;

import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class OpensearchSigV4ConfiguratorTest {

@Test
void testApplyInterceptor(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testConfigMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost")
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testAccessKeyIdMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testSecretAccessKeyMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testRegionMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}
}

0 comments on commit 8a2ffb3

Please sign in to comment.