Skip to content

Commit

Permalink
AJ-1602: list-jobs API should aggregate Import Service and cWDS (#1308)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Mar 14, 2024
1 parent 80d93a4 commit a1b4e65
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 23 deletions.
5 changes: 5 additions & 0 deletions local-dev/templates/firecloud-orchestration.conf.ctmpl
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ thurloe {
baseUrl = "https://thurloe.dsde-dev.broadinstitute.org"
}

cwds {
baseUrl = "https://cwds.dsde-dev.broadinstitute.org"
enabled = true
}

firecloud {
baseUrl = "https://firecloud-orchestration.dsde-dev.broadinstitute.org"
portalUrl = "https://firecloud.dsde-dev.broadinstitute.org"
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object Dependencies {
"org.broadinstitute.dsde.workbench" %% "workbench-oauth2" % s"0.5-$workbenchLibsHash",
"org.broadinstitute.dsde.workbench" %% "sam-client" % "0.1-ef83073",
"org.broadinstitute.dsde.workbench" %% "workbench-notifications" %s"0.6-$workbenchLibsHash",
"org.databiosphere" % "workspacedataservice-client-okhttp-jakarta" % "0.2.117-SNAPSHOT",

"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-slf4j" % akkaV,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ case class Application(agoraDAO: AgoraDAO,
thurloeDAO: ThurloeDAO,
shareLogDAO: ShareLogDAO,
importServiceDAO: ImportServiceDAO,
shibbolethDAO: ShibbolethDAO)
shibbolethDAO: ShibbolethDAO,
cwdsDAO: CwdsDAO)
3 changes: 2 additions & 1 deletion src/main/scala/org/broadinstitute/dsde/firecloud/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ object Boot extends App with LazyLogging {
val shareLogDAO:ShareLogDAO = new ElasticSearchShareLogDAO(elasticSearchClient, FireCloudConfig.ElasticSearch.shareLogIndexName)
val importServiceDAO:ImportServiceDAO = new HttpImportServiceDAO
val shibbolethDAO:ShibbolethDAO = new HttpShibbolethDAO
val cwdsDAO:CwdsDAO = new HttpCwdsDAO(FireCloudConfig.Cwds.enabled)

val app:Application = Application(agoraDAO, googleServicesDAO, ontologyDAO, rawlsDAO, samDAO, searchDAO, researchPurposeSupport, thurloeDAO, shareLogDAO, importServiceDAO, shibbolethDAO);
val app:Application = Application(agoraDAO, googleServicesDAO, ontologyDAO, rawlsDAO, samDAO, searchDAO, researchPurposeSupport, thurloeDAO, shareLogDAO, importServiceDAO, shibbolethDAO, cwdsDAO);

val agoraPermissionServiceConstructor: (UserInfo) => AgoraPermissionService = AgoraPermissionService.constructor(app)
val exportEntitiesByTypeActorConstructor: (ExportEntitiesByTypeArguments) => ExportEntitiesByTypeActor = ExportEntitiesByTypeActor.constructor(app, system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.typesafe.scalalogging.LazyLogging
import org.broadinstitute.dsde.firecloud.EntityService._
import org.broadinstitute.dsde.firecloud.FireCloudConfig.Rawls
import org.broadinstitute.dsde.firecloud.dataaccess.ImportServiceFiletypes.{FILETYPE_PFB, FILETYPE_RAWLS}
import org.broadinstitute.dsde.firecloud.dataaccess.{GoogleServicesDAO, ImportServiceDAO, RawlsDAO}
import org.broadinstitute.dsde.firecloud.dataaccess.{CwdsDAO, GoogleServicesDAO, ImportServiceDAO, RawlsDAO}
import org.broadinstitute.dsde.firecloud.model.ModelJsonProtocol._
import org.broadinstitute.dsde.firecloud.model.{ModelSchema, _}
import org.broadinstitute.dsde.firecloud.service.PerRequest.{PerRequestMessage, RequestComplete}
Expand All @@ -34,7 +34,7 @@ import scala.util.{Failure, Success, Try}
object EntityService {

def constructor(app: Application)(modelSchema: ModelSchema)(implicit executionContext: ExecutionContext) =
new EntityService(app.rawlsDAO, app.importServiceDAO, app.googleServicesDAO, modelSchema)
new EntityService(app.rawlsDAO, app.importServiceDAO, app.cwdsDAO, app.googleServicesDAO, modelSchema)

def colNamesToAttributeNames(headers: Seq[String], requiredAttributes: Map[String, String]): Seq[(String, Option[String])] = {
headers.tail map { colName => (colName, requiredAttributes.get(colName))}
Expand Down Expand Up @@ -102,7 +102,7 @@ object EntityService {

}

class EntityService(rawlsDAO: RawlsDAO, importServiceDAO: ImportServiceDAO, googleServicesDAO: GoogleServicesDAO, modelSchema: ModelSchema)(implicit val executionContext: ExecutionContext)
class EntityService(rawlsDAO: RawlsDAO, importServiceDAO: ImportServiceDAO, cwdsDAO: CwdsDAO, googleServicesDAO: GoogleServicesDAO, modelSchema: ModelSchema)(implicit val executionContext: ExecutionContext)
extends TSVFileSupport with LazyLogging {

val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ")
Expand Down Expand Up @@ -374,11 +374,22 @@ class EntityService(rawlsDAO: RawlsDAO, importServiceDAO: ImportServiceDAO, goog
}

def listJobs(workspaceNamespace: String, workspaceName: String, runningOnly: Boolean, userInfo: UserInfo): Future[List[ImportServiceListResponse]] = {
// get jobs from Import Service
val importServiceJobs = importServiceDAO.listJobs(workspaceNamespace, workspaceName, runningOnly)(userInfo)
// TODO AJ-1602: get jobs from cWDS
// TODO AJ-1602: merge lists before replying
importServiceJobs

for {
// get jobs from Import Service
importServiceJobs <- importServiceDAO.listJobs(workspaceNamespace, workspaceName, runningOnly)(userInfo)
// get jobs from cWDS
cwdsJobs <- if (cwdsDAO.isEnabled) {
rawlsDAO.getWorkspace(workspaceNamespace, workspaceName)(userInfo) map { workspace =>
cwdsDAO.listJobsV1(workspace.workspace.workspaceId, runningOnly)(userInfo)
}
} else {
Future.successful(List.empty)
}
} yield {
// merge Import Service and cWDS results
importServiceJobs.concat(cwdsJobs)
}
}

def getEntitiesWithType(workspaceNamespace: String, workspaceName: String, userInfo: UserInfo): Future[PerRequestMessage] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ object FireCloudConfig {
val validPreferenceKeys = profile.getStringList("validPreferenceKeys").asScala.toSet
}

object Cwds {
private val cwds = config.getConfig("cwds")
val baseUrl = cwds.getString("baseUrl")
val enabled = cwds.getBoolean("enabled")
}

object FireCloud {
private val firecloud = config.getConfig("firecloud")
val baseUrl = firecloud.getString("baseUrl")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.broadinstitute.dsde.firecloud.dataaccess

import org.broadinstitute.dsde.firecloud.model.{ImportServiceListResponse, UserInfo}

import java.util.UUID
import scala.concurrent.Future

trait CwdsDAO {

def isEnabled: Boolean

def listJobsV1(workspaceId: String,
runningOnly: Boolean
)(implicit userInfo: UserInfo): List[ImportServiceListResponse]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.broadinstitute.dsde.firecloud.dataaccess

import okhttp3.Protocol
import org.broadinstitute.dsde.firecloud.FireCloudConfig
import org.broadinstitute.dsde.firecloud.dataaccess.HttpCwdsDAO.commonHttpClient
import org.broadinstitute.dsde.firecloud.model.{ImportServiceListResponse, UserInfo}
import org.databiosphere.workspacedata.api.JobApi
import org.databiosphere.workspacedata.client.ApiClient
import org.databiosphere.workspacedata.model.GenericJob
import org.databiosphere.workspacedata.model.GenericJob.StatusEnum._

import scala.jdk.CollectionConverters._
import java.util.UUID

object HttpCwdsDAO {
// singleton common http client to prevent object thrashing
private val commonHttpClient = new ApiClient().getHttpClient.newBuilder
.protocols(List(Protocol.HTTP_1_1).asJava)
.build
}

class HttpCwdsDAO(enabled: Boolean) extends CwdsDAO {

private final val RUNNING_STATUSES: java.util.List[String] = List("CREATED", "QUEUED", "RUNNING").asJava

private final val STATUS_TRANSLATION: Map[GenericJob.StatusEnum,String] = Map(
// there is no effective difference between Translating and ReadyForUpsert for our purposes
CREATED -> "Translating",
QUEUED -> "Translating",
RUNNING -> "ReadyForUpsert",
SUCCEEDED -> "Done",
ERROR -> "Error",
CANCELLED -> "Error",
UNKNOWN -> "Error"
)

override def isEnabled: Boolean = enabled

override def listJobsV1(workspaceId: String, runningOnly: Boolean)(implicit userInfo: UserInfo)
: scala.collection.immutable.List[ImportServiceListResponse] = {
// determine the proper cWDS statuses based on the runningOnly argument
// the Java API expects null when not specifying statuses
val statuses = if (runningOnly) RUNNING_STATUSES else null

// prepare the cWDS client
val apiClient: ApiClient = new ApiClient()
apiClient.setHttpClient(commonHttpClient)
apiClient.setBasePath(FireCloudConfig.Cwds.baseUrl)
apiClient.setAccessToken(userInfo.accessToken.token)
val jobApi: JobApi = new JobApi()
jobApi.setApiClient(apiClient)

// query cWDS for its jobs, and translate the response to ImportServiceListResponse format
jobApi.jobsInInstanceV1(UUID.fromString(workspaceId), statuses)
.asScala
.map(toImportServiceListResponse)
.toList
}

protected[dataaccess] def toImportServiceListResponse(cwdsJob: GenericJob): ImportServiceListResponse = {
ImportServiceListResponse(jobId = cwdsJob.getJobId.toString,
status = toImportServiceStatus(cwdsJob.getStatus),
filetype = cwdsJob.getJobType.getValue,
message = Option(cwdsJob.getErrorMessage))
}

protected[dataaccess] def toImportServiceStatus(cwdsStatus: GenericJob.StatusEnum): String = {
// don't fail status translation if status somehow could not be found
STATUS_TRANSLATION.getOrElse(cwdsStatus, "Unknown")
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.model.{HttpResponse, StatusCode, StatusCodes}
import com.google.cloud.storage.StorageException
import org.broadinstitute.dsde.firecloud.dataaccess.ImportServiceFiletypes.FILETYPE_RAWLS
import org.broadinstitute.dsde.firecloud.dataaccess.{MockImportServiceDAO, MockRawlsDAO}
import org.broadinstitute.dsde.firecloud.dataaccess.{MockCwdsDAO, MockImportServiceDAO, MockRawlsDAO}
import org.broadinstitute.dsde.firecloud.mock.MockGoogleServicesDAO
import org.broadinstitute.dsde.firecloud.model.ModelJsonProtocol._
import org.broadinstitute.dsde.firecloud.model.{AsyncImportRequest, EntityUpdateDefinition, FirecloudModelSchema, ImportServiceResponse, ModelSchema, RequestCompleteWithErrorReport, UserInfo}
import org.broadinstitute.dsde.firecloud.model.{AsyncImportRequest, EntityUpdateDefinition, FirecloudModelSchema, ImportServiceListResponse, ImportServiceResponse, ModelSchema, RequestCompleteWithErrorReport, UserInfo, WithAccessToken}
import org.broadinstitute.dsde.firecloud.service.PerRequest.RequestComplete
import org.broadinstitute.dsde.firecloud.service.{BaseServiceSpec, PerRequest}
import org.broadinstitute.dsde.rawls.model.{ErrorReport, ErrorReportSource}
import org.broadinstitute.dsde.rawls.model.ErrorReport
import org.broadinstitute.dsde.workbench.model.google.{GcsBucketName, GcsObjectName, GcsPath}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{times, verify, when}
import org.mockito.Mockito.{never, times, verify, when}
import org.parboiled.common.FileUtils
import org.scalatest.BeforeAndAfterEach
import org.scalatestplus.mockito.MockitoSugar.{mock => mockito}

import java.util.UUID
import java.util.zip.ZipFile
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
Expand All @@ -32,6 +33,8 @@ class EntityServiceSpec extends BaseServiceSpec with BeforeAndAfterEach {
searchDao.reset()
}

private def dummyUserInfo(tokenStr: String) = UserInfo("dummy", OAuth2BearerToken(tokenStr), -1, "dummy")

"EntityClient should extract TSV files out of bagit zips" - {
"with neither participants nor samples in the zip" in {
val zip = new ZipFile("src/test/resources/testfiles/bagit/nothingbag.zip")
Expand Down Expand Up @@ -214,10 +217,107 @@ class EntityServiceSpec extends BaseServiceSpec with BeforeAndAfterEach {
}
}

"EntityService.listJobs" - {
"should concatenate results from cWDS and Import Service" in {
val importServiceResponse = List(
ImportServiceListResponse("jobId1", "status1", "filetype1", None),
ImportServiceListResponse("jobId2", "status2", "filetype2", None)
)
val cwdsResponse = List(
ImportServiceListResponse("jobId3", "status3", "filetype3", None),
ImportServiceListResponse("jobId4", "status4", "filetype4", None)
)

listJobsTestImpl(importServiceResponse, cwdsResponse)
}

"should return empty list if both cWDS and Import Service return empty lists" in {
val importServiceResponse = List()
val cwdsResponse = List()

listJobsTestImpl(importServiceResponse, cwdsResponse)
}

"should return results if Import Service has results but cWDS is empty" in {
val importServiceResponse = List(
ImportServiceListResponse("jobId1", "status1", "filetype1", None),
ImportServiceListResponse("jobId2", "status2", "filetype2", None)
)
val cwdsResponse = List()

listJobsTestImpl(importServiceResponse, cwdsResponse)
}

"should return results if cWDS has results but Import Service is empty" in {
val importServiceResponse = List()
val cwdsResponse = List(
ImportServiceListResponse("jobId1", "status1", "filetype1", None),
ImportServiceListResponse("jobId2", "status2", "filetype2", None)
)

listJobsTestImpl(importServiceResponse, cwdsResponse)
}

"should not call cWDS if cWDS is not enabled" in {
// set up mocks
val importServiceDAO = mockito[MockImportServiceDAO]
val cwdsDAO = mockito[MockCwdsDAO]
val rawlsDAO = mockito[MockRawlsDAO]

val importServiceResponse = List(
ImportServiceListResponse("jobId1", "status1", "filetype1", None),
ImportServiceListResponse("jobId2", "status2", "filetype2", None)
)
val cwdsResponse = List(
ImportServiceListResponse("jobId3", "status3", "filetype3", None),
ImportServiceListResponse("jobId4", "status4", "filetype4", None)
)

when(importServiceDAO.listJobs(any[String], any[String], any[Boolean])(any[UserInfo])).thenReturn(Future.successful(importServiceResponse))
when(cwdsDAO.listJobsV1(any[String], any[Boolean])(any[UserInfo])).thenReturn(cwdsResponse)
when(cwdsDAO.isEnabled).thenReturn(false)

// inject mocks to entity service
val entityService = getEntityService(mockImportServiceDAO = importServiceDAO, cwdsDAO = cwdsDAO)

// list jobs via entity service
val actual = entityService.listJobs("workspaceNamespace", "workspaceName", runningOnly = true, dummyUserInfo("mytoken")).futureValue

// verify Rawls get-workspace was NOT called
verify(rawlsDAO, never).getWorkspace(any[String], any[String])(any[WithAccessToken])
// verify cwds list-jobs was NOT called
verify(cwdsDAO, never).listJobsV1(any[String], any[Boolean])(any[UserInfo])

// verify the response only contains Import Service jobs
actual should contain theSameElementsAs importServiceResponse
}

def listJobsTestImpl(importServiceResponse: List[ImportServiceListResponse], cwdsResponse: List[ImportServiceListResponse]) = {
// set up mocks
val importServiceDAO = mockito[MockImportServiceDAO]
val cwdsDAO = mockito[MockCwdsDAO]

when(importServiceDAO.listJobs(any[String], any[String], any[Boolean])(any[UserInfo])).thenReturn(Future.successful(importServiceResponse))
when(cwdsDAO.listJobsV1(any[String], any[Boolean])(any[UserInfo])).thenReturn(cwdsResponse)
when(cwdsDAO.isEnabled).thenReturn(true)

// inject mocks to entity service
val entityService = getEntityService(mockImportServiceDAO = importServiceDAO, cwdsDAO = cwdsDAO)

// list jobs via entity service
val actual = entityService.listJobs("workspaceNamespace", "workspaceName", runningOnly = true, dummyUserInfo("mytoken")).futureValue

actual should contain theSameElementsAs (importServiceResponse ++ cwdsResponse)
}


}

private def getEntityService(mockGoogleServicesDAO: MockGoogleServicesDAO = new MockGoogleServicesDAO,
mockImportServiceDAO: MockImportServiceDAO = new MockImportServiceDAO,
rawlsDAO: MockRawlsDAO = new MockRawlsDAO) = {
val application = app.copy(googleServicesDAO = mockGoogleServicesDAO, rawlsDAO = rawlsDAO, importServiceDAO = mockImportServiceDAO)
rawlsDAO: MockRawlsDAO = new MockRawlsDAO,
cwdsDAO: MockCwdsDAO = new MockCwdsDAO) = {
val application = app.copy(googleServicesDAO = mockGoogleServicesDAO, rawlsDAO = rawlsDAO, importServiceDAO = mockImportServiceDAO, cwdsDAO = cwdsDAO)

// instantiate an EntityService, specify importServiceDAO and googleServicesDAO
implicit val modelSchema: ModelSchema = FirecloudModelSchema
Expand Down
Loading

0 comments on commit a1b4e65

Please sign in to comment.