Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to customize httpClientBuilder for s3 client #50

Merged
merged 25 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0700af5
Upgrade to Kafka 3.6 (#51)
philipp94831 Jan 7, 2025
4cfc3d6
Close Blobstorage clients (#52)
philipp94831 Jan 7, 2025
78a9a1c
[Gradle Release Plugin] - pre tag commit: '2.8.0'.
bakdata-bot Jan 7, 2025
7159700
[Gradle Release Plugin] - new version commit: '2.8.1-SNAPSHOT'.
bakdata-bot Jan 7, 2025
1dc9607
Changelog for version 2.8.0
bakdata-bot Jan 7, 2025
b80bafc
Test converter using embedded connect (#53)
philipp94831 Jan 8, 2025
ffbdea5
Upgrade to Kafka 3.8 (#54)
philipp94831 Jan 9, 2025
534a9f8
[Gradle Release Plugin] - pre tag commit: '2.9.0'.
bakdata-bot Jan 9, 2025
d1ee81d
[Gradle Release Plugin] - new version commit: '2.9.1-SNAPSHOT'.
bakdata-bot Jan 9, 2025
808f6f5
Changelog for version 2.9.0
bakdata-bot Jan 9, 2025
44e31ef
Use Apache Kafka ConfigDef (#55)
philipp94831 Jan 14, 2025
31adf3d
[Gradle Release Plugin] - pre tag commit: '2.9.1'.
bakdata-bot Jan 14, 2025
a37d69b
[Gradle Release Plugin] - new version commit: '2.9.2-SNAPSHOT'.
bakdata-bot Jan 14, 2025
81f616a
Changelog for version 2.9.1
bakdata-bot Jan 14, 2025
bb3156b
Merge branch 'bakdata:master' into master
yidian1997 Jan 15, 2025
dc368b0
add S3_SDK_HTTP_CLIENT_BUILDER_CONFIG to customize http client for s3…
yidian1997 Dec 27, 2024
2271d9c
format
yidian1997 Dec 27, 2024
8e772fa
make S3_SDK_HTTP_CLIENT_BUILDER_CONFIG optional
yidian1997 Jan 9, 2025
6cb5bc2
make S3_SDK_HTTP_CLIENT_BUILDER_CONFIG optional and add UT shouldUseC…
yidian1997 Jan 13, 2025
73a7710
make S3_SDK_HTTP_CLIENT_BUILDER_CONFIG optional by adding NoSdkHttpCl…
yidian1997 Jan 14, 2025
a27b508
format
yidian1997 Jan 14, 2025
70f068a
rebase with master
yidian1997 Jan 15, 2025
6c64b30
format
yidian1997 Jan 15, 2025
176d6a7
add final
yidian1997 Jan 15, 2025
9b30ffa
remove NoSdkHttpClientBuilder
yidian1997 Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
Expand Down Expand Up @@ -144,6 +145,9 @@ public class AbstractLargeMessageConfig extends AbstractConfig {
public static final String S3_ENABLE_PATH_STYLE_ACCESS_CONFIG = S3_PREFIX + "path.style.access";
public static final String S3_ENABLE_PATH_STYLE_ACCESS_DOC = "Enable path-style access for S3 client.";
public static final boolean S3_ENABLE_PATH_STYLE_ACCESS_DEFAULT = false;
public static final String S3_SDK_HTTP_CLIENT_BUILDER_CONFIG = S3_PREFIX + "sdk.http.client.builder";
public static final String S3_SDK_HTTP_CLIENT_BUILDER_DOC = "The HTTP client to use for S3 client.";
public static final Class<? extends SdkHttpClient.Builder> S3_SDK_HTTP_CLIENT_BUILDER_DEFAULT = null;
public static final String S3_REGION_DEFAULT = "";
public static final String S3_ACCESS_KEY_DOC = "AWS access key to use for connecting to S3. Leave empty if AWS"
+ " credential provider chain or STS Assume Role provider should be used.";
Expand Down Expand Up @@ -212,6 +216,7 @@ protected static ConfigDef baseConfigDef() {
.define(S3_ENDPOINT_CONFIG, Type.STRING, S3_ENDPOINT_DEFAULT, Importance.LOW, S3_ENDPOINT_DOC)
.define(S3_ENABLE_PATH_STYLE_ACCESS_CONFIG, Type.BOOLEAN, S3_ENABLE_PATH_STYLE_ACCESS_DEFAULT,
Importance.LOW, S3_ENABLE_PATH_STYLE_ACCESS_DOC)
.define(S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, Type.CLASS, S3_SDK_HTTP_CLIENT_BUILDER_DEFAULT, Importance.LOW, S3_SDK_HTTP_CLIENT_BUILDER_DOC)
.define(S3_REGION_CONFIG, Type.STRING, S3_REGION_DEFAULT, Importance.LOW, S3_REGION_DOC)
.define(S3_ACCESS_KEY_CONFIG, Type.PASSWORD, S3_ACCESS_KEY_DEFAULT, Importance.LOW, S3_ACCESS_KEY_DOC)
.define(S3_SECRET_KEY_CONFIG, Type.PASSWORD, S3_SECRET_KEY_DEFAULT, Importance.LOW, S3_SECRET_KEY_DOC)
Expand Down Expand Up @@ -260,6 +265,9 @@ public LargeMessageStoringClient getStorer() {

protected <T> T getInstance(final String key, final Class<T> targetClass) {
final Class<?> configuredClass = this.getClass(key);
if (configuredClass == null) {
return null;
}
final Object o = Utils.newInstance(configuredClass);
if (!targetClass.isInstance(o)) {
throw new KafkaException(configuredClass.getName() + " is not an instance of " + targetClass.getName());
Expand Down Expand Up @@ -310,12 +318,19 @@ private BlobStorageClient createAmazonS3Client() {
this.getAmazonEndpointOverride().ifPresent(clientBuilder::endpointOverride);
this.getAmazonRegion().ifPresent(clientBuilder::region);
this.getAmazonCredentialsProvider().ifPresent(clientBuilder::credentialsProvider);
this.getAmazonSdkHttpClientBuilderInstance()
.ifPresent(clientBuilder::httpClientBuilder);
if (this.enableAmazonS3PathStyleAccess()) {
clientBuilder.forcePathStyle(true);
}
return new AmazonS3Client(clientBuilder.build());
}

private <T extends SdkHttpClient.Builder<T>> Optional<SdkHttpClient.Builder<T>> getAmazonSdkHttpClientBuilderInstance() {
final SdkHttpClient.Builder<T> builder = this.getInstance(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, SdkHttpClient.Builder.class);
return Optional.ofNullable(builder);
}

private Optional<URI> getAmazonEndpointOverride() {
final String endpoint = this.getString(S3_ENDPOINT_CONFIG);
return isEmpty(endpoint) ? Optional.empty() : Optional.of(URI.create(endpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@

import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri;
import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.core.client.config.SdkClientOption.CONFIGURED_SYNC_HTTP_CLIENT_BUILDER;
import static software.amazon.awssdk.core.client.config.SdkClientOption.SYNC_HTTP_CLIENT;

import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.AttributeMap;

class LargeMessageRetrievingClientS3IntegrationTest extends AmazonS3IntegrationTest {

Expand All @@ -59,6 +69,31 @@ void shouldReadBackedText() {
}
}

@Test
void shouldUseConfiguredSdkHttpClientBuilder() {
final String bucket = "bucket";
final String basePath = "s3://" + bucket + "/base/";
final Map<String, Object> properties = ImmutableMap.<String, Object>builder()
.put(AbstractLargeMessageConfig.S3_REGION_CONFIG, "us-east-1")
.put(AbstractLargeMessageConfig.MAX_BYTE_SIZE_CONFIG, 0)
.put(AbstractLargeMessageConfig.BASE_PATH_CONFIG, basePath)
.put(AbstractLargeMessageConfig.S3_SDK_HTTP_CLIENT_BUILDER_CONFIG, MockSdkHttpClientBuilder.class.getName())
.build();
AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
LargeMessageRetrievingClient retriever = config.getRetriever();
// Get private field clientFactories
Map<String, Supplier<BlobStorageClient>> clientFactories = getPrivateField(retriever, "clientFactories", Map.class);
BlobStorageClient blobStorageClient = clientFactories.get("s3").get();
// Get private field s3Client
S3Client s3Client = getPrivateField(blobStorageClient, "s3", S3Client.class);
// Get private field clientConfiguration
SdkClientConfiguration clientConfiguration = getPrivateField(s3Client, "clientConfiguration", SdkClientConfiguration.class);
// Get private field attributes
AttributeMap attributes = getPrivateField(clientConfiguration, "attributes", AttributeMap.class);
assertThat(attributes.get(SYNC_HTTP_CLIENT)).isExactlyInstanceOf(MockSdkHttpClient.class);
assertThat(attributes.get(CONFIGURED_SYNC_HTTP_CLIENT_BUILDER)).isExactlyInstanceOf(MockSdkHttpClientBuilder.class);
}

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
Expand All @@ -72,4 +107,36 @@ private void store(final String bucket, final String key, final String s) {
.build(), RequestBody.fromString(s));
}

private static <T> T getPrivateField(Object object, String fieldName, Class<T> fieldType) {
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return fieldType.cast(field.get(object));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

public static class MockSdkHttpClientBuilder implements SdkHttpClient.Builder {
@Override
public SdkHttpClient buildWithDefaults(AttributeMap attributeMap) {
return new MockSdkHttpClient();
}
}

private static class MockSdkHttpClient implements SdkHttpClient {
@Override
public ExecutableHttpRequest prepareRequest(HttpExecuteRequest httpExecuteRequest) {
return null;
}

public String clientName() {
return "MockSdkHttpClient";
}

@Override
public void close() {

}
}
}