Skip to content

Commit

Permalink
getJob checks both cWDS and Import Service
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb committed Mar 18, 2024
1 parent 9e5d939 commit ac43897
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.firecloud.EntityService._
import org.broadinstitute.dsde.firecloud.FireCloudConfig.Rawls
import org.broadinstitute.dsde.firecloud.dataaccess.ImportServiceFiletypes.{FILETYPE_PFB, FILETYPE_RAWLS}
import org.broadinstitute.dsde.firecloud.dataaccess.ImportServiceFiletypes.FILETYPE_RAWLS
import org.broadinstitute.dsde.firecloud.dataaccess.{CwdsDAO, GoogleServicesDAO, ImportServiceDAO, RawlsDAO}
import org.broadinstitute.dsde.firecloud.model.ModelJsonProtocol._
import org.broadinstitute.dsde.firecloud.model.{ModelSchema, _}
Expand All @@ -28,7 +28,6 @@ import scala.jdk.CollectionConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scala.language.postfixOps
import scala.sys.process._
import scala.util.{Failure, Success, Try}

object EntityService {
Expand Down Expand Up @@ -409,6 +408,31 @@ class EntityService(rawlsDAO: RawlsDAO, importServiceDAO: ImportServiceDAO, cwds
}
}

def getJob(workspaceNamespace: String, workspaceName: String, jobId: String, userInfo: UserInfo): Future[ImportServiceListResponse] = {
// there is no way to tell just from a jobId if the job exists in cWDS or Import Service. So, we have to try both.

// if cWDS is enabled, query cWDS for job
val cwdsFuture: Future[ImportServiceListResponse] = if (cwdsDAO.isEnabled) {
rawlsDAO.getWorkspace(workspaceNamespace, workspaceName)(userInfo) map { workspace =>
cwdsDAO.getJobV1(workspace.workspace.workspaceId, jobId)(userInfo)
}
} else {
Future.failed(new FireCloudExceptionWithErrorReport(ErrorReport(StatusCodes.NotFound, s"Import $jobId not found")))
}

// if cWDS found the job, return it; else, try Import Service
cwdsFuture map { cwdsResponse =>
logger.info(s"Found job $jobId in cWDS")
cwdsResponse
} recoverWith {
case _ =>
importServiceDAO.getJob(workspaceNamespace, workspaceName, jobId)(userInfo) map { importServiceResponse =>
logger.info(s"Found job $jobId in Import Service")
importServiceResponse
}
}
}

def getEntitiesWithType(workspaceNamespace: String, workspaceName: String, userInfo: UserInfo): Future[PerRequestMessage] = {
rawlsDAO.getEntityTypes(workspaceNamespace, workspaceName)(userInfo).flatMap { entityTypeResponse =>
val entityTypes = entityTypeResponse.keys.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ trait CwdsDAO {
runningOnly: Boolean
)(implicit userInfo: UserInfo): List[ImportServiceListResponse]

def getJobV1(workspaceId: String,
jobId: String
)(implicit userInfo: UserInfo): ImportServiceListResponse

def importV1(workspaceId: String,
asyncImportRequest: AsyncImportRequest
)(implicit userInfo: UserInfo): GenericJob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class HttpCwdsDAO(enabled: Boolean, supportedFormats: List[String]) extends Cwds
.toList
}

override def getJobV1(workspaceId: String, jobId: String)(implicit userInfo: UserInfo): ImportServiceListResponse = {
val jobApi: JobApi = new JobApi()
jobApi.setApiClient(getApiClient(userInfo.accessToken.token))

toImportServiceListResponse(jobApi.jobStatusV1(UUID.fromString(jobId)))
}

override def importV1(workspaceId: String,
asyncImportRequest: AsyncImportRequest
)(implicit userInfo: UserInfo): GenericJob = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package org.broadinstitute.dsde.firecloud.dataaccess

import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model.{HttpEntity, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, ResponseEntity, StatusCodes}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import org.broadinstitute.dsde.firecloud.FireCloudConfig
import org.broadinstitute.dsde.firecloud.{FireCloudConfig, FireCloudExceptionWithErrorReport}
import org.broadinstitute.dsde.firecloud.model.{AsyncImportRequest, AsyncImportResponse, ImportServiceListResponse, ImportServiceRequest, ImportServiceResponse, RequestCompleteWithErrorReport, UserInfo}
import org.broadinstitute.dsde.firecloud.service.FireCloudDirectiveUtils
import org.broadinstitute.dsde.firecloud.service.PerRequest.{PerRequestMessage, RequestComplete}
import org.broadinstitute.dsde.firecloud.model.ModelJsonProtocol._
import org.broadinstitute.dsde.firecloud.utils.RestJsonClient
import org.broadinstitute.dsde.rawls.model.{ErrorReportSource, WorkspaceName}

import org.broadinstitute.dsde.rawls.model.{ErrorReport, ErrorReportSource, WorkspaceName}
import spray.json.DefaultJsonProtocol._

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -39,6 +38,22 @@ class HttpImportServiceDAO(implicit val system: ActorSystem, implicit val materi
}
}

