Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#186091347] Create endpoints to clean up a run #13

Merged
merged 8 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
@ToString
@Builder
public class RunFile {

@JsonProperty(value = "type")
Enum<type> type;
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved

@JsonProperty(value = "path")
String path;

public enum type {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
FINAL,
SECONDARY,
LOG
}

}
28 changes: 28 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFiles.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

import java.util.ArrayList;
import java.util.List;

@Getter
@Setter
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
@AllArgsConstructor
@EqualsAndHashCode
@NoArgsConstructor
@ToString
@Builder
public class RunFiles {

@JsonProperty("files")
List<RunFile> files;

public void addRunFile(RunFile runFile) {
if (files == null) {
files = new ArrayList<>();
}
files.add(runFile);
}
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved

}
29 changes: 23 additions & 6 deletions src/main/java/com/dnastack/wes/api/WesV1Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,28 @@ public RunId cancelRun(@PathVariable("runId") String runId) {
return adapter.cancel(runId);
}

@AuditActionUri("wes:run:files")
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:read', 'wes')")
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
@GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunFiles getRunFiles(@PathVariable("run_id") String runId) {
return adapter.getRunFiles(runId);
}

@AuditActionUri("wes:run:files")
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId , 'wes:runs:delete', 'wes')")
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
@DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunId deleteRunFiles(
@PathVariable("run_id") String runId,
@RequestParam(value = "async", required = false) boolean async
) {
return adapter.deleteRunFiles(runId, async);
}

