Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean-DNAstack committed Oct 19, 2023
1 parent bdb534a commit 18b45a1
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 155 deletions.
7 changes: 3 additions & 4 deletions src/main/java/com/dnastack/wes/api/RunFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
@Builder
public class RunFile {

@JsonProperty(value = "type")
Enum<type> type;
@JsonProperty(value = "file_type")
FileType fileType;

@JsonProperty(value = "path")
String path;

public enum type {
public enum FileType {
FINAL,
SECONDARY,
LOG
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFileDeletion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonUnwrapped;

public record RunFileDeletion(@JsonUnwrapped RunFile runFile, DeletionState state, @JsonUnwrapped ErrorResponse errorResponse) {

public enum DeletionState {
DELETED,
ASYNC,
FAILED
}

}
5 changes: 5 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFileDeletions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.dnastack.wes.api;

import java.util.List;

public record RunFileDeletions(List<RunFileDeletion> deletions) {}
25 changes: 1 addition & 24 deletions src/main/java/com/dnastack/wes/api/RunFiles.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,5 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

import java.util.ArrayList;
import java.util.List;

@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@NoArgsConstructor
@ToString
@Builder
public class RunFiles {

@JsonProperty("files")
List<RunFile> files;

public void addRunFile(RunFile runFile) {
if (files == null) {
files = new ArrayList<>();
}
files.add(runFile);
}

}
public record RunFiles(List<RunFile> runFiles) {}
10 changes: 5 additions & 5 deletions src/main/java/com/dnastack/wes/api/WesV1Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ public RunId cancelRun(@PathVariable("runId") String runId) {
return adapter.cancel(runId);
}

@AuditActionUri("wes:run:files")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')")
@AuditActionUri("wes:run:files:list")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:read', 'wes')")
@GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunFiles getRunFiles(@PathVariable("run_id") String runId) {
return adapter.getRunFiles(runId);
}

@AuditActionUri("wes:run:files")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:delete', 'wes')")
@AuditActionUri("wes:run:files:delete")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:write', 'wes')")
@DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunId deleteRunFiles(
public RunFileDeletions deleteRunFiles(
@PathVariable("run_id") String runId,
@RequestParam(value = "async", required = false) boolean async
) {
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/com/dnastack/wes/config/AsyncConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.dnastack.wes.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Configuration
public class AsyncConfig {

@Bean
public ThreadPoolTaskExecutor defaultAsyncOperationExecutor(
@Value("${app.executors.default.core-pool-size:8}") int corePoolSize,
@Value("${app.executors.default.max-pool-size:16}") int maxPoolSize,
@Value("${app.executors.default.queue-capacity:5000}") int queueCapacity
) {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("defaultAsyncOp-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

}
180 changes: 81 additions & 99 deletions src/main/java/com/dnastack/wes/cromwell/CromwellService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.dnastack.wes.AppConfig;
import com.dnastack.wes.api.*;
import com.dnastack.wes.security.AuthenticatedUser;
import com.dnastack.wes.shared.FileDeletionException;
import com.dnastack.wes.shared.InvalidRequestException;
import com.dnastack.wes.shared.NotFoundException;
import com.dnastack.wes.storage.BlobStorageClient;
Expand All @@ -14,13 +13,16 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpRange;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

Expand All @@ -32,10 +34,7 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -56,6 +55,7 @@ public class CromwellService {
private final PathTranslatorFactory pathTranslatorFactory;
private final CromwellWesMapper cromwellWesMapper;
private final CromwellConfig cromwellConfig;
private final ThreadPoolTaskExecutor executor;

private final AppConfig appConfig;

Expand All @@ -66,14 +66,16 @@ public class CromwellService {
PathTranslatorFactory pathTranslatorFactory,
CromwellWesMapper cromwellWesMapper,
AppConfig appConfig,
CromwellConfig config
CromwellConfig config,
@Qualifier("defaultAsyncOperationExecutor") ThreadPoolTaskExecutor executor
) {
this.client = cromwellClient;
this.pathTranslatorFactory = pathTranslatorFactory;
this.storageClient = storageClient;
this.cromwellWesMapper = cromwellWesMapper;
this.appConfig = appConfig;
this.cromwellConfig = config;
this.executor = executor;
}


Expand Down Expand Up @@ -235,31 +237,26 @@ public RunFiles getRunFiles(String runId) throws NotFoundException {
Set<String> finalFileSet = new HashSet<>();
Set<String> secondaryFileSet = new HashSet<>();
Set<String> logFileSet = new HashSet<>();
RunFiles runFiles = new RunFiles();
List<RunFile> files = new ArrayList<>();

Map<String, Object> outputs = metadataResponse.getOutputs();
if (!outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output));
}
extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse);
extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse);

finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path)));
finalFileSet.forEach(path -> files.add(new RunFile(RunFile.FileType.FINAL, path)));
secondaryFileSet.forEach(path -> {
if (!finalFileSet.contains(path)) {
runFiles.addRunFile(new RunFile(RunFile.type.SECONDARY, path));
if (!finalFileSet.contains(path) && !logFileSet.contains(path)) {
files.add(new RunFile(RunFile.FileType.SECONDARY, path));
}
});
logFileSet.forEach(path -> {
if (!finalFileSet.contains(path)) {
runFiles.addRunFile(new RunFile(RunFile.type.LOG, path));
files.add(new RunFile(RunFile.FileType.LOG, path));
}
});

if (runFiles.getFiles() != null) {
return runFiles;
} else {
throw new NotFoundException("No files were found for the runId: " + runId);
}
return new RunFiles(files);
}

/**
Expand All @@ -269,49 +266,31 @@ public RunFiles getRunFiles(String runId) throws NotFoundException {
*
* @return the cromwell id
*/
public RunId deleteRunFiles(String runId, boolean async) {
List<RunFile> files = getRunFiles(runId).getFiles();
files = files.stream().filter(runFile -> runFile.getType() == RunFile.type.SECONDARY).toList();
if (!files.isEmpty()) {
if (async) {
// Asynchronous deletion of files
ExecutorService executor = Executors.newFixedThreadPool(files.size());
List<Future<?>> deletionTasks = new ArrayList<>();
files.forEach(file -> {
Future<?> submit = executor.submit(() -> {
try {
storageClient.deleteFile(file.getPath());
} catch (IOException e) {
throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e);
}
});
deletionTasks.add(submit);
});
// Block until all deletion tasks have completed
deletionTasks.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
Throwable cause = e.getCause();
throw new FileDeletionException(cause.getMessage(), cause.getCause());
}
});
executor.shutdown();
} else {
// Synchronous deletion of files
files.forEach(file -> {
try {
storageClient.deleteFile(file.getPath());
} catch (IOException e) {
throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e);
}
});
}
} else {
throw new NotFoundException("No deletable files were found for the runId: " + runId);
public RunFileDeletions deleteRunFiles(String runId, boolean async) {
List<RunFile> files = getRunFiles(runId).runFiles();
List<RunFileDeletion> outcomes = files.stream().filter(runFile -> RunFile.FileType.SECONDARY.equals(runFile.getFileType()))
.map(runFile -> {
if (async) {
return deleteRunFileAsync(runFile);
} else {
return deleteRunFile(runFile);
}
}).toList();
return new RunFileDeletions(outcomes);
}

public RunFileDeletion deleteRunFileAsync(RunFile runFile) {
CompletableFuture.runAsync(() -> deleteRunFile(runFile), executor);
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null);
}

