Skip to content

Commit

Permalink
minor code refector
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 5, 2024
1 parent 002b757 commit ca12379
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
new ReadContext.Builder().blobSize(blobSize)
.asyncPartStreams(blobPartInputStreamFutures)
.blobChecksum(blobChecksum)
.metadata(null)
.build()
);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
Expand Down Expand Up @@ -467,31 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(
new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(
new ByteArrayIndexInput("desc", bytes),
size,
position
);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}).fileSize(bytes.length).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(),
completionListener
);

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -524,27 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
countDownLatch.countDown();
});
List<InputStream> openInputStreams = new ArrayList<>();
blobContainer.asyncBlobUpload(
new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}).fileSize(blobSize).failIfAlreadyExists(false).writePriority(WritePriority.NORMAL).uploadFinalizer(uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}).doRemoteDataIntegrityCheck(false).expectedChecksum(null).metadata(null).build(),
completionListener
);

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -643,30 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t

List<InputStream> openInputStreams = new ArrayList<>();
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
s3BlobContainer.asyncBlobUpload(
new WriteContext.Builder().fileName("write_large_blob").streamContextSupplier(new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
})
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(),
completionListener
);

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
assertNotNull(exceptionRef.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.http.HttpStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.StreamContext;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -332,36 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(
new WriteContext.Builder().fileName("write_blob_by_streams_max_retries").streamContextSupplier(new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(
new ByteArrayIndexInput("desc", bytes),
size,
position
);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
})
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(),
completionListener
);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
assertThat(countDown.isCountedDown(), is(true));

openInputStreams.forEach(inputStream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength).asyncPartStreams(blobPartStreams).build();
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* Represents the response from a blob download operation, containing both the
* input stream of the blob content and the associated metadata.
*
* @opensearch.internal
* @opensearch.experimental
*/
public class BlobDownloadResponse {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobDownloadResponse;
Expand Down Expand Up @@ -166,6 +167,7 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
}

@Override
@ExperimentalApi
public BlobDownloadResponse downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public void testReadBlobAsync() throws Exception {
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.blobChecksum(null)
.metadata(null)
.build();

Mockito.doAnswer(invocation -> {
Expand Down Expand Up @@ -109,8 +107,6 @@ public void testReadBlobAsyncException() throws Exception {
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.blobChecksum(null)
.metadata(null)
.build();

Mockito.doAnswer(invocation -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public void testReadContextListener() throws InterruptedException, IOException {
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

Expand Down Expand Up @@ -131,8 +129,6 @@ public int available() {
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

Expand Down Expand Up @@ -188,8 +184,6 @@ public int read(byte[] b) throws IOException {
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

Expand Down Expand Up @@ -217,8 +211,6 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception {
);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

Expand Down

0 comments on commit ca12379

Please sign in to comment.