override def getJob(workspaceNamespace: String, workspaceName: String, jobId: String)(implicit userInfo: UserInfo): Future[ImportServiceListResponse] = {
// get single job from import service
val importServiceUrl = FireCloudDirectiveUtils
.encodeUri(s"${FireCloudConfig.ImportService.server}/$workspaceNamespace/$workspaceName/imports/$jobId")

userAuthedRequest(Get(importServiceUrl))(userInfo) flatMap { importServiceResponse =>
importServiceResponse.status match {
case StatusCodes.OK => Unmarshal(importServiceResponse).to[ImportServiceListResponse]
case _ =>
extractResponseBody(importServiceResponse.entity) flatMap { respBody =>
Future.failed(new FireCloudExceptionWithErrorReport(ErrorReport(importServiceResponse.status, respBody)))
}
}
}
}

private def doImport(workspaceNamespace: String, workspaceName: String, isUpsert: Boolean, importRequest: AsyncImportRequest)(implicit userInfo: UserInfo): Future[PerRequestMessage] = {
// the payload to Import Service sends "path" and filetype.
val importServicePayload: ImportServiceRequest = ImportServiceRequest(path = importRequest.url, filetype = importRequest.filetype, isUpsert = isUpsert, options = importRequest.options)
Expand Down Expand Up @@ -70,13 +85,7 @@ class HttpImportServiceDAO(implicit val system: ActorSystem, implicit val materi
}
case otherResp =>
// see if we can extract errors
val responseStringFuture = otherResp.entity match {
case HttpEntity.Strict(_, data) =>
Future.successful(data.utf8String)
case nonStrictEntity =>
nonStrictEntity.toStrict(10.seconds).map(_.data.utf8String)
}
responseStringFuture.map { responseString =>
extractResponseBody(otherResp.entity).map { responseString =>
RequestCompleteWithErrorReport(otherResp.status, responseString)
} recover {
case t:Throwable =>
Expand All @@ -86,4 +95,15 @@ class HttpImportServiceDAO(implicit val system: ActorSystem, implicit val materi
}
}
}

protected[dataaccess] def extractResponseBody(entity: ResponseEntity): Future[String] = {
entity match {
case HttpEntity.Strict(_, data) =>
Future.successful(data.utf8String)
case nonStrictEntity =>
nonStrictEntity.toStrict(10.seconds).map(_.data.utf8String)
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ trait ImportServiceDAO {
runningOnly: Boolean
)(implicit userInfo: UserInfo): Future[List[ImportServiceListResponse]]

def getJob(workspaceNamespace: String,
workspaceName: String,
jobId: String,
)(implicit userInfo: UserInfo): Future[ImportServiceListResponse]

}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ trait WorkspaceApiService extends FireCloudRequestBuilding with FireCloudDirecti
path(("importPFB" | "importJob") / Segment) { jobId =>
get {
requireUserInfo() { userInfo =>
passthrough(Uri(encodeUri(s"${FireCloudConfig.ImportService.server}/$workspaceNamespace/$workspaceName/imports/$jobId")), HttpMethods.GET)
complete {
entityServiceConstructor(FlexibleModelSchema).getJob(workspaceNamespace, workspaceName, jobId, userInfo) map { respBody =>
RequestComplete(OK, respBody)
}
}
}
}
} ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,15 @@ class EntityServiceSpec extends BaseServiceSpec with BeforeAndAfterEach {

}

"EntityService.getJob" - {
"should return correctly if job found in cWDS" is pending
"should return correctly if job not found in cWDS, but is found in Import Service" is pending
"should return correctly if cWDS errors, but is found in Import Service" is pending
"should return 404 if job not found in either cWDS or Import Service" is pending
"should return Import Service's error if job not found in cWDS and Import Service errors" is pending
"should return Import Service's error if cWDS errors and Import Service errors" is pending
}

private def getEntityService(mockGoogleServicesDAO: MockGoogleServicesDAO = new MockGoogleServicesDAO,
mockImportServiceDAO: MockImportServiceDAO = new MockImportServiceDAO,
rawlsDAO: MockRawlsDAO = new MockRawlsDAO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class MockCwdsDAO(enabled: Boolean = true) extends CwdsDAO {
override def listJobsV1(workspaceId: String, runningOnly: Boolean)(implicit userInfo: UserInfo)
: List[ImportServiceListResponse] = List()

override def getJobV1(workspaceId: String, jobId: String)(implicit userInfo: UserInfo): ImportServiceListResponse =
ImportServiceListResponse(jobId, "ReadyForUpsert", "pfb", None)

override def importV1(workspaceId: String,
asyncImportRequest: AsyncImportRequest
)(implicit userInfo: UserInfo): GenericJob = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ class MockImportServiceDAO extends ImportServiceDAO {
: Future[List[ImportServiceListResponse]] = {
Future.successful(List.empty[ImportServiceListResponse])
}
}

override def getJob(workspaceNamespace: String, workspaceName: String, jobId: String)(implicit userInfo: UserInfo)
: Future[ImportServiceListResponse] = {
Future.successful(ImportServiceListResponse(jobId, "status", "filetype", None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1159,45 +1159,6 @@ class WorkspaceApiServiceSpec extends BaseServiceSpec with WorkspaceApiService w

}

"WorkspaceService importPFB job-status Tests" - {

List(importJobStatusPath, pfbImportPath) foreach { pathUnderTest =>
s"Successful passthrough should return OK with payload for $pathUnderTest" in {

val jobId = UUID.randomUUID().toString

val responsePayload = JsObject(
("id", JsString(jobId)),
("status", JsString("Running"))
)

importServiceServer
.when(request()
.withMethod("GET")
.withPath(s"/${workspace.namespace}/${workspace.name}/imports/$jobId"))
.respond(org.mockserver.model.HttpResponse.response()
.withStatusCode(OK.intValue)
.withBody(responsePayload.compactPrint)
.withHeader("Content-Type", "application/json"))

(Get(s"$pathUnderTest/$jobId")
~> dummyUserIdHeaders(dummyUserId)
~> sealRoute(workspaceRoutes)) ~> check {
status should equal(OK)
responseAs[String].parseJson should be (responsePayload) // to address string-formatting issues
}
}

s"Passthrough should not pass unrecognized HTTP verbs for $pathUnderTest" in {
(Delete(s"$pathUnderTest/dummyJobId")
~> dummyUserIdHeaders(dummyUserId)
~> sealRoute(workspaceRoutes)) ~> check {
status should equal(MethodNotAllowed)
}
}
}
}

"WorkspaceService POST importJob Tests" - {

List(FILETYPE_PFB, FILETYPE_TDR) foreach { filetype =>
Expand Down

0 comments on commit ac43897

Please sign in to comment.