From d2ed0a34efdb5d3c63585ed8c21bc812e3dc0d02 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat Date: Fri, 5 Apr 2024 22:34:53 +0530 Subject: [PATCH] addres comments Signed-off-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 7 +------ .../mocks/MockFsAsyncBlobContainer.java | 2 +- .../common/blobstore/BlobContainer.java | 2 +- .../blobstore/stream/read/ReadContext.java | 13 ++++--------- .../transfer/RemoteTransferContainer.java | 3 ++- ...ncMultiStreamEncryptedBlobContainerTests.java | 8 ++------ .../read/listener/ReadContextListenerTests.java | 16 ++++------------ 7 files changed, 15 insertions(+), 36 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1035a752bce0a..1f23a09a047f2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -285,12 +285,7 @@ public void readBlobAsync(String blobName, ActionListener listener) ); } } - listener.onResponse( - new ReadContext.Builder().blobSize(blobSize) - .asyncPartStreams(blobPartInputStreamFutures) - .blobChecksum(blobChecksum) - .build() - ); + listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build()); } catch (Exception ex) { listener.onFailure(ex); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index bddec2db0f128..d45b4e3deb798 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener listener) InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); } - ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build(); + ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build(); listener.onResponse(blobReadContext); } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index 244fb222b06ed..e16e75de7f27d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -83,7 +83,7 @@ public interface BlobContainer { * * @param blobName * The name of the blob to get an {@link InputStream} for. - * @return The {@code InputStream} to read the blob. + * @return The {@link BlobDownloadResponse} of the blob. * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 8708f959c42f2..36ee46c0fc2c8 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -72,7 +72,7 @@ public List getPartStreams() { @ExperimentalApi public interface StreamPartCreator extends Supplier> { /** - * Kicks off a async process to start streaming. + * Kicks off an async process to start streaming. * * @return When the returned future is completed, streaming has * just begun. Clients must fully consume the resulting stream. @@ -87,19 +87,14 @@ public interface StreamPartCreator extends Supplier asyncPartStreams; + private final long blobSize; + private final List asyncPartStreams; private String blobChecksum; private Map metadata; - public Builder blobSize(long blobSize) { + public Builder(long blobSize, List asyncPartStreams) { this.blobSize = blobSize; - return this; - } - - public Builder asyncPartStreams(List asyncPartStreams) { this.asyncPartStreams = asyncPartStreams; - return this; } public Builder blobChecksum(String blobChecksum) { diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 98a5d3ff2fa3a..61ac72f5ae8f6 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -56,7 +56,7 @@ public class RemoteTransferContainer implements Closeable { private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; private final boolean isRemoteDataIntegritySupported; private final AtomicBoolean readBlock = new AtomicBoolean(); - private Map metadata = null; + private final Map metadata; private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); @@ -90,6 +90,7 @@ public RemoteTransferContainer( this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; this.expectedChecksum = expectedChecksum; this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + this.metadata = null; } /** diff --git a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java index fa05c7b8c569a..aee4ae40d16a5 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainerTests.java @@ -57,9 +57,7 @@ public void testReadBlobAsync() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext.Builder().blobSize(size) - .asyncPartStreams(List.of(() -> streamContainerFuture)) - .build(); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); @@ -105,9 +103,7 @@ public void testReadBlobAsyncException() throws Exception { final ListenerTestUtils.CountingCompletionListener completionListener = new ListenerTestUtils.CountingCompletionListener<>(); final CompletableFuture streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer); - final ReadContext readContext = new ReadContext.Builder().blobSize(size) - .asyncPartStreams(List.of(() -> streamContainerFuture)) - .build(); + final ReadContext readContext = new ReadContext.Builder(size, List.of(() -> streamContainerFuture)).build(); Mockito.doAnswer(invocation -> { ActionListener readContextActionListener = invocation.getArgument(1); diff --git a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java index 60a816fcd7278..c47874f3ba294 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListenerTests.java @@ -78,9 +78,7 @@ public void testReadContextListener() throws InterruptedException, IOException { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -127,9 +125,7 @@ public int available() { threadPool.generic() ) ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -182,9 +178,7 @@ public int read(byte[] b) throws IOException { threadPool.generic() ) ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await(); @@ -209,9 +203,7 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception { UnaryOperator.identity(), MAX_CONCURRENT_STREAMS ); - ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS) - .asyncPartStreams(blobPartStreams) - .build(); + ReadContext readContext = new ReadContext.Builder((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams).build(); readContextListener.onResponse(readContext); countDownLatch.await();