Skip to content

Commit

Permalink
Add support to customize httpClientBuilder for s3 client (#50)
Browse files Browse the repository at this point in the history
# Pull Request

## What

Add a new parameter S3_SDK_HTTP_CLIENT_BUILDER_CONFIG
This is a class to customize the httpClient used by the s3 client

## Why

Our use case is:
We have a minio server with https endpoint. And the minio server uses
self-signed certificate to do the TLS.
When our kafka client uses this lib to put object to that minio, we need
to make sure the CA certificate of the minio server should in the
truststore used by the s3 client. This is the reason we want to
customize the httpClient.

Another limitation is that we need add the CA certificate to the
truststore in the code instead of using the keytool command line. Since
the command line is hard to automate in our use case. So finally I
choose add this support in the this lib.

## How

1) Add the S3_SDK_HTTP_CLIENT_BUILDER_CONFIG class parameter
2) Add method getAmazonSdkHttpClientBuilderInstance() to get the object
of SdkHttpClient.Builder by reflection
3) Add some logic in the createAmazonS3Client() method to config the
SdkHttpClient.Builder

## Test

I added this parameter with DefaultTlsTrustManagersProvider in the UT
and it works fine.

---------

Co-authored-by: Philipp Schirmer <[email protected]>
  • Loading branch information
