Skip to content

Commit

Permalink
DB optimization draft
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko committed Dec 6, 2024
1 parent 14a9a75 commit 53c85fa
Show file tree
Hide file tree
Showing 19 changed files with 360 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import pl.touk.nussknacker.engine.management.periodic.DeploymentActor.{
DeploymentCompleted,
WaitingForDeployment
}
import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeployment
import pl.touk.nussknacker.engine.management.periodic.model.{
PeriodicProcessDeployment,
PeriodicProcessDeploymentWithFullProcess
}

import scala.concurrent.Future
import scala.concurrent.duration._
Expand All @@ -21,23 +24,23 @@ object DeploymentActor {
}

private[periodic] def props(
findToBeDeployed: => Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]],
deploy: PeriodicProcessDeployment[CanonicalProcess] => Future[Unit],
findToBeDeployed: => Future[Seq[PeriodicProcessDeploymentWithFullProcess]],
deploy: PeriodicProcessDeploymentWithFullProcess => Future[Unit],
interval: FiniteDuration
) = {
Props(new DeploymentActor(findToBeDeployed, deploy, interval))
}

private[periodic] case object CheckToBeDeployed

private case class WaitingForDeployment(ids: List[PeriodicProcessDeployment[CanonicalProcess]])
private case class WaitingForDeployment(ids: List[PeriodicProcessDeploymentWithFullProcess])

private case object DeploymentCompleted
}

class DeploymentActor(
findToBeDeployed: => Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]],
deploy: PeriodicProcessDeployment[CanonicalProcess] => Future[Unit],
findToBeDeployed: => Future[Seq[PeriodicProcessDeploymentWithFullProcess]],
deploy: PeriodicProcessDeploymentWithFullProcess => Future[Unit],
interval: FiniteDuration
) extends Actor
with Timers
Expand All @@ -55,25 +58,25 @@ class DeploymentActor(
logger.trace("Checking scenarios to be deployed")
findToBeDeployed.onComplete {
case Success(runDetailsSeq) =>
logger.debug(s"Found ${runDetailsSeq.size} to be deployed: ${runDetailsSeq.map(_.display)}")
logger.debug(s"Found ${runDetailsSeq.size} to be deployed: ${runDetailsSeq.map(_.deployment.display)}")
self ! WaitingForDeployment(runDetailsSeq.toList)
case Failure(exception) =>
logger.error("Finding scenarios to be deployed failed unexpectedly", exception)
}
case WaitingForDeployment(Nil) =>
case WaitingForDeployment(runDetails :: _) =>
logger.info(s"Found a scenario to be deployed: ${runDetails.display}")
context.become(receiveOngoingDeployment(runDetails))
logger.info(s"Found a scenario to be deployed: ${runDetails.deployment.display}")
context.become(receiveOngoingDeployment(runDetails.deployment))
deploy(runDetails) onComplete {
case Success(_) =>
self ! DeploymentCompleted
case Failure(exception) =>
logger.error(s"Deployment of ${runDetails.display} failed unexpectedly", exception)
logger.error(s"Deployment of ${runDetails.deployment.display} failed unexpectedly", exception)
self ! DeploymentCompleted
}
}

private def receiveOngoingDeployment(runDetails: PeriodicProcessDeployment[CanonicalProcess]): Receive = {
private def receiveOngoingDeployment(runDetails: PeriodicProcessDeployment): Receive = {
case CheckToBeDeployed =>
logger.debug(s"Still waiting for ${runDetails.display} to be deployed")
case DeploymentCompleted =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.management.periodic

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{DeploymentData, ExternalDeploymentId}
import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class PeriodicProcessService(
scenarioActivityId = ScenarioActivityId.random,
user = ScenarioUser.internalNuUser,
date = metadata.dateDeployed.getOrElse(metadata.dateFinished),
scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcess.processVersion.versionId)),
scenarioVersionId = Some(ScenarioVersionId.from(deployment.periodicProcessMetadata.versionId)),
scheduledExecutionStatus = metadata.status,
dateFinished = metadata.dateFinished,
scheduleName = deployment.scheduleName.display,
Expand Down Expand Up @@ -163,7 +163,7 @@ class PeriodicProcessService(
scheduledProcessesRepository
.schedule(process.id, name, date, deploymentRetryConfig.deployMaxRetries)
.flatMap { data =>
handleEvent(ScheduledEvent(data, firstSchedule = true))
handleEvent(ScheduledEvent(data.deployment, firstSchedule = true))
}
case (name, None) =>
logger.warn(s"Schedule $name does not have date to schedule")
Expand All @@ -174,30 +174,32 @@ class PeriodicProcessService(
.map(_ => ())
}

def findToBeDeployed: Future[Seq[PeriodicProcessDeployment[CanonicalProcess]]] = {
def findToBeDeployed: Future[Seq[PeriodicProcessDeploymentWithFullProcess]] = {
for {
toBeDeployed <- scheduledProcessesRepository.findToBeDeployed.run.flatMap { toDeployList =>
Future.sequence(toDeployList.map(checkIfNotRunning)).map(_.flatten)
}
// We retry scenarios that failed on deployment. Failure recovery of running scenarios should be handled by Flink's restart strategy
toBeRetried <- scheduledProcessesRepository.findToBeRetried.run
// We don't block scheduled deployments by retries
} yield toBeDeployed.sortBy(d => (d.runAt, d.createdAt)) ++ toBeRetried.sortBy(d => (d.nextRetryAt, d.createdAt))
} yield toBeDeployed.sortBy(d => (d.deployment.runAt, d.deployment.createdAt)) ++ toBeRetried.sortBy(d =>
(d.deployment.nextRetryAt, d.deployment.createdAt)
)
}

// Currently we don't allow simultaneous runs of one scenario - only sequential, so if other schedule kicks in, it'll have to wait
// TODO: we show allow to deploy scenarios with different scheduleName to be deployed simultaneous
private def checkIfNotRunning(
toDeploy: PeriodicProcessDeployment[CanonicalProcess]
): Future[Option[PeriodicProcessDeployment[CanonicalProcess]]] = {
toDeploy: PeriodicProcessDeploymentWithFullProcess
): Future[Option[PeriodicProcessDeploymentWithFullProcess]] = {
delegateDeploymentManager
.getProcessStates(toDeploy.periodicProcess.processVersion.processName)(DataFreshnessPolicy.Fresh)
.getProcessStates(toDeploy.deployment.periodicProcessMetadata.processName)(DataFreshnessPolicy.Fresh)
.map(
_.value
.map(_.status)
.find(SimpleStateStatus.DefaultFollowingDeployStatuses.contains)
.map { _ =>
logger.debug(s"Deferring run of ${toDeploy.display} as scenario is currently running")
logger.debug(s"Deferring run of ${toDeploy.deployment.display} as scenario is currently running")
None
}
.getOrElse(Some(toDeploy))
Expand Down Expand Up @@ -298,9 +300,14 @@ class PeriodicProcessService(
case Right(Some(futureDate)) =>
logger.info(s"Rescheduling ${deployment.display} to $futureDate")
val action = scheduledProcessesRepository
.schedule(process.id, deployment.scheduleName, futureDate, deploymentRetryConfig.deployMaxRetries)
.schedule(
periodicProcessMetadata.id,
deployment.scheduleName,
futureDate,
deploymentRetryConfig.deployMaxRetries
)
.flatMap { data =>
handleEvent(ScheduledEvent(data, firstSchedule = false))
handleEvent(ScheduledEvent(data.deployment, firstSchedule = false))
}
Some(action)
case Right(None) =>
Expand All @@ -318,9 +325,9 @@ class PeriodicProcessService(
}

if (scheduleActions.forall(_.isEmpty)) {
logger.info(s"No scheduled deployments for periodic process: ${process.id.value}. Deactivating")
deactivateAction(process).flatMap { _ =>
markProcessActionExecutionFinished(processScheduleData.process.processActionId)
logger.info(s"No scheduled deployments for periodic process: ${periodicProcessMetadata.id.value}. Deactivating")
deactivateAction(periodicProcessMetadata).flatMap { _ =>
markProcessActionExecutionFinished(processScheduleData.periodicProcessMetadata.processActionId)
}

} else
Expand All @@ -332,11 +339,11 @@ class PeriodicProcessService(
for {
_ <- scheduledProcessesRepository.markFinished(deployment.id)
currentState <- scheduledProcessesRepository.findProcessData(deployment.id)
} yield handleEvent(FinishedEvent(currentState, state))
} yield handleEvent(FinishedEvent(currentState.deployment, state))
}

private def handleFailedDeployment(
deployment: PeriodicProcessDeployment[_],
deployment: PeriodicProcessDeployment,
state: Option[StatusDetails]
): RepositoryAction[Unit] = {
def calculateNextRetryAt = now().plus(deploymentRetryConfig.deployRetryPenalize.toMillis, ChronoUnit.MILLIS)
Expand All @@ -359,7 +366,7 @@ class PeriodicProcessService(
for {
_ <- scheduledProcessesRepository.markFailedOnDeployWithStatus(deployment.id, status, retriesLeft, nextRetryAt)
currentState <- scheduledProcessesRepository.findProcessData(deployment.id)
} yield handleEvent(FailedOnDeployEvent(currentState, state))
} yield handleEvent(FailedOnDeployEvent(currentState.deployment, state))
}

private def markFailedAction(
Expand All @@ -370,23 +377,26 @@ class PeriodicProcessService(
for {
_ <- scheduledProcessesRepository.markFailed(deployment.id)
currentState <- scheduledProcessesRepository.findProcessData(deployment.id)
} yield handleEvent(FailedOnRunEvent(currentState, state))
} yield handleEvent(FailedOnRunEvent(currentState.deployment, state))
}

def deactivate(processName: ProcessName): Future[Iterable[DeploymentId]] =
for {
activeSchedules <- getLatestDeploymentsForActiveSchedules(processName)
(runningDeploymentsForSchedules, _) <- synchronizeDeploymentsStates(processName, activeSchedules)
_ <- activeSchedules.groupedByPeriodicProcess.map(p => deactivateAction(p.process)).sequence.runWithCallbacks
_ <- activeSchedules.groupedByPeriodicProcess
.map(p => deactivateAction(p.periodicProcessMetadata))
.sequence
.runWithCallbacks
} yield runningDeploymentsForSchedules.map(deployment => DeploymentId(deployment.toString))

private def deactivateAction(process: PeriodicProcess[_]): RepositoryAction[Callback] = {
private def deactivateAction(process: PeriodicProcessMetadata): RepositoryAction[Callback] = {
logger.info(s"Deactivate periodic process id: ${process.id.value}")
for {
_ <- scheduledProcessesRepository.markInactive(process.id)
// we want to delete jars only after we successfully mark process as inactive. It's better to leave jar garbage than
// have process without jar
} yield () => jarManager.deleteJar(process.deploymentData.jarFileName)
} yield () => jarManager.deleteJar(process.jarFileName)
}

private def markProcessActionExecutionFinished(
Expand All @@ -399,8 +409,17 @@ class PeriodicProcessService(
.map(_ => ())
}

def deploy(deployment: PeriodicProcessDeployment[CanonicalProcess]): Future[Unit] = {
def deploy(deploymentWithCanonicalProcess: PeriodicProcessDeploymentWithFullProcess): Future[Unit] = {
// TODO: set status before deployment?
val deployment = deploymentWithCanonicalProcess.deployment
val periodicProcessMetadata = deployment.periodicProcessMetadata
val deploymentWithJarData = DeploymentWithJarData(
processName = periodicProcessMetadata.processName,
versionId = periodicProcessMetadata.versionId,
process = deploymentWithCanonicalProcess.process,
inputConfigDuringExecutionJson = deploymentWithCanonicalProcess.inputConfigDuringExecutionJson,
jarFileName = periodicProcessMetadata.jarFileName
)
val id = deployment.id
val deploymentData = DeploymentData(
DeploymentId(id.toString),
Expand All @@ -412,10 +431,14 @@ class PeriodicProcessService(
AdditionalComponentConfigsForRuntimeExtractor.getRequiredAdditionalConfigsForRuntime(configsFromProvider)
)
)
val deploymentWithJarData = deployment.periodicProcess.deploymentData
val deploymentAction = for {
_ <- Future.successful(
logger.info("Deploying scenario {} for deployment id {}", deploymentWithJarData.processVersion, id)
logger.info(
"Deploying scenario name={} versionId={} for deployment id {}",
periodicProcessMetadata.processName,
periodicProcessMetadata.versionId,
id
)
)
enrichedProcessConfig <- processConfigEnricher.onDeploy(
ProcessConfigEnricher.DeployData(
Expand All @@ -431,12 +454,17 @@ class PeriodicProcessService(
} yield externalDeploymentId
deploymentAction
.flatMap { externalDeploymentId =>
logger.info("Scenario has been deployed {} for deployment id {}", deploymentWithJarData.processVersion, id)
logger.info(
"Scenario has been deployed name={} versionId={} for deployment id {}",
periodicProcessMetadata.processName,
periodicProcessMetadata.versionId,
id
)
// TODO: add externalDeploymentId??
scheduledProcessesRepository
.markDeployed(id)
.flatMap(_ => scheduledProcessesRepository.findProcessData(id))
.flatMap(afterChange => handleEvent(DeployedEvent(afterChange, externalDeploymentId)))
.flatMap(afterChange => handleEvent(DeployedEvent(afterChange.deployment, externalDeploymentId)))
.run
}
// We can recover since deployment actor watches only future completion.
Expand Down Expand Up @@ -483,7 +511,7 @@ class PeriodicProcessService(
deployment.createdAt,
deployment.runAt,
deployment.state.status,
scheduleData.process.active,
scheduleData.periodicProcessMetadata.active,
runtimeStatuses.getStatus(deployment.id)
)
}
Expand Down Expand Up @@ -531,7 +559,7 @@ class PeriodicProcessService(
}

private def scheduledExecutionStatusAndDateFinished(
entity: PeriodicProcessDeployment[Unit],
entity: PeriodicProcessDeployment,
): Option[FinishedScheduledExecutionMetadata] = {
for {
status <- entity.state.status match {
Expand Down
Loading

0 comments on commit 53c85fa

Please sign in to comment.