From 53c85fa7ec09ae8694e269b78d5860b398a432ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Goworko?= Date: Fri, 6 Dec 2024 15:51:48 +0100 Subject: [PATCH] DB optimization draft --- .../management/periodic/DeploymentActor.scala | 25 +-- .../management/periodic/JarManager.scala | 1 + .../periodic/PeriodicProcessService.scala | 82 ++++++--- .../db/PeriodicProcessesRepository.scala | 169 +++++++++++++----- .../periodic/flink/FlinkJarManager.scala | 10 +- .../model/DeploymentWithJarData.scala | 5 +- .../periodic/model/PeriodicProcess.scala | 4 +- .../model/PeriodicProcessDeployment.scala | 17 +- .../periodic/model/SchedulesState.scala | 55 ++++-- .../AdditionalDeploymentDataProvider.scala | 4 +- .../service/PeriodicProcessListener.scala | 13 +- .../service/ProcessConfigEnricher.scala | 2 +- .../periodic/DeploymentActorTest.scala | 21 ++- .../management/periodic/JarManagerStub.scala | 3 +- .../PeriodicProcessDeploymentGen.scala | 44 ++--- .../periodic/PeriodicProcessGen.scala | 3 +- ...eriodicProcessServiceIntegrationTest.scala | 42 ++--- .../periodic/PeriodicProcessServiceTest.scala | 18 +- .../db/InMemPeriodicProcessesRepository.scala | 38 ++-- 19 files changed, 360 insertions(+), 196 deletions(-) diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActor.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActor.scala index 091a1cc5052..a204313e4e8 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActor.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActor.scala @@ -8,7 +8,10 @@ import pl.touk.nussknacker.engine.management.periodic.DeploymentActor.{ DeploymentCompleted, WaitingForDeployment } -import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeployment +import pl.touk.nussknacker.engine.management.periodic.model.{ + PeriodicProcessDeployment, + PeriodicProcessDeploymentWithFullProcess +} import scala.concurrent.Future import scala.concurrent.duration._ @@ -21,8 +24,8 @@ object DeploymentActor { } private[periodic] def props( - findToBeDeployed: => Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]], - deploy: PeriodicProcessDeployment[CanonicalProcess] => Future[Unit], + findToBeDeployed: => Future[Seq[PeriodicProcessDeploymentWithFullProcess]], + deploy: PeriodicProcessDeploymentWithFullProcess => Future[Unit], interval: FiniteDuration ) = { Props(new DeploymentActor(findToBeDeployed, deploy, interval)) @@ -30,14 +33,14 @@ object DeploymentActor { private[periodic] case object CheckToBeDeployed - private case class WaitingForDeployment(ids: List[PeriodicProcessDeployment[CanonicalProcess]]) + private case class WaitingForDeployment(ids: List[PeriodicProcessDeploymentWithFullProcess]) private case object DeploymentCompleted } class DeploymentActor( - findToBeDeployed: => Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]], - deploy: PeriodicProcessDeployment[CanonicalProcess] => Future[Unit], + findToBeDeployed: => Future[Seq[PeriodicProcessDeploymentWithFullProcess]], + deploy: PeriodicProcessDeploymentWithFullProcess => Future[Unit], interval: FiniteDuration ) extends Actor with Timers @@ -55,25 +58,25 @@ class DeploymentActor( logger.trace("Checking scenarios to be deployed") findToBeDeployed.onComplete { case Success(runDetailsSeq) => - logger.debug(s"Found ${runDetailsSeq.size} to be deployed: ${runDetailsSeq.map(_.display)}") + logger.debug(s"Found ${runDetailsSeq.size} to be deployed: ${runDetailsSeq.map(_.deployment.display)}") self ! WaitingForDeployment(runDetailsSeq.toList) case Failure(exception) => logger.error("Finding scenarios to be deployed failed unexpectedly", exception) } case WaitingForDeployment(Nil) => case WaitingForDeployment(runDetails :: _) => - logger.info(s"Found a scenario to be deployed: ${runDetails.display}") - context.become(receiveOngoingDeployment(runDetails)) + logger.info(s"Found a scenario to be deployed: ${runDetails.deployment.display}") + context.become(receiveOngoingDeployment(runDetails.deployment)) deploy(runDetails) onComplete { case Success(_) => self ! DeploymentCompleted case Failure(exception) => - logger.error(s"Deployment of ${runDetails.display} failed unexpectedly", exception) + logger.error(s"Deployment of ${runDetails.deployment.display} failed unexpectedly", exception) self ! DeploymentCompleted } } - private def receiveOngoingDeployment(runDetails: PeriodicProcessDeployment[CanonicalProcess]): Receive = { + private def receiveOngoingDeployment(runDetails: PeriodicProcessDeployment): Receive = { case CheckToBeDeployed => logger.debug(s"Still waiting for ${runDetails.display} to be deployed") case DeploymentCompleted => diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/JarManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/JarManager.scala index 255ab8108e9..ad22cb3df90 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/JarManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/JarManager.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.management.periodic import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index e346c92ec08..e0d098cd034 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -78,7 +78,7 @@ class PeriodicProcessService( scenarioActivityId = ScenarioActivityId.random, user = ScenarioUser.internalNuUser, date = metadata.dateDeployed.getOrElse(metadata.dateFinished), - scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcess.processVersion.versionId)), + scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcessMetadata.versionId)), scheduledExecutionStatus = metadata.status, dateFinished = metadata.dateFinished, scheduleName = deployment.scheduleName.display, @@ -163,7 +163,7 @@ class PeriodicProcessService( scheduledProcessesRepository .schedule(process.id, name, date, deploymentRetryConfig.deployMaxRetries) .flatMap { data => - handleEvent(ScheduledEvent(data, firstSchedule = true)) + handleEvent(ScheduledEvent(data.deployment, firstSchedule = true)) } case (name, None) => logger.warn(s"Schedule $name does not have date to schedule") @@ -174,7 +174,7 @@ class PeriodicProcessService( .map(_ => ()) } - def findToBeDeployed: Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = { + def findToBeDeployed: Future[Seq[PeriodicProcessDeploymentWithFullProcess]] = { for { toBeDeployed <- scheduledProcessesRepository.findToBeDeployed.run.flatMap { toDeployList => Future.sequence(toDeployList.map(checkIfNotRunning)).map(_.flatten) @@ -182,22 +182,24 @@ class PeriodicProcessService( // We retry scenarios that failed on deployment. Failure recovery of running scenarios should be handled by Flink's restart strategy toBeRetried <- scheduledProcessesRepository.findToBeRetried.run // We don't block scheduled deployments by retries - } yield toBeDeployed.sortBy(d => (d.runAt, d.createdAt)) ++ toBeRetried.sortBy(d => (d.nextRetryAt, d.createdAt)) + } yield toBeDeployed.sortBy(d => (d.deployment.runAt, d.deployment.createdAt)) ++ toBeRetried.sortBy(d => + (d.deployment.nextRetryAt, d.deployment.createdAt) + ) } // Currently we don't allow simultaneous runs of one scenario - only sequential, so if other schedule kicks in, it'll have to wait // TODO: we show allow to deploy scenarios with different scheduleName to be deployed simultaneous private def checkIfNotRunning( - toDeploy: PeriodicProcessDeployment[CanonicalProcess] - ): Future[Option[PeriodicProcessDeployment[CanonicalProcess]]] = { + toDeploy: PeriodicProcessDeploymentWithFullProcess + ): Future[Option[PeriodicProcessDeploymentWithFullProcess]] = { delegateDeploymentManager - .getProcessStates(toDeploy.periodicProcess.processVersion.processName)(DataFreshnessPolicy.Fresh) + .getProcessStates(toDeploy.deployment.periodicProcessMetadata.processName)(DataFreshnessPolicy.Fresh) .map( _.value .map(_.status) .find(SimpleStateStatus.DefaultFollowingDeployStatuses.contains) .map { _ => - logger.debug(s"Deferring run of ${toDeploy.display} as scenario is currently running") + logger.debug(s"Deferring run of ${toDeploy.deployment.display} as scenario is currently running") None } .getOrElse(Some(toDeploy)) @@ -298,9 +300,14 @@ class PeriodicProcessService( case Right(Some(futureDate)) => logger.info(s"Rescheduling ${deployment.display} to $futureDate") val action = scheduledProcessesRepository - .schedule(process.id, deployment.scheduleName, futureDate, deploymentRetryConfig.deployMaxRetries) + .schedule( + periodicProcessMetadata.id, + deployment.scheduleName, + futureDate, + deploymentRetryConfig.deployMaxRetries + ) .flatMap { data => - handleEvent(ScheduledEvent(data, firstSchedule = false)) + handleEvent(ScheduledEvent(data.deployment, firstSchedule = false)) } Some(action) case Right(None) => @@ -318,9 +325,9 @@ class PeriodicProcessService( } if (scheduleActions.forall(_.isEmpty)) { - logger.info(s"No scheduled deployments for periodic process: ${process.id.value}. Deactivating") - deactivateAction(process).flatMap { _ => - markProcessActionExecutionFinished(processScheduleData.process.processActionId) + logger.info(s"No scheduled deployments for periodic process: ${periodicProcessMetadata.id.value}. Deactivating") + deactivateAction(periodicProcessMetadata).flatMap { _ => + markProcessActionExecutionFinished(processScheduleData.periodicProcessMetadata.processActionId) } } else @@ -332,11 +339,11 @@ class PeriodicProcessService( for { _ <- scheduledProcessesRepository.markFinished(deployment.id) currentState <- scheduledProcessesRepository.findProcessData(deployment.id) - } yield handleEvent(FinishedEvent(currentState, state)) + } yield handleEvent(FinishedEvent(currentState.deployment, state)) } private def handleFailedDeployment( - deployment: PeriodicProcessDeployment[_], + deployment: PeriodicProcessDeployment, state: Option[StatusDetails] ): RepositoryAction[Unit] = { def calculateNextRetryAt = now().plus(deploymentRetryConfig.deployRetryPenalize.toMillis, ChronoUnit.MILLIS) @@ -359,7 +366,7 @@ class PeriodicProcessService( for { _ <- scheduledProcessesRepository.markFailedOnDeployWithStatus(deployment.id, status, retriesLeft, nextRetryAt) currentState <- scheduledProcessesRepository.findProcessData(deployment.id) - } yield handleEvent(FailedOnDeployEvent(currentState, state)) + } yield handleEvent(FailedOnDeployEvent(currentState.deployment, state)) } private def markFailedAction( @@ -370,23 +377,26 @@ class PeriodicProcessService( for { _ <- scheduledProcessesRepository.markFailed(deployment.id) currentState <- scheduledProcessesRepository.findProcessData(deployment.id) - } yield handleEvent(FailedOnRunEvent(currentState, state)) + } yield handleEvent(FailedOnRunEvent(currentState.deployment, state)) } def deactivate(processName: ProcessName): Future[Iterable[DeploymentId]] = for { activeSchedules <- getLatestDeploymentsForActiveSchedules(processName) (runningDeploymentsForSchedules, _) <- synchronizeDeploymentsStates(processName, activeSchedules) - _ <- activeSchedules.groupedByPeriodicProcess.map(p => deactivateAction(p.process)).sequence.runWithCallbacks + _ <- activeSchedules.groupedByPeriodicProcess + .map(p => deactivateAction(p.periodicProcessMetadata)) + .sequence + .runWithCallbacks } yield runningDeploymentsForSchedules.map(deployment => DeploymentId(deployment.toString)) - private def deactivateAction(process: PeriodicProcess[_]): RepositoryAction[Callback] = { + private def deactivateAction(process: PeriodicProcessMetadata): RepositoryAction[Callback] = { logger.info(s"Deactivate periodic process id: ${process.id.value}") for { _ <- scheduledProcessesRepository.markInactive(process.id) // we want to delete jars only after we successfully mark process as inactive. It's better to leave jar garbage than // have process without jar - } yield () => jarManager.deleteJar(process.deploymentData.jarFileName) + } yield () => jarManager.deleteJar(process.jarFileName) } private def markProcessActionExecutionFinished( @@ -399,8 +409,17 @@ class PeriodicProcessService( .map(_ => ()) } - def deploy(deployment: PeriodicProcessDeployment[CanonicalProcess]): Future[Unit] = { + def deploy(deploymentWithCanonicalProcess: PeriodicProcessDeploymentWithFullProcess): Future[Unit] = { // TODO: set status before deployment? + val deployment = deploymentWithCanonicalProcess.deployment + val periodicProcessMetadata = deployment.periodicProcessMetadata + val deploymentWithJarData = DeploymentWithJarData( + processName = periodicProcessMetadata.processName, + versionId = periodicProcessMetadata.versionId, + process = deploymentWithCanonicalProcess.process, + inputConfigDuringExecutionJson = deploymentWithCanonicalProcess.inputConfigDuringExecutionJson, + jarFileName = periodicProcessMetadata.jarFileName + ) val id = deployment.id val deploymentData = DeploymentData( DeploymentId(id.toString), @@ -412,10 +431,14 @@ class PeriodicProcessService( AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(configsFromProvider) ) ) - val deploymentWithJarData = deployment.periodicProcess.deploymentData val deploymentAction = for { _ <- Future.successful( - logger.info("Deploying scenario {} for deployment id {}", deploymentWithJarData.processVersion, id) + logger.info( + "Deploying scenario name={} versionId={} for deployment id {}", + periodicProcessMetadata.processName, + periodicProcessMetadata.versionId, + id + ) ) enrichedProcessConfig <- processConfigEnricher.onDeploy( ProcessConfigEnricher.DeployData( @@ -431,12 +454,17 @@ class PeriodicProcessService( } yield externalDeploymentId deploymentAction .flatMap { externalDeploymentId => - logger.info("Scenario has been deployed {} for deployment id {}", deploymentWithJarData.processVersion, id) + logger.info( + "Scenario has been deployed name={} versionId={} for deployment id {}", + periodicProcessMetadata.processName, + periodicProcessMetadata.versionId, + id + ) // TODO: add externalDeploymentId?? scheduledProcessesRepository .markDeployed(id) .flatMap(_ => scheduledProcessesRepository.findProcessData(id)) - .flatMap(afterChange => handleEvent(DeployedEvent(afterChange, externalDeploymentId))) + .flatMap(afterChange => handleEvent(DeployedEvent(afterChange.deployment, externalDeploymentId))) .run } // We can recover since deployment actor watches only future completion. @@ -483,7 +511,7 @@ class PeriodicProcessService( deployment.createdAt, deployment.runAt, deployment.state.status, - scheduleData.process.active, + scheduleData.periodicProcessMetadata.active, runtimeStatuses.getStatus(deployment.id) ) } @@ -531,7 +559,7 @@ class PeriodicProcessService( } private def scheduledExecutionStatusAndDateFinished( - entity: PeriodicProcessDeployment[Unit], + entity: PeriodicProcessDeployment, ): Option[FinishedScheduledExecutionMetadata] = { for { status <- entity.state.status match { diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala index 267f6316fbf..0e1a90aa5dd 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala @@ -4,11 +4,15 @@ import cats.Monad import com.github.tminglei.slickpg.ExPostgresProfile import com.typesafe.scalalogging.LazyLogging import io.circe.parser.decode -import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.ProcessActionId -import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.periodic._ +import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.{ + createPeriodicProcessMetadata, + createPeriodicProcessWithoutJson, + prepareScheduleProperty +} import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model._ import slick.dbio.{DBIOAction, Effect, NoStream} @@ -21,14 +25,36 @@ import scala.language.higherKinds object PeriodicProcessesRepository { + def createPeriodicProcessDeploymentWithFullProcess( + processEntity: PeriodicProcessEntity, + processDeploymentEntity: PeriodicProcessDeploymentEntity + ): PeriodicProcessDeploymentWithFullProcess = { + val process = createPeriodicProcessWithJson(processEntity) + PeriodicProcessDeploymentWithFullProcess( + PeriodicProcessesRepository.createPeriodicProcessDeploymentFromProcess(process, processDeploymentEntity), + process.deploymentData.process, + process.deploymentData.inputConfigDuringExecutionJson, + ) + } + def createPeriodicProcessDeployment( processEntity: PeriodicProcessEntity, processDeploymentEntity: PeriodicProcessDeploymentEntity - ): PeriodicProcessDeployment[CanonicalProcess] = { + ): PeriodicProcessDeployment = { val process = createPeriodicProcessWithJson(processEntity) + createPeriodicProcessDeploymentFromProcess( + process, + processDeploymentEntity + ) + } + + def createPeriodicProcessDeploymentFromProcess( + periodicProcess: PeriodicProcess[CanonicalProcess], + processDeploymentEntity: PeriodicProcessDeploymentEntity + ): PeriodicProcessDeployment = { PeriodicProcessDeployment( processDeploymentEntity.id, - process, + createPeriodicProcessMetadata(periodicProcess), processDeploymentEntity.createdAt, processDeploymentEntity.runAt, ScheduleName(processDeploymentEntity.scheduleName), @@ -49,12 +75,12 @@ object PeriodicProcessesRepository { } def createPeriodicProcessWithJson(processEntity: PeriodicProcessEntity): PeriodicProcess[CanonicalProcess] = { - val processVersion = createProcessVersion(processEntity) - val scheduleProperty = prepareScheduleProperty(processEntity) + val scheduleProperty = prepareScheduleProperty(processEntity.scheduleProperty) PeriodicProcess( processEntity.id, model.DeploymentWithJarData( - processVersion = processVersion, + processName = processEntity.processName, + versionId = processEntity.processVersionId, inputConfigDuringExecutionJson = processEntity.inputConfigDuringExecutionJson, jarFileName = processEntity.jarFileName, process = processEntity.processJson.getOrElse( @@ -69,12 +95,12 @@ object PeriodicProcessesRepository { } def createPeriodicProcessWithoutJson(processEntity: PeriodicProcessEntity): PeriodicProcess[Unit] = { - val processVersion = createProcessVersion(processEntity) - val scheduleProperty = prepareScheduleProperty(processEntity) + val scheduleProperty = prepareScheduleProperty(processEntity.scheduleProperty) PeriodicProcess( processEntity.id, model.DeploymentWithJarData( - processVersion = processVersion, + processName = processEntity.processName, + versionId = processEntity.processVersionId, inputConfigDuringExecutionJson = processEntity.inputConfigDuringExecutionJson, jarFileName = processEntity.jarFileName, process = () @@ -86,14 +112,20 @@ object PeriodicProcessesRepository { ) } - private def prepareScheduleProperty(processEntity: PeriodicProcessEntity) = { - val scheduleProperty = decode[ScheduleProperty](processEntity.scheduleProperty) - .fold(e => throw new IllegalArgumentException(e), identity) - scheduleProperty + def createPeriodicProcessMetadata(process: PeriodicProcess[_]): PeriodicProcessMetadata = { + PeriodicProcessMetadata( + id = process.id, + processName = process.deploymentData.processName, + versionId = process.deploymentData.versionId, + jarFileName = process.deploymentData.jarFileName, + scheduleProperty = process.scheduleProperty, + active = process.active, + processActionId = process.processActionId + ) } - private def createProcessVersion(processEntity: PeriodicProcessEntity): ProcessVersion = { - ProcessVersion.empty.copy(versionId = processEntity.processVersionId, processName = processEntity.processName) + def prepareScheduleProperty(scheduleProperty: String) = { + decode[ScheduleProperty](scheduleProperty).fold(e => throw new IllegalArgumentException(e), identity) } } @@ -133,15 +165,15 @@ trait PeriodicProcessesRepository { deploymentsPerScheduleMaxCount: Int ): Action[SchedulesState] - def findToBeDeployed: Action[Seq[PeriodicProcessDeployment[CanonicalProcess]]] + def findToBeDeployed: Action[Seq[PeriodicProcessDeploymentWithFullProcess]] - def findToBeRetried: Action[Seq[PeriodicProcessDeployment[CanonicalProcess]]] + def findToBeRetried: Action[Seq[PeriodicProcessDeploymentWithFullProcess]] def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus( expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus] ): Action[SchedulesState] - def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeployment[CanonicalProcess]] + def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeploymentWithFullProcess] def findProcessData(processName: ProcessName): Action[Seq[PeriodicProcess[CanonicalProcess]]] @@ -163,7 +195,7 @@ trait PeriodicProcessesRepository { scheduleName: ScheduleName, runAt: LocalDateTime, deployMaxRetries: Int - ): Action[PeriodicProcessDeployment[CanonicalProcess]] + ): Action[PeriodicProcessDeploymentWithFullProcess] } @@ -192,9 +224,13 @@ class SlickPeriodicProcessesRepository( ): Action[SchedulesState] = { PeriodicProcessesWithoutJson .filter(_.processName === scenarioName) + .map(extractPeriodicProcessMetadataColumns) .join(PeriodicProcessDeployments) - .on(_.id === _.periodicProcessId) + .on(_._1 === _.periodicProcessId) .result + .map(_.map { case (periodicProcessMetadataColumnValues, deploymentEntity) => + (createPeriodicProcessMetadataFromColumnValues(periodicProcessMetadataColumnValues), deploymentEntity) + }) .map(toSchedulesState) } @@ -205,8 +241,8 @@ class SlickPeriodicProcessesRepository( ): Action[PeriodicProcess[CanonicalProcess]] = { val processEntity = PeriodicProcessEntity( id = PeriodicProcessId(-1), - processName = deploymentWithJarData.processVersion.processName, - processVersionId = deploymentWithJarData.processVersion.versionId, + processName = deploymentWithJarData.processName, + processVersionId = deploymentWithJarData.versionId, processingType = processingType, processJson = Some(deploymentWithJarData.process), inputConfigDuringExecutionJson = deploymentWithJarData.inputConfigDuringExecutionJson, @@ -222,30 +258,32 @@ class SlickPeriodicProcessesRepository( private def now(): LocalDateTime = LocalDateTime.now(clock) - override def findToBeDeployed: Action[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = + override def findToBeDeployed: Action[Seq[PeriodicProcessDeploymentWithFullProcess]] = activePeriodicProcessWithDeploymentQuery .filter { case (_, d) => d.runAt <= now() && d.status === (PeriodicProcessDeploymentStatus.Scheduled: PeriodicProcessDeploymentStatus) } .result - .map(_.map((PeriodicProcessesRepository.createPeriodicProcessDeployment _).tupled)) + .map(_.map((PeriodicProcessesRepository.createPeriodicProcessDeploymentWithFullProcess _).tupled)) - override def findToBeRetried: Action[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = + override def findToBeRetried: Action[Seq[PeriodicProcessDeploymentWithFullProcess]] = activePeriodicProcessWithDeploymentQuery .filter { case (_, d) => d.nextRetryAt <= now() && d.status === (PeriodicProcessDeploymentStatus.RetryingDeploy: PeriodicProcessDeploymentStatus) } .result - .map(_.map((PeriodicProcessesRepository.createPeriodicProcessDeployment _).tupled)) + .map(_.map((PeriodicProcessesRepository.createPeriodicProcessDeploymentWithFullProcess _).tupled)) - override def findProcessData(id: PeriodicProcessDeploymentId): Action[PeriodicProcessDeployment[CanonicalProcess]] = { + override def findProcessData( + id: PeriodicProcessDeploymentId + ): Action[PeriodicProcessDeploymentWithFullProcess] = { (PeriodicProcessesWithJson join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) .filter { case (_, deployment) => deployment.id === id } .result .head - .map((PeriodicProcessesRepository.createPeriodicProcessDeployment _).tupled) + .map((PeriodicProcessesRepository.createPeriodicProcessDeploymentWithFullProcess _).tupled) } override def findProcessData(processName: ProcessName): Action[Seq[PeriodicProcess[CanonicalProcess]]] = { @@ -347,7 +385,7 @@ class SlickPeriodicProcessesRepository( private def getLatestDeploymentsForEachSchedulePostgres( periodicProcessesQuery: Query[PeriodicProcessesTable, PeriodicProcessEntity, Seq], deploymentsPerScheduleMaxCount: Int - ): Action[Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]] = { + ): Action[Seq[(PeriodicProcessMetadata, PeriodicProcessDeploymentEntity)]] = { // To effectively limit deployments to given count for each schedule in one query, we use window functions in slick import ExPostgresProfile.api._ import com.github.tminglei.slickpg.window.PgWindowFuncSupport.WindowFunctions._ @@ -369,10 +407,49 @@ class SlickPeriodicProcessesRepository( } .subquery .filter(_._1 <= deploymentsPerScheduleMaxCount.longValue()) - .map { case (_, process, deployment) => - (process, deployment) - } + .map { case (_, process, deployment) => (extractPeriodicProcessMetadataColumns(process), deployment) } .result + .map(_.map { case (periodicProcessMetadataColumnValues, deploymentEntity) => + (createPeriodicProcessMetadataFromColumnValues(periodicProcessMetadataColumnValues), deploymentEntity) + }) + } + + private def extractPeriodicProcessMetadataColumns( + periodicProcessesTable: PeriodicProcessesTable, + ): ( + Rep[PeriodicProcessId], + Rep[ProcessName], + Rep[VersionId], + Rep[String], + Rep[String], + Rep[Boolean], + Rep[Option[ProcessActionId]] + ) = + ( + periodicProcessesTable.id, + periodicProcessesTable.processName, + periodicProcessesTable.processVersionId, + periodicProcessesTable.jarFileName, + periodicProcessesTable.scheduleProperty, + periodicProcessesTable.active, + periodicProcessesTable.processActionId, + ) + + private def createPeriodicProcessMetadataFromColumnValues( + columnValues: (PeriodicProcessId, ProcessName, VersionId, String, String, Boolean, Option[ProcessActionId]) + ): PeriodicProcessMetadata = { + columnValues match { + case (id, processName, versionId, jarFileName, scheduleProperty, active, processActionId) => + PeriodicProcessMetadata( + id, + processName, + versionId, + jarFileName, + prepareScheduleProperty(scheduleProperty), + active, + processActionId + ) + } } // This variant of method is much less optimal than postgres one. It is highly recommended to use postgres with periodics @@ -381,7 +458,7 @@ class SlickPeriodicProcessesRepository( private def getLatestDeploymentsForEachScheduleJdbcGeneric( periodicProcessesQuery: Query[PeriodicProcessesTable, PeriodicProcessEntity, Seq], deploymentsPerScheduleMaxCount: Int - ): Action[Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]] = { + ): Action[Seq[(PeriodicProcessMetadata, PeriodicProcessDeploymentEntity)]] = { // It is debug instead of warn to not bloast logs when e.g. for some reasons is used hsql under the hood logger.debug( "WARN: Using not optimized version of getLatestDeploymentsForEachSchedule that not uses window functions" @@ -399,6 +476,11 @@ class SlickPeriodicProcessesRepository( .map(_.map((process, _))) }) .map(_.flatten) + .map(_.map { case (entity, scheduleName) => + val process = createPeriodicProcessWithoutJson(entity) + val metadata = createPeriodicProcessMetadata(process) + (metadata, scheduleName) + }) deploymentsForSchedules <- DBIO .sequence(schedulesForProcesses.map { case (process, scheduleName) => @@ -421,7 +503,7 @@ class SlickPeriodicProcessesRepository( scheduleName: ScheduleName, runAt: LocalDateTime, deployMaxRetries: Int - ): Action[PeriodicProcessDeployment[CanonicalProcess]] = { + ): Action[PeriodicProcessDeploymentWithFullProcess] = { val deploymentEntity = PeriodicProcessDeploymentEntity( id = PeriodicProcessDeploymentId(-1), periodicProcessId = id, @@ -452,21 +534,22 @@ class SlickPeriodicProcessesRepository( join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) } - private def toSchedulesState(list: Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]): SchedulesState = { + private def toSchedulesState( + list: Seq[(PeriodicProcessMetadata, PeriodicProcessDeploymentEntity)] + ): SchedulesState = { SchedulesState( list - .map { case (process, deployment) => - val scheduleId = ScheduleId(process.id, ScheduleName(deployment.scheduleName)) - val scheduleDataWithoutDeployment = - (scheduleId, PeriodicProcessesRepository.createPeriodicProcessWithoutJson(process)) + .map { case (periodicProcessMetadata, deployment) => + val scheduleId = ScheduleId(periodicProcessMetadata.id, ScheduleName(deployment.scheduleName)) + val scheduleData = (scheduleId, periodicProcessMetadata) val scheduleDeployment = ScheduleDeploymentData(deployment) - (scheduleDataWithoutDeployment, scheduleDeployment) + (scheduleData, scheduleDeployment) } .toList .toGroupedMap .toList - .map { case ((scheduleId, process), deployments) => - scheduleId -> ScheduleData(process, deployments) + .map { case ((scheduleId, periodicProcessMetadata), deployments) => + scheduleId -> ScheduleData(periodicProcessMetadata, deployments) } .toMap ) diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala index 297d21a1d7c..a9b9f6a6bca 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/flink/FlinkJarManager.scala @@ -4,6 +4,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.JobID import pl.touk.nussknacker.engine.{BaseModelData, newdeployment} import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData @@ -55,7 +56,8 @@ private[periodic] class FlinkJarManager( logger.info(s"Prepare deployment for scenario: $processVersion") copyJarToLocalDir(processVersion).map { jarFileName => DeploymentWithJarData( - processVersion = processVersion, + processName = processVersion.processName, + versionId = processVersion.versionId, process = canonicalProcess, inputConfigDuringExecutionJson = inputConfigDuringExecution.serialized, jarFileName = jarFileName @@ -77,14 +79,14 @@ private[periodic] class FlinkJarManager( deploymentWithJarData: DeploymentWithJarData[CanonicalProcess], deploymentData: DeploymentData ): Future[Option[ExternalDeploymentId]] = { - val processVersion = deploymentWithJarData.processVersion logger.info( - s"Deploying scenario ${processVersion.processName}, version id: ${processVersion.versionId} and jar: ${deploymentWithJarData.jarFileName}" + s"Deploying scenario ${deploymentWithJarData.processName}, version id: ${deploymentWithJarData.versionId} and jar: ${deploymentWithJarData.jarFileName}" ) val jarFile = jarsDir.resolve(deploymentWithJarData.jarFileName).toFile val args = FlinkDeploymentManager.prepareProgramArgs( deploymentWithJarData.inputConfigDuringExecutionJson, - processVersion, + ProcessVersion.empty + .copy(processName = deploymentWithJarData.processName, versionId = deploymentWithJarData.versionId), deploymentData, deploymentWithJarData.process ) diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/DeploymentWithJarData.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/DeploymentWithJarData.scala index be290b90e9a..edacf2729c9 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/DeploymentWithJarData.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/DeploymentWithJarData.scala @@ -1,9 +1,10 @@ package pl.touk.nussknacker.engine.management.periodic.model -import pl.touk.nussknacker.engine.api.ProcessVersion +import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} case class DeploymentWithJarData[ProcessRep]( - processVersion: ProcessVersion, + processName: ProcessName, + versionId: VersionId, process: ProcessRep, inputConfigDuringExecutionJson: String, jarFileName: String diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcess.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcess.scala index effee642b22..ba4d312cb92 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcess.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcess.scala @@ -16,6 +16,4 @@ case class PeriodicProcess[ProcessRep]( active: Boolean, createdAt: LocalDateTime, processActionId: Option[ProcessActionId] -) { - val processVersion: ProcessVersion = deploymentData.processVersion -} +) diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcessDeployment.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcessDeployment.scala index cca749d2036..ebbe7afcbd7 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcessDeployment.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/PeriodicProcessDeployment.scala @@ -1,15 +1,22 @@ package pl.touk.nussknacker.engine.management.periodic.model -import pl.touk.nussknacker.engine.management.periodic.{MultipleScheduleProperty, SingleScheduleProperty} +import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus +import pl.touk.nussknacker.engine.management.periodic.{MultipleScheduleProperty, SingleScheduleProperty} import slick.lifted.MappedTo import java.time.{Clock, LocalDateTime} +case class PeriodicProcessDeploymentWithFullProcess( + deployment: PeriodicProcessDeployment, + process: CanonicalProcess, + inputConfigDuringExecutionJson: String, +) + // TODO: We should separate schedules concept from deployments - fully switch to ScheduleData and ScheduleDeploymentData -case class PeriodicProcessDeployment[ProcessRep]( +case class PeriodicProcessDeployment( id: PeriodicProcessDeploymentId, - periodicProcess: PeriodicProcess[ProcessRep], + periodicProcessMetadata: PeriodicProcessMetadata, createdAt: LocalDateTime, runAt: LocalDateTime, scheduleName: ScheduleName, @@ -19,7 +26,7 @@ case class PeriodicProcessDeployment[ProcessRep]( ) { def nextRunAt(clock: Clock): Either[String, Option[LocalDateTime]] = - (periodicProcess.scheduleProperty, scheduleName.value) match { + (periodicProcessMetadata.scheduleProperty, scheduleName.value) match { case (MultipleScheduleProperty(schedules), Some(name)) => schedules.get(name).toRight(s"Failed to find schedule: $scheduleName").flatMap(_.nextRunAt(clock)) case (e: SingleScheduleProperty, None) => e.nextRunAt(clock) @@ -27,7 +34,7 @@ case class PeriodicProcessDeployment[ProcessRep]( } def display: String = - s"${periodicProcess.processVersion} with scheduleName=${scheduleName.display} and deploymentId=$id" + s"${periodicProcessMetadata.processName} with scheduleName=${scheduleName.display} and deploymentId=$id" } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/SchedulesState.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/SchedulesState.scala index ff8aeaac666..f5fb1ce63cc 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/SchedulesState.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/model/SchedulesState.scala @@ -1,6 +1,8 @@ package pl.touk.nussknacker.engine.management.periodic.model -import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.api.deployment.ProcessActionId +import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} +import pl.touk.nussknacker.engine.management.periodic.ScheduleProperty import pl.touk.nussknacker.engine.management.periodic.db.{PeriodicProcessDeploymentEntity, PeriodicProcessesRepository} import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap @@ -19,14 +21,15 @@ case class SchedulesState(schedules: Map[ScheduleId, ScheduleData]) { def isEmpty: Boolean = schedules.isEmpty def groupByProcessName: Map[ProcessName, SchedulesState] = - schedules.groupBy(_._2.process.processVersion.processName).mapValuesNow(SchedulesState) + schedules.groupBy(_._2.periodicProcessMetadata.processName).mapValuesNow(SchedulesState) lazy val groupedByPeriodicProcess: List[PeriodicProcessScheduleData] = - schedules.toList.groupBy(_._2.process).toList.map { case (periodicProcess, groupedSchedules) => - val deploymentsForSchedules = groupedSchedules.flatMap { case (scheduleId, scheduleData) => - scheduleData.latestDeployments.map(_.toFullDeploymentData(periodicProcess, scheduleId.scheduleName)) - } - PeriodicProcessScheduleData(periodicProcess, deploymentsForSchedules) + schedules.toList.groupBy(_._2.periodicProcessMetadata).toList.map { + case (periodicProcessMetadata, groupedSchedules) => + val deploymentsForSchedules = groupedSchedules.flatMap { case (scheduleId, scheduleData) => + scheduleData.latestDeployments.map(_.toFullDeploymentData(periodicProcessMetadata, scheduleId.scheduleName)) + } + PeriodicProcessScheduleData(periodicProcessMetadata, deploymentsForSchedules) } } @@ -35,7 +38,20 @@ case class SchedulesState(schedules: Map[ScheduleId, ScheduleData]) { // For most operations it will contain only one latest deployment but for purpose of statuses of historical deployments // it has list instead of one element. // This structure should contain SingleScheduleProperty as well. See note above -case class ScheduleData(process: PeriodicProcess[Unit], latestDeployments: List[ScheduleDeploymentData]) +case class ScheduleData( + periodicProcessMetadata: PeriodicProcessMetadata, + latestDeployments: List[ScheduleDeploymentData] +) + +case class PeriodicProcessMetadata( + id: PeriodicProcessId, + processName: ProcessName, + versionId: VersionId, + jarFileName: String, + scheduleProperty: ScheduleProperty, + active: Boolean, + processActionId: Option[ProcessActionId], +) // To identify schedule we need scheduleName - None for SingleScheduleProperty and Some(key) for MultipleScheduleProperty keys // Also we need PeriodicProcessId to distinguish between active schedules and some inactive from the past for the same PeriodicProcessId @@ -53,10 +69,19 @@ case class ScheduleDeploymentData( ) { def toFullDeploymentData( - process: PeriodicProcess[Unit], + periodicProcessMetadata: PeriodicProcessMetadata, scheduleName: ScheduleName - ): PeriodicProcessDeployment[Unit] = - PeriodicProcessDeployment(id, process, createdAt, runAt, scheduleName, retriesLeft, nextRetryAt, state) + ): PeriodicProcessDeployment = + PeriodicProcessDeployment( + id, + periodicProcessMetadata, + createdAt, + runAt, + scheduleName, + retriesLeft, + nextRetryAt, + state + ) def display = s"deploymentId=$id" @@ -80,14 +105,14 @@ object ScheduleDeploymentData { // These below are temporary structures, see notice next to SchedulesState case class PeriodicProcessScheduleData( - process: PeriodicProcess[Unit], - deployments: List[PeriodicProcessDeployment[Unit]] + periodicProcessMetadata: PeriodicProcessMetadata, + deployments: List[PeriodicProcessDeployment] ) { - def existsDeployment(predicate: PeriodicProcessDeployment[Unit] => Boolean): Boolean = deployments.exists(predicate) + def existsDeployment(predicate: PeriodicProcessDeployment => Boolean): Boolean = deployments.exists(predicate) def display: String = { val deploymentsForSchedules = deployments.map(_.display) - s"processName=${process.processVersion.processName}, deploymentsForSchedules=$deploymentsForSchedules" + s"processName=${periodicProcessMetadata.processName}, deploymentsForSchedules=$deploymentsForSchedules" } } diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/AdditionalDeploymentDataProvider.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/AdditionalDeploymentDataProvider.scala index 1871cf05518..6883383c4f9 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/AdditionalDeploymentDataProvider.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/AdditionalDeploymentDataProvider.scala @@ -7,13 +7,13 @@ import java.time.format.DateTimeFormatter trait AdditionalDeploymentDataProvider { - def prepareAdditionalData(runDetails: PeriodicProcessDeployment[CanonicalProcess]): Map[String, String] + def prepareAdditionalData(runDetails: PeriodicProcessDeployment): Map[String, String] } object DefaultAdditionalDeploymentDataProvider extends AdditionalDeploymentDataProvider { - override def prepareAdditionalData(runDetails: PeriodicProcessDeployment[CanonicalProcess]): Map[String, String] = { + override def prepareAdditionalData(runDetails: PeriodicProcessDeployment): Map[String, String] = { Map( "deploymentId" -> runDetails.id.value.toString, "runAt" -> runDetails.runAt.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/PeriodicProcessListener.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/PeriodicProcessListener.scala index 73a092e9c57..ae0405905fe 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/PeriodicProcessListener.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/PeriodicProcessListener.scala @@ -21,29 +21,28 @@ trait PeriodicProcessListenerFactory { } sealed trait PeriodicProcessEvent { - val deployment: PeriodicProcessDeployment[CanonicalProcess] + val deployment: PeriodicProcessDeployment } case class DeployedEvent( - deployment: PeriodicProcessDeployment[CanonicalProcess], + deployment: PeriodicProcessDeployment, externalDeploymentId: Option[ExternalDeploymentId] ) extends PeriodicProcessEvent -case class FinishedEvent(deployment: PeriodicProcessDeployment[CanonicalProcess], processState: Option[StatusDetails]) +case class FinishedEvent(deployment: PeriodicProcessDeployment, processState: Option[StatusDetails]) extends PeriodicProcessEvent case class FailedOnDeployEvent( - deployment: PeriodicProcessDeployment[CanonicalProcess], + deployment: PeriodicProcessDeployment, processState: Option[StatusDetails] ) extends PeriodicProcessEvent case class FailedOnRunEvent( - deployment: PeriodicProcessDeployment[CanonicalProcess], + deployment: PeriodicProcessDeployment, processState: Option[StatusDetails] ) extends PeriodicProcessEvent -case class ScheduledEvent(deployment: PeriodicProcessDeployment[CanonicalProcess], firstSchedule: Boolean) - extends PeriodicProcessEvent +case class ScheduledEvent(deployment: PeriodicProcessDeployment, firstSchedule: Boolean) extends PeriodicProcessEvent object EmptyListener extends EmptyListener diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/ProcessConfigEnricher.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/ProcessConfigEnricher.scala index 1cdf3177953..ab269f6fd2b 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/ProcessConfigEnricher.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/service/ProcessConfigEnricher.scala @@ -46,7 +46,7 @@ object ProcessConfigEnricher { case class DeployData( canonicalProcess: CanonicalProcess, inputConfigDuringExecutionJson: String, - deployment: PeriodicProcessDeployment[CanonicalProcess] + deployment: PeriodicProcessDeployment ) extends ProcessConfigEnricherInputData case class EnrichedProcessConfig(inputConfigDuringExecutionJson: String) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActorTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActorTest.scala index a32745a2b53..7036cb8236b 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActorTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentActorTest.scala @@ -8,7 +8,10 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.periodic.DeploymentActor.CheckToBeDeployed -import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeployment +import pl.touk.nussknacker.engine.management.periodic.model.{ + PeriodicProcessDeployment, + PeriodicProcessDeploymentWithFullProcess +} import scala.concurrent.Future import scala.concurrent.duration._ @@ -33,11 +36,11 @@ class DeploymentActorTest extends AnyFunSuite with TestKitBase with Matchers wit } private def shouldFindToBeDeployedScenarios( - result: Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]] + result: Future[Seq[PeriodicProcessDeploymentWithFullProcess]] ): Unit = { val probe = TestProbe() var counter = 0 - def findToBeDeployed: Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = { + def findToBeDeployed: Future[Seq[PeriodicProcessDeploymentWithFullProcess]] = { counter += 1 probe.ref ! s"invoked $counter" result @@ -54,14 +57,14 @@ class DeploymentActorTest extends AnyFunSuite with TestKitBase with Matchers wit } test("should deploy found scenario") { - val probe = TestProbe() - val waitingDeployment = PeriodicProcessDeploymentGen() - var toBeDeployed: Seq[PeriodicProcessDeployment[CanonicalProcess]] = Seq(waitingDeployment) - var actor: ActorRef = null - def findToBeDeployed: Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = { + val probe = TestProbe() + val waitingDeployment = PeriodicProcessDeploymentGen() + var toBeDeployed: Seq[PeriodicProcessDeploymentWithFullProcess] = Seq(waitingDeployment) + var actor: ActorRef = null + def findToBeDeployed: Future[Seq[PeriodicProcessDeploymentWithFullProcess]] = { Future.successful(toBeDeployed) } - def deploy(deployment: PeriodicProcessDeployment[CanonicalProcess]): Future[Unit] = { + def deploy(deployment: PeriodicProcessDeploymentWithFullProcess): Future[Unit] = { probe.ref ! deployment // Simulate periodic check for waiting scenarios while deploying a scenario. actor ! CheckToBeDeployed diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/JarManagerStub.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/JarManagerStub.scala index 63d37875d1c..3a189b5e28f 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/JarManagerStub.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/JarManagerStub.scala @@ -18,7 +18,8 @@ class JarManagerStub extends JarManager { ): Future[DeploymentWithJarData[CanonicalProcess]] = { Future.successful( model.DeploymentWithJarData( - processVersion = processVersion, + processName = processVersion.processName, + versionId = processVersion.versionId, process = canonicalProcess, inputConfigDuringExecutionJson = "", jarFileName = "" diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessDeploymentGen.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessDeploymentGen.scala index 31b847143f6..3a90adf12a4 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessDeploymentGen.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessDeploymentGen.scala @@ -1,13 +1,7 @@ package pl.touk.nussknacker.engine.management.periodic -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess -import pl.touk.nussknacker.engine.management.periodic.model.{ - PeriodicProcessDeployment, - PeriodicProcessDeploymentId, - PeriodicProcessDeploymentState, - PeriodicProcessDeploymentStatus, - ScheduleName -} +import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessMetadata +import pl.touk.nussknacker.engine.management.periodic.model._ import java.time.LocalDateTime @@ -15,21 +9,27 @@ object PeriodicProcessDeploymentGen { val now: LocalDateTime = LocalDateTime.now() - def apply(): PeriodicProcessDeployment[CanonicalProcess] = { - PeriodicProcessDeployment( - id = PeriodicProcessDeploymentId(42), - periodicProcess = PeriodicProcessGen(), - createdAt = now.minusMinutes(10), - runAt = now, - scheduleName = ScheduleName(None), - retriesLeft = 0, - nextRetryAt = None, - state = PeriodicProcessDeploymentState( - deployedAt = None, - completedAt = None, - status = PeriodicProcessDeploymentStatus.Scheduled, - ) + def apply(): PeriodicProcessDeploymentWithFullProcess = { + val periodicProcess = PeriodicProcessGen() + PeriodicProcessDeploymentWithFullProcess( + PeriodicProcessDeployment( + id = PeriodicProcessDeploymentId(42), + periodicProcessMetadata = createPeriodicProcessMetadata(periodicProcess), + createdAt = now.minusMinutes(10), + runAt = now, + scheduleName = ScheduleName(None), + retriesLeft = 0, + nextRetryAt = None, + state = PeriodicProcessDeploymentState( + deployedAt = None, + completedAt = None, + status = PeriodicProcessDeploymentStatus.Scheduled, + ) + ), + periodicProcess.deploymentData.process, + periodicProcess.deploymentData.inputConfigDuringExecutionJson, ) + } } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessGen.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessGen.scala index 1ddf34889a1..8cec5d4b5c1 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessGen.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessGen.scala @@ -14,7 +14,8 @@ object PeriodicProcessGen { PeriodicProcess( id = PeriodicProcessId(42), deploymentData = DeploymentWithJarData( - processVersion = ProcessVersion.empty, + processName = ProcessVersion.empty.processName, + versionId = ProcessVersion.empty.versionId, process = buildCanonicalProcess(), inputConfigDuringExecutionJson = "{}", jarFileName = "jar-file-name.jar" diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index 0fd79cd3c06..d2e6e7ac314 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -201,7 +201,7 @@ class PeriodicProcessServiceIntegrationTest stateAfterSchedule should have size 1 val afterSchedule = stateAfterSchedule.firstScheduleData - afterSchedule.process.processVersion.processName shouldBe processName + afterSchedule.periodicProcessMetadata.processName shouldBe processName afterSchedule.latestDeployments.head.state shouldBe PeriodicProcessDeploymentState( None, None, @@ -214,9 +214,9 @@ class PeriodicProcessServiceIntegrationTest val allToDeploy = service.findToBeDeployed.futureValue allToDeploy.map( - _.periodicProcess.processVersion.processName + _.deployment.periodicProcessMetadata.processName ) should contain only (processName, every30MinutesProcessName) - val toDeploy = allToDeploy.find(_.periodicProcess.processVersion.processName == processName).value + val toDeploy = allToDeploy.find(_.deployment.periodicProcessMetadata.processName == processName).value service.deploy(toDeploy).futureValue otherProcessingTypeService.deploy(otherProcessingTypeService.findToBeDeployed.futureValue.loneElement).futureValue @@ -237,7 +237,9 @@ class PeriodicProcessServiceIntegrationTest service.handleFinished.futureValue val toDeployAfterFinish = service.findToBeDeployed.futureValue - toDeployAfterFinish.map(_.periodicProcess.processVersion.processName) should contain only every30MinutesProcessName + toDeployAfterFinish.map( + _.deployment.periodicProcessMetadata.processName + ) should contain only every30MinutesProcessName service.deactivate(processName).futureValue service.getLatestDeploymentsForActiveSchedules(processName).futureValue shouldBe empty val inactiveStates = service @@ -294,9 +296,9 @@ class PeriodicProcessServiceIntegrationTest service.deploy(toDeploy).futureValue val toBeRetried :: Nil = service.findToBeDeployed.futureValue.toList - toBeRetried.state.status shouldBe PeriodicProcessDeploymentStatus.RetryingDeploy - toBeRetried.retriesLeft shouldBe 1 - toBeRetried.nextRetryAt.isDefined shouldBe true + toBeRetried.deployment.state.status shouldBe PeriodicProcessDeploymentStatus.RetryingDeploy + toBeRetried.deployment.retriesLeft shouldBe 1 + toBeRetried.deployment.nextRetryAt.isDefined shouldBe true service.deploy(toBeRetried).futureValue service.findToBeDeployed.futureValue.toList shouldBe Nil @@ -372,12 +374,12 @@ class PeriodicProcessServiceIntegrationTest val allToDeploy = service.findToBeDeployed.futureValue allToDeploy should have length 4 - val toDeploy = allToDeploy.filter(_.periodicProcess.processVersion.processName == processName) + val toDeploy = allToDeploy.filter(_.deployment.periodicProcessMetadata.processName == processName) toDeploy should have length 2 - toDeploy.head.runAt shouldBe localTime(expectedScheduleTime.plus(5, ChronoUnit.MINUTES)) - toDeploy.head.scheduleName.value shouldBe Some(scheduleMinute5) - toDeploy.last.runAt shouldBe localTime(expectedScheduleTime.plus(10, ChronoUnit.MINUTES)) - toDeploy.last.scheduleName.value shouldBe Some(scheduleMinute10) + toDeploy.head.deployment.runAt shouldBe localTime(expectedScheduleTime.plus(5, ChronoUnit.MINUTES)) + toDeploy.head.deployment.scheduleName.value shouldBe Some(scheduleMinute5) + toDeploy.last.deployment.runAt shouldBe localTime(expectedScheduleTime.plus(10, ChronoUnit.MINUTES)) + toDeploy.last.deployment.scheduleName.value shouldBe Some(scheduleMinute10) service.deactivate(processName).futureValue service.getLatestDeploymentsForActiveSchedules(processName).futureValue shouldBe empty @@ -414,19 +416,19 @@ class PeriodicProcessServiceIntegrationTest val toDeploy = service.findToBeDeployed.futureValue toDeploy should have length 2 - val deployment = toDeploy.find(_.scheduleName.value.contains(firstSchedule)).value + val deployment = toDeploy.find(_.deployment.scheduleName.value.contains(firstSchedule)).value service.deploy(deployment) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deployment.deployment.id)) val toDeployAfterDeploy = service.findToBeDeployed.futureValue toDeployAfterDeploy should have length 0 - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.deployment.id)) service.handleFinished.futureValue val toDeployAfterFinish = service.findToBeDeployed.futureValue toDeployAfterFinish should have length 1 - toDeployAfterFinish.head.scheduleName.value.value shouldBe secondSchedule + toDeployAfterFinish.head.deployment.scheduleName.value.value shouldBe secondSchedule val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName).futureValue val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] @@ -487,7 +489,7 @@ class PeriodicProcessServiceIntegrationTest currentTime = timeToTriggerSchedule1 val toDeployOnSchedule1 = service.findToBeDeployed.futureValue.loneElement - toDeployOnSchedule1.scheduleName.value.value shouldBe schedule1 + toDeployOnSchedule1.deployment.scheduleName.value.value shouldBe schedule1 service.deploy(toDeployOnSchedule1).futureValue val stateAfterSchedule1Deploy = service.getLatestDeploymentsForActiveSchedules(processName).futureValue @@ -520,7 +522,7 @@ class PeriodicProcessServiceIntegrationTest currentTime = timeToTriggerSchedule2 val toDeployOnSchedule2 = service.findToBeDeployed.futureValue.loneElement - toDeployOnSchedule2.scheduleName.value.value shouldBe schedule2 + toDeployOnSchedule2.deployment.scheduleName.value.value shouldBe schedule2 service.deploy(toDeployOnSchedule2).futureValue val stateAfterSchedule2Deploy = service.getLatestDeploymentsForActiveSchedules(processName).futureValue @@ -607,7 +609,7 @@ class PeriodicProcessServiceIntegrationTest toDeploy should have length 1 val deployment = toDeploy.head service.deploy(deployment).futureValue - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.deployment.id)) tryWithFailedListener { () => service.deactivate(processName) @@ -637,7 +639,7 @@ class PeriodicProcessServiceIntegrationTest val deployment = toDeploy.head service.deploy(deployment).futureValue - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deployment.deployment.id)) // this one is cyclically called by RescheduleActor service.handleFinished.futureValue diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index 7da7f65421a..e713f02b3d9 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -15,7 +15,10 @@ import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.PeriodicProcessStatus -import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeployment +import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.{ + createPeriodicProcessDeployment, + createPeriodicProcessDeploymentWithFullProcess +} import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model.{PeriodicProcessDeployment, PeriodicProcessDeploymentStatus} import pl.touk.nussknacker.engine.management.periodic.service.ProcessConfigEnricher.EnrichedProcessConfig @@ -85,7 +88,7 @@ class PeriodicProcessServiceTest additionalDeploymentDataProvider = new AdditionalDeploymentDataProvider { override def prepareAdditionalData( - runDetails: PeriodicProcessDeployment[CanonicalProcess] + runDetails: PeriodicProcessDeployment ): Map[String, String] = additionalData + ("runId" -> runDetails.id.value.toString) @@ -138,7 +141,7 @@ class PeriodicProcessServiceTest PeriodicProcessDeploymentStatus.Scheduled, deployMaxRetries = 0 ) - fWithNoRetries.periodicProcessService.findToBeDeployed.futureValue.map(_.id) shouldBe List(scheduledId1) + fWithNoRetries.periodicProcessService.findToBeDeployed.futureValue.map(_.deployment.id) shouldBe List(scheduledId1) val fWithRetries = new Fixture val failedId2 = fWithRetries.repository.addActiveProcess( @@ -151,7 +154,10 @@ class PeriodicProcessServiceTest PeriodicProcessDeploymentStatus.Scheduled, deployMaxRetries = 1 ) - fWithRetries.periodicProcessService.findToBeDeployed.futureValue.map(_.id) shouldBe List(scheduledId2, failedId2) + fWithRetries.periodicProcessService.findToBeDeployed.futureValue.map(_.deployment.id) shouldBe List( + scheduledId2, + failedId2 + ) } test("findToBeDeployed - should not return scenarios with different processing type") { @@ -360,7 +366,7 @@ class PeriodicProcessServiceTest test("deploy - should deploy and mark as so") { val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) - val toSchedule = createPeriodicProcessDeployment( + val toSchedule = createPeriodicProcessDeploymentWithFullProcess( f.repository.processEntities.loneElement, f.repository.deploymentEntities.loneElement ) @@ -382,7 +388,7 @@ class PeriodicProcessServiceTest val f = new Fixture f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) f.jarManagerStub.deployWithJarFuture = Future.failed(new RuntimeException("Flink deploy error")) - val toSchedule = createPeriodicProcessDeployment( + val toSchedule = createPeriodicProcessDeploymentWithFullProcess( f.repository.processEntities.loneElement, f.repository.deploymentEntities.loneElement ) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala index d16786058ea..ad453417f4c 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.management.periodic.db.InMemPeriodicProcessesR DeploymentIdSequence, ProcessIdSequence } -import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeployment +import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeploymentWithFullProcess import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus import pl.touk.nussknacker.engine.management.periodic.model._ @@ -139,8 +139,8 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP val id = PeriodicProcessId(Random.nextLong()) val periodicProcess = PeriodicProcessEntity( id = id, - processName = deploymentWithJarData.processVersion.processName, - processVersionId = deploymentWithJarData.processVersion.versionId, + processName = deploymentWithJarData.processName, + processVersionId = deploymentWithJarData.versionId, processingType = processingType, processJson = Some(deploymentWithJarData.process), inputConfigDuringExecutionJson = deploymentWithJarData.inputConfigDuringExecutionJson, @@ -199,25 +199,29 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP .take(deploymentsPerScheduleMaxCount) .map(ScheduleDeploymentData(_)) .toList - scheduleId -> ScheduleData(PeriodicProcessesRepository.createPeriodicProcessWithoutJson(process), ds) + scheduleId -> ScheduleData( + PeriodicProcessesRepository + .createPeriodicProcessMetadata(PeriodicProcessesRepository.createPeriodicProcessWithoutJson(process)), + ds + ) } } yield deploymentGroupedByScheduleName).toMap) - override def findToBeDeployed: Seq[PeriodicProcessDeployment[CanonicalProcess]] = { + override def findToBeDeployed: Seq[PeriodicProcessDeploymentWithFullProcess] = { val scheduled = findActive(PeriodicProcessDeploymentStatus.Scheduled) readyToRun(scheduled) } - override def findToBeRetried: Action[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = { - val toBeRetried = findActive(PeriodicProcessDeploymentStatus.FailedOnDeploy).filter(_.retriesLeft > 0) + override def findToBeRetried: Action[Seq[PeriodicProcessDeploymentWithFullProcess]] = { + val toBeRetried = findActive(PeriodicProcessDeploymentStatus.FailedOnDeploy).filter(_.deployment.retriesLeft > 0) readyToRun(toBeRetried) } - override def findProcessData(id: PeriodicProcessDeploymentId): PeriodicProcessDeployment[CanonicalProcess] = + override def findProcessData(id: PeriodicProcessDeploymentId): PeriodicProcessDeploymentWithFullProcess = (for { d <- deploymentEntities if d.id == id p <- processEntities if p.id == d.periodicProcessId - } yield createPeriodicProcessDeployment(p, d)).head + } yield createPeriodicProcessDeploymentWithFullProcess(p, d)).head override def findProcessData(processName: ProcessName): Seq[PeriodicProcess[CanonicalProcess]] = processEntities(processName) @@ -267,7 +271,7 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP scheduleName: ScheduleName, runAt: LocalDateTime, deployMaxRetries: Int - ): PeriodicProcessDeployment[CanonicalProcess] = { + ): PeriodicProcessDeploymentWithFullProcess = { val deploymentEntity = PeriodicProcessDeploymentEntity( id = PeriodicProcessDeploymentId(Random.nextLong()), periodicProcessId = id, @@ -281,7 +285,7 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP status = PeriodicProcessDeploymentStatus.Scheduled ) deploymentEntities += deploymentEntity - createPeriodicProcessDeployment(processEntities.find(_.id == id).head, deploymentEntity) + createPeriodicProcessDeploymentWithFullProcess(processEntities.find(_.id == id).head, deploymentEntity) } private def update( @@ -294,24 +298,24 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP } } - private def findActive(status: PeriodicProcessDeploymentStatus): Seq[PeriodicProcessDeployment[CanonicalProcess]] = + private def findActive(status: PeriodicProcessDeploymentStatus): Seq[PeriodicProcessDeploymentWithFullProcess] = findActive( Seq(status) ) private def findActive( statusList: Seq[PeriodicProcessDeploymentStatus] - ): Seq[PeriodicProcessDeployment[CanonicalProcess]] = + ): Seq[PeriodicProcessDeploymentWithFullProcess] = (for { p <- processEntities if p.active && p.processingType == processingType d <- deploymentEntities if d.periodicProcessId == p.id && statusList.contains(d.status) - } yield createPeriodicProcessDeployment(p, d)).toSeq + } yield createPeriodicProcessDeploymentWithFullProcess(p, d)).toSeq private def readyToRun( - deployments: Seq[PeriodicProcessDeployment[CanonicalProcess]] - ): Seq[PeriodicProcessDeployment[CanonicalProcess]] = { + deployments: Seq[PeriodicProcessDeploymentWithFullProcess] + ): Seq[PeriodicProcessDeploymentWithFullProcess] = { val now = LocalDateTime.now() - deployments.filter(d => d.runAt.isBefore(now) || d.runAt.isEqual(now)) + deployments.filter(d => d.deployment.runAt.isBefore(now) || d.deployment.runAt.isEqual(now)) } }