Skip to content

Commit

Permalink
review changes part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko committed Dec 19, 2024
1 parent 907eb00 commit 59d7ac8
Show file tree
Hide file tree
Showing 19 changed files with 204 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentState
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.CustomActionDefinition
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
Expand Down Expand Up @@ -46,6 +47,8 @@ trait DeploymentManager extends AutoCloseable {

def deploymentSynchronisationSupport: DeploymentSynchronisationSupport

def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
Expand All @@ -59,35 +62,17 @@ trait DeploymentManager extends AutoCloseable {
): Future[WithDataFreshnessStatus[ProcessState]] = {
for {
statusDetailsWithFreshness <- getProcessStates(idWithName.name)
stateWithFreshness <- resolvePrefetchedProcessState(
stateWithFreshness <- resolve(
idWithName,
statusDetailsWithFreshness.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
statusDetailsWithFreshness,
)
).map(statusDetailsWithFreshness.withValue)
} yield stateWithFreshness
}

final def resolvePrefetchedProcessState(
idWithName: ProcessIdWithName,
lastStateAction: Option[ProcessAction],
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
statusDetailsWithFreshness: WithDataFreshnessStatus[List[StatusDetails]],
): Future[WithDataFreshnessStatus[ProcessState]] = {
resolve(
idWithName,
statusDetailsWithFreshness.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
).map(state => statusDetailsWithFreshness.map(_ => state))
}

/**
* We provide a special wrapper called WithDataFreshnessStatus to ensure that fetched data is restored
* from the cache or not. If you use any kind of cache in your DM implementation please wrap result data
Expand Down Expand Up @@ -127,14 +112,18 @@ trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager

}

trait StateQueryForAllScenariosSupported { self: DeploymentManager =>
sealed trait StateQueryForAllScenariosSupport

trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport {

def getProcessesStates()(
def getAllProcessesStates()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]

}

case object NoStateQueryForAllScenariosSupport extends StateQueryForAllScenariosSupport

sealed trait DeploymentSynchronisationSupport

trait DeploymentSynchronisationSupported extends DeploymentSynchronisationSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import scala.concurrent.duration._
class CachingProcessStateDeploymentManager(
delegate: DeploymentManager,
cacheTTL: FiniteDuration,
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport,
override val stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport,
) extends DeploymentManager {

private val cache: AsyncCache[ProcessName, List[StatusDetails]] = Caffeine
Expand Down Expand Up @@ -81,7 +82,12 @@ object CachingProcessStateDeploymentManager extends LazyLogging {
scenarioStateCacheTTL
.map { cacheTTL =>
logger.debug(s"Wrapping DeploymentManager: $delegate with caching mechanism with TTL: $cacheTTL")
new CachingProcessStateDeploymentManager(delegate, cacheTTL, delegate.deploymentSynchronisationSupport)
new CachingProcessStateDeploymentManager(
delegate,
cacheTTL,
delegate.deploymentSynchronisationSupport,
delegate.stateQueryForAllScenariosSupport
)
}
.getOrElse {
logger.debug(s"Skipping ProcessState caching for DeploymentManager: $delegate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

override def close(): Unit = {}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.engine.util

import pl.touk.nussknacker.engine.api.deployment.WithDataFreshnessStatus

object WithDataFreshnessStatusUtils {

implicit class WithDataFreshnessStatusMapOps[K, V](withDataFreshnessStatus: WithDataFreshnessStatus[Map[K, V]]) {

def get(k: K): Option[WithDataFreshnessStatus[V]] = withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => Some(WithDataFreshnessStatus(value, cached))
case WithDataFreshnessStatus(None, _) => None
}

def getOrElse(k: K, orElse: V): WithDataFreshnessStatus[V] = {
withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => WithDataFreshnessStatus(value, cached)
case WithDataFreshnessStatus(None, cached) => WithDataFreshnessStatus(orElse, cached)
}
}

}

implicit class WithDataFreshnessStatusOps[A, B](scenarioActivity: WithDataFreshnessStatus[A]) {

def withValue(v: B): WithDataFreshnessStatus[B] = {
scenarioActivity.map(_ => v)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.test.PatientScalaFutures
Expand All @@ -26,8 +26,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should ask delegate for a fresh state each time") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val results = List(
cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh),
Expand All @@ -41,8 +45,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should cache state for DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached)
firstInvocation.cached shouldBe false
Expand All @@ -55,8 +63,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should reuse state updated by DataFreshnessPolicy.Fresh during reading with DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh)
resultForFresh.cached shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment._
import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.{
WithDataFreshnessStatusMapOps,
WithDataFreshnessStatusOps
}
import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser}
import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess}
Expand All @@ -34,7 +38,6 @@ import pl.touk.nussknacker.ui.process.repository.ProcessDBQueryRepository.Proces
import pl.touk.nussknacker.ui.process.repository._
import pl.touk.nussknacker.ui.security.api.{AdminUser, LoggedUser, NussknackerInternalUser}
import pl.touk.nussknacker.ui.util.FutureUtils._
import pl.touk.nussknacker.ui.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps
import pl.touk.nussknacker.ui.validation.{CustomActionValidator, UIProcessValidator}
import pl.touk.nussknacker.ui.{BadRequestError, NotFoundError}
import slick.dbio.{DBIO, DBIOAction}
Expand Down Expand Up @@ -459,9 +462,7 @@ class DeploymentService(
dbioRunner.run(
for {
actionsInProgress <- getInProgressActionTypesForScenarios(scenarios)
// DeploymentManager's may support fetching state of all scenarios at once (StateQueryForAllScenariosSupported capability)
// For those DM's we prefetch the state and then use it for resolving individual scenarios.
prefetchedStates <- DBIO.from(getPrefetchedStatesForSupportedManagers(scenarios))
prefetchedStates <- DBIO.from(getPrefetchedStatesForSupportedManagers(scenarios))
processesWithState <- processTraverse
.map {
case process if process.isFragment => DBIO.successful(process)
Expand All @@ -480,34 +481,44 @@ class DeploymentService(
)
}

// DeploymentManager's may support fetching state of all scenarios at once
// State is prefetched only when:
// - DM has capability StateQueryForAllScenariosSupported
// - the query is about more than one scenario handled by that DM
private def getPrefetchedStatesForSupportedManagers(
scenarios: List[ScenarioWithDetails],
)(
implicit user: LoggedUser,
freshnessPolicy: DataFreshnessPolicy
): Future[Map[ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]] = {
val allProcessingTypes = scenarios.map(_.processingType).toSet
val deploymentManagersByProcessingTypes =
allProcessingTypes.flatMap(pt => dispatcher.deploymentManager(pt).map(dm => (pt, dm)))
val prefetchedStatesByProcessingTypes = Future
val numberOfScenariosByProcessingType =
allProcessingTypes
.map(processingType => (processingType, scenarios.count(_.processingType == processingType)))
.toMap
val processingTypesWithMoreThanOneScenario = numberOfScenariosByProcessingType.filter(_._2 > 1).keys

Future
.sequence {
deploymentManagersByProcessingTypes.map { case (processingType, manager) =>
manager match {
case dm: StateQueryForAllScenariosSupported => getProcessesStates(processingType, dm)
case _ => Future.successful(None)
}
processingTypesWithMoreThanOneScenario.map { processingType =>
(for {
manager <- dispatcher.deploymentManager(processingType)
managerWithCapability <- manager.stateQueryForAllScenariosSupport match {
case supported: StateQueryForAllScenariosSupported => Some(supported)
case NoStateQueryForAllScenariosSupport => None
}
} yield getProcessesStates(processingType, managerWithCapability))
.getOrElse(Future.successful(None))
}
}
.map(_.flatten.toMap)

prefetchedStatesByProcessingTypes
}

private def getProcessesStates(processingType: ProcessingType, manager: StateQueryForAllScenariosSupported)(
implicit freshnessPolicy: DataFreshnessPolicy,
): Future[Option[(ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]])]] = {
manager
.getProcessesStates()
.getAllProcessesStates()
.map(states => Some((processingType, states)))
.recover { case NonFatal(e) =>
logger.warn(
Expand Down Expand Up @@ -682,14 +693,16 @@ class DeploymentService(

val state = (prefetchedStateOpt match {
case Some(prefetchedState) =>
deploymentManager.resolvePrefetchedProcessState(
processIdWithName,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
prefetchedState,
)
deploymentManager
.resolve(
processIdWithName,
prefetchedState.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
.map(prefetchedState.withValue)
case None =>
deploymentManager.getProcessState(
processIdWithName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ object InvalidDeploymentManagerStub extends DeploymentManager {

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

override def close(): Unit = ()
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ class MockDeploymentManager(

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

}

class MockManagerProvider(deploymentManager: DeploymentManager = new MockDeploymentManager())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport
}

class DevelopmentDeploymentManagerProvider extends DeploymentManagerProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ object MockableDeploymentManagerProvider {

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

override def managerSpecificScenarioActivities(
processIdWithName: ProcessIdWithName,
after: Option[Instant],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.concurrent.{ExecutionContext, Future}
object PeriodicDeploymentManager {

def apply(
delegate: DeploymentManager with StateQueryForAllScenariosSupported,
delegate: DeploymentManager,
schedulePropertyExtractorFactory: SchedulePropertyExtractorFactory,
processConfigEnricherFactory: ProcessConfigEnricherFactory,
periodicBatchConfig: PeriodicBatchConfig,
Expand Down Expand Up @@ -107,7 +107,6 @@ class PeriodicDeploymentManager private[periodic] (
toClose: () => Unit
)(implicit val ec: ExecutionContext)
extends DeploymentManager
with StateQueryForAllScenariosSupported
with ManagerSpecificScenarioActivitiesStoredByManager
with LazyLogging {

Expand Down Expand Up @@ -194,18 +193,15 @@ class PeriodicDeploymentManager private[periodic] (
}
}

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport =
service.stateQueryForAllScenariosSupport

override def getProcessStates(
name: ProcessName
)(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = {
service.getStatusDetails(name).map(_.map(List(_)))
}

override def getProcessesStates()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = {
service.getStatusDetails().map(_.map(_.map { case (k, v) => (k, List(v)) }))
}

override def resolve(
idWithName: ProcessIdWithName,
statusDetailsList: List[StatusDetails],
Expand Down
Loading

0 comments on commit 59d7ac8

Please sign in to comment.