From ef5f09e58412e2618614588e7554708b92947025 Mon Sep 17 00:00:00 2001 From: Patrick Magee Date: Wed, 4 Oct 2023 13:50:30 -0400 Subject: [PATCH] [#186151481] added support for range lookups with range header (#12) * [#186151481] added support for range lookups with range header * [#186151481] addressed code review comments --- .../com/dnastack/wes/service/WesE2ETest.java | 85 ++++++++- nginx/README.md | 2 +- pom.xml | 35 +++- .../wes/api/RangeNotSatisfiableException.java | 9 + .../com/dnastack/wes/api/WesV1Controller.java | 60 ++++-- .../wes/cromwell/CromwellService.java | 20 +- .../GlobalControllerExceptionHandler.java | 7 + .../wes/storage/AzureBlobStorageClient.java | 19 +- .../wes/storage/BlobStorageClient.java | 7 +- .../wes/storage/BoundedInputStream.java | 171 ++++++++++++++++++ .../wes/storage/GcpBlobStorageClient.java | 50 ++--- .../wes/storage/LocalBlobStorageClient.java | 52 ++---- .../local/LocalBlobStorageClientTest.java | 6 +- 13 files changed, 415 insertions(+), 108 deletions(-) create mode 100644 src/main/java/com/dnastack/wes/api/RangeNotSatisfiableException.java create mode 100644 src/main/java/com/dnastack/wes/storage/BoundedInputStream.java diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java index f6acb5d..c7fbe97 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java @@ -7,6 +7,7 @@ import com.google.auth.oauth2.GoogleCredentials; import io.restassured.builder.MultiPartSpecBuilder; import io.restassured.http.ContentType; +import io.restassured.http.Header; import io.restassured.specification.MultiPartSpecification; import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.*; @@ -605,7 +606,6 @@ public void setup() throws Exception { .jsonPath() .getString("run_id"); //@formatter:on - final String runPathStatus = format("%s/%s/status", path, workflowJobId); pollUntilJobCompletes(workflowJobId); } @@ -649,6 +649,43 @@ public void getStdoutForTaskReturnsSuccessfully() { Assertions.assertEquals("Hello Frank\n",body); + // test range offset + body = given() + .log().uri() + .log().method() + .header("Range","bytes=0-4") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stdout")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("Hello",body); + + body = given() + .log().uri() + .log().method() + .header("Range","bytes=6-") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stdout")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("Frank\n",body); + + body = given() + .log().uri() + .log().method() + .header("Range","bytes=1-3") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stdout")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("ell",body); + //@formatter:on //@formatter:off @@ -663,6 +700,52 @@ public void getStdoutForTaskReturnsSuccessfully() { Assertions.assertEquals("Goodbye Frank\n",body); //@formatter:on + + // test range offset + body = given() + .log().uri() + .log().method() + .header("Range","bytes=0-4") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stderr")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("Goodb",body); + + body = given() + .log().uri() + .log().method() + .header("Range","bytes=8-") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stderr")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("Frank\n",body); + + body = given() + .log().uri() + .log().method() + .header("Range","bytes=1-3") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stderr")) + .then() + .statusCode(206) + .extract().asString(); + + Assertions.assertEquals("ood",body); + + given() + .log().uri() + .log().method() + .header("Range","bytes=1-3,6-9") + .header(getHeader(getResource(path))) + .get(taskLogs.get("stderr")) + .then() + .statusCode(416); } @Test diff --git a/nginx/README.md b/nginx/README.md index b67156a..a819921 100644 --- a/nginx/README.md +++ b/nginx/README.md @@ -22,7 +22,7 @@ following will provide certificates for hostname `127.0.0.1`. cd cert # Generate server cert to be signed -openssl req -new -nodes -x509 -days 365 -keyout server.key -out server.crt -config server.conf +openssl req -new -nodes -x509 -days 365 -keyout server.pem -out server.crt -config server.conf # Generate client cert to be signed openssl req -new -nodes -x509 -days 365 -keyout client.key -out client.crt -config client.conf diff --git a/pom.xml b/pom.xml index cd76cbb..c98ae22 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.7.12 + 2.7.15 @@ -28,13 +28,13 @@ ${java.version} ${java.version} 3.0.0-M5 - 2021.0.5 + 2021.0.8 1.2.10 1.0.0 0.1.5 1.6.13 - 1.0.8 - 1.0.9 + 1.0.9 + 1.0.15 4.9.3 0.11.2 3.8.0 @@ -42,7 +42,9 @@ target/dependency-check-report.json target/dependency-check-report.html true - 9.1.0 + 1.2.8 + 2.20.1 + 1.16.0 @@ -71,10 +73,24 @@ logback-core ${logback.version} + + com.azure + azure-sdk-bom + ${azure-sdk.version} + pom + import + com.google.cloud - libraries-bom - ${gcloud.version} + google-cloud-storage-bom + ${gcloud.storage.version} + pom + import + + + com.google.auth + google-auth-library-bom + ${gcloud.auth.version} pom import @@ -198,12 +214,15 @@ com.azure azure-storage-blob - ${azurestorage.version} com.google.cloud google-cloud-storage + + com.google.auth + google-auth-library-oauth2-http + org.apache.commons diff --git a/src/main/java/com/dnastack/wes/api/RangeNotSatisfiableException.java b/src/main/java/com/dnastack/wes/api/RangeNotSatisfiableException.java new file mode 100644 index 0000000..fa1da0b --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RangeNotSatisfiableException.java @@ -0,0 +1,9 @@ +package com.dnastack.wes.api; + +public class RangeNotSatisfiableException extends RuntimeException { + + public RangeNotSatisfiableException(String s) { + super(s); + } + +} diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index 390b38b..a9f5a6c 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -2,7 +2,7 @@ import com.dnastack.audit.aspect.AuditActionUri; -import com.dnastack.audit.aspect.AuditIgnore; +import com.dnastack.audit.util.AuditIgnore; import com.dnastack.wes.AppConfig; import com.dnastack.wes.cromwell.CromwellService; import com.dnastack.wes.security.AuthenticatedUser; @@ -10,6 +10,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpRange; import org.springframework.http.MediaType; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.*; @@ -18,6 +20,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -123,36 +126,71 @@ public RunId cancelRun(@PathVariable("runId") String runId) { @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public void getStderr(HttpServletResponse response, @PathVariable String runId) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId); + public void getStderr(HttpServletResponse response, @RequestHeader HttpHeaders headers, @PathVariable String runId) throws IOException { + adapter.getLogBytes(response.getOutputStream(), runId,getRangeFromHeaders(response,headers)); } @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/task/{taskId}/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public void getStderr(HttpServletResponse response, @PathVariable String runId, @PathVariable String taskId) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr"); + public void getTaskStderr( + HttpServletResponse response, + @RequestHeader HttpHeaders headers, + @PathVariable String runId, + @PathVariable String taskId + ) throws IOException { + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr",getRangeFromHeaders(response,headers)); } @AuditActionUri("wes:run:stdout") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/task/{taskId}/stdout", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public void getStdout(HttpServletResponse response, @PathVariable String runId, @PathVariable String taskId) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout"); + public void getTaskStdout( + HttpServletResponse response, + @RequestHeader HttpHeaders headers, + @PathVariable String runId, + @PathVariable String taskId + ) throws IOException { + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout",getRangeFromHeaders(response,headers)); } @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/task/{taskName}/{index}/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public void getStderr(HttpServletResponse response, @PathVariable String runId, @PathVariable String taskName, @PathVariable int index) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr"); + public void getTaskStderr( + HttpServletResponse response, + @RequestHeader HttpHeaders headers, + @PathVariable String runId, + @PathVariable String taskName, + @PathVariable int index + ) throws IOException { + adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr",getRangeFromHeaders(response,headers)); } @AuditActionUri("wes:run:stdout") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/task/{taskName}/{index}/stdout", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) - public void getStdout(HttpServletResponse response, @PathVariable String runId, @PathVariable String taskName, @PathVariable int index) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout"); + public void getTaskStdout( + HttpServletResponse response, + @RequestHeader HttpHeaders headers, + @PathVariable String runId, + @PathVariable String taskName, + @PathVariable int index + ) throws IOException { + adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout", getRangeFromHeaders(response, headers)); + } + + private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers){ + List ranges = headers.getRange(); + if (ranges.isEmpty()){ + return null; + } else if (ranges.size() > 1) { + // only return the first range parsed + throw new RangeNotSatisfiableException("Streaming of multiple ranges is not supported"); + } else { + response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT); + return ranges.get(0); + } } } diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 3301f2f..4e85589 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -19,6 +19,7 @@ import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpRange; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -217,9 +218,10 @@ public RunId cancel(String runId) { return RunId.builder().runId(runId).build(); } - public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey) throws IOException { + public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { String logPath = getLogPath(runId, taskId, logKey); - storageClient.readBytes(outputStream, logPath, null, null); + + storageClient.readBytes(outputStream, logPath, range); } private String getLogPath(String runId, String taskId, String logKey) throws IOException { @@ -236,9 +238,9 @@ private String getLogPath(String runId, String taskId, String logKey) throws IOE } } - public void getLogBytes(OutputStream outputStream, String runId, String taskName, int index, String logKey) throws IOException { + public void getLogBytes(OutputStream outputStream, String runId, String taskName, int index, String logKey, HttpRange httpRange) throws IOException { String logPath = getLogPath(runId, taskName, index, logKey); - storageClient.readBytes(outputStream, logPath, null, null); + storageClient.readBytes(outputStream, logPath, httpRange); } //legacy @@ -258,10 +260,16 @@ private String getLogPath(String runId, String taskName, int index, String logKe } } - public void getLogBytes(OutputStream outputStream, String runId) throws IOException { + public void getLogBytes(OutputStream outputStream, String runId, HttpRange range) throws IOException { CromwellMetadataResponse response = client.getMetadata(runId); if (response.getFailures() != null) { - outputStream.write(mapper.writeValueAsBytes(response.getFailures())); + + byte[] bytes = mapper.writeValueAsBytes(response.getFailures()); + if (range != null) { + outputStream.write(Arrays.copyOfRange(bytes, (int) range.getRangeStart(bytes.length), (int) range.getRangeEnd(bytes.length) + 1)); + } else { + outputStream.write(bytes); + } } } diff --git a/src/main/java/com/dnastack/wes/shared/GlobalControllerExceptionHandler.java b/src/main/java/com/dnastack/wes/shared/GlobalControllerExceptionHandler.java index e648d33..3817db6 100644 --- a/src/main/java/com/dnastack/wes/shared/GlobalControllerExceptionHandler.java +++ b/src/main/java/com/dnastack/wes/shared/GlobalControllerExceptionHandler.java @@ -1,6 +1,7 @@ package com.dnastack.wes.shared; import com.dnastack.wes.api.ErrorResponse; +import com.dnastack.wes.api.RangeNotSatisfiableException; import com.dnastack.wes.workflow.UnauthorizedWorkflowException; import feign.FeignException; import org.springframework.http.ResponseEntity; @@ -39,4 +40,10 @@ public ResponseEntity handle(FeignException ex) { .body(ErrorResponse.builder().msg(ex.getMessage()).errorCode(ex.status()).build()); } + @ExceptionHandler(RangeNotSatisfiableException.class) + public ResponseEntity handle(RangeNotSatisfiableException ex) { + return ResponseEntity.status(416) + .body(ErrorResponse.builder().msg(ex.getMessage()).errorCode(416).build()); + } + } diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index dc6eda9..997def3 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -6,6 +6,7 @@ import com.azure.storage.blob.sas.BlobSasPermission; import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; import com.dnastack.wes.shared.ConfigurationException; +import org.springframework.http.HttpRange; import org.springframework.web.util.UriComponentsBuilder; import java.io.BufferedInputStream; @@ -76,7 +77,7 @@ public String writeBytes(InputStream stream, long size, String stagingFolder, St String objectName = builder.pathSegment(stagingFolder, fileName).build().toString(); BlobClient blobClient = containerClient.getBlobClient(objectName); - if (blobClient.exists()) { + if (Boolean.TRUE.equals(blobClient.exists())) { throw new IOException("Could not write object " + fileName + "to target destination " + objectName + ". Object already exists"); } @@ -86,7 +87,7 @@ public String writeBytes(InputStream stream, long size, String stagingFolder, St } @Override - public void readBytes(OutputStream outputStream, String blobUri, Long rangeStart, Long rangeEnd) throws IOException { + public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpRange) throws IOException { String containerName; String blobName; if (blobUri.startsWith("https")) { @@ -102,16 +103,18 @@ public void readBytes(OutputStream outputStream, String blobUri, Long rangeStart BlobContainerClient containerClient = client.getBlobContainerClient(containerName); BlobClient blobClient = containerClient.getBlobClient(blobName); - if (!blobClient.exists()) { + if (Boolean.FALSE.equals(blobClient.exists())) { throw new IOException("Could not read from blob: " + blobUri + ", object does not exist"); } - if (rangeStart == null) { - rangeStart = 0L; - } + final long blobSize = blobClient.getProperties().getBlobSize(); + + long rangeStart = 0; + long rangeEnd = blobSize; - if (rangeEnd == null) { - rangeEnd = blobClient.getProperties().getBlobSize(); + if (httpRange != null){ + rangeStart = httpRange.getRangeStart(blobSize); + rangeEnd = httpRange.getRangeEnd(blobSize); } BlobRange range = new BlobRange(rangeStart, rangeEnd - rangeStart); diff --git a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java index c074e41..54e7d7d 100644 --- a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java @@ -1,5 +1,8 @@ package com.dnastack.wes.storage; +import org.springframework.http.HttpRange; + +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -12,10 +15,10 @@ public interface BlobStorageClient { String writeBytes(InputStream stream, long uploadSize, String stagingFolder, String fileName) throws IOException; default void getBytes(OutputStream outputStream, String blobUri) throws IOException { - readBytes(outputStream, blobUri, null, null); + readBytes(outputStream, blobUri, null); } - void readBytes(OutputStream outputStream, String blobUri, Long rangeStart, Long rangeEnd) throws IOException; + void readBytes(OutputStream outputStream, String blobUri, @Nullable HttpRange httpRange) throws IOException; } diff --git a/src/main/java/com/dnastack/wes/storage/BoundedInputStream.java b/src/main/java/com/dnastack/wes/storage/BoundedInputStream.java new file mode 100644 index 0000000..10d16c9 --- /dev/null +++ b/src/main/java/com/dnastack/wes/storage/BoundedInputStream.java @@ -0,0 +1,171 @@ +package com.dnastack.wes.storage; + +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; + +@Slf4j +public class BoundedInputStream extends InputStream { + + + /** + * The wrapped input stream. + */ + private final InputStream inputStream; + + + /** + * The limit, -1 if none. + */ + private final long maxPos; + + + /** + * The current position of the inner input stream. + */ + private long pos; + + /** + * The offset at which to start reading bytes in the inner input stream. + */ + private final long minPos; + + + /** + * Marks the input stream. + */ + private long mark; + + + /** + * Creates a new bounded input stream. + * + * @param in The input stream to wrap. + * @param size The maximum number of bytes to return, -1 if no limit. + */ + public BoundedInputStream(final InputStream in, final long size) { + this(in, 0L, size); + } + + /** + * Creates a new bounded input stream. + * + * @param in The input stream to wrap. + * @param minPos The starting offset for the bounded input stream; bytes before the offset will be skipped. + * @param size The maximum number of bytes to return, -1 if no limit. + */ + public BoundedInputStream(final InputStream in, final long minPos, final long size) { + this.minPos = minPos; + this.pos = 0L; + this.mark = -1L; + this.maxPos = size >= 0 ? minPos + size : -1L; + this.inputStream = in; + } + + @Override + public int read() throws IOException { + if (isEndOfStream()) { + return -1; + } + skipBytesBeforeOffset(); + int result = this.inputStream.read(); + if (result >= 0) { + this.pos++; + } + return result; + } + + @Override + public int read(@NotNull byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (isEndOfStream()) { + return -1; + } + skipBytesBeforeOffset(); + if (shouldLimitLength(len)) { + len = (int) (this.maxPos - this.pos); + } + int bytesRead = this.inputStream.read(b, off, len); + if (bytesRead == -1) { + return -1; + } else { + this.pos += bytesRead; + return bytesRead; + } + } + + private boolean isEndOfStream() { + return this.maxPos >= 0L && this.pos >= this.maxPos; + } + + private boolean shouldLimitLength(int len) { + return (this.pos + len > this.maxPos) && this.maxPos != -1; + } + + @Override + public long skip(long n) throws IOException { + long toSkip = this.maxPos >= 0L ? Math.min(n, this.maxPos - this.pos) : n; + long skippedBytes = this.inputStream.skip(toSkip); + this.pos += skippedBytes; + return skippedBytes; + } + + + @Override + public int available() throws IOException { + return this.maxPos >= 0L && this.pos >= this.maxPos ? 0 : this.inputStream.available(); + } + + + @Override + public String toString() { + return this.inputStream.toString(); + } + + + @Override + public void close() throws IOException { + this.inputStream.close(); + } + + + @Override + public synchronized void reset() throws IOException { + this.inputStream.reset(); + this.pos = this.mark; + } + + + @Override + public synchronized void mark(int readlimit) { + this.inputStream.mark(readlimit); + this.mark = this.pos; + } + + + @Override + public boolean markSupported() { + return this.inputStream.markSupported(); + } + + /** + * Try skipping bytes to the offset with the internal stream. + */ + private void skipBytesBeforeOffset() throws IOException { + final long toSkip = this.minPos - this.pos; + if (toSkip > 0) { + final long actuallySkipped = this.inputStream.skip(toSkip); + this.pos += actuallySkipped; + if (actuallySkipped != toSkip) { + log.debug("Could not skip {} bytes. Instead skipped {} bytes.", toSkip, actuallySkipped); + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index 1b0db51..a1ba292 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -9,6 +9,7 @@ import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.SignUrlOption; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpRange; import org.springframework.web.util.UriComponentsBuilder; import java.io.*; @@ -16,7 +17,6 @@ import java.net.URL; import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.concurrent.TimeUnit; @Slf4j @@ -87,7 +87,7 @@ public String writeBytes(InputStream blobStream, long size, String stagingFolder } @Override - public void readBytes(OutputStream outputStream, String blobUri, Long rangeStart, Long rangeEnd) throws IOException { + public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpRange) throws IOException { BlobId blobId = GcpStorageUtils.blobIdFromGsUrl(blobUri); Blob blob = client.get(blobId); @@ -95,44 +95,24 @@ public void readBytes(OutputStream outputStream, String blobUri, Long rangeStart throw new FileNotFoundException("Could not open open file: " + blobUri + " it does not appear to exist"); } - if (rangeStart == null) { - rangeStart = 0L; - } - if (rangeEnd == null) { - rangeEnd = blob.getSize(); - } + long rangeStart = 0L; + long rangeEnd = blob.getSize(); - long totalBytesToRead = rangeEnd - rangeStart; - int bufferSize = 64 * 1024; + if (httpRange != null) { + rangeStart = httpRange.getRangeStart(blob.getSize()); + // httpRange is inclusive, but google limit is not + rangeEnd = httpRange.getRangeEnd(blob.getSize()) + 1; + } - if (totalBytesToRead <= bufferSize) { - outputStream.write(blob.getContent()); - } else { - try (ReadChannel reader = blob.reader(BlobSourceOption.userProject(project))) { - reader.seek(rangeStart); - try (WritableByteChannel writer = Channels.newChannel(outputStream)) { - long maxRead = totalBytesToRead - bufferSize; - - ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize); - int totalBytes = 0; - while ((totalBytes += reader.read(byteBuffer)) < maxRead) { - byteBuffer.flip(); - writer.write(byteBuffer); - byteBuffer.clear(); - } - - if (totalBytes < totalBytesToRead) { - byteBuffer = ByteBuffer.allocate((int) totalBytesToRead - totalBytes); - reader.read(byteBuffer); - byteBuffer.flip(); - writer.write(byteBuffer); - byteBuffer.clear(); - } + try (ReadChannel readChannel = blob.reader(BlobSourceOption.userProject(project))) { + readChannel.seek(rangeStart); + readChannel.limit(rangeEnd); + // the outer stream is + try (InputStream inputStream = Channels.newInputStream(readChannel)) { + inputStream.transferTo(outputStream); } - } } - } } diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index eece11d..b836351 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -1,13 +1,18 @@ package com.dnastack.wes.storage; import com.dnastack.wes.shared.ConfigurationException; +import org.springframework.http.HttpRange; import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.channels.Channel; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; public class LocalBlobStorageClient implements BlobStorageClient { @@ -47,8 +52,9 @@ public URL getSignedUrl(String blobUri) { @Override public String writeBytes(InputStream stream, long uploadSize, String stagingFolder, String fileName) throws IOException { - String path = stagingPath + "/" + stagingFolder + "/" + fileName; - File fileToWrite = new File(path); + + Path path = Paths.get(stagingPath, stagingFolder ,fileName); + File fileToWrite = path.toFile(); String filePath = fileToWrite.getAbsolutePath(); if (!filePath.startsWith(stagingPath)) { @@ -62,52 +68,30 @@ public String writeBytes(InputStream stream, long uploadSize, String stagingFold fileToWrite.getParentFile().mkdirs(); try (FileOutputStream fileOutputStream = new FileOutputStream(fileToWrite)) { - byte[] bytes = new byte[64 * 1024]; - int bytesRead; - while ((bytesRead = stream.read(bytes)) > 0) { - fileOutputStream.write(bytes, 0, bytesRead); - } + stream.transferTo(fileOutputStream); } return filePath; } @Override - public void readBytes(OutputStream outputStream, String blobUri, Long rangeStart, Long rangeEnd) throws IOException { + public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpRange) throws IOException { File fileToRead = new File(blobUri); if (!fileToRead.exists()) { throw new IOException("Could not read from file: " + fileToRead + ". File does not exist"); } - if (rangeStart == null) { - rangeStart = 0L; + long rangeStart = 0L; + long rangeEnd = fileToRead.length(); + if (httpRange != null){ + rangeStart = httpRange.getRangeStart(fileToRead.length()); } - if (rangeEnd == null) { - rangeEnd = fileToRead.length(); - } - - - try (RandomAccessFile randomAccessFile = new RandomAccessFile(fileToRead, "r")) { - randomAccessFile.seek(rangeStart); - - int byteArrayLength = 64 * 1024; - - if (rangeEnd - rangeStart < byteArrayLength) { - byteArrayLength = rangeEnd.intValue() - rangeStart.intValue(); - } - - long bytesRemaining = rangeEnd - rangeStart; - byte[] bytesIn = new byte[byteArrayLength]; - int bytesRead; - while ((bytesRead = randomAccessFile.read(bytesIn)) > 0 && bytesRemaining > 0) { - if (bytesRead > bytesRemaining) { - bytesRead = (int) bytesRemaining; - } - - outputStream.write(bytesIn, 0, bytesRead); - bytesRemaining -= bytesRead; + try (FileChannel channel = new RandomAccessFile(fileToRead, "r").getChannel()) { + channel.position(rangeStart); + try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)){ + inputStream.transferTo(outputStream); } } } diff --git a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java index f75eebe..fb3017b 100644 --- a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java +++ b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java @@ -4,6 +4,7 @@ import com.dnastack.wes.storage.LocalBlobStorageClientConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.springframework.http.HttpRange; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -73,7 +74,7 @@ public void testReadingFile() throws IOException { Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - storageClient.readBytes(outputStream, targetPath.toString(), 0L, (long) toWrite.length()); + storageClient.readBytes(outputStream, targetPath.toString(), HttpRange.createByteRange(0L,toWrite.length())); String readValue = outputStream.toString(); Assertions.assertEquals(readValue, toWrite); @@ -90,7 +91,8 @@ public void testReadingFile_withTruncation() throws IOException { Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - storageClient.readBytes(outputStream, targetPath.toString(), 5L, (long) toWrite.length()); + + storageClient.readBytes(outputStream, targetPath.toString(), HttpRange.createByteRange(5L,toWrite.length())); String readValue = outputStream.toString(); Assertions.assertEquals(toWrite.substring(5), readValue); }