Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeriodicDM as decorator for any DM with data stored in main Nu DB #7364

Open
wants to merge 13 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -644,21 +644,17 @@ lazy val flinkPeriodicDeploymentManager = (project in flink("management/periodic
name := "nussknacker-flink-periodic-manager",
libraryDependencies ++= {
Seq(
"org.typelevel" %% "cats-core" % catsV % Provided,
"com.typesafe.slick" %% "slick" % slickV % Provided,
"com.typesafe.slick" %% "slick-hikaricp" % slickV % "provided, test",
"com.github.tminglei" %% "slick-pg" % slickPgV,
"org.hsqldb" % "hsqldb" % hsqldbV % Test,
"org.flywaydb" % "flyway-core" % flywayV % Provided,
"com.cronutils" % "cron-utils" % cronParserV,
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % Test,
"org.flywaydb" % "flyway-core" % flywayV,
"com.typesafe.slick" %% "slick-hikaricp" % slickV % "provided, test",
"org.hsqldb" % "hsqldb" % hsqldbV % Test,
"com.typesafe.akka" %% "akka-testkit" % akkaV % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testContainersScalaV % Test,
)
}
)
.dependsOn(
commonPeriodicDeploymentManager,
flinkDeploymentManager,
deploymentManagerApi % Provided,
scenarioCompiler % Provided,
Expand Down Expand Up @@ -1788,6 +1784,25 @@ lazy val commonComponentsTests = (project in engine("common/components-tests"))
flinkComponentsTestkit % Test
)

lazy val commonPeriodicDeploymentManager = (project in engine("common/periodic-deployment-manager"))
.settings(commonSettings)
.settings(publishAssemblySettings: _*)
.settings(
name := "nussknacker-common-periodic-deployment-manager",
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"org.typelevel" %% "cats-core" % catsV,
"com.cronutils" % "cron-utils" % cronParserV,
"com.softwaremill.retry" %% "retry" % retryV,
"com.github.tminglei" %% "slick-pg" % slickPgV,
)
}
)
.dependsOn(
deploymentManagerApi % Provided,
)

lazy val flinkBaseComponents = (project in flink("components/base"))
.settings(commonSettings)
.settings(assemblyNoScala("flinkBase.jar"): _*)
Expand Down Expand Up @@ -2080,7 +2095,7 @@ lazy val designer = (project in file("designer/server"))
liteEmbeddedDeploymentManager % Provided,
liteK8sDeploymentManager % Provided,
developmentTestsDeploymentManager % Provided,
flinkPeriodicDeploymentManager % Provided,
flinkPeriodicDeploymentManager % "provided, test->test",
schemedKafkaComponentsUtils % Provided,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package pl.touk.nussknacker.engine

