Skip to content

Commit

Permalink
Adding async flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Himshikha Gupta committed May 21, 2024
1 parent 8912b0c commit 778308e
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 57 deletions.
5 changes: 4 additions & 1 deletion plugins/repository-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ dependencies {
api "software.amazon.awssdk:aws-query-protocol:${versions.aws}"
api "software.amazon.awssdk:sts:${versions.aws}"
api "software.amazon.awssdk:netty-nio-client:${versions.aws}"
api "software.amazon.awssdk:aws-crt-client:${versions.aws}"
api "software.amazon.awssdk.crt:aws-crt:0.21.16"
api "software.amazon.awssdk:crt-core:${versions.aws}"


api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
Expand Down Expand Up @@ -548,7 +552,6 @@ thirdPartyAudit {
'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration',
'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration$Builder',
'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration$DefaultBuilder',
'software.amazon.awssdk.crtcore.CrtProxyConfiguration',
'software.amazon.awssdk.crtcore.CrtProxyConfiguration$Builder',
'software.amazon.awssdk.crtcore.CrtProxyConfiguration$DefaultBuilder',
'software.amazon.eventstream.HeaderValue',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ final class AmazonAsyncS3WithCredentials {
private final S3AsyncClient priorityClient;
private final S3AsyncClient urgentClient;
private final AwsCredentialsProvider credentials;
private final S3AsyncClient crtClient;


private AmazonAsyncS3WithCredentials(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
final S3AsyncClient urgentClient,
@Nullable final AwsCredentialsProvider credentials
@Nullable final AwsCredentialsProvider credentials,
final S3AsyncClient crtClient
) {
this.client = client;
this.credentials = credentials;
this.priorityClient = priorityClient;
this.urgentClient = urgentClient;
this.crtClient = crtClient;
}

S3AsyncClient client() {
Expand All @@ -46,6 +50,9 @@ S3AsyncClient urgentClient() {
return urgentClient;
}

S3AsyncClient crtClient() {
return crtClient;
}
AwsCredentialsProvider credentials() {
return credentials;
}
Expand All @@ -56,6 +63,16 @@ static AmazonAsyncS3WithCredentials create(
final S3AsyncClient urgentClient,
@Nullable final AwsCredentialsProvider credentials
) {
return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials);
return create(client, priorityClient, urgentClient, credentials, null);
}

static AmazonAsyncS3WithCredentials create(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
final S3AsyncClient urgentClient,
@Nullable final AwsCredentialsProvider credentials,
final S3AsyncClient crtClient
) {
return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials, crtClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
Expand Down Expand Up @@ -230,7 +235,33 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
);
final S3AsyncClient client = SocketAccess.doPrivileged(builder::build);

return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials);
final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder();
crtBuilder.credentialsProvider(credentials);
S3CrtHttpConfiguration.Builder httpConfiguration = S3CrtHttpConfiguration.builder();
if (clientSettings.proxySettings.getType() != ProxySettings.ProxyType.DIRECT) {
S3CrtProxyConfiguration proxyConfiguration = S3CrtProxyConfiguration.builder()
.scheme(clientSettings.proxySettings.getType().toProtocol().toString())
.host(clientSettings.proxySettings.getHostName())
.port(clientSettings.proxySettings.getPort())
.username(clientSettings.proxySettings.getUsername())
.password(clientSettings.proxySettings.getPassword())
.build();
httpConfiguration.proxyConfiguration(proxyConfiguration);
}
crtBuilder.httpConfiguration(httpConfiguration.build());
crtBuilder.retryConfiguration(S3CrtRetryConfiguration.builder()
.numRetries(clientSettings.maxRetries)
.build());
// crtBuilder.asyncConfiguration(
// ClientAsyncConfiguration.builder()
// .advancedOption(
// SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
// urgentExecutorBuilder.getFutureCompletionExecutor()
// )
// .build()
// );
final S3AsyncClient crtClient = SocketAccess.doPrivileged(crtBuilder::build);
return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials, crtClient);
}

static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,14 @@
package org.opensearch.repositories.s3;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.utils.CollectionUtils;

Expand Down Expand Up @@ -193,17 +173,106 @@ public void writeBlobWithMetadata(
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata
) throws IOException {
logger.info("********writeBlobWithMetadata********");

assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
logger.info("Executing single upload");
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
logger.info("Executing multiplart upload");

executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
}

// public void asyncWriteBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists,@Nullable Map<String, String> metadata, WritePriority priority, ActionListener<Void> completionListener) throws IOException {
// assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
//
// try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
//
// S3AsyncClient s3AsyncClient;
// if (priority == WritePriority.URGENT) {
// s3AsyncClient = amazonS3Reference.get().urgentClient();
// } else if (priority == WritePriority.HIGH) {
// s3AsyncClient = amazonS3Reference.get().priorityClient();
// } else {
// s3AsyncClient = amazonS3Reference.get().client();
// }
// //getAsyncTransferManager does not support putObject.
// CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
// .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
// CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager().put
// completableFuture.whenComplete((response, throwable) -> {
// if (throwable == null) {
// completionListener.onResponse(response);
// } else {
// Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
// completionListener.onFailure(ex);
// }
// });
// SocketAccess.doPrivilegedIOException(() -> {
// if (blobSize <= getLargeBlobThresholdInBytes()) {
// logger.info("Executing single upload");
// executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
// } else {
// logger.info("Executing multiplart upload");
//
// executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
// }
// return null;
// });
// }
// }

public void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener<Void> completionListener) {
// BlockingInputStreamAsyncRequestBody body =
// AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later.
//
// CompletableFuture<PutObjectResponse> responseFuture =
// s33CrtAsyncClient.putObject(r -> r.bucket(bucketName).key(key), body);
//
// // AsyncExampleUtils.randomString() returns a random string up to 100 characters.
// String randomString = AsyncExampleUtils.randomString();
// logger.info("random string to upload: {}: length={}", randomString, randomString.length());
//
// // Provide the stream of data to be uploaded.
// body.writeInputStream(new ByteArrayInputStream(randomString.getBytes()));
//
// PutObjectResponse response = responseFuture.join(); // Wait for the response.
// logger.info("Object {} uploaded to bucket {}.", key, bucketName);
// return response;



try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

S3CrtAsyncClient s3AsyncClient = (S3CrtAsyncClient) amazonS3Reference.get().crtClient();
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(blobStore.bucket())
.key(blobName)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
BlockingInputStreamAsyncRequestBody body =
AsyncRequestBody.forBlockingInputStream(null);
CompletableFuture<PutObjectResponse> completableFuture = s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
body);
body.writeInputStream(inputStream);

completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(null);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}
}

@Override
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
UploadRequest uploadRequest = new UploadRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ grant {
permission java.io.FilePermission "config", "read";

permission java.lang.RuntimePermission "accessDeclaredMembers";
permission java.lang.RuntimePermission "loadLibrary.aws-crt-jni";
permission java.lang.RuntimePermission "loadLibrary.*";
permission java.lang.RuntimePermission "shutdownHooks";

};

grant codeBase "${codebase.aws-crt-jni}" {
permission java.lang.RuntimePermission "loadLibrary.*";
};
Loading

0 comments on commit 778308e

Please sign in to comment.