diff --git a/src/main/java/com/dnastack/wes/api/RunFile.java b/src/main/java/com/dnastack/wes/api/RunFile.java index b09c0a1..e9cf622 100644 --- a/src/main/java/com/dnastack/wes/api/RunFile.java +++ b/src/main/java/com/dnastack/wes/api/RunFile.java @@ -12,13 +12,12 @@ @Builder public class RunFile { - @JsonProperty(value = "type") - Enum type; + @JsonProperty(value = "file_type") + FileType fileType; - @JsonProperty(value = "path") String path; - public enum type { + public enum FileType { FINAL, SECONDARY, LOG diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletion.java b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java new file mode 100644 index 0000000..910a733 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletion.java @@ -0,0 +1,13 @@ +package com.dnastack.wes.api; + +import com.fasterxml.jackson.annotation.JsonUnwrapped; + +public record RunFileDeletion(@JsonUnwrapped RunFile runFile, DeletionState state, @JsonUnwrapped ErrorResponse errorResponse) { + + public enum DeletionState { + DELETED, + ASYNC, + FAILED + } + +} diff --git a/src/main/java/com/dnastack/wes/api/RunFileDeletions.java b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java new file mode 100644 index 0000000..0426752 --- /dev/null +++ b/src/main/java/com/dnastack/wes/api/RunFileDeletions.java @@ -0,0 +1,5 @@ +package com.dnastack.wes.api; + +import java.util.List; + +public record RunFileDeletions(List deletions) {} diff --git a/src/main/java/com/dnastack/wes/api/RunFiles.java b/src/main/java/com/dnastack/wes/api/RunFiles.java index 71b62f8..0835760 100644 --- a/src/main/java/com/dnastack/wes/api/RunFiles.java +++ b/src/main/java/com/dnastack/wes/api/RunFiles.java @@ -1,28 +1,5 @@ package com.dnastack.wes.api; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.*; - -import java.util.ArrayList; import java.util.List; -@Getter -@Setter -@AllArgsConstructor -@EqualsAndHashCode -@NoArgsConstructor -@ToString -@Builder -public class RunFiles { - - @JsonProperty("files") - List files; - - public void addRunFile(RunFile runFile) { - if (files == null) { - files = new ArrayList<>(); - } - files.add(runFile); - } - -} +public record RunFiles(List runFiles) {} diff --git a/src/main/java/com/dnastack/wes/api/WesV1Controller.java b/src/main/java/com/dnastack/wes/api/WesV1Controller.java index 6b35f45..541da8e 100644 --- a/src/main/java/com/dnastack/wes/api/WesV1Controller.java +++ b/src/main/java/com/dnastack/wes/api/WesV1Controller.java @@ -123,17 +123,17 @@ public RunId cancelRun(@PathVariable("runId") String runId) { return adapter.cancel(runId); } - @AuditActionUri("wes:run:files") - @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')") + @AuditActionUri("wes:run:files:list") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:read', 'wes')") @GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) public RunFiles getRunFiles(@PathVariable("run_id") String runId) { return adapter.getRunFiles(runId); } - @AuditActionUri("wes:run:files") - @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:delete', 'wes')") + @AuditActionUri("wes:run:files:delete") + @PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:write', 'wes')") @DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE }) - public RunId deleteRunFiles( + public RunFileDeletions deleteRunFiles( @PathVariable("run_id") String runId, @RequestParam(value = "async", required = false) boolean async ) { diff --git a/src/main/java/com/dnastack/wes/config/AsyncConfig.java b/src/main/java/com/dnastack/wes/config/AsyncConfig.java new file mode 100644 index 0000000..c5b5453 --- /dev/null +++ b/src/main/java/com/dnastack/wes/config/AsyncConfig.java @@ -0,0 +1,31 @@ +package com.dnastack.wes.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Slf4j +@Configuration +public class AsyncConfig { + + @Bean + public ThreadPoolTaskExecutor defaultAsyncOperationExecutor( + @Value("${app.executors.default.core-pool-size:8}") int corePoolSize, + @Value("${app.executors.default.max-pool-size:16}") int maxPoolSize, + @Value("${app.executors.default.queue-capacity:5000}") int queueCapacity + ) { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setThreadNamePrefix("defaultAsyncOp-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + +} diff --git a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java index 74436a2..ac98414 100644 --- a/src/main/java/com/dnastack/wes/cromwell/CromwellService.java +++ b/src/main/java/com/dnastack/wes/cromwell/CromwellService.java @@ -3,7 +3,6 @@ import com.dnastack.wes.AppConfig; import com.dnastack.wes.api.*; import com.dnastack.wes.security.AuthenticatedUser; -import com.dnastack.wes.shared.FileDeletionException; import com.dnastack.wes.shared.InvalidRequestException; import com.dnastack.wes.shared.NotFoundException; import com.dnastack.wes.storage.BlobStorageClient; @@ -14,13 +13,16 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpRange; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -32,10 +34,7 @@ import java.nio.file.Paths; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -56,6 +55,7 @@ public class CromwellService { private final PathTranslatorFactory pathTranslatorFactory; private final CromwellWesMapper cromwellWesMapper; private final CromwellConfig cromwellConfig; + private final ThreadPoolTaskExecutor executor; private final AppConfig appConfig; @@ -66,7 +66,8 @@ public class CromwellService { PathTranslatorFactory pathTranslatorFactory, CromwellWesMapper cromwellWesMapper, AppConfig appConfig, - CromwellConfig config + CromwellConfig config, + @Qualifier("defaultAsyncOperationExecutor") ThreadPoolTaskExecutor executor ) { this.client = cromwellClient; this.pathTranslatorFactory = pathTranslatorFactory; @@ -74,6 +75,7 @@ public class CromwellService { this.cromwellWesMapper = cromwellWesMapper; this.appConfig = appConfig; this.cromwellConfig = config; + this.executor = executor; } @@ -235,31 +237,26 @@ public RunFiles getRunFiles(String runId) throws NotFoundException { Set finalFileSet = new HashSet<>(); Set secondaryFileSet = new HashSet<>(); Set logFileSet = new HashSet<>(); - RunFiles runFiles = new RunFiles(); + List files = new ArrayList<>(); Map outputs = metadataResponse.getOutputs(); if (!outputs.isEmpty()) { outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output)); } - extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse); + extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse); - finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path))); + finalFileSet.forEach(path -> files.add(new RunFile(RunFile.FileType.FINAL, path))); secondaryFileSet.forEach(path -> { - if (!finalFileSet.contains(path)) { - runFiles.addRunFile(new RunFile(RunFile.type.SECONDARY, path)); + if (!finalFileSet.contains(path) && !logFileSet.contains(path)) { + files.add(new RunFile(RunFile.FileType.SECONDARY, path)); } }); logFileSet.forEach(path -> { if (!finalFileSet.contains(path)) { - runFiles.addRunFile(new RunFile(RunFile.type.LOG, path)); + files.add(new RunFile(RunFile.FileType.LOG, path)); } }); - - if (runFiles.getFiles() != null) { - return runFiles; - } else { - throw new NotFoundException("No files were found for the runId: " + runId); - } + return new RunFiles(files); } /** @@ -269,49 +266,31 @@ public RunFiles getRunFiles(String runId) throws NotFoundException { * * @return the cromwell id */ - public RunId deleteRunFiles(String runId, boolean async) { - List files = getRunFiles(runId).getFiles(); - files = files.stream().filter(runFile -> runFile.getType() == RunFile.type.SECONDARY).toList(); - if (!files.isEmpty()) { - if (async) { - // Asynchronous deletion of files - ExecutorService executor = Executors.newFixedThreadPool(files.size()); - List> deletionTasks = new ArrayList<>(); - files.forEach(file -> { - Future submit = executor.submit(() -> { - try { - storageClient.deleteFile(file.getPath()); - } catch (IOException e) { - throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); - } - }); - deletionTasks.add(submit); - }); - // Block until all deletion tasks have completed - deletionTasks.forEach(future -> { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - Thread.currentThread().interrupt(); - Throwable cause = e.getCause(); - throw new FileDeletionException(cause.getMessage(), cause.getCause()); - } - }); - executor.shutdown(); - } else { - // Synchronous deletion of files - files.forEach(file -> { - try { - storageClient.deleteFile(file.getPath()); - } catch (IOException e) { - throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e); - } - }); - } - } else { - throw new NotFoundException("No deletable files were found for the runId: " + runId); + public RunFileDeletions deleteRunFiles(String runId, boolean async) { + List files = getRunFiles(runId).runFiles(); + List outcomes = files.stream().filter(runFile -> RunFile.FileType.SECONDARY.equals(runFile.getFileType())) + .map(runFile -> { + if (async) { + return deleteRunFileAsync(runFile); + } else { + return deleteRunFile(runFile); + } + }).toList(); + return new RunFileDeletions(outcomes); + } + + public RunFileDeletion deleteRunFileAsync(RunFile runFile) { + CompletableFuture.runAsync(() -> deleteRunFile(runFile), executor); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null); + } + + public RunFileDeletion deleteRunFile(RunFile runFile) { + try { + storageClient.deleteFile(runFile.getPath()); + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null); + } catch (IOException e) { + return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build()); } - return RunId.builder().runId(runId).build(); } public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException { @@ -544,58 +523,61 @@ private JsonNode extractJsonNode(String value) throws IOException { } } - private void extractSecondaryAndLogFilesFromCalls(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse) { + private void extractSecondaryLogFiles(Set secondaryFileSet, Set logFileSet, CromwellMetadataResponse metadataResponse){ + Map outputs = metadataResponse.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); + } Map> calls = metadataResponse.getCalls(); if (calls != null && !calls.isEmpty()) { - calls.values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> { - Map outputs = call.getOutputs(); - if (outputs != null && !outputs.isEmpty()) { - outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - String stderr = call.getStderr(); - String stdout = call.getStdout(); - if (storageClient.isFile(stderr)) { - logFileSet.add(stderr); - } - if (storageClient.isFile(stdout)) { - logFileSet.add(stdout); - } - Map backendLogs = call.getBackendLogs(); - if (backendLogs != null && !backendLogs.isEmpty()) { - backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); - } - CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); - if (subWorkflowMetadata != null) { - Map subOutputs = subWorkflowMetadata.getOutputs(); - if (subOutputs != null && !subOutputs.isEmpty()) { - subOutputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); - } - extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata); - } - } - })); + calls.values().stream().flatMap(List::stream).forEach(call -> extractSecondaryLogFilesFromCall(secondaryFileSet, logFileSet, call)); } } - private void extractFilesFromValue(Set fileSet, Object output) { - if (output instanceof String) { - if (storageClient.isFile(output.toString())) { - fileSet.add(output.toString()); - } + private void extractSecondaryLogFilesFromCall(Set secondaryFileSet, Set logFileSet, CromwellTaskCall call){ + Map outputs = call.getOutputs(); + if (outputs != null && !outputs.isEmpty()) { + outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output)); } - else if (output instanceof Object[]) { - extractFiles(fileSet, (List) output); + String stderr = call.getStderr(); + String stdout = call.getStdout(); + if (stderr != null && storageClient.isFile(stderr)) { + logFileSet.add(stderr); } - else { - extractFiles(fileSet, (Map) output); + if (stdout != null && storageClient.isFile(stdout)) { + logFileSet.add(stdout); + } + Map backendLogs = call.getBackendLogs(); + if (backendLogs != null && !backendLogs.isEmpty()) { + backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log)); + } + CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata(); + if (subWorkflowMetadata != null) { + extractSecondaryLogFiles(secondaryFileSet,logFileSet,subWorkflowMetadata); + } + } + + private void extractFilesFromValue(Set fileSet, Object output) { + JsonNode node = mapper.valueToTree(output); + if (node.isTextual()) { + if (storageClient.isFile(node.asText())) { + fileSet.add(node.asText()); + } + } else if (node.isArray()) { + ArrayNode arrayOutput = mapper.valueToTree(output); + extractFilesFromArrayNode(fileSet, arrayOutput); + } else if (node.isObject()) { + ObjectNode objectOutput = mapper.valueToTree(output); + extractFilesFromObjectNode(fileSet, objectOutput); } } - private void extractFiles(Set fileSet, List outputs) { + private void extractFilesFromArrayNode(Set fileSet, ArrayNode outputs) { outputs.forEach(output -> extractFilesFromValue(fileSet, output)); } - private void extractFiles(Set fileSet, Map outputs) { - outputs.values().forEach(output -> extractFilesFromValue(fileSet, output)); + private void extractFilesFromObjectNode(Set fileSet, ObjectNode outputs) { + outputs.forEach(output -> extractFilesFromValue(fileSet, output)); } private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException { diff --git a/src/main/java/com/dnastack/wes/shared/FileDeletionException.java b/src/main/java/com/dnastack/wes/shared/FileDeletionException.java deleted file mode 100644 index fcb1a7f..0000000 --- a/src/main/java/com/dnastack/wes/shared/FileDeletionException.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.dnastack.wes.shared; - -public class FileDeletionException extends RuntimeException { - - public FileDeletionException() { - super(); - } - - public FileDeletionException(String message) { - super(message); - } - - public FileDeletionException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java index d9f4cbb..b322d9d 100644 --- a/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java @@ -121,8 +121,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - BlobClient blobClient = client.getBlobContainerClient(container).getBlobClient(filePath); - return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())) && blobClient.exists(); + try { + return client.getBlobContainerClient(container).getBlobClient(filePath).exists(); + } catch (IllegalArgumentException e) { + return false; + } } @Override diff --git a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java index f37d608..3658f3b 100644 --- a/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/GcpBlobStorageClient.java @@ -115,8 +115,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - Blob blob = client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)); - return filePath.startsWith("gs://") && blob != null && blob.exists(); + try { + return client.get(GcpStorageUtils.blobIdFromGsUrl(filePath)).exists(); + } catch (IllegalArgumentException e) { + return false; + } } @Override diff --git a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java index 7524165..2a1ae6e 100644 --- a/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java +++ b/src/main/java/com/dnastack/wes/storage/LocalBlobStorageClient.java @@ -87,7 +87,7 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR try (FileChannel channel = new RandomAccessFile(fileToRead, "r").getChannel()) { channel.position(rangeStart); - try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)){ + try (InputStream inputStream = new BoundedInputStream(Channels.newInputStream(channel),rangeEnd - rangeStart)) { inputStream.transferTo(outputStream); } } @@ -95,7 +95,11 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR @Override public boolean isFile(String filePath) { - return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + try { + return filePath.startsWith("/") && Files.exists(Path.of(filePath)); + } catch (IllegalArgumentException e) { + return false; + } } @Override