diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index ff4abf5d57f9a..51f586d0374a5 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -235,7 +235,7 @@ synchronized AmazonAsyncS3WithCredentials buildClient( ); final S3AsyncClient client = SocketAccess.doPrivileged(builder::build); - final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder(); + final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder().region(Region.US_EAST_1); crtBuilder.credentialsProvider(credentials); S3CrtHttpConfiguration.Builder httpConfiguration = S3CrtHttpConfiguration.builder(); if (clientSettings.proxySettings.getType() != ProxySettings.ProxyType.DIRECT) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index f53c3d1dfc948..9e1134111afa9 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -229,24 +229,6 @@ public void writeBlobWithMetadata( // } public void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener completionListener) { -// BlockingInputStreamAsyncRequestBody body = -// AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later. -// -// CompletableFuture responseFuture = -// s33CrtAsyncClient.putObject(r -> r.bucket(bucketName).key(key), body); -// -// // AsyncExampleUtils.randomString() returns a random string up to 100 characters. -// String randomString = AsyncExampleUtils.randomString(); -// logger.info("random string to upload: {}: length={}", randomString, randomString.length()); -// -// // Provide the stream of data to be uploaded. -// body.writeInputStream(new ByteArrayInputStream(randomString.getBytes())); -// -// PutObjectResponse response = responseFuture.join(); // Wait for the response. -// logger.info("Object {} uploaded to bucket {}.", key, bucketName); -// return response; - - try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { @@ -262,6 +244,7 @@ public void asyncStreamUpload(String blobName, InputStream inputStream, ActionL body); body.writeInputStream(inputStream); + completableFuture.whenComplete((response, throwable) -> { if (throwable == null) { completionListener.onResponse(null); diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 1357309d8834f..9312503a67849 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -127,9 +127,10 @@ public List getChangedIndicesRouting( ClusterState previousCl for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) { changedIndicesRouting.add(indexRouting); + logger.info("changedIndicesRouting {}", indexRouting.prettyPrint()); + } } - logger.info("changedIndicesRouting {}", changedIndicesRouting.toString()); return changedIndicesRouting; @@ -147,6 +148,7 @@ public CheckedRunnable getIndexRoutingAsyncAction( logger.info("custerMetadataBasePath {}", custerMetadataBasePath); final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID())); + logger.info("full path {}", blobContainer.path()); final String fileName = getIndexRoutingFileName(); logger.info("fileName {}", fileName); @@ -207,6 +209,8 @@ public CheckedRunnable getIndexRoutingAsyncAction( // } logger.info("TRYING S3 UPLOAD"); InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); + logger.info("Going to upload {}", indexRouting.prettyPrint()); + return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncStreamUpload(fileName, indexRoutingStream, completionListener); }