Skip to content

Commit

Permalink
use builder() in WriteContext
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
Sandeep Kumawat committed Apr 4, 2024
1 parent 929482e commit 4ddb4dc
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.repositories.s3;

import org.junit.Assert;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
Expand Down Expand Up @@ -466,24 +467,35 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
blobContainer.asyncBlobUpload(
new WriteContext.Builder()
.fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(new StreamContextSupplier() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null, null), completionListener);
})
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
})
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -516,24 +528,35 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
countDownLatch.countDown();
});
List<InputStream> openInputStreams = new ArrayList<>();
blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
blobContainer.asyncBlobUpload(
new WriteContext.Builder()
.fileName("write_large_blob")
.streamContextSupplier(new StreamContextSupplier() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.NORMAL, uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null, null), completionListener);
})
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
})
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(), completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -632,20 +655,30 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t

List<InputStream> openInputStreams = new ArrayList<>();
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
s3BlobContainer.asyncBlobUpload(
new WriteContext.Builder()
.fileName("write_large_blob")
.streamContextSupplier(new StreamContextSupplier() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null, null), completionListener);

})
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(), completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
assertNotNull(exceptionRef.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,30 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
blobContainer.asyncBlobUpload(
new WriteContext.Builder()
.fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(new StreamContextSupplier() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null, null), completionListener);

})
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.expectedChecksum(null)
.metadata(null)
.build(), completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

assertThat(countDown.isCountedDown(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,80 @@ public Long getExpectedChecksum() {
public Map<String, String> getMetadata() {
return metadata;
}

/**
* Builder for {@link WriteContext}.
*
* @opensearch.internal
*/
public static class Builder {
private String fileName;
private StreamContextSupplier streamContextSupplier;
private long fileSize;
private boolean failIfAlreadyExists;
private WritePriority writePriority;
private CheckedConsumer<Boolean, IOException> uploadFinalizer;
private boolean doRemoteDataIntegrityCheck;
private Long expectedChecksum;
private Map<String, String> metadata;

public Builder fileName(String fileName) {
this.fileName = fileName;
return this;
}

public Builder streamContextSupplier(StreamContextSupplier streamContextSupplier) {
this.streamContextSupplier = streamContextSupplier;
return this;
}

public Builder fileSize(long fileSize) {
this.fileSize = fileSize;
return this;
}

public Builder writePriority(WritePriority writePriority) {
this.writePriority = writePriority;
return this;
}

public Builder failIfAlreadyExists(boolean failIfAlreadyExists) {
this.failIfAlreadyExists = failIfAlreadyExists;
return this;
}

public Builder uploadFinalizer(CheckedConsumer<Boolean, IOException> uploadFinalizer) {
this.uploadFinalizer = uploadFinalizer;
return this;
}

public Builder doRemoteDataIntegrityCheck(boolean doRemoteDataIntegrityCheck) {
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
return this;
}

public Builder expectedChecksum(Long expectedChecksum) {
this.expectedChecksum = expectedChecksum;
return this;
}

public Builder metadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}

public WriteContext build() {
return new WriteContext(
fileName,
streamContextSupplier,
fileSize,
failIfAlreadyExists,
writePriority,
uploadFinalizer,
doRemoteDataIntegrityCheck,
expectedChecksum,
metadata
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,16 @@ public RemoteTransferContainer(
* @return The {@link WriteContext} for the current upload
*/
public WriteContext createWriteContext() {
return new WriteContext(
remoteFileName,
this::supplyStreamContext,
contentLength,
failTransferIfFileExists,
writePriority,
this::finalizeUpload,
isRemoteDataIntegrityCheckPossible(),
isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null,
metadata
);
return new WriteContext.Builder().fileName(remoteFileName)
.streamContextSupplier(this::supplyStreamContext)
.fileSize(contentLength)
.failIfAlreadyExists(failTransferIfFileExists)
.writePriority(writePriority)
.uploadFinalizer(this::finalizeUpload)
.doRemoteDataIntegrityCheck(isRemoteDataIntegrityCheckPossible())
.expectedChecksum(isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null)
.metadata(metadata)
.build();
}

// package-private for testing
Expand Down

0 comments on commit 4ddb4dc

Please sign in to comment.