Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean-DNAstack committed Oct 20, 2023
1 parent 18b45a1 commit dc8436a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/main/java/com/dnastack/wes/config/AsyncConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -13,7 +14,7 @@
public class AsyncConfig {

@Bean
public ThreadPoolTaskExecutor defaultAsyncOperationExecutor(
public TaskExecutor defaultAsyncOperationExecutor(
@Value("${app.executors.default.core-pool-size:8}") int corePoolSize,
@Value("${app.executors.default.max-pool-size:16}") int maxPoolSize,
@Value("${app.executors.default.queue-capacity:5000}") int queueCapacity
Expand Down
32 changes: 15 additions & 17 deletions src/main/java/com/dnastack/wes/cromwell/CromwellService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpRange;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

Expand Down Expand Up @@ -55,7 +54,7 @@ public class CromwellService {
private final PathTranslatorFactory pathTranslatorFactory;
private final CromwellWesMapper cromwellWesMapper;
private final CromwellConfig cromwellConfig;
private final ThreadPoolTaskExecutor executor;
private final TaskExecutor defaultAsyncOperationExecutor;

private final AppConfig appConfig;

Expand All @@ -67,15 +66,15 @@ public class CromwellService {
CromwellWesMapper cromwellWesMapper,
AppConfig appConfig,
CromwellConfig config,
@Qualifier("defaultAsyncOperationExecutor") ThreadPoolTaskExecutor executor
TaskExecutor defaultAsyncOperationExecutor
) {
this.client = cromwellClient;
this.pathTranslatorFactory = pathTranslatorFactory;
this.storageClient = storageClient;
this.cromwellWesMapper = cromwellWesMapper;
this.appConfig = appConfig;
this.cromwellConfig = config;
this.executor = executor;
this.defaultAsyncOperationExecutor = defaultAsyncOperationExecutor;
}


Expand Down Expand Up @@ -240,8 +239,8 @@ public RunFiles getRunFiles(String runId) throws NotFoundException {
List<RunFile> files = new ArrayList<>();

Map<String, Object> outputs = metadataResponse.getOutputs();
if (!outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output));
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, mapper.valueToTree(output)));
}
extractSecondaryLogFiles(secondaryFileSet, logFileSet, metadataResponse);

Expand Down Expand Up @@ -280,15 +279,17 @@ public RunFileDeletions deleteRunFiles(String runId, boolean async) {
}

public RunFileDeletion deleteRunFileAsync(RunFile runFile) {
CompletableFuture.runAsync(() -> deleteRunFile(runFile), executor);
CompletableFuture.runAsync(() -> deleteRunFile(runFile), defaultAsyncOperationExecutor);
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.ASYNC,null);
}

public RunFileDeletion deleteRunFile(RunFile runFile) {
try {
storageClient.deleteFile(runFile.getPath());
log.info("Deleting file '{}'", runFile.getPath());
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.DELETED, null);
} catch (IOException e) {
log.error("Encountered exception while deleting file '%s': '%s'".formatted(runFile.getPath(), e.getMessage()), e);
return new RunFileDeletion(runFile, RunFileDeletion.DeletionState.FAILED, ErrorResponse.builder().errorCode(400).msg(e.getMessage()).build());
}
}
Expand Down Expand Up @@ -526,7 +527,7 @@ private JsonNode extractJsonNode(String value) throws IOException {
private void extractSecondaryLogFiles(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellMetadataResponse metadataResponse){
Map<String, Object> outputs = metadataResponse.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output)));
}
Map<String, List<CromwellTaskCall>> calls = metadataResponse.getCalls();
if (calls != null && !calls.isEmpty()) {
Expand All @@ -537,7 +538,7 @@ private void extractSecondaryLogFiles(Set<String> secondaryFileSet, Set<String>
private void extractSecondaryLogFilesFromCall(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellTaskCall call){
Map<String, Object> outputs = call.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, mapper.valueToTree(output)));
}
String stderr = call.getStderr();
String stdout = call.getStdout();
Expand All @@ -549,26 +550,23 @@ private void extractSecondaryLogFilesFromCall(Set<String> secondaryFileSet, Set<
}
Map<String, String> backendLogs = call.getBackendLogs();
if (backendLogs != null && !backendLogs.isEmpty()) {
backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log));
backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, mapper.valueToTree(log)));
}
CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata();
if (subWorkflowMetadata != null) {
extractSecondaryLogFiles(secondaryFileSet,logFileSet,subWorkflowMetadata);
}
}

private void extractFilesFromValue(Set<String> fileSet, Object output) {
JsonNode node = mapper.valueToTree(output);
private void extractFilesFromValue(Set<String> fileSet, JsonNode node) {
if (node.isTextual()) {
if (storageClient.isFile(node.asText())) {
fileSet.add(node.asText());
}
} else if (node.isArray()) {
ArrayNode arrayOutput = mapper.valueToTree(output);
extractFilesFromArrayNode(fileSet, arrayOutput);
extractFilesFromArrayNode(fileSet, (ArrayNode) node);
} else if (node.isObject()) {
ObjectNode objectOutput = mapper.valueToTree(output);
extractFilesFromObjectNode(fileSet, objectOutput);
extractFilesFromObjectNode(fileSet, (ObjectNode) node);
}
}

Expand Down

0 comments on commit dc8436a

Please sign in to comment.