@AuditActionUri("wes:run:stderr")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')")
@GetMapping(value = "/runs/{runId}/logs/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public void getStderr(HttpServletResponse response, @RequestHeader HttpHeaders headers, @PathVariable String runId) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId,getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stderr")
Expand All @@ -139,7 +156,7 @@ public void getTaskStderr(
@PathVariable String runId,
@PathVariable String taskId
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stdout")
Expand All @@ -151,7 +168,7 @@ public void getTaskStdout(
@PathVariable String runId,
@PathVariable String taskId
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stderr")
Expand All @@ -164,7 +181,7 @@ public void getTaskStderr(
@PathVariable String taskName,
@PathVariable int index
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stdout")
Expand All @@ -180,9 +197,9 @@ public void getTaskStdout(
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout", getRangeFromHeaders(response, headers));
}

private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers){
private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers) {
List<HttpRange> ranges = headers.getRange();
if (ranges.isEmpty()){
if (ranges.isEmpty()) {
return null;
} else if (ranges.size() > 1) {
// only return the first range parsed
Expand Down
152 changes: 151 additions & 1 deletion src/main/java/com/dnastack/wes/cromwell/CromwellService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.dnastack.wes.AppConfig;
import com.dnastack.wes.api.*;
import com.dnastack.wes.security.AuthenticatedUser;
import com.dnastack.wes.shared.FileDeletionException;
import com.dnastack.wes.shared.InvalidRequestException;
import com.dnastack.wes.shared.NotFoundException;
import com.dnastack.wes.storage.BlobStorageClient;
Expand Down Expand Up @@ -31,6 +32,10 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -211,13 +216,104 @@ private CromwellStatus getStatus(String runId) {
*
* @param runId The cromwell id
*
* @return a complete run log
* @return the cromwell id
*/
public RunId cancel(String runId) {
client.abortWorkflow(runId);
return RunId.builder().runId(runId).build();
}

/**
* Get the files for a specific run.
*
* @param runId The cromwell id
*
* @return a list of generated files for the run
*/
public RunFiles getRunFiles(String runId) throws NotFoundException {
CromwellMetadataResponse metadataResponse = getMetadata(runId);
Set<String> finalFileSet = new HashSet<>();
Set<String> secondaryFileSet = new HashSet<>();
Set<String> logFileSet = new HashSet<>();
RunFiles runFiles = new RunFiles();
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved

Map<String, Object> outputs = metadataResponse.getOutputs();
if (!outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(finalFileSet, output));
}
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, metadataResponse);

finalFileSet.forEach(path -> runFiles.addRunFile(new RunFile(RunFile.type.FINAL, path)));
secondaryFileSet.forEach(path -> {
if (!finalFileSet.contains(path)) {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
runFiles.addRunFile(new RunFile(RunFile.type.SECONDARY, path));
}
});
logFileSet.forEach(path -> {
if (!finalFileSet.contains(path)) {
runFiles.addRunFile(new RunFile(RunFile.type.LOG, path));
}
});

if (runFiles.getFiles() != null) {
return runFiles;
} else {
throw new NotFoundException("No files were found for the runId: " + runId);
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Request to delete the files associated with the run.
*
* @param runId The cromwell id
*
* @return the cromwell id
*/
public RunId deleteRunFiles(String runId, boolean async) {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
List<RunFile> files = getRunFiles(runId).getFiles();
files = files.stream().filter(runFile -> runFile.getType() == RunFile.type.SECONDARY).toList();
if (!files.isEmpty()) {
if (async) {
// Asynchronous deletion of files
ExecutorService executor = Executors.newFixedThreadPool(files.size());
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
List<Future<?>> deletionTasks = new ArrayList<>();
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
files.forEach(file -> {
Future<?> submit = executor.submit(() -> {
try {
storageClient.deleteFile(file.getPath());
} catch (IOException e) {
throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e);
}
});
deletionTasks.add(submit);
});
// Block until all deletion tasks have completed
deletionTasks.forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
Throwable cause = e.getCause();
throw new FileDeletionException(cause.getMessage(), cause.getCause());
}
});
executor.shutdown();
} else {
// Synchronous deletion of files
files.forEach(file -> {
try {
storageClient.deleteFile(file.getPath());
} catch (IOException e) {
throw new FileDeletionException("Failed to delete file with path: " + file.getPath(), e);
}
});
}
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new NotFoundException("No deletable files were found for the runId: " + runId);
}
return RunId.builder().runId(runId).build();
}

public void getLogBytes(OutputStream outputStream, String runId, String taskId, String logKey, HttpRange range) throws IOException {
String logPath = getLogPath(runId, taskId, logKey);

Expand Down Expand Up @@ -448,6 +544,60 @@ private JsonNode extractJsonNode(String value) throws IOException {
}
}

private void extractSecondaryAndLogFilesFromCalls(Set<String> secondaryFileSet, Set<String> logFileSet, CromwellMetadataResponse metadataResponse) {
Map<String, List<CromwellTaskCall>> calls = metadataResponse.getCalls();
if (calls != null && !calls.isEmpty()) {
calls.values().forEach(cromwellTaskCallList -> cromwellTaskCallList.forEach(call -> {
Map<String, Object> outputs = call.getOutputs();
if (outputs != null && !outputs.isEmpty()) {
outputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
String stderr = call.getStderr();
String stdout = call.getStdout();
if (storageClient.isFile(stderr)) {
logFileSet.add(stderr);
}
if (storageClient.isFile(stdout)) {
logFileSet.add(stdout);
}
Map<String, String> backendLogs = call.getBackendLogs();
if (backendLogs != null && !backendLogs.isEmpty()) {
backendLogs.values().forEach(log -> extractFilesFromValue(logFileSet, log));
}
CromwellMetadataResponse subWorkflowMetadata = call.getSubWorkflowMetadata();
if (subWorkflowMetadata != null) {
Map<String, Object> subOutputs = subWorkflowMetadata.getOutputs();
if (subOutputs != null && !subOutputs.isEmpty()) {
subOutputs.values().forEach(output -> extractFilesFromValue(secondaryFileSet, output));
}
extractSecondaryAndLogFilesFromCalls(secondaryFileSet, logFileSet, subWorkflowMetadata);
}
}
}));
}
}
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved

private void extractFilesFromValue(Set<String> fileSet, Object output) {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
if (output instanceof String) {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
if (storageClient.isFile(output.toString())) {
fileSet.add(output.toString());
}
}
else if (output instanceof Object[]) {
Sean-DNAstack marked this conversation as resolved.
Show resolved Hide resolved
extractFiles(fileSet, (List<Object>) output);
}
else {
extractFiles(fileSet, (Map<String, Object>) output);
}
}

private void extractFiles(Set<String> fileSet, List<Object> outputs) {
outputs.forEach(output -> extractFilesFromValue(fileSet, output));
}

private void extractFiles(Set<String> fileSet, Map<String, Object> outputs) {
outputs.values().forEach(output -> extractFilesFromValue(fileSet, output));
}

private void setWorkflowSourceAndDependencies(Path tempDirectory, RunRequest runRequest, CromwellExecutionRequest cromwellRequest) throws IOException {
if (runRequest.getWorkflowAttachments() == null || runRequest.getWorkflowAttachments().length == 0) {
throw new InvalidRequestException("Url provided is relative however no workflowAttachments are defined");
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/dnastack/wes/shared/FileDeletionException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.dnastack.wes.shared;

public class FileDeletionException extends RuntimeException {

public FileDeletionException() {
super();
}

public FileDeletionException(String message) {
super(message);
}

public FileDeletionException(String message, Throwable cause) {
super(message, cause);
}

}
14 changes: 11 additions & 3 deletions src/main/java/com/dnastack/wes/storage/AzureBlobStorageClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

public class AzureBlobStorageClient implements BlobStorageClient {


private final BlobServiceClient client;
private final long signedUrlTtl;
private final String container;
Expand Down Expand Up @@ -49,7 +48,6 @@ public AzureBlobStorageClient(AzureBlobStorageClientConfig config) {
stagingPath = config.getStagingPath();
}


@Override
public URL getSignedUrl(String blobUri) {
BlobUrlParts parts = BlobUrlParts.parse(blobUri);
Expand Down Expand Up @@ -99,7 +97,6 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR
containerName = container;
}


BlobContainerClient containerClient = client.getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName);

Expand All @@ -122,4 +119,15 @@ public void readBytes(OutputStream outputStream, String blobUri, HttpRange httpR
.setMaxRetryRequests(3), null, false, null, null);
}

@Override
public boolean isFile(String filePath) {
BlobClient blobClient = client.getBlobContainerClient(container).getBlobClient(filePath);
return filePath.startsWith("https://%s.blob.core.windows.net/".formatted(client.getAccountName())) && blobClient.exists();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this by doing

try {
     BlobUrlParts parts = BlobUrlParts.parse(blobUri);
     if (container.equals(parts.getBlobContainerName) && parts.getBlobName() != null){
      BlobClient blobClient = client.getBlobContainerClient(container).getBlobClient(filePath);
      return blobClient.exists
    }
} catch (IllegalArgumentException) {
  //
}
return false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have access to the blobUri to be able to get the parts. I just kept it as checking the blobClient and if it exists but if you have other suggestions I can change it

}

@Override
public void deleteFile(String filePath) {
client.getBlobContainerClient(container).getBlobClient(filePath).delete();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ default void getBytes(OutputStream outputStream, String blobUri) throws IOExcept
readBytes(outputStream, blobUri, null);
}


void readBytes(OutputStream outputStream, String blobUri, @Nullable HttpRange httpRange) throws IOException;

boolean isFile(String filePath);

void deleteFile(String filePath) throws IOException;

}
Loading