Skip to content

Commit

Permalink
debugging asynccrtclient
Browse files Browse the repository at this point in the history
  • Loading branch information
Himshikha Gupta committed May 21, 2024
1 parent 778308e commit f23686c
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
);
final S3AsyncClient client = SocketAccess.doPrivileged(builder::build);

final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder();
final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder().region(Region.US_EAST_1);
crtBuilder.credentialsProvider(credentials);
S3CrtHttpConfiguration.Builder httpConfiguration = S3CrtHttpConfiguration.builder();
if (clientSettings.proxySettings.getType() != ProxySettings.ProxyType.DIRECT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,6 @@ public void writeBlobWithMetadata(
// }

public void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener<Void> completionListener) {
// BlockingInputStreamAsyncRequestBody body =
// AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later.
//
// CompletableFuture<PutObjectResponse> responseFuture =
// s33CrtAsyncClient.putObject(r -> r.bucket(bucketName).key(key), body);
//
// // AsyncExampleUtils.randomString() returns a random string up to 100 characters.
// String randomString = AsyncExampleUtils.randomString();
// logger.info("random string to upload: {}: length={}", randomString, randomString.length());
//
// // Provide the stream of data to be uploaded.
// body.writeInputStream(new ByteArrayInputStream(randomString.getBytes()));
//
// PutObjectResponse response = responseFuture.join(); // Wait for the response.
// logger.info("Object {} uploaded to bucket {}.", key, bucketName);
// return response;



try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

Expand All @@ -262,6 +244,7 @@ public void asyncStreamUpload(String blobName, InputStream inputStream, ActionL
body);
body.writeInputStream(inputStream);


completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ public List<IndexRoutingTable> getChangedIndicesRouting( ClusterState previousCl
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) {
changedIndicesRouting.add(indexRouting);
logger.info("changedIndicesRouting {}", indexRouting.prettyPrint());

}
}
logger.info("changedIndicesRouting {}", changedIndicesRouting.toString());

return changedIndicesRouting;

Expand All @@ -147,6 +148,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
logger.info("custerMetadataBasePath {}", custerMetadataBasePath);

final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID()));
logger.info("full path {}", blobContainer.path());

final String fileName = getIndexRoutingFileName();
logger.info("fileName {}", fileName);
Expand Down Expand Up @@ -207,6 +209,8 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
// }
logger.info("TRYING S3 UPLOAD");
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting);
logger.info("Going to upload {}", indexRouting.prettyPrint());

return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncStreamUpload(fileName, indexRoutingStream, completionListener);

}
Expand Down

0 comments on commit f23686c

Please sign in to comment.