Skip to content

Commit

Permalink
add Builder support for readContext
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 5, 2024
1 parent 643d70a commit dd46e60
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,13 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum, null));
listener.onResponse(
new ReadContext.Builder().blobSize(blobSize)
.asyncPartStreams(blobPartInputStreamFutures)
.blobChecksum(blobChecksum)
.metadata(null)
.build()
);
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null, null);
ReadContext blobReadContext = new ReadContext.Builder().blobSize(contentLength)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,41 @@ public interface StreamPartCreator extends Supplier<CompletableFuture<InputStrea
@Override
CompletableFuture<InputStreamContainer> get();
}

/**
* Builder for {@link ReadContext}.
*
* @opensearch.experimental
*/
public static class Builder {
private long blobSize;
private List<StreamPartCreator> asyncPartStreams;
private String blobChecksum;
private Map<String, String> metadata;

public Builder blobSize(long blobSize) {
this.blobSize = blobSize;
return this;
}

public Builder asyncPartStreams(List<StreamPartCreator> asyncPartStreams) {
this.asyncPartStreams = asyncPartStreams;
return this;
}

public Builder blobChecksum(String blobChecksum) {
this.blobChecksum = blobChecksum;
return this;
}

public Builder metadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}

public ReadContext build() {
return new ReadContext(blobSize, asyncPartStreams, blobChecksum, metadata);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ public void testReadBlobAsync() throws Exception {
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.blobChecksum(null)
.metadata(null)
.build();

Mockito.doAnswer(invocation -> {
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
Expand Down Expand Up @@ -103,7 +107,11 @@ public void testReadBlobAsyncException() throws Exception {
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
final ReadContext readContext = new ReadContext(size, List.of(() -> streamContainerFuture), null, null);
final ReadContext readContext = new ReadContext.Builder().blobSize(size)
.asyncPartStreams(List.of(() -> streamContainerFuture))
.blobChecksum(null)
.metadata(null)
.build();

Mockito.doAnswer(invocation -> {
ActionListener<ReadContext> readContextActionListener = invocation.getArgument(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public void testReadContextListener() throws InterruptedException, IOException {
UnaryOperator.identity(),
MAX_CONCURRENT_STREAMS
);
ReadContext readContext = new ReadContext((long) PART_SIZE * NUMBER_OF_PARTS, blobPartStreams, null, null);
ReadContext readContext = new ReadContext.Builder().blobSize((long) PART_SIZE * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down Expand Up @@ -125,7 +129,11 @@ public int available() {
threadPool.generic()
)
);
ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down Expand Up @@ -178,7 +186,11 @@ public int read(byte[] b) throws IOException {
threadPool.generic()
)
);
ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1, blobPartStreams, null, null);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS + 1)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand All @@ -203,7 +215,11 @@ public void testWriteToTempFile_alreadyExists_replacesFile() throws Exception {
UnaryOperator.identity(),
MAX_CONCURRENT_STREAMS
);
ReadContext readContext = new ReadContext((long) (PART_SIZE + 1) * NUMBER_OF_PARTS, blobPartStreams, null, null);
ReadContext readContext = new ReadContext.Builder().blobSize((long) (PART_SIZE + 1) * NUMBER_OF_PARTS)
.asyncPartStreams(blobPartStreams)
.blobChecksum(null)
.metadata(null)
.build();
readContextListener.onResponse(readContext);

countDownLatch.await();
Expand Down

0 comments on commit dd46e60

Please sign in to comment.