public RunFileDeletion deleteRunFile(RunFile runFile) {
try {
storageClient.deleteFile(runFile.getPath());
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null);
} catch (IOException e) {
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build());
}
return RunId.builder().runId(runId).build();
}

public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException {
Expand Down Expand Up @@ -544,58 +523,61 @@ private JsonNode extractJsonNode(String value) throws IOException {
}
}

private void extractSecondaryAndLogFilesFromCalls(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellMetadataResponse metadataResponse) {
private void extractSecondaryLogFiles(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellMetadataResponse metadataResponse){
Map<String, Object> outputs = metadataResponse.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
}
Map<String, List<CromwellTaskCall>> calls = metadataResponse.getCalls();
if (calls != null && !calls.isEmpty()) {
calls.values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> {
Map<String, Object> outputs = call.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
String stderr = call.getStderr();
String stdout = call.getStdout();
if (storageClient.isFile(stderr)) {
logFileSet.add(stderr);
}
if (storageClient.isFile(stdout)) {
logFileSet.add(stdout);
}
Map<String, String> backendLogs = call.getBackendLogs();
if (backendLogs != null && !backendLogs.isEmpty()) {
backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log));
}
CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata();
if (subWorkflowMetadata != null) {
Map<String, Object> subOutputs = subWorkflowMetadata.getOutputs();
if (subOutputs != null && !subOutputs.isEmpty()) {
subOutputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
}
extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata);
}
}
}));
calls.values().stream().flatMap(List::stream).forEach(call -> extractSecondaryLogFilesFromCall(secondaryFileSet, logFileSet, call));
}
}

private void extractFilesFromValue(Set<String> fileSet, Object output) {
if (output instanceof String) {
if (storageClient.isFile(output.toString())) {
fileSet.add(output.toString());
}
private void extractSecondaryLogFilesFromCall(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellTaskCall call){
Map<String, Object> outputs = call.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
}
else if (output instanceof Object[]) {
extractFiles(fileSet, (List<Object>) output);
String stderr = call.getStderr();
String stdout = call.getStdout();
if (stderr != null && storageClient.isFile(stderr)) {
logFileSet.add(stderr);
}
else {
extractFiles(fileSet, (Map<String, Object>) output);
if (stdout != null && storageClient.isFile(stdout)) {
logFileSet.add(stdout);
}
Map<String, String> backendLogs = call.getBackendLogs();
if (backendLogs != null && !backendLogs.isEmpty()) {
backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log));
}
CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata();
if (subWorkflowMetadata != null) {
extractSecondaryLogFiles(secondaryFileSet,logFileSet,subWorkflowMetadata);
}
}

private void extractFilesFromValue(Set<String> fileSet, Object output) {
JsonNode node = mapper.valueToTree(output);
if (node.isTextual()) {
if (storageClient.isFile(node.asText())) {
fileSet.add(node.asText());
}
} else if (node.isArray()) {
ArrayNode arrayOutput = mapper.valueToTree(output);
extractFilesFromArrayNode(fileSet, arrayOutput);
} else if (node.isObject()) {
ObjectNode objectOutput = mapper.valueToTree(output);
extractFilesFromObjectNode(fileSet, objectOutput);
}
}

private void extractFiles(Set<String> fileSet, List<Object> outputs) {
private void extractFilesFromArrayNode(Set<String> fileSet, ArrayNode outputs) {
outputs.forEach(output -> extractFilesFromValue(fileSet, output));
}

private void extractFiles(Set<String> fileSet, Map<String, Object> outputs) {
outputs.values().forEach(output -> extractFilesFromValue(fileSet, output));
private void extractFilesFromObjectNode(Set<String> fileSet, ObjectNode outputs) {
outputs.forEach(output -> extractFilesFromValue(fileSet, output));
}

private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException {
Expand Down
Loading

0 comments on commit 18b45a1

Please sign in to comment.