yidian1997 and philipp94831 authored Jan 17, 2025
1 parent b5d370c commit 379eed2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -144,6 +145,9 @@ public class AbstractLargeMessageConfig extends AbstractConfig {
public static final String S3_ENABLE_PATH_STYLE_ACCESS_CONFIG = S3_PREFIX + "path.style.access";
public static final String S3_ENABLE_PATH_STYLE_ACCESS_DOC = "Enable path-style access for S3 client.";
public static final boolean S3_ENABLE_PATH_STYLE_ACCESS_DEFAULT = false;
public static final String S3_SDK_HTTP_CLIENT_BUILDER_CONFIG = S3_PREFIX + "sdk.http.client.builder";
public static final String S3_SDK_HTTP_CLIENT_BUILDER_DOC = "The HTTP client to use for S3 client.";
public static final Class<? extends SdkHttpClient.Builder> S3_SDK_HTTP_CLIENT_BUILDER_DEFAULT = null;
public static final String S3_REGION_DEFAULT = "";
public static final String S3_ACCESS_KEY_DOC = "AWS access key to use for connecting to S3. Leave empty if AWS"
+ " credential provider chain or STS Assume Role provider should be used.";
Expand Down Expand Up @@ -212,6 +216,7 @@ protected static ConfigDef baseConfigDef() {
.define(S3_ENDPOINT_CONFIG, Type.STRING, S3_ENDPOINT_DEFAULT, Importance.LOW, S3_ENDPOINT_DOC)
.define(S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, Type.BOOLEAN, S3_ENABLE_PATH_STYLE_ACCESS_DEFAULT,
Importance.LOW, S3_ENABLE_PATH_STYLE_ACCESS_DOC)
.define(S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, Type.CLASS, S3_SDK_HTTP_CLIENT_BUILDER_DEFAULT, Importance.LOW, S3_SDK_HTTP_CLIENT_BUILDER_DOC)
.define(S3_REGION_CONFIG, Type.STRING, S3_REGION_DEFAULT, Importance.LOW, S3_REGION_DOC)
.define(S3_ACCESS_KEY_CONFIG, Type.PASSWORD, S3_ACCESS_KEY_DEFAULT, Importance.LOW, S3_ACCESS_KEY_DOC)
.define(S3_SECRET_KEY_CONFIG, Type.PASSWORD, S3_SECRET_KEY_DEFAULT, Importance.LOW, S3_SECRET_KEY_DOC)
Expand Down Expand Up @@ -260,6 +265,9 @@ public LargeMessageStoringClient getStorer() {

protected <T> T getInstance(final String key, final Class<T> targetClass) {
final Class<?> configuredClass = this.getClass(key);
if (configuredClass == null) {
return null;
}
final Object o = Utils.newInstance(configuredClass);
if (!targetClass.isInstance(o)) {
throw new KafkaException(configuredClass.getName() + " is not an instance of " + targetClass.getName());
Expand Down Expand Up @@ -310,12 +318,19 @@ private BlobStorageClient createAmazonS3Client() {
this.getAmazonEndpointOverride().ifPresent(clientBuilder::endpointOverride);
this.getAmazonRegion().ifPresent(clientBuilder::region);
this.getAmazonCredentialsProvider().ifPresent(clientBuilder::credentialsProvider);
this.getAmazonSdkHttpClientBuilderInstance()
.ifPresent(clientBuilder::httpClientBuilder);
if (this.enableAmazonS3PathStyleAccess()) {
clientBuilder.forcePathStyle(true);
}
return new AmazonS3Client(clientBuilder.build());
}

private <T extends SdkHttpClient.Builder<T>> Optional<SdkHttpClient.Builder<T>> getAmazonSdkHttpClientBuilderInstance() {
final SdkHttpClient.Builder<T> builder = this.getInstance(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, SdkHttpClient.Builder.class);
return Optional.ofNullable(builder);
}

private Optional<URI> getAmazonEndpointOverride() {
final String endpoint = this.getString(S3_ENDPOINT_CONFIG);
return isEmpty(endpoint) ? Optional.empty() : Optional.of(URI.create(endpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@

import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri;
import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT_BUILDER;
import static software.amazon.awssdk.core.client.config.SdkClientOption.SYNC_HTTP_CLIENT;

import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.AttributeMap;

class LargeMessageRetrievingClientS3IntegrationTest extends AmazonS3IntegrationTest {

Expand All @@ -59,6 +69,31 @@ void shouldReadBackedText() {
}
}

@Test
void shouldUseConfiguredSdkHttpClientBuilder() {
final String bucket = "bucket";
final String basePath = "s3://" + bucket + "/base/";
final Map<String, Object> properties = ImmutableMap.<String, Object>builder()
.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1")
.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0)
.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath)
.put(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, MockSdkHttpClientBuilder.class.getName())
.build();
AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
LargeMessageRetrievingClient retriever = config.getRetriever();
// Get private field clientFactories
Map<String, Supplier<BlobStorageClient>> clientFactories = getPrivateField(retriever, "clientFactories", Map.class);
BlobStorageClient blobStorageClient = clientFactories.get("s3").get();
// Get private field s3Client
S3Client s3Client = getPrivateField(blobStorageClient, "s3", S3Client.class);
// Get private field clientConfiguration
SdkClientConfiguration clientConfiguration = getPrivateField(s3Client, "clientConfiguration", SdkClientConfiguration.class);
// Get private field attributes
AttributeMap attributes = getPrivateField(clientConfiguration, "attributes", AttributeMap.class);
assertThat(attributes.get(SYNC_HTTP_CLIENT)).isExactlyInstanceOf(MockSdkHttpClient.class);
assertThat(attributes.get(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER)).isExactlyInstanceOf(MockSdkHttpClientBuilder.class);
}

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
Expand All @@ -72,4 +107,36 @@ private void store(final String bucket, final String key, final String s) {
.build(), RequestBody.fromString(s));
}

private static <T> T getPrivateField(Object object, String fieldName, Class<T> fieldType) {
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return fieldType.cast(field.get(object));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

public static class MockSdkHttpClientBuilder implements SdkHttpClient.Builder {
@Override
public SdkHttpClient buildWithDefaults(AttributeMap attributeMap) {
return new MockSdkHttpClient();
}
}

private static class MockSdkHttpClient implements SdkHttpClient {
@Override
public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteRequest) {
return null;
}

public String clientName() {
return "MockSdkHttpClient";
}

@Override
public void close() {

}
}
}

0 comments on commit 379eed2

Please sign in to comment.