diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java index c7fbe97..b0e9eeb 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java @@ -7,20 +7,23 @@ import com.google.auth.oauth2.GoogleCredentials; import io.restassured.builder.MultiPartSpecBuilder; import io.restassured.http.ContentType; -import io.restassured.http.Header; import io.restassured.specification.MultiPartSpecification; +import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.*; import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; +import java.util.stream.Stream; import static io.restassured.RestAssured.given; -import static java.lang.String.format; import static org.awaitility.Awaitility.with; import static org.hamcrest.Matchers.*; @@ -569,6 +572,158 @@ public void listRunsReturnsReturnsNonEmptyCollection() { //@formatter:on } + + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") + @DisplayName("Get Run Files for existing run returns all files") + public void getRunFilesReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(200) + .body("runFiles.size()", greaterThan(0)) + .body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'SECONDARY', 'LOG'] }", equalTo(true)); + //@formatter:on + } + + + @Test + @DisplayName("Get Run Files for non-existent run fails with status 401 or 404") + public void getRunFilesForNonExistentRunShouldFail() { + String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID(); + String path = resourcePath + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(resourcePath))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(anyOf(equalTo(404), equalTo(401))); + //@formatter:on + } + + + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") + @DisplayName("Delete Run Files for existing run returns all deleted files") + public void deleteRunFilesReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) + .accept(ContentType.JSON) + .delete(path) + .then() + .assertThat() + .statusCode(200) + .body("deletions.size()", greaterThan(0)) + .body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'DELETED' }", equalTo(true)); + //@formatter:on + } + + + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") + @DisplayName("Delete Run Files for existing run asynchronously returns all deleted files") + public void deleteRunFilesAsyncReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) + .accept(ContentType.JSON) + .queryParam("async", true) + .delete(path) + .then() + .assertThat() + .statusCode(200) + .body("deletions.size()", greaterThan(0)) + .body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'ASYNC' }", equalTo(true)); + //@formatter:on + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(200) + .body("runFiles.size()", greaterThan(0)) + .body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'LOG'] }", equalTo(true))); + //@formatter:on + } + + + @Test + @DisplayName("Delete Run Files for non-existent run fails with status 401 or 404") + public void deleteRunFilesForNonExistentRunShouldFail() { + String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID(); + String path = resourcePath + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(resourcePath))) + .accept(ContentType.JSON) + .delete(path) + .then() + .assertThat() + .statusCode(anyOf(equalTo(404),equalTo(401))); + //@formatter:on + } + + + private Stream completeWorkflowWithFilesProvider() throws Exception { + String path = getRootPath() + "/runs"; + Map inputs = Collections.singletonMap("hello_world.name", "Some sort of String"); + + //@formatter:off + String workflowJobIdWithAllOutputTypes = given() + .log().uri() + .log().method() + .header(getHeader(getResource(path))) + .multiPart(getWorkflowUrlMultipart("echo.wdl")) + .multiPart(getMultipartAttachment("echo.wdl", supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) + .multiPart(getJsonMultipart("workflow_params", inputs)) + .post(path) + .then() + .assertThat() + .statusCode(200) + .body("run_id", is(notNullValue())) + .extract() + .jsonPath() + .getString("run_id"); + //@formatter:on + + pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + return Stream.of(Arguments.of(workflowJobIdWithAllOutputTypes)); + } + } } diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java b/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java index 3cb63c9..8f6ddf0 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java @@ -10,6 +10,7 @@ public class WdlSupplier { public static final String WORKFLOW_WITHOUT_FILE = "workflow_without_file.wdl"; + public static final String WORKFLOW_WITH_ALL_OUTPUT_TYPES = "workflow_with_all_output_types.wdl"; public static final String WORKFLOW_WITH_IMPORTS_1 = "workflow_with_imports_1.wdl"; public static final String WORKFLOW_WITH_IMPORTS_2 = "workflow_with_imports_2.wdl"; public static final String WORKFLOW_WITH_IMPORTS_INPUTS = "workflow_with_imports.json"; diff --git a/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl new file mode 100644 index 0000000..2ca9fe3 --- /dev/null +++ b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl @@ -0,0 +1,30 @@ +task echo { + String name + command { + echo "Hello ${name}" + >&2 echo "Goodbye ${name}" + echo "Bye" > "test.txt" + echo "Bye" > "test2.txt" + } + + runtime { + docker: "ubuntu" + } + + output { + File out = stdout() + File out2 = "test.txt" + Array[File] arrayOut = [out, out2, "test2.txt"] + } +} + +workflow hello_world { + String name + call echo { + input: + name = name + } + output { + File out = echo.out + } +} \ No newline at end of file diff --git a/src/main/java/com/dnastack/wes/api/RunFile.java b/src/main/java/com/dnastack/wes/api/RunFile.java new file mode 100644 index 0000000..e9cf622 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFile.java @@ -0,0 +1,26 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode +@ToString +@Builder +public class RunFile { + + @JsonProperty(value = "file_type") + FileType fileType; + + String path; + + public enum FileType { + FINAL, + SECONDARY, + LOG + } + +} diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletion.java b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java new file mode 100644 index 0000000..910a733 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java @@ -0,0 +1,13 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonUnwrapped; + +public record RunFileDeletion(@JsonUnwrapped RunFile runFile, DeletionState state, @JsonUnwrapped ErrorResponse errorResponse) { + + public enum DeletionState { + DELETED, + ASYNC, + FAILED + } + +} diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletions.java b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java new file mode 100644 index 0000000..0426752 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java @@ -0,0 +1,5 @@ +package com.dnastack.wes.api; + +import java.util.List; + +public record RunFileDeletions(List deletions) {} diff --git a/src/main/java/com/dnastack/wes/api/RunFiles.java b/src/main/java/com/dnastack/wes/api/RunFiles.java new file mode 100644 index 0000000..0835760 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFiles.java @@ -0,0 +1,5 @@ +package com.dnastack.wes.api; + +import java.util.List; + +public record RunFiles(List runFiles) {} diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index a9f5a6c..541da8e 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -123,11 +123,28 @@ public RunId cancelRun(@PathVariable("runId") String runId) { return adapter.cancel(runId); } + @AuditActionUri("wes:run:files:list") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:read', 'wes')") + @GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) + public RunFiles getRunFiles(@PathVariable("run_id") String runId) { + return adapter.getRunFiles(runId); + } + + @AuditActionUri("wes:run:files:delete") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:write', 'wes')") + @DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) + public RunFileDeletions deleteRunFiles( + @PathVariable("run_id") String runId, + @RequestParam(value = "async", required = false) boolean async + ) { + return adapter.deleteRunFiles(runId, async); + } + @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) public void getStderr(HttpServletResponse response, @RequestHeader HttpHeaders headers, @PathVariable String runId) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId,getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stderr") @@ -139,7 +156,7 @@ public void getTaskStderr( @PathVariable String runId, @PathVariable String taskId ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stdout") @@ -151,7 +168,7 @@ public void getTaskStdout( @PathVariable String runId, @PathVariable String taskId ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stderr") @@ -164,7 +181,7 @@ public void getTaskStderr( @PathVariable String taskName, @PathVariable int index ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stdout") @@ -180,9 +197,9 @@ public void getTaskStdout( adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout", getRangeFromHeaders(response, headers)); } - private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers){ + private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers) { List ranges = headers.getRange(); - if (ranges.isEmpty()){ + if (ranges.isEmpty()) { return null; } else if (ranges.size() > 1) { // only return the first range parsed diff --git a/src/main/java/com/dnastack/wes/config/AsyncConfig.java b/src/main/java/com/dnastack/wes/config/AsyncConfig.java new file mode 100644 index 0000000..c6d9d2b --- /dev/null +++ b/src/main/java/com/dnastack/wes/config/AsyncConfig.java @@ -0,0 +1,32 @@ +package com.dnastack.wes.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Slf4j +@Configuration +public class AsyncConfig { + + @Bean + public TaskExecutor defaultAsyncOperationExecutor( + @Value("${app.executors.default.core-pool-size:8}") int corePoolSize, + @Value("${app.executors.default.max-pool-size:16}") int maxPoolSize, + @Value("${app.executors.default.queue-capacity:5000}") int queueCapacity + ) { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setThreadNamePrefix("defaultAsyncOp-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 4e85589..b065521 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -13,12 +13,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpRange; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -31,6 +33,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -51,6 +54,7 @@ public class CromwellService { private final PathTranslatorFactory pathTranslatorFactory; private final CromwellWesMapper cromwellWesMapper; private final CromwellConfig cromwellConfig; + private final TaskExecutor defaultAsyncOperationExecutor; private final AppConfig appConfig; @@ -61,7 +65,8 @@ public class CromwellService { PathTranslatorFactory pathTranslatorFactory, CromwellWesMapper cromwellWesMapper, AppConfig appConfig, - CromwellConfig config + CromwellConfig config, + TaskExecutor defaultAsyncOperationExecutor ) { this.client = cromwellClient; this.pathTranslatorFactory = pathTranslatorFactory; @@ -69,6 +74,7 @@ public class CromwellService { this.cromwellWesMapper = cromwellWesMapper; this.appConfig = appConfig; this.cromwellConfig = config; + this.defaultAsyncOperationExecutor = defaultAsyncOperationExecutor; } @@ -211,13 +217,83 @@ private CromwellStatus getStatus(String runId) { * * @param runId The cromwell id * - * @return a complete run log + * @return the cromwell id */ public RunId cancel(String runId) { client.abortWorkflow(runId); return RunId.builder().runId(runId).build(); } + /** + * Get the files for a specific run. + * + * @param runId The cromwell id + * + * @return a list of generated files for the run + */ + public RunFiles getRunFiles(String runId) { + CromwellMetadataResponse metadataResponse = getMetadata(runId); + Set finalFileSet = new HashSet<>(); + Set secondaryFileSet = new HashSet<>(); + Set logFileSet = new HashSet<>(); + List files = new ArrayList<>(); + + Map outputs = metadataResponse.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, mapper.valueToTree(output))); + } + extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse); + + finalFileSet.forEach(path -> files.add(new RunFile(RunFile.FileType.FINAL, path))); + secondaryFileSet.forEach(path -> { + if (!finalFileSet.contains(path) && !logFileSet.contains(path)) { + files.add(new RunFile(RunFile.FileType.SECONDARY, path)); + } + }); + logFileSet.forEach(path -> { + if (!finalFileSet.contains(path)) { + files.add(new RunFile(RunFile.FileType.LOG, path)); + } + }); + return new RunFiles(files); + } + + /** + * Request to delete the files associated with the run. + * + * @param runId The cromwell id + * + * @return the cromwell id + */ + public RunFileDeletions deleteRunFiles(String runId, boolean async) { + List files = getRunFiles(runId).runFiles(); + List outcomes = files.stream().filter(runFile -> RunFile.FileType.SECONDARY.equals(runFile.getFileType())) + .map(runFile -> { + if (async) { + return deleteRunFileAsync(runFile); + } else { + return deleteRunFile(runFile); + } + }).toList(); + return new RunFileDeletions(outcomes); + } + + public RunFileDeletion deleteRunFileAsync(RunFile runFile) { + CompletableFuture.runAsync(() -> deleteRunFile(runFile), defaultAsyncOperationExecutor); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null); + } + + public RunFileDeletion deleteRunFile(RunFile runFile) { + try { + storageClient.deleteFile(runFile.getPath()); + log.info("Deleting file '{}'", runFile.getPath()); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null); + } catch (IOException e) { + log.error("Encountered exception while deleting file '%s': '%s'".formatted(runFile.getPath(), e.getMessage()), e); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build()); + } + } + public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { String logPath = getLogPath(runId, taskId, logKey); @@ -448,6 +524,60 @@ private JsonNode extractJsonNode(String value) throws IOException { } } + private void extractSecondaryLogFiles(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse){ + Map outputs = metadataResponse.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output))); + } + Map> calls = metadataResponse.getCalls(); + if (calls != null && !calls.isEmpty()) { + calls.values().stream().flatMap(List::stream).forEach(call -> extractSecondaryLogFilesFromCall(secondaryFileSet, logFileSet, call)); + } + } + + private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set logFileSet, CromwellTaskCall call){ + Map outputs = call.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output))); + } + String stderr = call.getStderr(); + String stdout = call.getStdout(); + if (stderr != null && storageClient.isFile(stderr)) { + logFileSet.add(stderr); + } + if (stdout != null && storageClient.isFile(stdout)) { + logFileSet.add(stdout); + } + Map backendLogs = call.getBackendLogs(); + if (backendLogs != null && !backendLogs.isEmpty()) { + backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, mapper.valueToTree(log))); + } + CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); + if (subWorkflowMetadata != null) { + extractSecondaryLogFiles(secondaryFileSet, logFileSet, subWorkflowMetadata); + } + } + + private void extractFilesFromValue(Set fileSet, JsonNode node) { + if (node.isTextual()) { + if (storageClient.isFile(node.asText())) { + fileSet.add(node.asText()); + } + } else if (node.isArray()) { + extractFilesFromArrayNode(fileSet, (ArrayNode) node); + } else if (node.isObject()) { + extractFilesFromObjectNode(fileSet, (ObjectNode) node); + } + } + + private void extractFilesFromArrayNode(Set fileSet, ArrayNode outputs) { + outputs.forEach(output -> extractFilesFromValue(fileSet, output)); + } + + private void extractFilesFromObjectNode(Set fileSet, ObjectNode outputs) { + outputs.forEach(output -> extractFilesFromValue(fileSet, output)); + } + private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException { if (runRequest.getWorkflowAttachments() == null || runRequest.getWorkflowAttachments().length == 0) { throw new InvalidRequestException("Url provided is relative however no workflowAttachments are defined"); diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index 997def3..b322d9d 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -20,7 +20,6 @@ public class AzureBlobStorageClient implements BlobStorageClient { - private final BlobServiceClient client; private final long signedUrlTtl; private final String container; @@ -49,7 +48,6 @@ public AzureBlobStorageClient(AzureBlobStorageClientConfig config) { stagingPath = config.getStagingPath(); } - @Override public URL getSignedUrl(String blobUri) { BlobUrlParts parts = BlobUrlParts.parse(blobUri); @@ -99,7 +97,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR containerName = container; } - BlobContainerClient containerClient = client.getBlobContainerClient(containerName); BlobClient blobClient = containerClient.getBlobClient(blobName); @@ -122,4 +119,18 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR .setMaxRetryRequests(3), null, false, null, null); } + @Override + public boolean isFile(String filePath) { + try { + return client.getBlobContainerClient(container).getBlobClient(filePath).exists(); + } catch (IllegalArgumentException e) { + return false; + } + } + + @Override + public void deleteFile(String filePath) { + client.getBlobContainerClient(container).getBlobClient(filePath).delete(); + } + } diff --git a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java index 54e7d7d..2ad51aa 100644 --- a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java @@ -18,7 +18,10 @@ default void getBytes(OutputStream outputStream, String blobUri) throws IOExcept readBytes(outputStream, blobUri, null); } - void readBytes(OutputStream outputStream, String blobUri, @Nullable HttpRange httpRange) throws IOException; + boolean isFile(String filePath); + + void deleteFile(String filePath) throws IOException; + } diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index a1ba292..3658f3b 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -52,7 +52,6 @@ public GcpBlobStorageClient(GcpBlobStorageConfig config) throws IOException { project = config.getProject(); } - @Override public URL getSignedUrl(String blobUri) { BlobId blobId = GcpStorageUtils.blobIdFromGsUrl(blobUri); @@ -95,7 +94,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR throw new FileNotFoundException("Could not open open file: " + blobUri + " it does not appear to exist"); } - long rangeStart = 0L; long rangeEnd = blob.getSize(); @@ -115,4 +113,18 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR } } + @Override + public boolean isFile(String filePath) { + try { + return client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)).exists(); + } catch (IllegalArgumentException e) { + return false; + } + } + + @Override + public void deleteFile(String filePath) { + client.delete(GcpStorageUtils.blobIdFromGsUrl(filePath)); + } + } diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index b836351..2a1ae6e 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -7,7 +7,6 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.nio.channels.Channel; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -36,7 +35,6 @@ public LocalBlobStorageClient(LocalBlobStorageClientConfig config) throws IOExce } - public String getStagingPath() { return stagingPath; } @@ -87,13 +85,26 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR rangeStart = httpRange.getRangeStart(fileToRead.length()); } - try (FileChannel channel = new RandomAccessFile(fileToRead, "r").getChannel()) { channel.position(rangeStart); - try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)){ + try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)) { inputStream.transferTo(outputStream); } } } + @Override + public boolean isFile(String filePath) { + try { + return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + } catch (IllegalArgumentException e) { + return false; + } + } + + @Override + public void deleteFile(String filePath) throws IOException { + Files.delete(Path.of(filePath)); + } + } diff --git a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java index fb3017b..cd57938 100644 --- a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java +++ b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java @@ -15,6 +15,9 @@ class LocalBlobStorageClientTest { + private final String toWrite = "Test String"; + private final String directory = "test"; + private final String fileName = "test.txt"; @Test public void testCreateStorageClient_noStagingPath() throws IOException { @@ -36,10 +39,6 @@ public void testCreateStorageClient_withStagingPath() throws IOException { @Test public void testWritingBytesToFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); storageClient.writeBytes(new ByteArrayInputStream(toWrite.getBytes()), toWrite.length(), directory, fileName); @@ -51,9 +50,6 @@ public void testWritingBytesToFile() throws IOException { @Test public void testWritingBytesToFile_existingFileThrowsError() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); Files.createDirectory(targetPath.getParent()); Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); @@ -66,12 +62,7 @@ public void testWritingBytesToFile_existingFileThrowsError() throws IOException @Test public void testReadingFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); storageClient.readBytes(outputStream, targetPath.toString(), HttpRange.createByteRange(0L,toWrite.length())); @@ -83,12 +74,7 @@ public void testReadingFile() throws IOException { @Test public void testReadingFile_withTruncation() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -97,4 +83,46 @@ public void testReadingFile_withTruncation() throws IOException { Assertions.assertEquals(toWrite.substring(5), readValue); } + @Test + public void testIsFile() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = createFile(); + + Assertions.assertTrue(storageClient.isFile(targetPath.toString())); + } + + @Test + public void testIsFile_noFileExists() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(directory + "/" + fileName); + + Assertions.assertFalse(storageClient.isFile(targetPath.toString())); + } + + @Test + public void testDeletingFile() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = createFile(); + + Assertions.assertTrue(Files.exists(targetPath)); + storageClient.deleteFile(targetPath.toString()); + Assertions.assertFalse(Files.exists(targetPath)); + } + + @Test + public void testDeletingFile_throwsError() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(directory + "/" + fileName); + + Assertions.assertFalse(Files.exists(targetPath)); + Assertions.assertThrows(IOException.class, () -> storageClient.deleteFile(targetPath.toString())); + } + + private Path createFile() throws IOException { + Path tempDir = Files.createTempDirectory(directory); + Path targetPath = tempDir.resolve(fileName); + Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + return targetPath; + } + } \ No newline at end of file diff --git a/workflow_execution_service.swagger.yaml b/workflow_execution_service.swagger.yaml index 334b71e..a2e297f 100644 --- a/workflow_execution_service.swagger.yaml +++ b/workflow_execution_service.swagger.yaml @@ -377,6 +377,106 @@ paths: format: int64 tags: - WorkflowExecutionService + /runs/{run_id}/files: + get: + summary: Get files associated with a workflow run. + description: >- + This endpoint provides all the generated files from the associated storage + client for a workflow run. The returned result has information about the + files of the given run, including the path and the type of the given file. + x-swagger-router-controller: ga4gh.wes.server + operationId: GetRunFiles + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunFiles' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run not found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + - name: orgId + in: path + required: true + type: integer + format: int64 + - name: projectId + in: path + required: true + type: integer + format: int64 + tags: + - WorkflowExecutionService + delete: + summary: Cleanup files associated with a workflow run. + description: >- + This endpoint cleans up the generated files associated with a workflow run. + + + The `async` query parameter is by default false, blocking the request until all + files are cleaned up. When true the delete operation will be done asynchronously + with faster execution but higher risk of not doing a "clean" cleanup. + x-swagger-router-controller: ga4gh.wes.server + operationId: DeleteRunFiles + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunFileDeletions' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run not found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + - name: orgId + in: path + required: true + type: integer + format: int64 + - name: projectId + in: path + required: true + type: integer + format: int64 + - name: async + in: query + type: boolean + default: false + tags: + - WorkflowExecutionService definitions: DefaultWorkflowEngineParameter: type: object @@ -553,6 +653,21 @@ definitions: - CANCELING: The task was canceled by the user, and is in the process of stopping. + FileType: + type: string + enum: + - FINAL + - SECONDARY + - LOG + default: FINAL + description: >- + - FINAL: The file exists directly as an output of the metadata response for the specified run. + + - SECONDARY: The file exists as a part of a task call output or a sub-workflow metadata + response for the specified run + + - LOG: The file exists as a part of the backend logs or stderr and stdout for the workflow or + sub-workflow metadata for the specified run. RunListResponse: type: object properties: @@ -653,6 +768,54 @@ definitions: state: $ref: '#/definitions/State' description: Small description of a workflow run, returned by server during listing + RunFiles: + type: object + required: + - run_id + properties: + files: + type: array + items: + $ref: '#/definitions/RunFile' + RunFile: + type: object + properties: + file_type: + $ref: '#/definitions/FileType' + path: + type: string + description: The path of the file. + RunFileDeletions: + type: object + required: + - run_id + properties: + deletions: + type: array + items: + $ref: '#/definitions/RunFileDeletion' + RunFileDeletion: + type: object + properties: + path: + type: string + description: The path of the file. + file_type: + $ref: '#/definitions/FileType' + state: + type: string + enum: + - DELETED + - ASYNC + - FAILED + description: >- + - DELETED: The file has been successfully deleted from storage. + + + - ASYNC: The file is being deleted asynchronously. + + + - FAILED: An error occurred during the process of deleting the file. WorkflowTypeVersion: type: object properties: