Skip to content

Commit

Permalink
[#186091347] Create endpoints to clean up a run
Browse files Browse the repository at this point in the history
[#186091347] Create endpoints to clean up a run
  • Loading branch information
Sean-DNAstack authored Oct 27, 2023
2 parents ef5f09e + 15038c1 commit 7a68da7
Show file tree
Hide file tree
Showing 16 changed files with 681 additions and 39 deletions.
159 changes: 157 additions & 2 deletions e2e-tests/src/main/java/com/dnastack/wes/service/WesE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@
import com.google.auth.oauth2.GoogleCredentials;
import io.restassured.builder.MultiPartSpecBuilder;
import io.restassured.http.ContentType;
import io.restassured.http.Header;
import io.restassured.specification.MultiPartSpecification;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.stream.Stream;

import static io.restassured.RestAssured.given;
import static java.lang.String.format;
import static org.awaitility.Awaitility.with;
import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -569,6 +572,158 @@ public void listRunsReturnsReturnsNonEmptyCollection() {
//@formatter:on
}


@ParameterizedTest
@MethodSource("completeWorkflowWithFilesProvider")
@DisplayName("Get Run Files for existing run returns all files")
public void getRunFilesReturnsNonEmptyCollection(String runId) {
String path = getRootPath() + "/runs/" + runId + "/files";

//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(getRootPath() + "/runs/" + runId)))
.accept(ContentType.JSON)
.get(path)
.then()
.assertThat()
.statusCode(200)
.body("runFiles.size()", greaterThan(0))
.body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'SECONDARY', 'LOG'] }", equalTo(true));
//@formatter:on
}


@Test
@DisplayName("Get Run Files for non-existent run fails with status 401 or 404")
public void getRunFilesForNonExistentRunShouldFail() {
String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID();
String path = resourcePath + "/files";

//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(resourcePath)))
.accept(ContentType.JSON)
.get(path)
.then()
.assertThat()
.statusCode(anyOf(equalTo(404), equalTo(401)));
//@formatter:on
}


@ParameterizedTest
@MethodSource("completeWorkflowWithFilesProvider")
@DisplayName("Delete Run Files for existing run returns all deleted files")
public void deleteRunFilesReturnsNonEmptyCollection(String runId) {
String path = getRootPath() + "/runs/" + runId + "/files";

//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(getRootPath() + "/runs/" + runId)))
.accept(ContentType.JSON)
.delete(path)
.then()
.assertThat()
.statusCode(200)
.body("deletions.size()", greaterThan(0))
.body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'DELETED' }", equalTo(true));
//@formatter:on
}


@ParameterizedTest
@MethodSource("completeWorkflowWithFilesProvider")
@DisplayName("Delete Run Files for existing run asynchronously returns all deleted files")
public void deleteRunFilesAsyncReturnsNonEmptyCollection(String runId) {
String path = getRootPath() + "/runs/" + runId + "/files";

//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(getRootPath() + "/runs/" + runId)))
.accept(ContentType.JSON)
.queryParam("async", true)
.delete(path)
.then()
.assertThat()
.statusCode(200)
.body("deletions.size()", greaterThan(0))
.body("deletions.every { it.path != null && it.file_type == 'SECONDARY' && it.state == 'ASYNC' }", equalTo(true));
//@formatter:on

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(5))
.untilAsserted(() ->
//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(getRootPath() + "/runs/" + runId)))
.accept(ContentType.JSON)
.get(path)
.then()
.assertThat()
.statusCode(200)
.body("runFiles.size()", greaterThan(0))
.body("runFiles.every { it.path != null && it.file_type in ['FINAL', 'LOG'] }", equalTo(true)));
//@formatter:on
}


@Test
@DisplayName("Delete Run Files for non-existent run fails with status 401 or 404")
public void deleteRunFilesForNonExistentRunShouldFail() {
String resourcePath = getRootPath() + "/runs/" + UUID.randomUUID();
String path = resourcePath + "/files";

//@formatter:off
given()
.log().uri()
.log().method()
.header(getHeader(getResource(resourcePath)))
.accept(ContentType.JSON)
.delete(path)
.then()
.assertThat()
.statusCode(anyOf(equalTo(404),equalTo(401)));
//@formatter:on
}


private Stream<Arguments> completeWorkflowWithFilesProvider() throws Exception {
String path = getRootPath() + "/runs";
Map<String, String> inputs = Collections.singletonMap("hello_world.name", "Some sort of String");

//@formatter:off
String workflowJobIdWithAllOutputTypes = given()
.log().uri()
.log().method()
.header(getHeader(getResource(path)))
.multiPart(getWorkflowUrlMultipart("echo.wdl"))
.multiPart(getMultipartAttachment("echo.wdl", supplier.getFileContent(WdlSupplier.WORKFLOW_WITH_ALL_OUTPUT_TYPES).getBytes()))
.multiPart(getJsonMultipart("workflow_params", inputs))
.post(path)
.then()
.assertThat()
.statusCode(200)
.body("run_id", is(notNullValue()))
.extract()
.jsonPath()
.getString("run_id");
//@formatter:on

pollUntilJobCompletes(workflowJobIdWithAllOutputTypes);
return Stream.of(Arguments.of(workflowJobIdWithAllOutputTypes));
}

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
public class WdlSupplier {

public static final String WORKFLOW_WITHOUT_FILE = "workflow_without_file.wdl";
public static final String WORKFLOW_WITH_ALL_OUTPUT_TYPES = "workflow_with_all_output_types.wdl";
public static final String WORKFLOW_WITH_IMPORTS_1 = "workflow_with_imports_1.wdl";
public static final String WORKFLOW_WITH_IMPORTS_2 = "workflow_with_imports_2.wdl";
public static final String WORKFLOW_WITH_IMPORTS_INPUTS = "workflow_with_imports.json";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
task echo {
String name
command {
echo "Hello ${name}"
>&2 echo "Goodbye ${name}"
echo "Bye" > "test.txt"
echo "Bye" > "test2.txt"
}

runtime {
docker: "ubuntu"
}

output {
File out = stdout()
File out2 = "test.txt"
Array[File] arrayOut = [out, out2, "test2.txt"]
}
}

workflow hello_world {
String name
call echo {
input:
name = name
}
output {
File out = echo.out
}
}
26 changes: 26 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.*;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
@ToString
@Builder
public class RunFile {

@JsonProperty(value = "file_type")
FileType fileType;

String path;

public enum FileType {
FINAL,
SECONDARY,
LOG
}

}
13 changes: 13 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFileDeletion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.dnastack.wes.api;

import com.fasterxml.jackson.annotation.JsonUnwrapped;

public record RunFileDeletion(@JsonUnwrapped RunFile runFile, DeletionState state, @JsonUnwrapped ErrorResponse errorResponse) {

public enum DeletionState {
DELETED,
ASYNC,
FAILED
}

}
5 changes: 5 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFileDeletions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.dnastack.wes.api;

import java.util.List;

public record RunFileDeletions(List<RunFileDeletion> deletions) {}
5 changes: 5 additions & 0 deletions src/main/java/com/dnastack/wes/api/RunFiles.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.dnastack.wes.api;

import java.util.List;

public record RunFiles(List<RunFile> runFiles) {}
29 changes: 23 additions & 6 deletions src/main/java/com/dnastack/wes/api/WesV1Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,28 @@ public RunId cancelRun(@PathVariable("runId") String runId) {
return adapter.cancel(runId);
}

@AuditActionUri("wes:run:files:list")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:read', 'wes')")
@GetMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunFiles getRunFiles(@PathVariable("run_id") String runId) {
return adapter.getRunFiles(runId);
}

@AuditActionUri("wes:run:files:delete")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId + '/files', 'wes:runs:write', 'wes')")
@DeleteMapping(value = "/runs/{run_id}/files", produces = { MediaType.APPLICATION_JSON_VALUE })
public RunFileDeletions deleteRunFiles(
@PathVariable("run_id") String runId,
@RequestParam(value = "async", required = false) boolean async
) {
return adapter.deleteRunFiles(runId, async);
}

@AuditActionUri("wes:run:stderr")
@PreAuthorize("@accessEvaluator.canAccessResource('/ga4gh/wes/v1/runs/' + #runId, 'wes:runs:read', 'wes')")
@GetMapping(value = "/runs/{runId}/logs/stderr", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public void getStderr(HttpServletResponse response, @RequestHeader HttpHeaders headers, @PathVariable String runId) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId,getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stderr")
Expand All @@ -139,7 +156,7 @@ public void getTaskStderr(
@PathVariable String runId,
@PathVariable String taskId
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stderr", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stdout")
Expand All @@ -151,7 +168,7 @@ public void getTaskStdout(
@PathVariable String runId,
@PathVariable String taskId
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskId, "stdout", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stderr")
Expand All @@ -164,7 +181,7 @@ public void getTaskStderr(
@PathVariable String taskName,
@PathVariable int index
) throws IOException {
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr",getRangeFromHeaders(response,headers));
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stderr", getRangeFromHeaders(response, headers));
}

@AuditActionUri("wes:run:stdout")
Expand All @@ -180,9 +197,9 @@ public void getTaskStdout(
adapter.getLogBytes(response.getOutputStream(), runId, taskName, index, "stdout", getRangeFromHeaders(response, headers));
}

private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers){
private HttpRange getRangeFromHeaders(HttpServletResponse response, HttpHeaders headers) {
List<HttpRange> ranges = headers.getRange();
if (ranges.isEmpty()){
if (ranges.isEmpty()) {
return null;
} else if (ranges.size() > 1) {
// only return the first range parsed
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/dnastack/wes/config/AsyncConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dnastack.wes.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Configuration
public class AsyncConfig {

@Bean
public TaskExecutor defaultAsyncOperationExecutor(
@Value("${app.executors.default.core-pool-size:8}") int corePoolSize,
@Value("${app.executors.default.max-pool-size:16}") int maxPoolSize,
@Value("${app.executors.default.queue-capacity:5000}") int queueCapacity
) {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("defaultAsyncOp-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

}
Loading

0 comments on commit 7a68da7

Please sign in to comment.