diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 85eba6dcadd6a..1035a752bce0a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -289,7 +289,6 @@ public void readBlobAsync(String blobName, ActionListener listener) new ReadContext.Builder().blobSize(blobSize) .asyncPartStreams(blobPartInputStreamFutures) .blobChecksum(blobChecksum) - .metadata(null) .build() ); } catch (Exception ex) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index e24a5db08e07e..9e830c409a58b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -30,7 +30,7 @@ import org.apache.lucene.store.IndexInput; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.StreamContextSupplier; @@ -467,31 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream( - new ByteArrayIndexInput("desc", bytes), - size, - position - ); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }).fileSize(bytes.length).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -524,27 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th countDownLatch.countDown(); }); List openInputStreams = new ArrayList<>(); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }).fileSize(blobSize).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -643,30 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t List openInputStreams = new ArrayList<>(); final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); - s3BlobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }) - .fileSize(blobSize) - .failIfAlreadyExists(false) - .writePriority(WritePriority.HIGH) - .uploadFinalizer(Assert::assertTrue) - .doRemoteDataIntegrityCheck(false) - .expectedChecksum(null) - .metadata(null) - .build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.HIGH) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + s3BlobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { assertNotNull(exceptionRef.get()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 7f1d04b96641a..8e25ba4d950ef 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -37,7 +37,6 @@ import org.apache.http.HttpStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.Nullable; import org.opensearch.common.StreamContext; import org.opensearch.common.SuppressForbidden; @@ -332,36 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception { exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream( - new ByteArrayIndexInput("desc", bytes), - size, - position - ); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }) - .fileSize(bytes.length) - .failIfAlreadyExists(false) - .writePriority(WritePriority.NORMAL) - .uploadFinalizer(Assert::assertTrue) - .doRemoteDataIntegrityCheck(false) - .expectedChecksum(null) - .metadata(null) - .build(), - completionListener - ); - assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); + assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); assertThat(countDown.isCountedDown(), is(true)); openInputStreams.forEach(inputStream -> { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 80aeef59ded6a..bddec2db0f128 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,11 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength) - .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) - .build(); + ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java index 41e7e1a44d6fc..97f3e4a16a76c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -15,7 +15,7 @@ * Represents the response from a blob download operation, containing both the * input stream of the blob content and the associated metadata. * - * @opensearch.internal + * @opensearch.experimental */ public class BlobDownloadResponse { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 5ab0b9d5fa870..a1d5041ff9aff 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobDownloadResponse; @@ -166,6 +167,7 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I } @Override + @ExperimentalApi public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 4c130bddd8b4a..fa05c7b8c569a 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -59,8 +59,6 @@ public void testReadBlobAsync() throws Exception { final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); final ReadContext readContext = new ReadContext.Builder().blobSize(size) .asyncPartStreams(List.of(() -> streamContainerFuture)) - .blobChecksum(null) - .metadata(null) .build(); Mockito.doAnswer(invocation -> { @@ -109,8 +107,6 @@ public void testReadBlobAsyncException() throws Exception { final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); final ReadContext readContext = new ReadContext.Builder().blobSize(size) .asyncPartStreams(List.of(() -> streamContainerFuture)) - .blobChecksum(null) - .metadata(null) .build(); Mockito.doAnswer(invocation -> { diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 3383541914c89..60a816fcd7278 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -80,8 +80,6 @@ public void testReadContextListener() throws InterruptedException, IOException { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -131,8 +129,6 @@ public int available() { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -188,8 +184,6 @@ public int read(byte[] b) throws IOException { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -217,8 +211,6 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext);