From 08c9432bdb841ec1bdf4904dded703896141d942 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 2 Apr 2024 19:02:00 +0530 Subject: [PATCH 01/20] Introduce interface changes to read/write blob with object metadata Signed-off-by: Sandeep Kumawat --- .../common/blobstore/BlobContainer.java | 69 +++++++++++++++++++ .../blobstore/BlobDownloadResponse.java | 44 ++++++++++++ .../transfer/BlobStoreTransferService.java | 6 ++ .../translog/transfer/TransferService.java | 10 +++ 4 files changed, 129 insertions(+) create mode 100644 server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 2e25a532b5abf..3776007e9e84c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -77,6 +77,19 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link BlobDownloadResponse} for the given blob name. + * + * @param blobName + * The name of the blob to get an {@link InputStream} for. + * @return The {@code InputStream} to read the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + return null; + }; + /** * Creates a new {@link InputStream} that can be used to read the given blob starting from * a specific {@code position} in the blob. The {@code length} is an indication of the @@ -128,6 +141,33 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -149,6 +189,35 @@ default long readBlobPreferredLength() { */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata + * using an atomic write operation if the implementation supports it. + *

+ * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobAtomicWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java new file mode 100644 index 0000000000000..c03570c71164c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.io.InputStream; +import java.util.Map; + +/** + * A class for blob download response + * + * @opensearch.internal + */ +public class BlobDownloadResponse { + + /** + * Downloaded blob InputStream + */ + private InputStream inputStream; + + /** + * Metadata of the downloaded blob + */ + private Map metadata; + + public InputStream getInputStream() { + return inputStream; + } + + public Map getMetadata() { + return metadata; + } + + public BlobDownloadResponse(InputStream inputStream, Map metadata) { + this.inputStream = inputStream; + this.metadata = metadata; + } + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 82dd6301ef79f..5ab0b9d5fa870 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -14,6 +14,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); + } + @Override public void deleteBlobs(Iterable path, List fileNames) throws IOException { blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..a11532dc69f1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -125,6 +126,15 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + /** + * + * @param path the remote path from where download should be made + * @param fileName the name of the file + * @return {@link BlobDownloadResponse} of the remote file + * @throws IOException the exception while reading the data + */ + BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); void listAllInSortedOrderAsync( From 950e55bb7239436f3496be68891fc6668dac133b Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 01:33:39 +0530 Subject: [PATCH 02/20] empty commit Signed-off-by: Sandeep Kumawat From 85711ae52ee793e5513524268eb46b7dc30a93df Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 15:35:59 +0530 Subject: [PATCH 03/20] provide metadata in remoteTransferContainer for async translog upload flow Signed-off-by: Sandeep Kumawat --- .../s3/S3BlobContainerMockClientTests.java | 6 +-- .../s3/S3BlobContainerRetriesTests.java | 2 +- .../blobstore/stream/write/WriteContext.java | 13 +++++- .../transfer/RemoteTransferContainer.java | 40 ++++++++++++++++++- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..607f186f2db1e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..70f41e647648e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener); + }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index e74462f82400d..3373f32845c27 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -13,6 +13,7 @@ import org.opensearch.common.StreamContext; import java.io.IOException; +import java.util.Map; /** * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams @@ -29,6 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private Map metadata; /** * Construct a new WriteContext object @@ -49,7 +51,8 @@ public WriteContext( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - @Nullable Long expectedChecksum + @Nullable Long expectedChecksum, + Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; @@ -59,6 +62,7 @@ public WriteContext( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.metadata = metadata; } /** @@ -131,4 +135,11 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + /** + * @return the upload metadata. + */ + public Map getMetadata() { + return metadata; + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 2047c99d9e13b..8a0f7cdb4a474 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); + private Map metadata = null; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +92,41 @@ public RemoteTransferContainer( this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; } + /** + * Construct a new RemoteTransferContainer object with metadata. + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + * @param metadata Object metadata to be store with the file. + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported, + Map metadata + ) { + this.fileName = fileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + this.expectedChecksum = expectedChecksum; + this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = metadata; + } + /** * @return The {@link WriteContext} for the current upload */ @@ -102,7 +139,8 @@ public WriteContext createWriteContext() { writePriority, this::finalizeUpload, isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null + isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null, + metadata ); } From 3a41ce3a54cb988fd21ecb61523592e2b8b662ba Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 01:21:20 +0530 Subject: [PATCH 04/20] introduce metadata support in ReadContext Signed-off-by: Sandeep Kumawat --- .../opensearch/repositories/s3/S3BlobContainer.java | 2 +- .../multipart/mocks/MockFsAsyncBlobContainer.java | 2 +- .../common/blobstore/stream/read/ReadContext.java | 10 +++++++++- .../AsyncMultiStreamEncryptedBlobContainerTests.java | 4 ++-- .../stream/read/listener/ReadContextListenerTests.java | 8 ++++---- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 25f361b40636e..56589a981003a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); + listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null)); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..fd089391038f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); + ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 1264551401b4c..02be0df346a5f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.InputStreamContainer; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -25,23 +26,30 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; + private Map metadata; - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { + public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; this.asyncPartStreams = asyncPartStreams; this.blobChecksum = blobChecksum; + this.metadata = metadata; } public ReadContext(ReadContext readContext) { this.blobSize = readContext.blobSize; this.asyncPartStreams = readContext.asyncPartStreams; this.blobChecksum = readContext.blobChecksum; + this.metadata = readContext.metadata; } public String getBlobChecksum() { return blobChecksum; } + public Map getMetadata() { + return metadata; + } + public int getNumberOfParts() { return asyncPartStreams.size(); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 1780819390052..6b2fd05ed4bfe 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,7 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -103,7 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 0163c2275e7f4..b9cf08fa4acb1 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,7 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -125,7 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -178,7 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -203,7 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); From d95e5c9ced4d1c44dede07cd707e14f2374b5ac7 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 13:46:26 +0530 Subject: [PATCH 05/20] empty commit Signed-off-by: Sandeep Kumawat From e63590c1e6f518b6952ec352ba9ffc8284bc9319 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 14:31:47 +0530 Subject: [PATCH 06/20] minor changes Signed-off-by: Sandeep Kumawat --- .../opensearch/common/blobstore/stream/read/ReadContext.java | 2 +- .../opensearch/common/blobstore/stream/write/WriteContext.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 02be0df346a5f..6877348f2fc15 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -26,7 +26,7 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; - private Map metadata; + private final Map metadata; public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index 3373f32845c27..ed77350381584 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -30,7 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private Map metadata; + private final Map metadata; /** * Construct a new WriteContext object @@ -77,6 +77,7 @@ protected WriteContext(WriteContext writeContext) { this.uploadFinalizer = writeContext.uploadFinalizer; this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck; this.expectedChecksum = writeContext.expectedChecksum; + this.metadata = writeContext.getMetadata(); } /** From 9e1c0ea51e06d91a0be96d9837896f587637ccb7 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Tue, 2 Apr 2024 19:02:00 +0530 Subject: [PATCH 07/20] Introduce interface changes to read/write blob with object metadata Signed-off-by: Sandeep Kumawat --- .../common/blobstore/BlobContainer.java | 69 +++++++++++++++++++ .../blobstore/BlobDownloadResponse.java | 44 ++++++++++++ .../transfer/BlobStoreTransferService.java | 6 ++ .../translog/transfer/TransferService.java | 10 +++ 4 files changed, 129 insertions(+) create mode 100644 server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 2e25a532b5abf..3776007e9e84c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -77,6 +77,19 @@ public interface BlobContainer { */ InputStream readBlob(String blobName) throws IOException; + /** + * Creates a new {@link BlobDownloadResponse} for the given blob name. + * + * @param blobName + * The name of the blob to get an {@link InputStream} for. + * @return The {@code InputStream} to read the blob. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. + */ + default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + return null; + }; + /** * Creates a new {@link InputStream} that can be used to read the given blob starting from * a specific {@code position} in the blob. The {@code length} is an indication of the @@ -128,6 +141,33 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -149,6 +189,35 @@ default long readBlobPreferredLength() { */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata + * using an atomic write operation if the implementation supports it. + *

+ * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * same name already exists, the operation will fail and an {@link IOException} will be thrown. + * + * @param blobName + * The name of the blob to write the contents of the input stream to. + * @param inputStream + * The input stream from which to retrieve the bytes to write to the blob. + * @param metadata + * The metadata to be associate with the blob upload. + * @param blobSize + * The size of the blob to be written, in bytes. It is implementation dependent whether + * this value is used in writing the blob to the repository. + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + default void writeBlobAtomicWithMetadata( + String blobName, + InputStream inputStream, + Map metadata, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException {}; + /** * Deletes this container and all its contents from the repository. * diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java new file mode 100644 index 0000000000000..c03570c71164c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import java.io.InputStream; +import java.util.Map; + +/** + * A class for blob download response + * + * @opensearch.internal + */ +public class BlobDownloadResponse { + + /** + * Downloaded blob InputStream + */ + private InputStream inputStream; + + /** + * Metadata of the downloaded blob + */ + private Map metadata; + + public InputStream getInputStream() { + return inputStream; + } + + public Map getMetadata() { + return metadata; + } + + public BlobDownloadResponse(InputStream inputStream, Map metadata) { + this.inputStream = inputStream; + this.metadata = metadata; + } + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 82dd6301ef79f..5ab0b9d5fa870 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -14,6 +14,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -164,6 +165,11 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I return blobStore.blobContainer((BlobPath) path).readBlob(fileName); } + @Override + public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); + } + @Override public void deleteBlobs(Iterable path, List fileNames) throws IOException { blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index cfe833dde87eb..a11532dc69f1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -125,6 +126,15 @@ void uploadBlobs( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + /** + * + * @param path the remote path from where download should be made + * @param fileName the name of the file + * @return {@link BlobDownloadResponse} of the remote file + * @throws IOException the exception while reading the data + */ + BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); void listAllInSortedOrderAsync( From fffe12a29aaef3d04c180f46ee5606bf1a5051b7 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 01:33:39 +0530 Subject: [PATCH 08/20] empty commit Signed-off-by: Sandeep Kumawat From 5ae5e86ea0c3f90b91ff2fdbd10050935ef1b59b Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Wed, 3 Apr 2024 15:35:59 +0530 Subject: [PATCH 09/20] provide metadata in remoteTransferContainer for async translog upload flow Signed-off-by: Sandeep Kumawat --- .../s3/S3BlobContainerMockClientTests.java | 6 +-- .../s3/S3BlobContainerRetriesTests.java | 2 +- .../blobstore/stream/write/WriteContext.java | 13 +++++- .../transfer/RemoteTransferContainer.java | 40 ++++++++++++++++++- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..607f186f2db1e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -483,7 +483,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -533,7 +533,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); } - }, false, null), completionListener); + }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -644,7 +644,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..70f41e647648e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -344,7 +344,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener); + }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index e74462f82400d..3373f32845c27 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -13,6 +13,7 @@ import org.opensearch.common.StreamContext; import java.io.IOException; +import java.util.Map; /** * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams @@ -29,6 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private Map metadata; /** * Construct a new WriteContext object @@ -49,7 +51,8 @@ public WriteContext( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - @Nullable Long expectedChecksum + @Nullable Long expectedChecksum, + Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; @@ -59,6 +62,7 @@ public WriteContext( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.metadata = metadata; } /** @@ -131,4 +135,11 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + /** + * @return the upload metadata. + */ + public Map getMetadata() { + return metadata; + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 2047c99d9e13b..8a0f7cdb4a474 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -27,6 +27,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -55,6 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); + private Map metadata = null; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +92,41 @@ public RemoteTransferContainer( this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; } + /** + * Construct a new RemoteTransferContainer object with metadata. + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + * @param metadata Object metadata to be store with the file. + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported, + Map metadata + ) { + this.fileName = fileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + this.expectedChecksum = expectedChecksum; + this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = metadata; + } + /** * @return The {@link WriteContext} for the current upload */ @@ -102,7 +139,8 @@ public WriteContext createWriteContext() { writePriority, this::finalizeUpload, isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null + isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null, + metadata ); } From 425cbac24e080f0b91520bcbfd833520e7f72ac7 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 01:21:20 +0530 Subject: [PATCH 10/20] introduce metadata support in ReadContext Signed-off-by: Sandeep Kumawat --- .../opensearch/repositories/s3/S3BlobContainer.java | 2 +- .../multipart/mocks/MockFsAsyncBlobContainer.java | 2 +- .../common/blobstore/stream/read/ReadContext.java | 10 +++++++++- .../AsyncMultiStreamEncryptedBlobContainerTests.java | 4 ++-- .../stream/read/listener/ReadContextListenerTests.java | 8 ++++---- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 25f361b40636e..56589a981003a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum)); + listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null)); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 36987ac2d4991..fd089391038f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null); + ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 1264551401b4c..02be0df346a5f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.InputStreamContainer; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -25,23 +26,30 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; + private Map metadata; - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum) { + public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; this.asyncPartStreams = asyncPartStreams; this.blobChecksum = blobChecksum; + this.metadata = metadata; } public ReadContext(ReadContext readContext) { this.blobSize = readContext.blobSize; this.asyncPartStreams = readContext.asyncPartStreams; this.blobChecksum = readContext.blobChecksum; + this.metadata = readContext.metadata; } public String getBlobChecksum() { return blobChecksum; } + public Map getMetadata() { + return metadata; + } + public int getNumberOfParts() { return asyncPartStreams.size(); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 1780819390052..6b2fd05ed4bfe 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,7 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -103,7 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null); + final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 0163c2275e7f4..b9cf08fa4acb1 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,7 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -125,7 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -178,7 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -203,7 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null); + ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); readContextListener.onResponse(readContext); countDownLatch.await(); From 18eadefadaeb6d1aa679a6a5a264f5ea3505323b Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 13:46:26 +0530 Subject: [PATCH 11/20] empty commit Signed-off-by: Sandeep Kumawat From a06b7a09dec8d6822cf9cd168dda3610ed8fe011 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 14:31:47 +0530 Subject: [PATCH 12/20] minor changes Signed-off-by: Sandeep Kumawat --- .../opensearch/common/blobstore/stream/read/ReadContext.java | 2 +- .../opensearch/common/blobstore/stream/write/WriteContext.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 02be0df346a5f..6877348f2fc15 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -26,7 +26,7 @@ public class ReadContext { private final long blobSize; private final List asyncPartStreams; private final String blobChecksum; - private Map metadata; + private final Map metadata; public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index 3373f32845c27..ed77350381584 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -30,7 +30,7 @@ public class WriteContext { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private Map metadata; + private final Map metadata; /** * Construct a new WriteContext object @@ -77,6 +77,7 @@ protected WriteContext(WriteContext writeContext) { this.uploadFinalizer = writeContext.uploadFinalizer; this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck; this.expectedChecksum = writeContext.expectedChecksum; + this.metadata = writeContext.getMetadata(); } /** From 477fada8746186af7a7c5765525ad4b8d171bf61 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Thu, 4 Apr 2024 23:14:28 +0530 Subject: [PATCH 13/20] minor fixes Signed-off-by: Sandeep Kumawat --- .../java/org/opensearch/common/blobstore/BlobContainer.java | 6 +++--- .../common/blobstore/stream/write/WriteContext.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 3776007e9e84c..c60a6beddc77a 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -143,7 +143,7 @@ default long readBlobPreferredLength() { /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata. - * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the * same name already exists, the operation will fail and an {@link IOException} will be thrown. * * @param blobName @@ -190,10 +190,10 @@ default void writeBlobWithMetadata( void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; /** - * Reads blob content from the input stream and writes it to the container in a new blob with the given name,and metadata + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata * using an atomic write operation if the implementation supports it. *

- * This method assumes the container does not already contain a blob of the same blobName. If a blob by the + * This method assumes the container does not already contain a blob of the same blobName. If a blob by the * same name already exists, the operation will fail and an {@link IOException} will be thrown. * * @param blobName diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index ed77350381584..c9812662d8701 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -77,7 +77,7 @@ protected WriteContext(WriteContext writeContext) { this.uploadFinalizer = writeContext.uploadFinalizer; this.doRemoteDataIntegrityCheck = writeContext.doRemoteDataIntegrityCheck; this.expectedChecksum = writeContext.expectedChecksum; - this.metadata = writeContext.getMetadata(); + this.metadata = writeContext.metadata; } /** From 929482eccd3c05fdac2a895d050c1e24afde06d5 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 01:15:39 +0530 Subject: [PATCH 14/20] resolve comments Signed-off-by: Sandeep Kumawat --- .../opensearch/common/blobstore/BlobContainer.java | 14 +++++++++++--- .../common/blobstore/BlobDownloadResponse.java | 7 ++++--- .../index/translog/transfer/TransferService.java | 2 ++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index c60a6beddc77a..244fb222b06ed 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; import java.io.IOException; @@ -86,8 +87,9 @@ public interface BlobContainer { * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ + @ExperimentalApi default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { - return null; + throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet"); }; /** @@ -160,13 +162,16 @@ default long readBlobPreferredLength() { * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ + @ExperimentalApi default void writeBlobWithMetadata( String blobName, InputStream inputStream, Map metadata, long blobSize, boolean failIfAlreadyExists - ) throws IOException {}; + ) throws IOException { + throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet"); + }; /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, @@ -210,13 +215,16 @@ default void writeBlobWithMetadata( * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ + @ExperimentalApi default void writeBlobAtomicWithMetadata( String blobName, InputStream inputStream, Map metadata, long blobSize, boolean failIfAlreadyExists - ) throws IOException {}; + ) throws IOException { + throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet"); + }; /** * Deletes this container and all its contents from the repository. diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java index c03570c71164c..41e7e1a44d6fc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -12,7 +12,8 @@ import java.util.Map; /** - * A class for blob download response + * Represents the response from a blob download operation, containing both the + * input stream of the blob content and the associated metadata. * * @opensearch.internal */ @@ -21,12 +22,12 @@ public class BlobDownloadResponse { /** * Downloaded blob InputStream */ - private InputStream inputStream; + private final InputStream inputStream; /** * Metadata of the downloaded blob */ - private Map metadata; + private final Map metadata; public InputStream getInputStream() { return inputStream; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index a11532dc69f1a..5fe8b02058383 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog.transfer; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -133,6 +134,7 @@ void uploadBlobs( * @return {@link BlobDownloadResponse} of the remote file * @throws IOException the exception while reading the data */ + @ExperimentalApi BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener); From 643d70a58fd3a47f3d149fb51eeda4c8e05a6216 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 02:21:02 +0530 Subject: [PATCH 15/20] use builder() in WriteContext Signed-off-by: Sandeep Kumawat --- .../s3/S3BlobContainerMockClientTests.java | 121 ++++++++++-------- .../s3/S3BlobContainerRetriesTests.java | 42 ++++-- .../blobstore/stream/write/WriteContext.java | 76 +++++++++++ .../transfer/RemoteTransferContainer.java | 21 ++- 4 files changed, 185 insertions(+), 75 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 607f186f2db1e..e24a5db08e07e 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -49,6 +49,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import java.io.IOException; @@ -466,24 +467,31 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }, false, null, null), completionListener); + blobContainer.asyncBlobUpload( + new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream( + new ByteArrayIndexInput("desc", bytes), + size, + position + ); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + } + }).fileSize(bytes.length).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), + completionListener + ); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -516,24 +524,27 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th countDownLatch.countDown(); }); List openInputStreams = new ArrayList<>(); - blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }, blobSize, false, WritePriority.NORMAL, uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }, false, null, null), completionListener); + blobContainer.asyncBlobUpload( + new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); + } + }).fileSize(blobSize).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), + completionListener + ); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -632,20 +643,30 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t List openInputStreams = new ArrayList<>(); final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); - s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener); - + s3BlobContainer.asyncBlobUpload( + new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); + } + }) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.HIGH) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .expectedChecksum(null) + .metadata(null) + .build(), + completionListener + ); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { assertNotNull(exceptionRef.get()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 70f41e647648e..7f1d04b96641a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -332,20 +332,34 @@ public void testWriteBlobByStreamsWithRetries() throws Exception { exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener); - + blobContainer.asyncBlobUpload( + new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream( + new ByteArrayIndexInput("desc", bytes), + size, + position + ); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + } + }) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .expectedChecksum(null) + .metadata(null) + .build(), + completionListener + ); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); assertThat(countDown.isCountedDown(), is(true)); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index c9812662d8701..72598b828a9be 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -143,4 +143,80 @@ public Long getExpectedChecksum() { public Map getMetadata() { return metadata; } + + /** + * Builder for {@link WriteContext}. + * + * @opensearch.internal + */ + public static class Builder { + private String fileName; + private StreamContextSupplier streamContextSupplier; + private long fileSize; + private boolean failIfAlreadyExists; + private WritePriority writePriority; + private CheckedConsumer uploadFinalizer; + private boolean doRemoteDataIntegrityCheck; + private Long expectedChecksum; + private Map metadata; + + public Builder fileName(String fileName) { + this.fileName = fileName; + return this; + } + + public Builder streamContextSupplier(StreamContextSupplier streamContextSupplier) { + this.streamContextSupplier = streamContextSupplier; + return this; + } + + public Builder fileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public Builder writePriority(WritePriority writePriority) { + this.writePriority = writePriority; + return this; + } + + public Builder failIfAlreadyExists(boolean failIfAlreadyExists) { + this.failIfAlreadyExists = failIfAlreadyExists; + return this; + } + + public Builder uploadFinalizer(CheckedConsumer uploadFinalizer) { + this.uploadFinalizer = uploadFinalizer; + return this; + } + + public Builder doRemoteDataIntegrityCheck(boolean doRemoteDataIntegrityCheck) { + this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; + return this; + } + + public Builder expectedChecksum(Long expectedChecksum) { + this.expectedChecksum = expectedChecksum; + return this; + } + + public Builder metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public WriteContext build() { + return new WriteContext( + fileName, + streamContextSupplier, + fileSize, + failIfAlreadyExists, + writePriority, + uploadFinalizer, + doRemoteDataIntegrityCheck, + expectedChecksum, + metadata + ); + } + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 8a0f7cdb4a474..98a5d3ff2fa3a 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -131,17 +131,16 @@ public RemoteTransferContainer( * @return The {@link WriteContext} for the current upload */ public WriteContext createWriteContext() { - return new WriteContext( - remoteFileName, - this::supplyStreamContext, - contentLength, - failTransferIfFileExists, - writePriority, - this::finalizeUpload, - isRemoteDataIntegrityCheckPossible(), - isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null, - metadata - ); + return new WriteContext.Builder().fileName(remoteFileName) + .streamContextSupplier(this::supplyStreamContext) + .fileSize(contentLength) + .failIfAlreadyExists(failTransferIfFileExists) + .writePriority(writePriority) + .uploadFinalizer(this::finalizeUpload) + .doRemoteDataIntegrityCheck(isRemoteDataIntegrityCheckPossible()) + .expectedChecksum(isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null) + .metadata(metadata) + .build(); } // package-private for testing From 002b757fbc2f0fb07478393de978a402d2fef8e5 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 09:06:55 +0530 Subject: [PATCH 16/20] add Builder support for readContext Signed-off-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 8 +++- .../mocks/MockFsAsyncBlobContainer.java | 6 ++- .../blobstore/stream/read/ReadContext.java | 37 +++++++++++++++++++ ...ultiStreamEncryptedBlobContainerTests.java | 12 +++++- .../listener/ReadContextListenerTests.java | 24 ++++++++++-- 5 files changed, 79 insertions(+), 8 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 56589a981003a..85eba6dcadd6a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,7 +285,13 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null)); + listener.onResponse( + new ReadContext.Builder().blobSize(blobSize) + .asyncPartStreams(blobPartInputStreamFutures) + .blobChecksum(blobChecksum) + .metadata(null) + .build() + ); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index fd089391038f7..80aeef59ded6a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,11 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null); + ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength) + .asyncPartStreams(blobPartStreams) + .blobChecksum(null) + .metadata(null) + .build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 6877348f2fc15..86ee9cd9c3745 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -80,4 +80,41 @@ public interface StreamPartCreator extends Supplier get(); } + + /** + * Builder for {@link ReadContext}. + * + * @opensearch.experimental + */ + public static class Builder { + private long blobSize; + private List asyncPartStreams; + private String blobChecksum; + private Map metadata; + + public Builder blobSize(long blobSize) { + this.blobSize = blobSize; + return this; + } + + public Builder asyncPartStreams(List asyncPartStreams) { + this.asyncPartStreams = asyncPartStreams; + return this; + } + + public Builder blobChecksum(String blobChecksum) { + this.blobChecksum = blobChecksum; + return this; + } + + public Builder metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + public ReadContext build() { + return new ReadContext(blobSize, asyncPartStreams, blobChecksum, metadata); + } + + } } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 6b2fd05ed4bfe..4c130bddd8b4a 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,7 +57,11 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); + final ReadContext readContext = new ReadContext.Builder().blobSize(size) + .asyncPartStreams(List.of(() -> streamContainerFuture)) + .blobChecksum(null) + .metadata(null) + .build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -103,7 +107,11 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null); + final ReadContext readContext = new ReadContext.Builder().blobSize(size) + .asyncPartStreams(List.of(() -> streamContainerFuture)) + .blobChecksum(null) + .metadata(null) + .build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index b9cf08fa4acb1..3383541914c89 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,7 +78,11 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null, null); + ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS) + .asyncPartStreams(blobPartStreams) + .blobChecksum(null) + .metadata(null) + .build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -125,7 +129,11 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); + ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) + .asyncPartStreams(blobPartStreams) + .blobChecksum(null) + .metadata(null) + .build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -178,7 +186,11 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null, null); + ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1) + .asyncPartStreams(blobPartStreams) + .blobChecksum(null) + .metadata(null) + .build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -203,7 +215,11 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null); + ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) + .asyncPartStreams(blobPartStreams) + .blobChecksum(null) + .metadata(null) + .build(); readContextListener.onResponse(readContext); countDownLatch.await(); From 8e15d6b31da7463ed4c2ef84bcdf3a689d643246 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 12:01:50 +0530 Subject: [PATCH 17/20] minor code refector Signed-off-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 1 - .../s3/S3BlobContainerMockClientTests.java | 137 +++++++++--------- .../s3/S3BlobContainerRetriesTests.java | 47 +++--- .../mocks/MockFsAsyncBlobContainer.java | 6 +- .../blobstore/BlobDownloadResponse.java | 2 +- .../transfer/BlobStoreTransferService.java | 2 + ...ultiStreamEncryptedBlobContainerTests.java | 4 - .../listener/ReadContextListenerTests.java | 8 - 8 files changed, 87 insertions(+), 120 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 85eba6dcadd6a..1035a752bce0a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -289,7 +289,6 @@ public void readBlobAsync(String blobName, ActionListener listener) new ReadContext.Builder().blobSize(blobSize) .asyncPartStreams(blobPartInputStreamFutures) .blobChecksum(blobChecksum) - .metadata(null) .build() ); } catch (Exception ex) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index e24a5db08e07e..9e830c409a58b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -30,7 +30,7 @@ import org.apache.lucene.store.IndexInput; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.StreamContextSupplier; @@ -467,31 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream( - new ByteArrayIndexInput("desc", bytes), - size, - position - ); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }).fileSize(bytes.length).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); // wait for completableFuture to finish @@ -524,27 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th countDownLatch.countDown(); }); List openInputStreams = new ArrayList<>(); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }).fileSize(blobSize).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> { - assertTrue(uploadSuccess); - if (throwExceptionOnFinalizeUpload) { - throw new RuntimeException(); - } - }).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + CheckedConsumer uploadFinalizer = uploadSuccess -> { + assertTrue(uploadSuccess); + if (throwExceptionOnFinalizeUpload) { + throw new RuntimeException(); + } + }; + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(uploadFinalizer) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException || throwExceptionOnFinalizeUpload) { @@ -643,30 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t List openInputStreams = new ArrayList<>(); final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); - s3BlobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }) - .fileSize(blobSize) - .failIfAlreadyExists(false) - .writePriority(WritePriority.HIGH) - .uploadFinalizer(Assert::assertTrue) - .doRemoteDataIntegrityCheck(false) - .expectedChecksum(null) - .metadata(null) - .build(), - completionListener - ); + + StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob") + .streamContextSupplier(streamContextSupplier) + .fileSize(blobSize) + .failIfAlreadyExists(false) + .writePriority(WritePriority.HIGH) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + s3BlobContainer.asyncBlobUpload(writeContext, completionListener); assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); if (expectException) { assertNotNull(exceptionRef.get()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 7f1d04b96641a..8e25ba4d950ef 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -37,7 +37,6 @@ import org.apache.http.HttpStatus; import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.Nullable; import org.opensearch.common.StreamContext; import org.opensearch.common.SuppressForbidden; @@ -332,36 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception { exceptionRef.set(ex); countDownLatch.countDown(); }); - blobContainer.asyncBlobUpload( - new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream( - new ByteArrayIndexInput("desc", bytes), - size, - position - ); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); - } - }) - .fileSize(bytes.length) - .failIfAlreadyExists(false) - .writePriority(WritePriority.NORMAL) - .uploadFinalizer(Assert::assertTrue) - .doRemoteDataIntegrityCheck(false) - .expectedChecksum(null) - .metadata(null) - .build(), - completionListener - ); - assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); + StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> { + InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + }, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize)); + + WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries") + .streamContextSupplier(streamContextSupplier) + .fileSize(bytes.length) + .failIfAlreadyExists(false) + .writePriority(WritePriority.NORMAL) + .uploadFinalizer(Assert::assertTrue) + .doRemoteDataIntegrityCheck(false) + .build(); + + blobContainer.asyncBlobUpload(writeContext, completionListener); + assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); assertThat(countDown.isCountedDown(), is(true)); openInputStreams.forEach(inputStream -> { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 80aeef59ded6a..bddec2db0f128 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,11 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength) - .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) - .build(); + ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java index 41e7e1a44d6fc..97f3e4a16a76c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java @@ -15,7 +15,7 @@ * Represents the response from a blob download operation, containing both the * input stream of the blob content and the associated metadata. * - * @opensearch.internal + * @opensearch.experimental */ public class BlobDownloadResponse { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 5ab0b9d5fa870..a1d5041ff9aff 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobDownloadResponse; @@ -166,6 +167,7 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I } @Override + @ExperimentalApi public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index 4c130bddd8b4a..fa05c7b8c569a 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -59,8 +59,6 @@ public void testReadBlobAsync() throws Exception { final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); final ReadContext readContext = new ReadContext.Builder().blobSize(size) .asyncPartStreams(List.of(() -> streamContainerFuture)) - .blobChecksum(null) - .metadata(null) .build(); Mockito.doAnswer(invocation -> { @@ -109,8 +107,6 @@ public void testReadBlobAsyncException() throws Exception { final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); final ReadContext readContext = new ReadContext.Builder().blobSize(size) .asyncPartStreams(List.of(() -> streamContainerFuture)) - .blobChecksum(null) - .metadata(null) .build(); Mockito.doAnswer(invocation -> { diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 3383541914c89..60a816fcd7278 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -80,8 +80,6 @@ public void testReadContextListener() throws InterruptedException, IOException { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -131,8 +129,6 @@ public int available() { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -188,8 +184,6 @@ public int read(byte[] b) throws IOException { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); @@ -217,8 +211,6 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { ); ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) .asyncPartStreams(blobPartStreams) - .blobChecksum(null) - .metadata(null) .build(); readContextListener.onResponse(readContext); From c17b19d1d0cced49893a44e0c086c60e50f60804 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 20:34:18 +0530 Subject: [PATCH 18/20] resolve comments Signed-off-by: Sandeep Kumawat --- .../opensearch/common/blobstore/stream/read/ReadContext.java | 2 +- .../opensearch/common/blobstore/stream/write/WriteContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 86ee9cd9c3745..8708f959c42f2 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -28,7 +28,7 @@ public class ReadContext { private final String blobChecksum; private final Map metadata; - public ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { + private ReadContext(long blobSize, List asyncPartStreams, String blobChecksum, Map metadata) { this.blobSize = blobSize; this.asyncPartStreams = asyncPartStreams; this.blobChecksum = blobChecksum; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index 72598b828a9be..d14800e82e495 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -43,7 +43,7 @@ public class WriteContext { * @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done * @param expectedChecksum This parameter expected only when the vendor plugin is expected to do server side data integrity verification */ - public WriteContext( + private WriteContext( String fileName, StreamContextSupplier streamContextSupplier, long fileSize, From e39ef1689d86b7f75f0f3346f3585efb44411bd4 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 22:34:53 +0530 Subject: [PATCH 19/20] addres comments Signed-off-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 7 +------ .../mocks/MockFsAsyncBlobContainer.java | 2 +- .../common/blobstore/BlobContainer.java | 2 +- .../blobstore/stream/read/ReadContext.java | 13 ++++--------- .../transfer/RemoteTransferContainer.java | 3 ++- ...ncMultiStreamEncryptedBlobContainerTests.java | 8 ++------ .../read/listener/ReadContextListenerTests.java | 16 ++++------------ 7 files changed, 15 insertions(+), 36 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1035a752bce0a..1f23a09a047f2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,12 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse( - new ReadContext.Builder().blobSize(blobSize) - .asyncPartStreams(blobPartInputStreamFutures) - .blobChecksum(blobChecksum) - .build() - ); + listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build()); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index bddec2db0f128..d45b4e3deb798 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build(); + ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 244fb222b06ed..e16e75de7f27d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -83,7 +83,7 @@ public interface BlobContainer { * * @param blobName * The name of the blob to get an {@link InputStream} for. - * @return The {@code InputStream} to read the blob. + * @return The {@link BlobDownloadResponse} of the blob. * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 8708f959c42f2..36ee46c0fc2c8 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -72,7 +72,7 @@ public List getPartStreams() { @ExperimentalApi public interface StreamPartCreator extends Supplier> { /** - * Kicks off a async process to start streaming. + * Kicks off an async process to start streaming. * * @return When the returned future is completed, streaming has * just begun. Clients must fully consume the resulting stream. @@ -87,19 +87,14 @@ public interface StreamPartCreator extends Supplier asyncPartStreams; + private final long blobSize; + private final List asyncPartStreams; private String blobChecksum; private Map metadata; - public Builder blobSize(long blobSize) { + public Builder(long blobSize, List asyncPartStreams) { this.blobSize = blobSize; - return this; - } - - public Builder asyncPartStreams(List asyncPartStreams) { this.asyncPartStreams = asyncPartStreams; - return this; } public Builder blobChecksum(String blobChecksum) { diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 98a5d3ff2fa3a..61ac72f5ae8f6 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -56,7 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); - private Map metadata = null; + private final Map metadata; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +90,7 @@ public RemoteTransferContainer( this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; this.expectedChecksum = expectedChecksum; this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = null; } /** diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index fa05c7b8c569a..aee4ae40d16a5 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,9 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext.Builder().blobSize(size) - .asyncPartStreams(List.of(() -> streamContainerFuture)) - .build(); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -105,9 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext.Builder().blobSize(size) - .asyncPartStreams(List.of(() -> streamContainerFuture)) - .build(); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 60a816fcd7278..c47874f3ba294 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,9 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -127,9 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -182,9 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -209,9 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); From 75fa23961ce3cd7d2456c3312144d08c357d2386 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Mon, 8 Apr 2024 10:03:06 +0530 Subject: [PATCH 20/20] resolve comments Signed-off-by: Sandeep Kumawat --- .../transfer/RemoteTransferContainer.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 61ac72f5ae8f6..cd2ef22327ebb 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -82,15 +82,17 @@ public RemoteTransferContainer( long expectedChecksum, boolean isRemoteDataIntegritySupported ) { - this.fileName = fileName; - this.remoteFileName = remoteFileName; - this.contentLength = contentLength; - this.failTransferIfFileExists = failTransferIfFileExists; - this.writePriority = writePriority; - this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; - this.expectedChecksum = expectedChecksum; - this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; - this.metadata = null; + this( + fileName, + remoteFileName, + contentLength, + failTransferIfFileExists, + writePriority, + offsetRangeInputStreamSupplier, + expectedChecksum, + isRemoteDataIntegritySupported, + null + ); } /**