From a051f8bd7d86f39351b1b66cc11bc70ab8a5b106 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Mon, 16 Oct 2023 16:56:02 -0400 Subject: [PATCH 1/8] Create new endpoint to get run files --- .../java/com/dnastack/wes/api/RunFile.java | 27 +++++++ .../java/com/dnastack/wes/api/RunFiles.java | 28 +++++++ .../com/dnastack/wes/api/WesV1Controller.java | 25 +++--- .../wes/cromwell/CromwellService.java | 73 +++++++++++++++++- .../wes/storage/AzureBlobStorageClient.java | 5 ++ .../wes/storage/BlobStorageClient.java | 3 +- .../wes/storage/GcpBlobStorageClient.java | 5 ++ .../wes/storage/LocalBlobStorageClient.java | 6 +- workflow_execution_service.swagger.yaml | 77 +++++++++++++++++++ 9 files changed, 234 insertions(+), 15 deletions(-) create mode 100644 src/main/java/com/dnastack/wes/api/RunFile.java create mode 100644 src/main/java/com/dnastack/wes/api/RunFiles.java diff --git a/src/main/java/com/dnastack/wes/api/RunFile.java b/src/main/java/com/dnastack/wes/api/RunFile.java new file mode 100644 index 0000000..b09c0a1 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFile.java @@ -0,0 +1,27 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode +@ToString +@Builder +public class RunFile { + + @JsonProperty(value = "type") + Enum type; + + @JsonProperty(value = "path") + String path; + + public enum type { + FINAL, + SECONDARY, + LOG + } + +} diff --git a/src/main/java/com/dnastack/wes/api/RunFiles.java b/src/main/java/com/dnastack/wes/api/RunFiles.java new file mode 100644 index 0000000..71b62f8 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFiles.java @@ -0,0 +1,28 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +import java.util.ArrayList; +import java.util.List; + +@Getter +@Setter +@AllArgsConstructor +@EqualsAndHashCode +@NoArgsConstructor +@ToString +@Builder +public class RunFiles { + + @JsonProperty("files") + List files; + + public void addRunFile(RunFile runFile) { + if (files == null) { + files = new ArrayList<>(); + } + files.add(runFile); + } + +} diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index a9f5a6c..c343cea 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -105,9 +105,7 @@ public RunListResponse getRuns( @AuditActionUri("wes:run:read") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/'+#runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{run_id}", produces = { MediaType.APPLICATION_JSON_VALUE }) - public RunLog getRun(@PathVariable("run_id") String runId) { - return adapter.getRun(runId); - } + public RunLog getRun(@PathVariable("run_id") String runId) { return adapter.getRun(runId); } @AuditActionUri("wes:run:status") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") @@ -119,15 +117,18 @@ public RunStatus getRunStatus(@PathVariable("run_id") String runId) { @AuditActionUri("wes:run:cancel") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:cancel', 'wes')") @PostMapping(path = "/runs/{runId}/cancel", produces = MediaType.APPLICATION_JSON_VALUE) - public RunId cancelRun(@PathVariable("runId") String runId) { - return adapter.cancel(runId); - } + public RunId cancelRun(@PathVariable("runId") String runId) { return adapter.cancel(runId); } + + @AuditActionUri("wes:run:files") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") + @GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) + public RunFiles getRunFiles(@PathVariable("run_id") String runId) { return adapter.getRunFiles(runId); } @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{runId}/logs/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) public void getStderr(HttpServletResponse response, @RequestHeader HttpHeaders headers, @PathVariable String runId) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId,getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stderr") @@ -139,7 +140,7 @@ public void getTaskStderr( @PathVariable String runId, @PathVariable String taskId ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stdout") @@ -151,7 +152,7 @@ public void getTaskStdout( @PathVariable String runId, @PathVariable String taskId ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stderr") @@ -164,7 +165,7 @@ public void getTaskStderr( @PathVariable String taskName, @PathVariable int index ) throws IOException { - adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr",getRangeFromHeaders(response,headers)); + adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr", getRangeFromHeaders(response, headers)); } @AuditActionUri("wes:run:stdout") @@ -180,9 +181,9 @@ public void getTaskStdout( adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout", getRangeFromHeaders(response, headers)); } - private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers){ + private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers) { List ranges = headers.getRange(); - if (ranges.isEmpty()){ + if (ranges.isEmpty()) { return null; } else if (ranges.size() > 1) { // only return the first range parsed diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 4e85589..21ed853 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -211,13 +211,44 @@ private CromwellStatus getStatus(String runId) { * * @param runId The cromwell id * - * @return a complete run log + * @return the cromwell id */ public RunId cancel(String runId) { client.abortWorkflow(runId); return RunId.builder().runId(runId).build(); } + /** + * Get the files for a specific run. + * + * @param runId The cromwell id + * + * @return a list of generated files for the run + */ + public RunFiles getRunFiles(String runId) { + CromwellMetadataResponse metadataResponse = getMetadata(runId); + Set finalFileSet = new HashSet<>(); + Set secondaryFileSet = new HashSet<>(); + Set logFileSet = new HashSet<>(); + RunFiles runFiles = new RunFiles(); + + metadataResponse.getOutputs().values().forEach(output -> extractFilesFromValue(finalFileSet, output)); + extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse); + + finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path))); + secondaryFileSet.forEach(path -> { + if (!finalFileSet.contains(path)) { + runFiles.addRunFile(new RunFile(RunFile.type.SECONDARY, path)); + } + }); + logFileSet.forEach(path -> { + if (!finalFileSet.contains(path)) { + runFiles.addRunFile(new RunFile(RunFile.type.LOG, path)); + } + }); + return runFiles; + } + public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { String logPath = getLogPath(runId, taskId, logKey); @@ -448,6 +479,46 @@ private JsonNode extractJsonNode(String value) throws IOException { } } + private void extractSecondaryAndLogFilesFromCalls(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse) { + metadataResponse.getCalls().values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> { + call.getOutputs().values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + String stderr = call.getStderr(); + String stdout = call.getStdout(); + if (storageClient.isFile(stderr)) { + logFileSet.add(stderr); + } + if (storageClient.isFile(stdout)) { + logFileSet.add(stdout); + } + call.getBackendLogs().values().forEach(log -> extractFilesFromValue(logFileSet, log)); + CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); + if (subWorkflowMetadata != null) { + subWorkflowMetadata.getOutputs().values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata); + } + })); + } + + private void extractFilesFromValue(Set fileSet, Object output) { + if (output instanceof String && storageClient.isFile(output.toString())) { + fileSet.add(output.toString()); + } + else if (output instanceof Object[]) { + extractFiles(fileSet, (List) output); + } + else { + extractFiles(fileSet, (Map) output); + } + } + + private void extractFiles(Set fileSet, List outputs) { + outputs.forEach(output -> extractFilesFromValue(fileSet, output)); + } + + private void extractFiles(Set fileSet, Map outputs) { + outputs.values().forEach(output -> extractFilesFromValue(fileSet, output)); + } + private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException { if (runRequest.getWorkflowAttachments() == null || runRequest.getWorkflowAttachments().length == 0) { throw new InvalidRequestException("Url provided is relative however no workflowAttachments are defined"); diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index 997def3..aeaa930 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -122,4 +122,9 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR .setMaxRetryRequests(3), null, false, null, null); } + @Override + public boolean isFile(String filePath) { + return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())); + } + } diff --git a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java index 54e7d7d..6fea554 100644 --- a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java @@ -18,7 +18,8 @@ default void getBytes(OutputStream outputStream, String blobUri) throws IOExcept readBytes(outputStream, blobUri, null); } - void readBytes(OutputStream outputStream, String blobUri, @Nullable HttpRange httpRange) throws IOException; + boolean isFile(String filePath); + } diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index a1ba292..ae113d3 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -115,4 +115,9 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR } } + @Override + public boolean isFile(String filePath) { + return filePath.startsWith("gs://"); + } + } diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index b836351..bbfcc19 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -7,7 +7,6 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; -import java.nio.channels.Channel; import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.file.Files; @@ -96,4 +95,9 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR } } + @Override + public boolean isFile(String filePath) { + return filePath.startsWith("/"); + } + } diff --git a/workflow_execution_service.swagger.yaml b/workflow_execution_service.swagger.yaml index 334b71e..e3105bd 100644 --- a/workflow_execution_service.swagger.yaml +++ b/workflow_execution_service.swagger.yaml @@ -377,6 +377,53 @@ paths: format: int64 tags: - WorkflowExecutionService + /runs/{run_id}/files: + get: + summary: Get files associated with a workflow run. + description: >- + This endpoint provides all the generated files from the associated storage + client for a workflow run. The returned result has information about the + files of the given run and the type of the given file. + x-swagger-router-controller: ga4gh.wes.server + operationId: GetRunFiles + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunFiles' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run not found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + - name: orgId + in: path + required: true + type: integer + format: int64 + - name: projectId + in: path + required: true + type: integer + format: int64 + tags: + - WorkflowExecutionService definitions: DefaultWorkflowEngineParameter: type: object @@ -553,6 +600,27 @@ definitions: - CANCELING: The task was canceled by the user, and is in the process of stopping. + File: + type: object + properties: + type: + type: string + enum: + - FINAL + - SECONDARY + - LOG + default: FINAL + description: >- + - FINAL: The file exists directly as an output of the metadata response for the specified run. + + - SECONDARY: The file exists as a part of a task call output or a sub-workflow metadata + response for the specified run + + - LOG: The file exists as a part of the backend logs or stderr and stdout for the workflow or + sub-workflow metadata for the specified run. + path: + type: string + description: The path of the file. RunListResponse: type: object properties: @@ -653,6 +721,15 @@ definitions: state: $ref: '#/definitions/State' description: Small description of a workflow run, returned by server during listing + RunFiles: + type: object + required: + - run_id + properties: + files: + type: array + items: + $ref: '#/definitions/File' WorkflowTypeVersion: type: object properties: From bdb534ac55ea05926ec53675b488bd00deb39751 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Wed, 18 Oct 2023 17:38:53 -0400 Subject: [PATCH 2/8] Create new endpoint to delete run files --- .../com/dnastack/wes/api/WesV1Controller.java | 22 +++- .../wes/cromwell/CromwellService.java | 123 ++++++++++++++---- .../wes/shared/FileDeletionException.java | 17 +++ .../wes/storage/AzureBlobStorageClient.java | 11 +- .../wes/storage/BlobStorageClient.java | 2 + .../wes/storage/GcpBlobStorageClient.java | 10 +- .../wes/storage/LocalBlobStorageClient.java | 9 +- workflow_execution_service.swagger.yaml | 53 ++++++++ 8 files changed, 212 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/dnastack/wes/shared/FileDeletionException.java diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index c343cea..6b35f45 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -105,7 +105,9 @@ public RunListResponse getRuns( @AuditActionUri("wes:run:read") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/'+#runId, 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{run_id}", produces = { MediaType.APPLICATION_JSON_VALUE }) - public RunLog getRun(@PathVariable("run_id") String runId) { return adapter.getRun(runId); } + public RunLog getRun(@PathVariable("run_id") String runId) { + return adapter.getRun(runId); + } @AuditActionUri("wes:run:status") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") @@ -117,12 +119,26 @@ public RunStatus getRunStatus(@PathVariable("run_id") String runId) { @AuditActionUri("wes:run:cancel") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:cancel', 'wes')") @PostMapping(path = "/runs/{runId}/cancel", produces = MediaType.APPLICATION_JSON_VALUE) - public RunId cancelRun(@PathVariable("runId") String runId) { return adapter.cancel(runId); } + public RunId cancelRun(@PathVariable("runId") String runId) { + return adapter.cancel(runId); + } @AuditActionUri("wes:run:files") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) - public RunFiles getRunFiles(@PathVariable("run_id") String runId) { return adapter.getRunFiles(runId); } + public RunFiles getRunFiles(@PathVariable("run_id") String runId) { + return adapter.getRunFiles(runId); + } + + @AuditActionUri("wes:run:files") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:delete', 'wes')") + @DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) + public RunId deleteRunFiles( + @PathVariable("run_id") String runId, + @RequestParam(value = "async", required = false) boolean async + ) { + return adapter.deleteRunFiles(runId, async); + } @AuditActionUri("wes:run:stderr") @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')") diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 21ed853..74436a2 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -3,6 +3,7 @@ import com.dnastack.wes.AppConfig; import com.dnastack.wes.api.*; import com.dnastack.wes.security.AuthenticatedUser; +import com.dnastack.wes.shared.FileDeletionException; import com.dnastack.wes.shared.InvalidRequestException; import com.dnastack.wes.shared.NotFoundException; import com.dnastack.wes.storage.BlobStorageClient; @@ -31,6 +32,10 @@ import java.nio.file.Paths; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -225,14 +230,17 @@ public RunId cancel(String runId) { * * @return a list of generated files for the run */ - public RunFiles getRunFiles(String runId) { + public RunFiles getRunFiles(String runId) throws NotFoundException { CromwellMetadataResponse metadataResponse = getMetadata(runId); Set finalFileSet = new HashSet<>(); Set secondaryFileSet = new HashSet<>(); Set logFileSet = new HashSet<>(); RunFiles runFiles = new RunFiles(); - metadataResponse.getOutputs().values().forEach(output -> extractFilesFromValue(finalFileSet, output)); + Map outputs = metadataResponse.getOutputs(); + if (!outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output)); + } extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse); finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path))); @@ -246,7 +254,64 @@ public RunFiles getRunFiles(String runId) { runFiles.addRunFile(new RunFile(RunFile.type.LOG, path)); } }); - return runFiles; + + if (runFiles.getFiles() != null) { + return runFiles; + } else { + throw new NotFoundException("No files were found for the runId: " + runId); + } + } + + /** + * Request to delete the files associated with the run. + * + * @param runId The cromwell id + * + * @return the cromwell id + */ + public RunId deleteRunFiles(String runId, boolean async) { + List files = getRunFiles(runId).getFiles(); + files = files.stream().filter(runFile -> runFile.getType() == RunFile.type.SECONDARY).toList(); + if (!files.isEmpty()) { + if (async) { + // Asynchronous deletion of files + ExecutorService executor = Executors.newFixedThreadPool(files.size()); + List> deletionTasks = new ArrayList<>(); + files.forEach(file -> { + Future submit = executor.submit(() -> { + try { + storageClient.deleteFile(file.getPath()); + } catch (IOException e) { + throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); + } + }); + deletionTasks.add(submit); + }); + // Block until all deletion tasks have completed + deletionTasks.forEach(future -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + Throwable cause = e.getCause(); + throw new FileDeletionException(cause.getMessage(), cause.getCause()); + } + }); + executor.shutdown(); + } else { + // Synchronous deletion of files + files.forEach(file -> { + try { + storageClient.deleteFile(file.getPath()); + } catch (IOException e) { + throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); + } + }); + } + } else { + throw new NotFoundException("No deletable files were found for the runId: " + runId); + } + return RunId.builder().runId(runId).build(); } public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { @@ -480,28 +545,42 @@ private JsonNode extractJsonNode(String value) throws IOException { } private void extractSecondaryAndLogFilesFromCalls(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse) { - metadataResponse.getCalls().values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> { - call.getOutputs().values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - String stderr = call.getStderr(); - String stdout = call.getStdout(); - if (storageClient.isFile(stderr)) { - logFileSet.add(stderr); - } - if (storageClient.isFile(stdout)) { - logFileSet.add(stdout); - } - call.getBackendLogs().values().forEach(log -> extractFilesFromValue(logFileSet, log)); - CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); - if (subWorkflowMetadata != null) { - subWorkflowMetadata.getOutputs().values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata); - } - })); + Map> calls = metadataResponse.getCalls(); + if (calls != null && !calls.isEmpty()) { + calls.values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> { + Map outputs = call.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + String stderr = call.getStderr(); + String stdout = call.getStdout(); + if (storageClient.isFile(stderr)) { + logFileSet.add(stderr); + } + if (storageClient.isFile(stdout)) { + logFileSet.add(stdout); + } + Map backendLogs = call.getBackendLogs(); + if (backendLogs != null && !backendLogs.isEmpty()) { + backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); + } + CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); + if (subWorkflowMetadata != null) { + Map subOutputs = subWorkflowMetadata.getOutputs(); + if (subOutputs != null && !subOutputs.isEmpty()) { + subOutputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + } + extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata); + } + } + })); + } } private void extractFilesFromValue(Set fileSet, Object output) { - if (output instanceof String && storageClient.isFile(output.toString())) { - fileSet.add(output.toString()); + if (output instanceof String) { + if (storageClient.isFile(output.toString())) { + fileSet.add(output.toString()); + } } else if (output instanceof Object[]) { extractFiles(fileSet, (List) output); diff --git a/src/main/java/com/dnastack/wes/shared/FileDeletionException.java b/src/main/java/com/dnastack/wes/shared/FileDeletionException.java new file mode 100644 index 0000000..fcb1a7f --- /dev/null +++ b/src/main/java/com/dnastack/wes/shared/FileDeletionException.java @@ -0,0 +1,17 @@ +package com.dnastack.wes.shared; + +public class FileDeletionException extends RuntimeException { + + public FileDeletionException() { + super(); + } + + public FileDeletionException(String message) { + super(message); + } + + public FileDeletionException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index aeaa930..d9f4cbb 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -20,7 +20,6 @@ public class AzureBlobStorageClient implements BlobStorageClient { - private final BlobServiceClient client; private final long signedUrlTtl; private final String container; @@ -49,7 +48,6 @@ public AzureBlobStorageClient(AzureBlobStorageClientConfig config) { stagingPath = config.getStagingPath(); } - @Override public URL getSignedUrl(String blobUri) { BlobUrlParts parts = BlobUrlParts.parse(blobUri); @@ -99,7 +97,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR containerName = container; } - BlobContainerClient containerClient = client.getBlobContainerClient(containerName); BlobClient blobClient = containerClient.getBlobClient(blobName); @@ -124,7 +121,13 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())); + BlobClient blobClient = client.getBlobContainerClient(container).getBlobClient(filePath); + return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())) && blobClient.exists(); + } + + @Override + public void deleteFile(String filePath) { + client.getBlobContainerClient(container).getBlobClient(filePath).delete(); } } diff --git a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java index 6fea554..2ad51aa 100644 --- a/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/BlobStorageClient.java @@ -22,4 +22,6 @@ default void getBytes(OutputStream outputStream, String blobUri) throws IOExcept boolean isFile(String filePath); + void deleteFile(String filePath) throws IOException; + } diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index ae113d3..f37d608 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -52,7 +52,6 @@ public GcpBlobStorageClient(GcpBlobStorageConfig config) throws IOException { project = config.getProject(); } - @Override public URL getSignedUrl(String blobUri) { BlobId blobId = GcpStorageUtils.blobIdFromGsUrl(blobUri); @@ -95,7 +94,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR throw new FileNotFoundException("Could not open open file: " + blobUri + " it does not appear to exist"); } - long rangeStart = 0L; long rangeEnd = blob.getSize(); @@ -117,7 +115,13 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - return filePath.startsWith("gs://"); + Blob blob = client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)); + return filePath.startsWith("gs://") && blob != null && blob.exists(); + } + + @Override + public void deleteFile(String filePath) { + client.delete(GcpStorageUtils.blobIdFromGsUrl(filePath)); } } diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index bbfcc19..7524165 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -35,7 +35,6 @@ public LocalBlobStorageClient(LocalBlobStorageClientConfig config) throws IOExce } - public String getStagingPath() { return stagingPath; } @@ -86,7 +85,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR rangeStart = httpRange.getRangeStart(fileToRead.length()); } - try (FileChannel channel = new RandomAccessFile(fileToRead, "r").getChannel()) { channel.position(rangeStart); try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)){ @@ -97,7 +95,12 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - return filePath.startsWith("/"); + return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + } + + @Override + public void deleteFile(String filePath) throws IOException { + Files.delete(Path.of(filePath)); } } diff --git a/workflow_execution_service.swagger.yaml b/workflow_execution_service.swagger.yaml index e3105bd..b8244e9 100644 --- a/workflow_execution_service.swagger.yaml +++ b/workflow_execution_service.swagger.yaml @@ -424,6 +424,59 @@ paths: format: int64 tags: - WorkflowExecutionService + delete: + summary: Cleanup files associated with a workflow run. + description: >- + This endpoint cleans up the generated files associated with a workflow run. + + + The `async` query parameter is by default false, blocking the request until all + files are cleaned up. When true the delete operation will be done asynchronously + with faster execution but higher risk of not doing a "clean" cleanup. + x-swagger-router-controller: ga4gh.wes.server + operationId: GetRunFiles + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunId' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run not found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + - name: orgId + in: path + required: true + type: integer + format: int64 + - name: projectId + in: path + required: true + type: integer + format: int64 + - name: async + in: query + type: boolean + default: false + tags: + - WorkflowExecutionService definitions: DefaultWorkflowEngineParameter: type: object From 18b45a16c9fe0bb872ca82a6d9ca42785bf9781e Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Thu, 19 Oct 2023 18:07:11 -0400 Subject: [PATCH 3/8] Address PR feedback --- .../java/com/dnastack/wes/api/RunFile.java | 7 +- .../com/dnastack/wes/api/RunFileDeletion.java | 13 ++ .../dnastack/wes/api/RunFileDeletions.java | 5 + .../java/com/dnastack/wes/api/RunFiles.java | 25 +-- .../com/dnastack/wes/api/WesV1Controller.java | 10 +- .../com/dnastack/wes/config/AsyncConfig.java | 31 +++ .../wes/cromwell/CromwellService.java | 180 ++++++++---------- .../wes/shared/FileDeletionException.java | 17 -- .../wes/storage/AzureBlobStorageClient.java | 7 +- .../wes/storage/GcpBlobStorageClient.java | 7 +- .../wes/storage/LocalBlobStorageClient.java | 8 +- 11 files changed, 155 insertions(+), 155 deletions(-) create mode 100644 src/main/java/com/dnastack/wes/api/RunFileDeletion.java create mode 100644 src/main/java/com/dnastack/wes/api/RunFileDeletions.java create mode 100644 src/main/java/com/dnastack/wes/config/AsyncConfig.java delete mode 100644 src/main/java/com/dnastack/wes/shared/FileDeletionException.java diff --git a/src/main/java/com/dnastack/wes/api/RunFile.java b/src/main/java/com/dnastack/wes/api/RunFile.java index b09c0a1..e9cf622 100644 --- a/src/main/java/com/dnastack/wes/api/RunFile.java +++ b/src/main/java/com/dnastack/wes/api/RunFile.java @@ -12,13 +12,12 @@ @Builder public class RunFile { - @JsonProperty(value = "type") - Enum type; + @JsonProperty(value = "file_type") + FileType fileType; - @JsonProperty(value = "path") String path; - public enum type { + public enum FileType { FINAL, SECONDARY, LOG diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletion.java b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java new file mode 100644 index 0000000..910a733 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java @@ -0,0 +1,13 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonUnwrapped; + +public record RunFileDeletion(@JsonUnwrapped RunFile runFile, DeletionState state, @JsonUnwrapped ErrorResponse errorResponse) { + + public enum DeletionState { + DELETED, + ASYNC, + FAILED + } + +} diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletions.java b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java new file mode 100644 index 0000000..0426752 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java @@ -0,0 +1,5 @@ +package com.dnastack.wes.api; + +import java.util.List; + +public record RunFileDeletions(List deletions) {} diff --git a/src/main/java/com/dnastack/wes/api/RunFiles.java b/src/main/java/com/dnastack/wes/api/RunFiles.java index 71b62f8..0835760 100644 --- a/src/main/java/com/dnastack/wes/api/RunFiles.java +++ b/src/main/java/com/dnastack/wes/api/RunFiles.java @@ -1,28 +1,5 @@ package com.dnastack.wes.api; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.*; - -import java.util.ArrayList; import java.util.List; -@Getter -@Setter -@AllArgsConstructor -@EqualsAndHashCode -@NoArgsConstructor -@ToString -@Builder -public class RunFiles { - - @JsonProperty("files") - List files; - - public void addRunFile(RunFile runFile) { - if (files == null) { - files = new ArrayList<>(); - } - files.add(runFile); - } - -} +public record RunFiles(List runFiles) {} diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index 6b35f45..541da8e 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -123,17 +123,17 @@ public RunId cancelRun(@PathVariable("runId") String runId) { return adapter.cancel(runId); } - @AuditActionUri("wes:run:files") - @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") + @AuditActionUri("wes:run:files:list") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) public RunFiles getRunFiles(@PathVariable("run_id") String runId) { return adapter.getRunFiles(runId); } - @AuditActionUri("wes:run:files") - @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:delete', 'wes')") + @AuditActionUri("wes:run:files:delete") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:write', 'wes')") @DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) - public RunId deleteRunFiles( + public RunFileDeletions deleteRunFiles( @PathVariable("run_id") String runId, @RequestParam(value = "async", required = false) boolean async ) { diff --git a/src/main/java/com/dnastack/wes/config/AsyncConfig.java b/src/main/java/com/dnastack/wes/config/AsyncConfig.java new file mode 100644 index 0000000..c5b5453 --- /dev/null +++ b/src/main/java/com/dnastack/wes/config/AsyncConfig.java @@ -0,0 +1,31 @@ +package com.dnastack.wes.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Slf4j +@Configuration +public class AsyncConfig { + + @Bean + public ThreadPoolTaskExecutor defaultAsyncOperationExecutor( + @Value("${app.executors.default.core-pool-size:8}") int corePoolSize, + @Value("${app.executors.default.max-pool-size:16}") int maxPoolSize, + @Value("${app.executors.default.queue-capacity:5000}") int queueCapacity + ) { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setThreadNamePrefix("defaultAsyncOp-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 74436a2..ac98414 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -3,7 +3,6 @@ import com.dnastack.wes.AppConfig; import com.dnastack.wes.api.*; import com.dnastack.wes.security.AuthenticatedUser; -import com.dnastack.wes.shared.FileDeletionException; import com.dnastack.wes.shared.InvalidRequestException; import com.dnastack.wes.shared.NotFoundException; import com.dnastack.wes.storage.BlobStorageClient; @@ -14,13 +13,16 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpRange; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -32,10 +34,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -56,6 +55,7 @@ public class CromwellService { private final PathTranslatorFactory pathTranslatorFactory; private final CromwellWesMapper cromwellWesMapper; private final CromwellConfig cromwellConfig; + private final ThreadPoolTaskExecutor executor; private final AppConfig appConfig; @@ -66,7 +66,8 @@ public class CromwellService { PathTranslatorFactory pathTranslatorFactory, CromwellWesMapper cromwellWesMapper, AppConfig appConfig, - CromwellConfig config + CromwellConfig config, + @Qualifier("defaultAsyncOperationExecutor") ThreadPoolTaskExecutor executor ) { this.client = cromwellClient; this.pathTranslatorFactory = pathTranslatorFactory; @@ -74,6 +75,7 @@ public class CromwellService { this.cromwellWesMapper = cromwellWesMapper; this.appConfig = appConfig; this.cromwellConfig = config; + this.executor = executor; } @@ -235,31 +237,26 @@ public RunFiles getRunFiles(String runId) throws NotFoundException { Set finalFileSet = new HashSet<>(); Set secondaryFileSet = new HashSet<>(); Set logFileSet = new HashSet<>(); - RunFiles runFiles = new RunFiles(); + List files = new ArrayList<>(); Map outputs = metadataResponse.getOutputs(); if (!outputs.isEmpty()) { outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output)); } - extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse); + extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse); - finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path))); + finalFileSet.forEach(path -> files.add(new RunFile(RunFile.FileType.FINAL, path))); secondaryFileSet.forEach(path -> { - if (!finalFileSet.contains(path)) { - runFiles.addRunFile(new RunFile(RunFile.type.SECONDARY, path)); + if (!finalFileSet.contains(path) && !logFileSet.contains(path)) { + files.add(new RunFile(RunFile.FileType.SECONDARY, path)); } }); logFileSet.forEach(path -> { if (!finalFileSet.contains(path)) { - runFiles.addRunFile(new RunFile(RunFile.type.LOG, path)); + files.add(new RunFile(RunFile.FileType.LOG, path)); } }); - - if (runFiles.getFiles() != null) { - return runFiles; - } else { - throw new NotFoundException("No files were found for the runId: " + runId); - } + return new RunFiles(files); } /** @@ -269,49 +266,31 @@ public RunFiles getRunFiles(String runId) throws NotFoundException { * * @return the cromwell id */ - public RunId deleteRunFiles(String runId, boolean async) { - List files = getRunFiles(runId).getFiles(); - files = files.stream().filter(runFile -> runFile.getType() == RunFile.type.SECONDARY).toList(); - if (!files.isEmpty()) { - if (async) { - // Asynchronous deletion of files - ExecutorService executor = Executors.newFixedThreadPool(files.size()); - List> deletionTasks = new ArrayList<>(); - files.forEach(file -> { - Future submit = executor.submit(() -> { - try { - storageClient.deleteFile(file.getPath()); - } catch (IOException e) { - throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); - } - }); - deletionTasks.add(submit); - }); - // Block until all deletion tasks have completed - deletionTasks.forEach(future -> { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - Thread.currentThread().interrupt(); - Throwable cause = e.getCause(); - throw new FileDeletionException(cause.getMessage(), cause.getCause()); - } - }); - executor.shutdown(); - } else { - // Synchronous deletion of files - files.forEach(file -> { - try { - storageClient.deleteFile(file.getPath()); - } catch (IOException e) { - throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); - } - }); - } - } else { - throw new NotFoundException("No deletable files were found for the runId: " + runId); + public RunFileDeletions deleteRunFiles(String runId, boolean async) { + List files = getRunFiles(runId).runFiles(); + List outcomes = files.stream().filter(runFile -> RunFile.FileType.SECONDARY.equals(runFile.getFileType())) + .map(runFile -> { + if (async) { + return deleteRunFileAsync(runFile); + } else { + return deleteRunFile(runFile); + } + }).toList(); + return new RunFileDeletions(outcomes); + } + + public RunFileDeletion deleteRunFileAsync(RunFile runFile) { + CompletableFuture.runAsync(() -> deleteRunFile(runFile), executor); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null); + } + + public RunFileDeletion deleteRunFile(RunFile runFile) { + try { + storageClient.deleteFile(runFile.getPath()); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null); + } catch (IOException e) { + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build()); } - return RunId.builder().runId(runId).build(); } public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { @@ -544,58 +523,61 @@ private JsonNode extractJsonNode(String value) throws IOException { } } - private void extractSecondaryAndLogFilesFromCalls(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse) { + private void extractSecondaryLogFiles(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse){ + Map outputs = metadataResponse.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + } Map> calls = metadataResponse.getCalls(); if (calls != null && !calls.isEmpty()) { - calls.values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> { - Map outputs = call.getOutputs(); - if (outputs != null && !outputs.isEmpty()) { - outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - String stderr = call.getStderr(); - String stdout = call.getStdout(); - if (storageClient.isFile(stderr)) { - logFileSet.add(stderr); - } - if (storageClient.isFile(stdout)) { - logFileSet.add(stdout); - } - Map backendLogs = call.getBackendLogs(); - if (backendLogs != null && !backendLogs.isEmpty()) { - backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); - } - CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); - if (subWorkflowMetadata != null) { - Map subOutputs = subWorkflowMetadata.getOutputs(); - if (subOutputs != null && !subOutputs.isEmpty()) { - subOutputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - } - extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata); - } - } - })); + calls.values().stream().flatMap(List::stream).forEach(call -> extractSecondaryLogFilesFromCall(secondaryFileSet, logFileSet, call)); } } - private void extractFilesFromValue(Set fileSet, Object output) { - if (output instanceof String) { - if (storageClient.isFile(output.toString())) { - fileSet.add(output.toString()); - } + private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set logFileSet, CromwellTaskCall call){ + Map outputs = call.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); } - else if (output instanceof Object[]) { - extractFiles(fileSet, (List) output); + String stderr = call.getStderr(); + String stdout = call.getStdout(); + if (stderr != null && storageClient.isFile(stderr)) { + logFileSet.add(stderr); } - else { - extractFiles(fileSet, (Map) output); + if (stdout != null && storageClient.isFile(stdout)) { + logFileSet.add(stdout); + } + Map backendLogs = call.getBackendLogs(); + if (backendLogs != null && !backendLogs.isEmpty()) { + backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); + } + CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); + if (subWorkflowMetadata != null) { + extractSecondaryLogFiles(secondaryFileSet,logFileSet,subWorkflowMetadata); + } + } + + private void extractFilesFromValue(Set fileSet, Object output) { + JsonNode node = mapper.valueToTree(output); + if (node.isTextual()) { + if (storageClient.isFile(node.asText())) { + fileSet.add(node.asText()); + } + } else if (node.isArray()) { + ArrayNode arrayOutput = mapper.valueToTree(output); + extractFilesFromArrayNode(fileSet, arrayOutput); + } else if (node.isObject()) { + ObjectNode objectOutput = mapper.valueToTree(output); + extractFilesFromObjectNode(fileSet, objectOutput); } } - private void extractFiles(Set fileSet, List outputs) { + private void extractFilesFromArrayNode(Set fileSet, ArrayNode outputs) { outputs.forEach(output -> extractFilesFromValue(fileSet, output)); } - private void extractFiles(Set fileSet, Map outputs) { - outputs.values().forEach(output -> extractFilesFromValue(fileSet, output)); + private void extractFilesFromObjectNode(Set fileSet, ObjectNode outputs) { + outputs.forEach(output -> extractFilesFromValue(fileSet, output)); } private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException { diff --git a/src/main/java/com/dnastack/wes/shared/FileDeletionException.java b/src/main/java/com/dnastack/wes/shared/FileDeletionException.java deleted file mode 100644 index fcb1a7f..0000000 --- a/src/main/java/com/dnastack/wes/shared/FileDeletionException.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.dnastack.wes.shared; - -public class FileDeletionException extends RuntimeException { - - public FileDeletionException() { - super(); - } - - public FileDeletionException(String message) { - super(message); - } - - public FileDeletionException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index d9f4cbb..b322d9d 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -121,8 +121,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - BlobClient blobClient = client.getBlobContainerClient(container).getBlobClient(filePath); - return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())) && blobClient.exists(); + try { + return client.getBlobContainerClient(container).getBlobClient(filePath).exists(); + } catch (IllegalArgumentException e) { + return false; + } } @Override diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index f37d608..3658f3b 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -115,8 +115,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - Blob blob = client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)); - return filePath.startsWith("gs://") && blob != null && blob.exists(); + try { + return client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)).exists(); + } catch (IllegalArgumentException e) { + return false; + } } @Override diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index 7524165..2a1ae6e 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -87,7 +87,7 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR try (FileChannel channel = new RandomAccessFile(fileToRead, "r").getChannel()) { channel.position(rangeStart); - try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)){ + try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)) { inputStream.transferTo(outputStream); } } @@ -95,7 +95,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + try { + return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + } catch (IllegalArgumentException e) { + return false; + } } @Override From dc8436ad3e04cdd878edf819c3614262a3b915e9 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Fri, 20 Oct 2023 17:25:45 -0400 Subject: [PATCH 4/8] Address PR feedback --- .../com/dnastack/wes/config/AsyncConfig.java | 3 +- .../wes/cromwell/CromwellService.java | 32 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/dnastack/wes/config/AsyncConfig.java b/src/main/java/com/dnastack/wes/config/AsyncConfig.java index c5b5453..c6d9d2b 100644 --- a/src/main/java/com/dnastack/wes/config/AsyncConfig.java +++ b/src/main/java/com/dnastack/wes/config/AsyncConfig.java @@ -4,6 +4,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @@ -13,7 +14,7 @@ public class AsyncConfig { @Bean - public ThreadPoolTaskExecutor defaultAsyncOperationExecutor( + public TaskExecutor defaultAsyncOperationExecutor( @Value("${app.executors.default.core-pool-size:8}") int corePoolSize, @Value("${app.executors.default.max-pool-size:16}") int maxPoolSize, @Value("${app.executors.default.queue-capacity:5000}") int queueCapacity diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index ac98414..7d6a935 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -20,9 +20,8 @@ import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpRange; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -55,7 +54,7 @@ public class CromwellService { private final PathTranslatorFactory pathTranslatorFactory; private final CromwellWesMapper cromwellWesMapper; private final CromwellConfig cromwellConfig; - private final ThreadPoolTaskExecutor executor; + private final TaskExecutor defaultAsyncOperationExecutor; private final AppConfig appConfig; @@ -67,7 +66,7 @@ public class CromwellService { CromwellWesMapper cromwellWesMapper, AppConfig appConfig, CromwellConfig config, - @Qualifier("defaultAsyncOperationExecutor") ThreadPoolTaskExecutor executor + TaskExecutor defaultAsyncOperationExecutor ) { this.client = cromwellClient; this.pathTranslatorFactory = pathTranslatorFactory; @@ -75,7 +74,7 @@ public class CromwellService { this.cromwellWesMapper = cromwellWesMapper; this.appConfig = appConfig; this.cromwellConfig = config; - this.executor = executor; + this.defaultAsyncOperationExecutor = defaultAsyncOperationExecutor; } @@ -240,8 +239,8 @@ public RunFiles getRunFiles(String runId) throws NotFoundException { List files = new ArrayList<>(); Map outputs = metadataResponse.getOutputs(); - if (!outputs.isEmpty()) { - outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output)); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, mapper.valueToTree(output))); } extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse); @@ -280,15 +279,17 @@ public RunFileDeletions deleteRunFiles(String runId, boolean async) { } public RunFileDeletion deleteRunFileAsync(RunFile runFile) { - CompletableFuture.runAsync(() -> deleteRunFile(runFile), executor); + CompletableFuture.runAsync(() -> deleteRunFile(runFile), defaultAsyncOperationExecutor); return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null); } public RunFileDeletion deleteRunFile(RunFile runFile) { try { storageClient.deleteFile(runFile.getPath()); + log.info("Deleting file '{}'", runFile.getPath()); return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null); } catch (IOException e) { + log.error("Encountered exception while deleting file '%s': '%s'".formatted(runFile.getPath(), e.getMessage()), e); return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build()); } } @@ -526,7 +527,7 @@ private JsonNode extractJsonNode(String value) throws IOException { private void extractSecondaryLogFiles(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse){ Map outputs = metadataResponse.getOutputs(); if (outputs != null && !outputs.isEmpty()) { - outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output))); } Map> calls = metadataResponse.getCalls(); if (calls != null && !calls.isEmpty()) { @@ -537,7 +538,7 @@ private void extractSecondaryLogFiles(Set secondaryFileSet, Set private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set logFileSet, CromwellTaskCall call){ Map outputs = call.getOutputs(); if (outputs != null && !outputs.isEmpty()) { - outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output))); } String stderr = call.getStderr(); String stdout = call.getStdout(); @@ -549,7 +550,7 @@ private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set< } Map backendLogs = call.getBackendLogs(); if (backendLogs != null && !backendLogs.isEmpty()) { - backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); + backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, mapper.valueToTree(log))); } CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); if (subWorkflowMetadata != null) { @@ -557,18 +558,15 @@ private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set< } } - private void extractFilesFromValue(Set fileSet, Object output) { - JsonNode node = mapper.valueToTree(output); + private void extractFilesFromValue(Set fileSet, JsonNode node) { if (node.isTextual()) { if (storageClient.isFile(node.asText())) { fileSet.add(node.asText()); } } else if (node.isArray()) { - ArrayNode arrayOutput = mapper.valueToTree(output); - extractFilesFromArrayNode(fileSet, arrayOutput); + extractFilesFromArrayNode(fileSet, (ArrayNode) node); } else if (node.isObject()) { - ObjectNode objectOutput = mapper.valueToTree(output); - extractFilesFromObjectNode(fileSet, objectOutput); + extractFilesFromObjectNode(fileSet, (ObjectNode) node); } } From 6d27bac503c11196db1cc3927caab5affb52fcd7 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Fri, 20 Oct 2023 17:26:12 -0400 Subject: [PATCH 5/8] Added e2e tests --- .../com/dnastack/wes/service/WesE2ETest.java | 152 +++++++++++++++++- .../dnastack/wes/service/wdl/WdlSupplier.java | 1 + .../wdl/workflow_with_all_output_types.wdl | 29 ++++ .../local/LocalBlobStorageClientTest.java | 55 +++++-- 4 files changed, 222 insertions(+), 15 deletions(-) create mode 100644 e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java index c7fbe97..e183fcd 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java @@ -7,7 +7,6 @@ import com.google.auth.oauth2.GoogleCredentials; import io.restassured.builder.MultiPartSpecBuilder; import io.restassured.http.ContentType; -import io.restassured.http.Header; import io.restassured.specification.MultiPartSpecification; import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.*; @@ -20,7 +19,6 @@ import java.util.*; import static io.restassured.RestAssured.given; -import static java.lang.String.format; import static org.awaitility.Awaitility.with; import static org.hamcrest.Matchers.*; @@ -433,6 +431,7 @@ public void uploadWorkflowAttachmentWithRunSubmission() throws Exception { public class RunMethodsWithExistingJobs { String workflowJobId; + String workflowJobIdWithAllOutputTypes; @BeforeAll public void setup() throws InterruptedException { @@ -452,6 +451,22 @@ public void setup() throws InterruptedException { .multiPart(getJsonMultipart("tags", tags)) .multiPart(getJsonMultipart("workflow_params", inputs)) .post(path) + .then() + .assertThat() + .statusCode(200) + .body("run_id",is(notNullValue())) + .extract() + .jsonPath() + .getString("run_id"); + + workflowJobIdWithAllOutputTypes = given() + .log().uri() + .log().method() + .header(getHeader(getResource(path))) + .multiPart(getWorkflowUrlMultipart("echo.wdl")) + .multiPart(getMultipartAttachment("echo.wdl",supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) + .multiPart(getJsonMultipart("workflow_params", inputs)) + .post(path) .then() .assertThat() .statusCode(200) @@ -569,6 +584,139 @@ public void listRunsReturnsReturnsNonEmptyCollection() { //@formatter:on } + + @Test + @DisplayName("Get Run Files for existing run returns all files") + public void getRunFilesReturnsNonEmptyCollection() throws Exception { + String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; + pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(200) + .body("runFiles.size()", greaterThan(0)) + .body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'SECONDARY', 'LOG'] }", equalTo(true)); + //@formatter:on + } + + + @Test + @DisplayName("Get Run Files for non-existent run fails with status 401 or 404") + public void getRunFilesForNonExistentRunShouldFail() { + String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID(); + String path = resourcePath + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(resourcePath))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(anyOf(equalTo(404), equalTo(401))); + //@formatter:on + } + + + @Test + @DisplayName("Delete Run Files for existing run returns all deleted files") + public void deleteRunFilesReturnsNonEmptyCollection() throws Exception { + reRunWorkflowWithAllOutputTypes(); + String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; + pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .accept(ContentType.JSON) + .delete(path) + .then() + .assertThat() + .statusCode(200) + .body("deletions.size()", greaterThan(0)) + .body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'DELETED' }", equalTo(true)); + //@formatter:on + } + + + @Test + @DisplayName("Delete Run Files for existing run asynchronously returns all deleted files") + public void deleteRunFilesAsyncReturnsNonEmptyCollection() throws Exception { + reRunWorkflowWithAllOutputTypes(); + String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; + pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .accept(ContentType.JSON) + .queryParam("async", true) + .delete(path) + .then() + .assertThat() + .statusCode(200) + .body("deletions.size()", greaterThan(0)) + .body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'ASYNC' }", equalTo(true)); + //@formatter:on + } + + + @Test + @DisplayName("Delete Run Files for non-existent run fails with status 401 or 404") + public void deleteRunFilesForNonExistentRunShouldFail() { + String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID(); + String path = resourcePath + "/files"; + + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(resourcePath))) + .accept(ContentType.JSON) + .delete(path) + .then() + .assertThat() + .statusCode(anyOf(equalTo(404),equalTo(401))); + //@formatter:on + } + + + private void reRunWorkflowWithAllOutputTypes() { + String path = getRootPath() + "/runs"; + Map inputs = Collections.singletonMap("hello_world.name", "Some sort of String"); + + //@formatter:off + workflowJobIdWithAllOutputTypes = given() + .log().uri() + .log().method() + .header(getHeader(getResource(path))) + .multiPart(getWorkflowUrlMultipart("echo.wdl")) + .multiPart(getMultipartAttachment("echo.wdl",supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) + .multiPart(getJsonMultipart("workflow_params", inputs)) + .post(path) + .then() + .assertThat() + .statusCode(200) + .body("run_id",is(notNullValue())) + .extract() + .jsonPath() + .getString("run_id"); + //@formatter:on + } + } } diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java b/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java index 3cb63c9..8f6ddf0 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/wdl/WdlSupplier.java @@ -10,6 +10,7 @@ public class WdlSupplier { public static final String WORKFLOW_WITHOUT_FILE = "workflow_without_file.wdl"; + public static final String WORKFLOW_WITH_ALL_OUTPUT_TYPES = "workflow_with_all_output_types.wdl"; public static final String WORKFLOW_WITH_IMPORTS_1 = "workflow_with_imports_1.wdl"; public static final String WORKFLOW_WITH_IMPORTS_2 = "workflow_with_imports_2.wdl"; public static final String WORKFLOW_WITH_IMPORTS_INPUTS = "workflow_with_imports.json"; diff --git a/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl new file mode 100644 index 0000000..c117df4 --- /dev/null +++ b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl @@ -0,0 +1,29 @@ +task echo { + String name + command { + echo "Hello ${name}" + >&2 echo "Goodbye ${name}" + echo "Bye" > "test.txt" + } + + runtime { + docker: "ubuntu" + } + + output { + File out = stdout() + File out2 = "test.txt" + Array[File] arrayOut = [out, out2] + } +} + +workflow hello_world { + String name + call echo { + input: + name = name + } + output { + File out = echo.out + } +} \ No newline at end of file diff --git a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java index fb3017b..80f832f 100644 --- a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java +++ b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java @@ -15,6 +15,9 @@ class LocalBlobStorageClientTest { + private final String toWrite = "Test String"; + private final String directory = "test"; + private final String fileName = "test.txt"; @Test public void testCreateStorageClient_noStagingPath() throws IOException { @@ -36,10 +39,6 @@ public void testCreateStorageClient_withStagingPath() throws IOException { @Test public void testWritingBytesToFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); storageClient.writeBytes(new ByteArrayInputStream(toWrite.getBytes()), toWrite.length(), directory, fileName); @@ -51,9 +50,6 @@ public void testWritingBytesToFile() throws IOException { @Test public void testWritingBytesToFile_existingFileThrowsError() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); Files.createDirectory(targetPath.getParent()); Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); @@ -66,9 +62,6 @@ public void testWritingBytesToFile_existingFileThrowsError() throws IOException @Test public void testReadingFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); Files.createDirectory(targetPath.getParent()); Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); @@ -83,9 +76,6 @@ public void testReadingFile() throws IOException { @Test public void testReadingFile_withTruncation() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - String toWrite = "Test String"; - String directory = "test"; - String fileName = "test.txt"; Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); Files.createDirectory(targetPath.getParent()); Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); @@ -97,4 +87,43 @@ public void testReadingFile_withTruncation() throws IOException { Assertions.assertEquals(toWrite.substring(5), readValue); } + @Test + public void testIsFile() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + Files.createDirectory(targetPath.getParent()); + Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + + Assertions.assertTrue(storageClient.isFile(targetPath.toString())); + } + + @Test + public void testIsFile_noFileExists() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + + Assertions.assertFalse(storageClient.isFile(targetPath.toString())); + } + + @Test + public void testDeletingFile() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + Files.createDirectory(targetPath.getParent()); + Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + + Assertions.assertTrue(Files.exists(targetPath)); + storageClient.deleteFile(targetPath.toString()); + Assertions.assertFalse(Files.exists(targetPath)); + } + + @Test + public void testDeletingFile_throwsError() throws IOException { + LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); + Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + + Assertions.assertFalse(Files.exists(targetPath)); + Assertions.assertThrows(IOException.class, () -> storageClient.deleteFile(targetPath.toString())); + } + } \ No newline at end of file From f4f06429518ff8966fb1c0b17e87029eca95bb07 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Fri, 20 Oct 2023 17:44:06 -0400 Subject: [PATCH 6/8] Add extra file to wdl --- .../wes/service/wdl/workflow_with_all_output_types.wdl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl index c117df4..2ca9fe3 100644 --- a/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl +++ b/e2e-tests/src/main/resources/com/dnastack/wes/service/wdl/workflow_with_all_output_types.wdl @@ -4,6 +4,7 @@ task echo { echo "Hello ${name}" >&2 echo "Goodbye ${name}" echo "Bye" > "test.txt" + echo "Bye" > "test2.txt" } runtime { @@ -13,7 +14,7 @@ task echo { output { File out = stdout() File out2 = "test.txt" - Array[File] arrayOut = [out, out2] + Array[File] arrayOut = [out, out2, "test2.txt"] } } From 92869fc9a938bd5c0754ba4e85550d959d8a2c93 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Mon, 23 Oct 2023 10:57:36 -0400 Subject: [PATCH 7/8] Clean up wes swagger yaml --- .../wes/cromwell/CromwellService.java | 4 +- workflow_execution_service.swagger.yaml | 83 +++++++++++++------ 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 7d6a935..b065521 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -231,7 +231,7 @@ public RunId cancel(String runId) { * * @return a list of generated files for the run */ - public RunFiles getRunFiles(String runId) throws NotFoundException { + public RunFiles getRunFiles(String runId) { CromwellMetadataResponse metadataResponse = getMetadata(runId); Set finalFileSet = new HashSet<>(); Set secondaryFileSet = new HashSet<>(); @@ -554,7 +554,7 @@ private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set< } CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); if (subWorkflowMetadata != null) { - extractSecondaryLogFiles(secondaryFileSet,logFileSet,subWorkflowMetadata); + extractSecondaryLogFiles(secondaryFileSet, logFileSet, subWorkflowMetadata); } } diff --git a/workflow_execution_service.swagger.yaml b/workflow_execution_service.swagger.yaml index b8244e9..a2e297f 100644 --- a/workflow_execution_service.swagger.yaml +++ b/workflow_execution_service.swagger.yaml @@ -383,7 +383,7 @@ paths: description: >- This endpoint provides all the generated files from the associated storage client for a workflow run. The returned result has information about the - files of the given run and the type of the given file. + files of the given run, including the path and the type of the given file. x-swagger-router-controller: ga4gh.wes.server operationId: GetRunFiles responses: @@ -434,12 +434,12 @@ paths: files are cleaned up. When true the delete operation will be done asynchronously with faster execution but higher risk of not doing a "clean" cleanup. x-swagger-router-controller: ga4gh.wes.server - operationId: GetRunFiles + operationId: DeleteRunFiles responses: '200': description: '' schema: - $ref: '#/definitions/RunId' + $ref: '#/definitions/RunFileDeletions' '401': description: The request is unauthorized. schema: @@ -653,27 +653,21 @@ definitions: - CANCELING: The task was canceled by the user, and is in the process of stopping. - File: - type: object - properties: - type: - type: string - enum: - - FINAL - - SECONDARY - - LOG - default: FINAL - description: >- - - FINAL: The file exists directly as an output of the metadata response for the specified run. - - - SECONDARY: The file exists as a part of a task call output or a sub-workflow metadata - response for the specified run - - - LOG: The file exists as a part of the backend logs or stderr and stdout for the workflow or - sub-workflow metadata for the specified run. - path: - type: string - description: The path of the file. + FileType: + type: string + enum: + - FINAL + - SECONDARY + - LOG + default: FINAL + description: >- + - FINAL: The file exists directly as an output of the metadata response for the specified run. + + - SECONDARY: The file exists as a part of a task call output or a sub-workflow metadata + response for the specified run + + - LOG: The file exists as a part of the backend logs or stderr and stdout for the workflow or + sub-workflow metadata for the specified run. RunListResponse: type: object properties: @@ -782,7 +776,46 @@ definitions: files: type: array items: - $ref: '#/definitions/File' + $ref: '#/definitions/RunFile' + RunFile: + type: object + properties: + file_type: + $ref: '#/definitions/FileType' + path: + type: string + description: The path of the file. + RunFileDeletions: + type: object + required: + - run_id + properties: + deletions: + type: array + items: + $ref: '#/definitions/RunFileDeletion' + RunFileDeletion: + type: object + properties: + path: + type: string + description: The path of the file. + file_type: + $ref: '#/definitions/FileType' + state: + type: string + enum: + - DELETED + - ASYNC + - FAILED + description: >- + - DELETED: The file has been successfully deleted from storage. + + + - ASYNC: The file is being deleted asynchronously. + + + - FAILED: An error occurred during the process of deleting the file. WorkflowTypeVersion: type: object properties: From 15038c17bcde154e3a4ab46a919b9c4e5fea5152 Mon Sep 17 00:00:00 2001 From: Sean-DNAstack Date: Tue, 24 Oct 2023 14:13:49 -0400 Subject: [PATCH 8/8] Optimize & clean up e2e tests --- .../com/dnastack/wes/service/WesE2ETest.java | 83 ++++++++++--------- .../local/LocalBlobStorageClientTest.java | 27 +++--- 2 files changed, 58 insertions(+), 52 deletions(-) diff --git a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java index e183fcd..b0e9eeb 100644 --- a/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java +++ b/e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java @@ -8,15 +8,20 @@ import io.restassured.builder.MultiPartSpecBuilder; import io.restassured.http.ContentType; import io.restassured.specification.MultiPartSpecification; +import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.*; import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; +import java.util.stream.Stream; import static io.restassured.RestAssured.given; import static org.awaitility.Awaitility.with; @@ -431,7 +436,6 @@ public void uploadWorkflowAttachmentWithRunSubmission() throws Exception { public class RunMethodsWithExistingJobs { String workflowJobId; - String workflowJobIdWithAllOutputTypes; @BeforeAll public void setup() throws InterruptedException { @@ -451,22 +455,6 @@ public void setup() throws InterruptedException { .multiPart(getJsonMultipart("tags", tags)) .multiPart(getJsonMultipart("workflow_params", inputs)) .post(path) - .then() - .assertThat() - .statusCode(200) - .body("run_id",is(notNullValue())) - .extract() - .jsonPath() - .getString("run_id"); - - workflowJobIdWithAllOutputTypes = given() - .log().uri() - .log().method() - .header(getHeader(getResource(path))) - .multiPart(getWorkflowUrlMultipart("echo.wdl")) - .multiPart(getMultipartAttachment("echo.wdl",supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) - .multiPart(getJsonMultipart("workflow_params", inputs)) - .post(path) .then() .assertThat() .statusCode(200) @@ -585,17 +573,17 @@ public void listRunsReturnsReturnsNonEmptyCollection() { } - @Test + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") @DisplayName("Get Run Files for existing run returns all files") - public void getRunFilesReturnsNonEmptyCollection() throws Exception { - String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; - pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + public void getRunFilesReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; //@formatter:off given() .log().uri() .log().method() - .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) .accept(ContentType.JSON) .get(path) .then() @@ -627,18 +615,17 @@ public void getRunFilesForNonExistentRunShouldFail() { } - @Test + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") @DisplayName("Delete Run Files for existing run returns all deleted files") - public void deleteRunFilesReturnsNonEmptyCollection() throws Exception { - reRunWorkflowWithAllOutputTypes(); - String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; - pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + public void deleteRunFilesReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; //@formatter:off given() .log().uri() .log().method() - .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) .accept(ContentType.JSON) .delete(path) .then() @@ -650,18 +637,17 @@ public void deleteRunFilesReturnsNonEmptyCollection() throws Exception { } - @Test + @ParameterizedTest + @MethodSource("completeWorkflowWithFilesProvider") @DisplayName("Delete Run Files for existing run asynchronously returns all deleted files") - public void deleteRunFilesAsyncReturnsNonEmptyCollection() throws Exception { - reRunWorkflowWithAllOutputTypes(); - String path = getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes + "/files"; - pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + public void deleteRunFilesAsyncReturnsNonEmptyCollection(String runId) { + String path = getRootPath() + "/runs/" + runId + "/files"; //@formatter:off given() .log().uri() .log().method() - .header(getHeader(getResource(getRootPath() + "/runs/" + workflowJobIdWithAllOutputTypes))) + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) .accept(ContentType.JSON) .queryParam("async", true) .delete(path) @@ -671,6 +657,24 @@ public void deleteRunFilesAsyncReturnsNonEmptyCollection() throws Exception { .body("deletions.size()", greaterThan(0)) .body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'ASYNC' }", equalTo(true)); //@formatter:on + + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(5)) + .untilAsserted(() -> + //@formatter:off + given() + .log().uri() + .log().method() + .header(getHeader(getResource(getRootPath() + "/runs/" + runId))) + .accept(ContentType.JSON) + .get(path) + .then() + .assertThat() + .statusCode(200) + .body("runFiles.size()", greaterThan(0)) + .body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'LOG'] }", equalTo(true))); + //@formatter:on } @@ -694,27 +698,30 @@ public void deleteRunFilesForNonExistentRunShouldFail() { } - private void reRunWorkflowWithAllOutputTypes() { + private Stream completeWorkflowWithFilesProvider() throws Exception { String path = getRootPath() + "/runs"; Map inputs = Collections.singletonMap("hello_world.name", "Some sort of String"); //@formatter:off - workflowJobIdWithAllOutputTypes = given() + String workflowJobIdWithAllOutputTypes = given() .log().uri() .log().method() .header(getHeader(getResource(path))) .multiPart(getWorkflowUrlMultipart("echo.wdl")) - .multiPart(getMultipartAttachment("echo.wdl",supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) + .multiPart(getMultipartAttachment("echo.wdl", supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes())) .multiPart(getJsonMultipart("workflow_params", inputs)) .post(path) .then() .assertThat() .statusCode(200) - .body("run_id",is(notNullValue())) + .body("run_id", is(notNullValue())) .extract() .jsonPath() .getString("run_id"); //@formatter:on + + pollUntilJobCompletes(workflowJobIdWithAllOutputTypes); + return Stream.of(Arguments.of(workflowJobIdWithAllOutputTypes)); } } diff --git a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java index 80f832f..cd57938 100644 --- a/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java +++ b/src/test/java/com/dnastack/wes/storage/client/local/LocalBlobStorageClientTest.java @@ -62,9 +62,7 @@ public void testWritingBytesToFile_existingFileThrowsError() throws IOException @Test public void testReadingFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); storageClient.readBytes(outputStream, targetPath.toString(), HttpRange.createByteRange(0L,toWrite.length())); @@ -76,9 +74,7 @@ public void testReadingFile() throws IOException { @Test public void testReadingFile_withTruncation() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); @@ -90,9 +86,7 @@ public void testReadingFile_withTruncation() throws IOException { @Test public void testIsFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); Assertions.assertTrue(storageClient.isFile(targetPath.toString())); } @@ -100,7 +94,7 @@ public void testIsFile() throws IOException { @Test public void testIsFile_noFileExists() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + Path targetPath = Path.of(directory + "/" + fileName); Assertions.assertFalse(storageClient.isFile(targetPath.toString())); } @@ -108,9 +102,7 @@ public void testIsFile_noFileExists() throws IOException { @Test public void testDeletingFile() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); - Files.createDirectory(targetPath.getParent()); - Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + Path targetPath = createFile(); Assertions.assertTrue(Files.exists(targetPath)); storageClient.deleteFile(targetPath.toString()); @@ -120,10 +112,17 @@ public void testDeletingFile() throws IOException { @Test public void testDeletingFile_throwsError() throws IOException { LocalBlobStorageClient storageClient = new LocalBlobStorageClient(); - Path targetPath = Path.of(storageClient.getStagingPath() + "/" + directory + "/" + fileName); + Path targetPath = Path.of(directory + "/" + fileName); Assertions.assertFalse(Files.exists(targetPath)); Assertions.assertThrows(IOException.class, () -> storageClient.deleteFile(targetPath.toString())); } + private Path createFile() throws IOException { + Path tempDir = Files.createTempDirectory(directory); + Path targetPath = tempDir.resolve(fileName); + Files.write(targetPath, toWrite.getBytes(), StandardOpenOption.CREATE_NEW); + return targetPath; + } + } \ No newline at end of file