import akka.actor.ActorSystem
import pl.touk.nussknacker.engine.api.deployment.{
ProcessingTypeActionService,
ProcessingTypeDeployedScenariosProvider,
ScenarioActivityManager
}
import pl.touk.nussknacker.engine.api.component.{ComponentAdditionalConfig, DesignerWideComponentId}
import pl.touk.nussknacker.engine.api.deployment.periodic.PeriodicProcessesManagerProvider
import pl.touk.nussknacker.engine.api.deployment._
import sttp.client3.SttpBackend

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -15,6 +12,7 @@ case class DeploymentManagerDependencies(
deployedScenariosProvider: ProcessingTypeDeployedScenariosProvider,
actionService: ProcessingTypeActionService,
scenarioActivityManager: ScenarioActivityManager,
periodicProcessesManagerProvider: PeriodicProcessesManagerProvider,
executionContext: ExecutionContext,
actorSystem: ActorSystem,
sttpBackend: SttpBackend[Future, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package pl.touk.nussknacker.engine.api.deployment.periodic

import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.api.deployment.periodic.PeriodicProcessesManager.ScheduleProperty
import pl.touk.nussknacker.engine.api.deployment.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus
import pl.touk.nussknacker.engine.api.deployment.periodic.model._
import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess

import java.time.LocalDateTime
import scala.concurrent.Future

trait PeriodicProcessesManager {

def create(
deploymentWithRuntimeParams: DeploymentWithRuntimeParams,
inputConfigDuringExecutionJson: String,
canonicalProcess: CanonicalProcess,
scheduleProperty: ScheduleProperty,
processActionId: ProcessActionId,
): Future[PeriodicProcess]

def markInactive(processId: PeriodicProcessId): Future[Unit]

def schedule(
id: PeriodicProcessId,
scheduleName: ScheduleName,
runAt: LocalDateTime,
deployMaxRetries: Int,
): Future[PeriodicProcessDeployment]

def findProcessData(
id: PeriodicProcessDeploymentId
): Future[PeriodicProcessDeployment]

def findToBeDeployed: Future[Seq[PeriodicProcessDeployment]]

def findToBeRetried: Future[Seq[PeriodicProcessDeployment]]

def markDeployed(id: PeriodicProcessDeploymentId): Future[Unit]

def markFinished(id: PeriodicProcessDeploymentId): Future[Unit]

def markFailed(id: PeriodicProcessDeploymentId): Future[Unit]

def markFailedOnDeployWithStatus(
id: PeriodicProcessDeploymentId,
status: PeriodicProcessDeploymentStatus,
deployRetries: Int,
retryAt: Option[LocalDateTime]
): Future[Unit]

def getSchedulesState(
scenarioName: ProcessName,
after: Option[LocalDateTime],
): Future[SchedulesState]

def getLatestDeploymentsForActiveSchedules(
processName: ProcessName,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState]

def getLatestDeploymentsForActiveSchedules(
deploymentsPerScheduleMaxCount: Int,
): Future[Map[ProcessName, SchedulesState]]

def getLatestDeploymentsForLatestInactiveSchedules(
processName: ProcessName,
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState]

def getLatestDeploymentsForLatestInactiveSchedules(
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Future[Map[ProcessName, SchedulesState]]

def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus(
expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus],
): Future[SchedulesState]

def fetchCanonicalProcessWithVersion(
processName: ProcessName,
versionId: VersionId
): Future[Option[(CanonicalProcess, ProcessVersion)]]

def fetchInputConfigDuringExecutionJson(
processName: ProcessName,
versionId: VersionId,
): Future[Option[String]]

}

object PeriodicProcessesManager {

sealed trait ScheduleProperty

sealed trait SingleScheduleProperty extends ScheduleProperty

case class MultipleScheduleProperty(schedules: Map[String, SingleScheduleProperty]) extends ScheduleProperty

case class CronScheduleProperty(labelOrCronExpr: String) extends SingleScheduleProperty

}

object NoOpPeriodicProcessesManager extends PeriodicProcessesManager {

override def create(
deploymentWithRuntimeParams: DeploymentWithRuntimeParams,
inputConfigDuringExecutionJson: String,
canonicalProcess: CanonicalProcess,
scheduleProperty: ScheduleProperty,
processActionId: ProcessActionId,
): Future[PeriodicProcess] = notImplemented

override def markInactive(processId: PeriodicProcessId): Future[Unit] = notImplemented

override def schedule(
id: PeriodicProcessId,
scheduleName: ScheduleName,
runAt: LocalDateTime,
deployMaxRetries: Int
): Future[PeriodicProcessDeployment] = notImplemented

override def findProcessData(
id: PeriodicProcessDeploymentId,
): Future[PeriodicProcessDeployment] = notImplemented

override def findToBeDeployed: Future[Seq[PeriodicProcessDeployment]] =
notImplemented

override def findToBeRetried: Future[Seq[PeriodicProcessDeployment]] =
notImplemented

override def markDeployed(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFinished(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFailed(id: PeriodicProcessDeploymentId): Future[Unit] = notImplemented

override def markFailedOnDeployWithStatus(
id: PeriodicProcessDeploymentId,
status: PeriodicProcessDeploymentStatus,
deployRetries: Int,
retryAt: Option[LocalDateTime]
): Future[Unit] = notImplemented

override def getSchedulesState(scenarioName: ProcessName, after: Option[LocalDateTime]): Future[SchedulesState] =
notImplemented

override def getLatestDeploymentsForActiveSchedules(
processName: ProcessName,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState] = notImplemented

override def getLatestDeploymentsForActiveSchedules(
deploymentsPerScheduleMaxCount: Int
): Future[Map[ProcessName, SchedulesState]] = notImplemented

override def getLatestDeploymentsForLatestInactiveSchedules(
processName: ProcessName,
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int,
): Future[SchedulesState] = notImplemented

override def getLatestDeploymentsForLatestInactiveSchedules(
inactiveProcessesMaxCount: Int,
deploymentsPerScheduleMaxCount: Int
): Future[Map[ProcessName, SchedulesState]] = notImplemented

override def findActiveSchedulesForProcessesHavingDeploymentWithMatchingStatus(
expectedDeploymentStatuses: Set[PeriodicProcessDeploymentStatus],
): Future[SchedulesState] = notImplemented

override def fetchCanonicalProcessWithVersion(
processName: ProcessName,
versionId: VersionId
): Future[Option[(CanonicalProcess, ProcessVersion)]] = notImplemented

override def fetchInputConfigDuringExecutionJson(
processName: ProcessName,
versionId: VersionId
): Future[Option[String]] = notImplemented

private def notImplemented: Future[Nothing] =
Future.failed(new NotImplementedError())

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.touk.nussknacker.engine.api.deployment.periodic

trait PeriodicProcessesManagerProvider {

def provide(
processingType: String,
): PeriodicProcessesManager

}

object NoOpPeriodicProcessesManagerProvider extends PeriodicProcessesManagerProvider {

override def provide(
processingType: String
): PeriodicProcessesManager = NoOpPeriodicProcessesManager

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId}

final case class DeploymentWithRuntimeParams(
processId: Option[ProcessId],
processName: ProcessName,
versionId: VersionId,
runtimeParams: RuntimeParams,
)

final case class RuntimeParams(params: Map[String, String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.deployment.ProcessActionId
import pl.touk.nussknacker.engine.api.deployment.periodic.PeriodicProcessesManager.ScheduleProperty

import java.time.LocalDateTime

case class PeriodicProcessId(value: Long)

case class PeriodicProcess(
id: PeriodicProcessId,
deploymentData: DeploymentWithRuntimeParams,
scheduleProperty: ScheduleProperty,
active: Boolean,
createdAt: LocalDateTime,
processActionId: Option[ProcessActionId]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pl.touk.nussknacker.engine.api.deployment.periodic.model

import pl.touk.nussknacker.engine.api.deployment.periodic.model.PeriodicProcessDeploymentStatus.PeriodicProcessDeploymentStatus

import java.time.LocalDateTime

// TODO: We should separate schedules concept from deployments - fully switch to ScheduleData and ScheduleDeploymentData
case class PeriodicProcessDeployment(
id: PeriodicProcessDeploymentId,
periodicProcess: PeriodicProcess,
createdAt: LocalDateTime,
runAt: LocalDateTime,
scheduleName: ScheduleName,
retriesLeft: Int,
nextRetryAt: Option[LocalDateTime],
state: PeriodicProcessDeploymentState
) {

def display: String =
s"Process with id=${periodicProcess.deploymentData.processId}, name=${periodicProcess.deploymentData.processName}, versionId=${periodicProcess.deploymentData.versionId}, scheduleName=${scheduleName.display} and deploymentId=$id"

}

case class PeriodicProcessDeploymentState(
deployedAt: Option[LocalDateTime],
completedAt: Option[LocalDateTime],
status: PeriodicProcessDeploymentStatus
)

case class PeriodicProcessDeploymentId(value: Long) {
override def toString: String = value.toString
}

object PeriodicProcessDeploymentStatus extends Enumeration {
type PeriodicProcessDeploymentStatus = Value

val Scheduled, Deployed, Finished, Failed, RetryingDeploy, FailedOnDeploy = Value
}

case class ScheduleName(value: Option[String]) {
def display: String = value.getOrElse("[default]")
}
Loading
Loading