diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala index 602048fd3a2..ce7aeba9d43 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala @@ -4,6 +4,7 @@ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentState import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId} import pl.touk.nussknacker.engine.deployment.CustomActionDefinition import pl.touk.nussknacker.engine.newdeployment +import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps import java.time.Instant import scala.concurrent.ExecutionContext.Implicits._ @@ -46,6 +47,8 @@ trait DeploymentManager extends AutoCloseable { def deploymentSynchronisationSupport: DeploymentSynchronisationSupport + def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport + def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result] final def getProcessState( @@ -66,7 +69,7 @@ trait DeploymentManager extends AutoCloseable { latestVersionId, deployedVersionId, currentlyPresentedVersionId, - ).map(state => statusDetailsWithFreshness.map(_ => state)) + ).map(statusDetailsWithFreshness.withValue) } yield stateWithFreshness } @@ -109,6 +112,18 @@ trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager } +sealed trait StateQueryForAllScenariosSupport + +trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport { + + def getAllProcessesStates()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] + +} + +case object NoStateQueryForAllScenariosSupport extends StateQueryForAllScenariosSupport + sealed trait DeploymentSynchronisationSupport trait DeploymentSynchronisationSupported extends DeploymentSynchronisationSupport { diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala index 378ec214aa7..5807c745e93 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManager.scala @@ -15,7 +15,8 @@ import scala.concurrent.duration._ class CachingProcessStateDeploymentManager( delegate: DeploymentManager, cacheTTL: FiniteDuration, - override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport + override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport, + override val stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport, ) extends DeploymentManager { private val cache: AsyncCache[ProcessName, List[StatusDetails]] = Caffeine @@ -81,7 +82,12 @@ object CachingProcessStateDeploymentManager extends LazyLogging { scenarioStateCacheTTL .map { cacheTTL => logger.debug(s"Wrapping DeploymentManager: $delegate with caching mechanism with TTL: $cacheTTL") - new CachingProcessStateDeploymentManager(delegate, cacheTTL, delegate.deploymentSynchronisationSupport) + new CachingProcessStateDeploymentManager( + delegate, + cacheTTL, + delegate.deploymentSynchronisationSupport, + delegate.stateQueryForAllScenariosSupport + ) } .getOrElse { logger.debug(s"Skipping ProcessState caching for DeploymentManager: $delegate") diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala index 5f483981534..08b86b1a9fa 100644 --- a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/testing/DeploymentManagerStub.scala @@ -62,6 +62,8 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + override def close(): Unit = {} } diff --git a/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/WithDataFreshnessStatusUtils.scala b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/WithDataFreshnessStatusUtils.scala new file mode 100644 index 00000000000..76172afe86b --- /dev/null +++ b/designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/util/WithDataFreshnessStatusUtils.scala @@ -0,0 +1,31 @@ +package pl.touk.nussknacker.engine.util + +import pl.touk.nussknacker.engine.api.deployment.WithDataFreshnessStatus + +object WithDataFreshnessStatusUtils { + + implicit class WithDataFreshnessStatusMapOps[K, V](withDataFreshnessStatus: WithDataFreshnessStatus[Map[K, V]]) { + + def get(k: K): Option[WithDataFreshnessStatus[V]] = withDataFreshnessStatus.map(_.get(k)) match { + case WithDataFreshnessStatus(Some(value), cached) => Some(WithDataFreshnessStatus(value, cached)) + case WithDataFreshnessStatus(None, _) => None + } + + def getOrElse(k: K, orElse: V): WithDataFreshnessStatus[V] = { + withDataFreshnessStatus.map(_.get(k)) match { + case WithDataFreshnessStatus(Some(value), cached) => WithDataFreshnessStatus(value, cached) + case WithDataFreshnessStatus(None, cached) => WithDataFreshnessStatus(orElse, cached) + } + } + + } + + implicit class WithDataFreshnessStatusOps[A, B](scenarioActivity: WithDataFreshnessStatus[A]) { + + def withValue(v: B): WithDataFreshnessStatus[B] = { + scenarioActivity.map(_ => v) + } + + } + +} diff --git a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala index 99633fddfc7..172d141e792 100644 --- a/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala +++ b/designer/deployment-manager-api/src/test/scala/pl/touk/nussknacker/engine/api/deployment/cache/CachingProcessStateDeploymentManagerSpec.scala @@ -7,8 +7,8 @@ import org.scalatest.OptionValues import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar -import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment._ +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId import pl.touk.nussknacker.test.PatientScalaFutures @@ -26,8 +26,12 @@ class CachingProcessStateDeploymentManagerSpec test("should ask delegate for a fresh state each time") { val delegate = prepareDMReturningRandomStates - val cachingManager = - new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport) + val cachingManager = new CachingProcessStateDeploymentManager( + delegate, + 10 seconds, + NoDeploymentSynchronisationSupport, + NoStateQueryForAllScenariosSupport + ) val results = List( cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh), @@ -41,8 +45,12 @@ class CachingProcessStateDeploymentManagerSpec test("should cache state for DataFreshnessPolicy.CanBeCached") { val delegate = prepareDMReturningRandomStates - val cachingManager = - new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport) + val cachingManager = new CachingProcessStateDeploymentManager( + delegate, + 10 seconds, + NoDeploymentSynchronisationSupport, + NoStateQueryForAllScenariosSupport + ) val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached) firstInvocation.cached shouldBe false @@ -55,8 +63,12 @@ class CachingProcessStateDeploymentManagerSpec test("should reuse state updated by DataFreshnessPolicy.Fresh during reading with DataFreshnessPolicy.CanBeCached") { val delegate = prepareDMReturningRandomStates - val cachingManager = - new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport) + val cachingManager = new CachingProcessStateDeploymentManager( + delegate, + 10 seconds, + NoDeploymentSynchronisationSupport, + NoStateQueryForAllScenariosSupport + ) val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh) resultForFresh.cached shouldBe false diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessStateProvider.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessStateProvider.scala index ab8e63f8e14..e2f718bf40f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessStateProvider.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/ProcessStateProvider.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity import pl.touk.nussknacker.ui.security.api.LoggedUser -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.language.higherKinds trait ProcessStateProvider { diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index bd72c1c211d..b8b27bbaed3 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -21,12 +21,16 @@ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment._ import pl.touk.nussknacker.engine.util.AdditionalComponentConfigsForRuntimeExtractor +import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.{ + WithDataFreshnessStatusMapOps, + WithDataFreshnessStatusOps +} import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails import pl.touk.nussknacker.ui.api.{DeploymentCommentSettings, ListenerApiUser} import pl.touk.nussknacker.ui.listener.ProcessChangeEvent.{OnActionExecutionFinished, OnActionFailed, OnActionSuccess} import pl.touk.nussknacker.ui.listener.{ProcessChangeListener, User => ListenerUser} import pl.touk.nussknacker.ui.process.ProcessStateProvider -import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions._ +import pl.touk.nussknacker.ui.process.ScenarioWithDetailsConversions.Ops import pl.touk.nussknacker.ui.process.deployment.LoggedUserConversions.LoggedUserOps import pl.touk.nussknacker.ui.process.exception.{DeployingInvalidScenarioError, ProcessIllegalAction} import pl.touk.nussknacker.ui.process.processingtype.provider.ProcessingTypeDataProvider @@ -169,9 +173,9 @@ class DeploymentService( } /** - * Common validations and operations for a command execution. - * @return gathered data for further command execution - */ + * Common validations and operations for a command execution. + * @return gathered data for further command execution + */ private def prepareCommandContextWithAction[PS: ScenarioShapeFetchStrategy]( processId: ProcessIdWithName, actionName: ScenarioActionName, @@ -203,7 +207,7 @@ class DeploymentService( _ = checkIfCanPerformActionOnScenario(actionName, processDetails) // 1.7. check if action is allowed for current state inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - processState <- getProcessState(processDetails, inProgressActionNames, None) + processState <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None) _ = checkIfCanPerformActionInState(actionName, processDetails, processState) // 1.8. create new action, action is started with "in progress" state, the whole command execution can take some time actionId <- actionRepository.addInProgressAction( @@ -437,7 +441,11 @@ class DeploymentService( processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id) processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processIdWithName.name)) inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessState(processDetails, inProgressActionNames, currentlyPresentedVersionId) + result <- getProcessStateFetchingStatusFromManager( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId + ) } yield result) } @@ -446,7 +454,7 @@ class DeploymentService( )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = { dbioRunner.run(for { inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessState(processDetails, inProgressActionNames, None) + result <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None) } yield result) } @@ -454,30 +462,95 @@ class DeploymentService( implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy ): Future[F[ScenarioWithDetails]] = { + val scenarios = processTraverse.toList dbioRunner.run( for { - actionsInProgress <- getInProgressActionTypesForProcessTraverse(processTraverse) + actionsInProgress <- getInProgressActionTypesForScenarios(scenarios) + prefetchedStates <- DBIO.from(getPrefetchedStatesForSupportedManagers(scenarios)) processesWithState <- processTraverse .map { case process if process.isFragment => DBIO.successful(process) case process => - getProcessState( - process.toEntity, - actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), - None, - ).map(state => process.copy(state = Some(state))) + val prefetchedStatusDetails = for { + prefetchedStatusDetailsForProcessingTypes <- prefetchedStates.get(process.processingType) + prefetchedStatusDetails <- prefetchedStatusDetailsForProcessingTypes.get(process.name) + } yield prefetchedStatusDetails + prefetchedStatusDetails match { + case Some(prefetchedStatusDetails) => + getProcessStateUsingPrefetchedStatus( + process.toEntity, + actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), + None, + prefetchedStatusDetails, + ).map(state => process.copy(state = Some(state))) + case None => + getProcessStateFetchingStatusFromManager( + process.toEntity, + actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), + None, + ).map(state => process.copy(state = Some(state))) + } + } .sequence[DB, ScenarioWithDetails] } yield processesWithState ) } + // DeploymentManager's may support fetching state of all scenarios at once + // State is prefetched only when: + // - DM has capability StateQueryForAllScenariosSupported + // - the query is about more than one scenario handled by that DM + private def getPrefetchedStatesForSupportedManagers( + scenarios: List[ScenarioWithDetails], + )( + implicit user: LoggedUser, + freshnessPolicy: DataFreshnessPolicy + ): Future[Map[ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]] = { + val allProcessingTypes = scenarios.map(_.processingType).toSet + val numberOfScenariosByProcessingType = + allProcessingTypes + .map(processingType => (processingType, scenarios.count(_.processingType == processingType))) + .toMap + val processingTypesWithMoreThanOneScenario = numberOfScenariosByProcessingType.filter(_._2 > 1).keys + + Future + .sequence { + processingTypesWithMoreThanOneScenario.map { processingType => + (for { + manager <- dispatcher.deploymentManager(processingType) + managerWithCapability <- manager.stateQueryForAllScenariosSupport match { + case supported: StateQueryForAllScenariosSupported => Some(supported) + case NoStateQueryForAllScenariosSupport => None + } + } yield getAllProcessesStates(processingType, managerWithCapability)) + .getOrElse(Future.successful(None)) + } + } + .map(_.flatten.toMap) + } + + private def getAllProcessesStates(processingType: ProcessingType, manager: StateQueryForAllScenariosSupported)( + implicit freshnessPolicy: DataFreshnessPolicy, + ): Future[Option[(ProcessingType, WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]])]] = { + manager + .getAllProcessesStates() + .map(states => Some((processingType, states))) + .recover { case NonFatal(e) => + logger.warn( + s"Failed to get statuses of all scenarios in deployment manager for $processingType: ${e.getMessage}", + e + ) + None + } + } + // This is optimisation tweak. We want to reduce number of calls for in progress action types. So for >1 scenarios // we do one call for all in progress action types for all scenarios - private def getInProgressActionTypesForProcessTraverse[F[_]: Traverse]( - processTraverse: F[ScenarioWithDetails] + private def getInProgressActionTypesForScenarios( + scenarios: List[ScenarioWithDetails] ): DB[Map[ProcessId, Set[ScenarioActionName]]] = { - processTraverse.toList match { + scenarios match { case Nil => DBIO.successful(Map.empty) case head :: Nil => actionRepository @@ -489,11 +562,57 @@ class DeploymentService( } } - private def getProcessState( + private def getProcessStateFetchingStatusFromManager( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], currentlyPresentedVersionId: Option[VersionId], )(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[ProcessState] = { + getProcessState( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId, + manager => + getStateFromDeploymentManager( + manager, + processDetails.idWithName, + processDetails.lastStateAction, + processDetails.processVersionId, + processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId, + ) + ) + } + + private def getProcessStateUsingPrefetchedStatus( + processDetails: ScenarioWithDetailsEntity[_], + inProgressActionNames: Set[ScenarioActionName], + currentlyPresentedVersionId: Option[VersionId], + prefetchedStatusDetails: WithDataFreshnessStatus[List[StatusDetails]], + )(implicit user: LoggedUser): DB[ProcessState] = { + getProcessState( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId, + manager => + manager + .resolve( + processDetails.idWithName, + prefetchedStatusDetails.value, + processDetails.lastStateAction, + processDetails.processVersionId, + processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId, + ) + .map(prefetchedStatusDetails.withValue) + ) + } + + private def getProcessState( + processDetails: ScenarioWithDetailsEntity[_], + inProgressActionNames: Set[ScenarioActionName], + currentlyPresentedVersionId: Option[VersionId], + fetchState: DeploymentManager => Future[WithDataFreshnessStatus[ProcessState]], + )(implicit user: LoggedUser): DB[ProcessState] = { val processVersionId = processDetails.processVersionId val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) dispatcher @@ -527,16 +646,7 @@ class DeploymentService( processDetails.lastStateAction match { case Some(_) => DBIOAction - .from( - getStateFromDeploymentManager( - manager, - processDetails.idWithName, - processDetails.lastStateAction, - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - ) + .from(fetchState(manager)) .map { statusWithFreshness => logger.debug( s"Status for: '${processDetails.name}' is: ${statusWithFreshness.value.status}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction @@ -637,7 +747,7 @@ class DeploymentService( lastStateAction, latestVersionId, deployedVersionId, - currentlyPresentedVersionId + currentlyPresentedVersionId, ) .recover { case NonFatal(e) => logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala index 82149948176..37b31a16494 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/processingtype/InvalidDeploymentManagerStub.scala @@ -55,5 +55,7 @@ object InvalidDeploymentManagerStub extends DeploymentManager { override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + override def close(): Unit = () } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala index dcd25483e6d..728d3188696 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/test/mock/MockDeploymentManager.scala @@ -298,6 +298,8 @@ class MockDeploymentManager( override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + } class MockManagerProvider(deploymentManager: DeploymentManager = new MockDeploymentManager()) diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala index fe9518f9f26..ab972d49afe 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/DevelopmentDeploymentManagerProvider.scala @@ -228,6 +228,7 @@ class DevelopmentDeploymentManager(actorSystem: ActorSystem, modelData: BaseMode override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport } class DevelopmentDeploymentManagerProvider extends DeploymentManagerProvider { diff --git a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala index 4154b8d7030..6ccc6299038 100644 --- a/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala +++ b/engine/development/deploymentManager/src/main/scala/pl/touk/nussknacker/development/manager/MockableDeploymentManagerProvider.scala @@ -144,6 +144,8 @@ object MockableDeploymentManagerProvider { override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + override def managerSpecificScenarioActivities( processIdWithName: ProcessIdWithName, after: Option[Instant], diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala index bd64c7e8bd8..9ce983104cc 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/FlinkPeriodicDeploymentManagerProvider.scala @@ -38,10 +38,10 @@ class FlinkPeriodicDeploymentManagerProvider extends DeploymentManagerProvider w modelData: BaseModelData, dependencies: DeploymentManagerDependencies, config: Config, - scenarioStateCacheTTL: Option[FiniteDuration] + scenarioStateCacheTTL: Option[FiniteDuration], ): ValidatedNel[String, DeploymentManager] = { logger.info("Creating FlinkPeriodic scenario manager") - delegate.createDeploymentManager(modelData, dependencies, config, scenarioStateCacheTTL).map { + delegate.createDeploymentManagerWithCapabilities(modelData, dependencies, config, scenarioStateCacheTTL).map { delegateDeploymentManager => import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index 9ad4fdf8342..b17a6d18aae 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -193,6 +193,9 @@ class PeriodicDeploymentManager private[periodic] ( } } + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = + service.stateQueryForAllScenariosSupport + override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index 6e48530d7c6..ac4ab19cbd5 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -495,6 +495,29 @@ class PeriodicProcessService( } } + def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = + delegateDeploymentManager.stateQueryForAllScenariosSupport match { + case supported: StateQueryForAllScenariosSupported => + new StateQueryForAllScenariosSupported { + + override def getAllProcessesStates()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { + supported.getAllProcessesStates().flatMap { statusesWithFreshness => + mergeStatusWithDeployments(statusesWithFreshness.value).map { statusDetails => + statusesWithFreshness.map(_.flatMap { case (name, _) => + statusDetails.get(name).map(statusDetails => (name, List(statusDetails))) + }) + } + } + } + + } + + case NoStateQueryForAllScenariosSupport => + NoStateQueryForAllScenariosSupport + } + private def mergeStatusWithDeployments( name: ProcessName, runtimeStatuses: List[StatusDetails] @@ -528,12 +551,55 @@ class PeriodicProcessService( } } + private def mergeStatusWithDeployments( + runtimeStatuses: Map[ProcessName, List[StatusDetails]] + ): Future[Map[ProcessName, StatusDetails]] = { + def toDeploymentStatuses(processName: ProcessName, schedulesState: SchedulesState) = + schedulesState.schedules.toList + .flatMap { case (scheduleId, scheduleData) => + scheduleData.latestDeployments.map { deployment => + DeploymentStatus( + deployment.id, + scheduleId, + deployment.createdAt, + deployment.runAt, + deployment.state.status, + scheduleData.process.active, + runtimeStatuses.getOrElse(processName, List.empty).getStatus(deployment.id) + ) + } + } + .sorted(DeploymentStatus.ordering.reverse) + + for { + activeSchedules <- getLatestDeploymentsForActiveSchedules(MaxDeploymentsStatus) + inactiveSchedules <- getLatestDeploymentsForLatestInactiveSchedules(MaxDeploymentsStatus, MaxDeploymentsStatus) + } yield { + val allProcessNames = activeSchedules.keySet ++ inactiveSchedules.keySet + allProcessNames.map { processName => + val activeSchedulesForProcess = activeSchedules.getOrElse(processName, SchedulesState(Map.empty)) + val inactiveSchedulesForProcess = inactiveSchedules.getOrElse(processName, SchedulesState(Map.empty)) + val status = PeriodicProcessStatus( + toDeploymentStatuses(processName, activeSchedulesForProcess), + toDeploymentStatuses(processName, inactiveSchedulesForProcess) + ) + val mergedStatus = status.mergedStatusDetails.copy(status = status) + (processName, mergedStatus) + }.toMap + } + } + def getLatestDeploymentsForActiveSchedules( processName: ProcessName, deploymentsPerScheduleMaxCount: Int = 1 ): Future[SchedulesState] = scheduledProcessesRepository.getLatestDeploymentsForActiveSchedules(processName, deploymentsPerScheduleMaxCount).run + def getLatestDeploymentsForActiveSchedules( + deploymentsPerScheduleMaxCount: Int + ): Future[Map[ProcessName, SchedulesState]] = + scheduledProcessesRepository.getLatestDeploymentsForActiveSchedules(deploymentsPerScheduleMaxCount).run + def getLatestDeploymentsForLatestInactiveSchedules( processName: ProcessName, inactiveProcessesMaxCount: Int, @@ -547,6 +613,17 @@ class PeriodicProcessService( ) .run + def getLatestDeploymentsForLatestInactiveSchedules( + inactiveProcessesMaxCount: Int, + deploymentsPerScheduleMaxCount: Int + ): Future[Map[ProcessName, SchedulesState]] = + scheduledProcessesRepository + .getLatestDeploymentsForLatestInactiveSchedules( + inactiveProcessesMaxCount, + deploymentsPerScheduleMaxCount + ) + .run + implicit class RuntimeStatusesExt(runtimeStatuses: List[StatusDetails]) { private val runtimeStatusesMap = runtimeStatuses.flatMap(status => status.deploymentId.map(_ -> status)).toMap diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala index 0b2e2c48c8d..9906c609bc4 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/db/PeriodicProcessesRepository.scala @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.ProcessActionId import pl.touk.nussknacker.engine.api.process.ProcessName import pl.touk.nussknacker.engine.management.periodic._ +import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessWithoutJson import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData.{ WithCanonicalProcess, WithoutCanonicalProcess @@ -131,12 +132,21 @@ trait PeriodicProcessesRepository { deploymentsPerScheduleMaxCount: Int ): Action[SchedulesState] + def getLatestDeploymentsForActiveSchedules( + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] + def getLatestDeploymentsForLatestInactiveSchedules( processName: ProcessName, inactiveProcessesMaxCount: Int, deploymentsPerScheduleMaxCount: Int ): Action[SchedulesState] + def getLatestDeploymentsForLatestInactiveSchedules( + inactiveProcessesMaxCount: Int, + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] + def findToBeDeployed: Action[Seq[PeriodicProcessDeployment[WithCanonicalProcess]]] def findToBeRetried: Action[Seq[PeriodicProcessDeployment[WithCanonicalProcess]]] @@ -201,7 +211,7 @@ class SlickPeriodicProcessesRepository( .on(_.id === _.periodicProcessId) .filterOpt(afterOpt)((entities, after) => entities._2.completedAt > after) .result - .map(toSchedulesState) + .map(toSchedulesStateForSinglePeriodicProcess) } override def create( @@ -315,7 +325,7 @@ class SlickPeriodicProcessesRepository( getLatestDeploymentsForEachSchedule( processesHavingDeploymentsWithMatchingStatus, deploymentsPerScheduleMaxCount = 1 - ) + ).map(_.values.headOption.getOrElse(SchedulesState(Map.empty))) } override def getLatestDeploymentsForActiveSchedules( @@ -323,6 +333,14 @@ class SlickPeriodicProcessesRepository( deploymentsPerScheduleMaxCount: Int ): Action[SchedulesState] = { val activeProcessesQuery = PeriodicProcessesWithoutJson.filter(p => p.processName === processName && p.active) + getLatestDeploymentsForEachSchedule(activeProcessesQuery, deploymentsPerScheduleMaxCount) + .map(_.getOrElse(processName, SchedulesState(Map.empty))) + } + + override def getLatestDeploymentsForActiveSchedules( + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] = { + val activeProcessesQuery = PeriodicProcessesWithoutJson.filter(_.active) getLatestDeploymentsForEachSchedule(activeProcessesQuery, deploymentsPerScheduleMaxCount) } @@ -336,12 +354,24 @@ class SlickPeriodicProcessesRepository( .sortBy(_.createdAt.desc) .take(inactiveProcessesMaxCount) getLatestDeploymentsForEachSchedule(filteredProcessesQuery, deploymentsPerScheduleMaxCount) + .map(_.getOrElse(processName, SchedulesState(Map.empty))) + } + + override def getLatestDeploymentsForLatestInactiveSchedules( + inactiveProcessesMaxCount: Int, + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] = { + val filteredProcessesQuery = PeriodicProcessesWithoutJson + .filter(!_.active) + .sortBy(_.createdAt.desc) + .take(inactiveProcessesMaxCount) + getLatestDeploymentsForEachSchedule(filteredProcessesQuery, deploymentsPerScheduleMaxCount) } private def getLatestDeploymentsForEachSchedule( periodicProcessesQuery: Query[PeriodicProcessWithoutJson, PeriodicProcessEntityWithoutJson, Seq], deploymentsPerScheduleMaxCount: Int - ): Action[SchedulesState] = { + ): Action[Map[ProcessName, SchedulesState]] = { val filteredPeriodicProcessQuery = periodicProcessesQuery.filter(p => p.processingType === processingType) val latestDeploymentsForSchedules = profile match { case _: ExPostgresProfile => @@ -355,7 +385,7 @@ class SlickPeriodicProcessesRepository( private def getLatestDeploymentsForEachSchedulePostgres( periodicProcessesQuery: Query[PeriodicProcessWithoutJson, PeriodicProcessEntityWithoutJson, Seq], deploymentsPerScheduleMaxCount: Int - ): Action[Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]] = { + ): Action[Seq[(PeriodicProcessEntityWithoutJson, PeriodicProcessDeploymentEntity)]] = { // To effectively limit deployments to given count for each schedule in one query, we use window functions in slick import ExPostgresProfile.api._ import com.github.tminglei.slickpg.window.PgWindowFuncSupport.WindowFunctions._ @@ -389,7 +419,7 @@ class SlickPeriodicProcessesRepository( private def getLatestDeploymentsForEachScheduleJdbcGeneric( periodicProcessesQuery: Query[PeriodicProcessWithoutJson, PeriodicProcessEntityWithoutJson, Seq], deploymentsPerScheduleMaxCount: Int - ): Action[Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]] = { + ): Action[Seq[(PeriodicProcessEntityWithoutJson, PeriodicProcessDeploymentEntity)]] = { // It is debug instead of warn to not bloast logs when e.g. for some reasons is used hsql under the hood logger.debug( "WARN: Using not optimized version of getLatestDeploymentsForEachSchedule that not uses window functions" @@ -460,21 +490,30 @@ class SlickPeriodicProcessesRepository( join PeriodicProcessDeployments on (_.id === _.periodicProcessId)) } - private def toSchedulesState(list: Seq[(PeriodicProcessEntity, PeriodicProcessDeploymentEntity)]): SchedulesState = { + private def toSchedulesState( + list: Seq[(PeriodicProcessEntityWithoutJson, PeriodicProcessDeploymentEntity)] + ): Map[ProcessName, SchedulesState] = { + list + .groupBy(_._1.processName) + .map { case (processName, list) => processName -> toSchedulesStateForSinglePeriodicProcess(list) } + } + + private def toSchedulesStateForSinglePeriodicProcess( + list: Seq[(PeriodicProcessEntityWithoutJson, PeriodicProcessDeploymentEntity)] + ): SchedulesState = { SchedulesState( list .map { case (process, deployment) => - val scheduleId = ScheduleId(process.id, ScheduleName(deployment.scheduleName)) - val scheduleDataWithoutDeployment = - (scheduleId, PeriodicProcessesRepository.createPeriodicProcessWithoutJson(process)) + val scheduleId = ScheduleId(process.id, ScheduleName(deployment.scheduleName)) + val scheduleData = (scheduleId, process) val scheduleDeployment = ScheduleDeploymentData(deployment) - (scheduleDataWithoutDeployment, scheduleDeployment) + (scheduleData, scheduleDeployment) } .toList .toGroupedMap .toList .map { case ((scheduleId, process), deployments) => - scheduleId -> ScheduleData(process, deployments) + scheduleId -> ScheduleData(createPeriodicProcessWithoutJson(process), deployments) } .toMap ) diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala index c2e8a4edb4f..e424331f837 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/DeploymentManagerStub.scala @@ -11,18 +11,48 @@ import scala.concurrent.Future class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands { - var jobStatus: Option[StatusDetails] = None - - def setStateStatus(status: StateStatus, deploymentIdOpt: Option[PeriodicProcessDeploymentId]): Unit = { - jobStatus = Some( - StatusDetails( - deploymentId = deploymentIdOpt.map(pdid => DeploymentId(pdid.toString)), - externalDeploymentId = Some(ExternalDeploymentId("1")), - status = status, - version = None, - startTime = None, - attributes = None, - errors = Nil + var jobStatus: Map[ProcessName, List[StatusDetails]] = Map.empty + + def setEmptyStateStatus(): Unit = { + jobStatus = Map.empty + } + + def addStateStatus( + processName: ProcessName, + status: StateStatus, + deploymentIdOpt: Option[PeriodicProcessDeploymentId] + ): Unit = { + jobStatus = jobStatus ++ Map( + processName -> List( + StatusDetails( + deploymentId = deploymentIdOpt.map(pdid => DeploymentId(pdid.toString)), + externalDeploymentId = Some(ExternalDeploymentId("1")), + status = status, + version = None, + startTime = None, + attributes = None, + errors = Nil + ) + ) + ) + } + + def setStateStatus( + processName: ProcessName, + status: StateStatus, + deploymentIdOpt: Option[PeriodicProcessDeploymentId] + ): Unit = { + jobStatus = Map( + processName -> List( + StatusDetails( + deploymentId = deploymentIdOpt.map(pdid => DeploymentId(pdid.toString)), + externalDeploymentId = Some(ExternalDeploymentId("1")), + status = status, + version = None, + startTime = None, + attributes = None, + errors = Nil + ) ) ) } @@ -47,9 +77,19 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - Future.successful(WithDataFreshnessStatus.fresh(jobStatus.toList)) + Future.successful(WithDataFreshnessStatus.fresh(jobStatus.get(name).toList.flatten)) } override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = + new StateQueryForAllScenariosSupported { + + override def getAllProcessesStates()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = + Future.successful(WithDataFreshnessStatus.fresh(jobStatus)) + + } + } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index d4310d1b5ec..82f1c575b0b 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -152,7 +152,7 @@ class PeriodicDeploymentManagerTest test("getProcessState - should be scheduled when scenario scheduled and job finished on Flink") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Scheduled) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe a[ScheduledStatus] @@ -170,7 +170,7 @@ class PeriodicDeploymentManagerTest PeriodicProcessDeploymentStatus.Finished, LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC) ) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) f.periodicProcessService.deactivate(processName).futureValue val state = @@ -186,7 +186,7 @@ class PeriodicDeploymentManagerTest test("getProcessState - should be running when scenario deployed and job running on Flink") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Running, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe SimpleStateStatus.Running @@ -198,7 +198,7 @@ class PeriodicDeploymentManagerTest test("getProcessState - should be waiting for reschedule if job finished on Flink but scenario is still deployed") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe WaitingForScheduleStatus @@ -296,7 +296,7 @@ class PeriodicDeploymentManagerTest test("should get status of failed job") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails statusDetails.status shouldBe ProblemStateStatus.Failed @@ -308,7 +308,7 @@ class PeriodicDeploymentManagerTest test("should redeploy failed scenario") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) val statusDetailsBeforeRedeploy = f.getMergedStatusDetails statusDetailsBeforeRedeploy.status shouldBe ProblemStateStatus.Failed f.getAllowedActions( @@ -375,7 +375,7 @@ class PeriodicDeploymentManagerTest test("should redeploy running scenario") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Running, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel @@ -402,7 +402,7 @@ class PeriodicDeploymentManagerTest test("should redeploy finished scenario") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) val statusDetails = f.getMergedStatusDetails f.getAllowedActions(statusDetails, processVersion.versionId, None, Some(processVersion.versionId)) shouldBe List( ScenarioActionName.Cancel @@ -429,7 +429,7 @@ class PeriodicDeploymentManagerTest test("should cancel failed job after RescheduleActor handles finished") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) // this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue @@ -449,7 +449,7 @@ class PeriodicDeploymentManagerTest test("should reschedule failed job after RescheduleActor handles finished when configured") { val f = new Fixture(executionConfig = PeriodicExecutionConfig(rescheduleOnFailure = true)) val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) // this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue @@ -465,7 +465,7 @@ class PeriodicDeploymentManagerTest test("should cancel failed job before RescheduleActor handles finished") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) f.periodicDeploymentManager.processCommand(DMCancelScenarioCommand(processName, User("test", "Tester"))).futureValue @@ -477,13 +477,13 @@ class PeriodicDeploymentManagerTest test("should cancel failed scenario after disappeared from Flink console") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) // this one is cyclically called by RescheduleActor f.periodicProcessService.handleFinished.futureValue // after some time Flink stops returning job status - f.delegateDeploymentManagerStub.jobStatus = None + f.delegateDeploymentManagerStub.jobStatus = Map.empty f.getMergedStatusDetails.status shouldEqual ProblemStateStatus.Failed f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index e7807319126..892fba32f48 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -228,7 +228,11 @@ class PeriodicProcessServiceIntegrationTest ) afterDeployDeployment.runAt shouldBe localTime(expectedScheduleTime) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(afterDeployDeployment.id)) + f.delegateDeploymentManagerStub.setStateStatus( + processName, + SimpleStateStatus.Finished, + Some(afterDeployDeployment.id) + ) service.handleFinished.futureValue val toDeployAfterFinish = service.findToBeDeployed.futureValue @@ -411,12 +415,12 @@ class PeriodicProcessServiceIntegrationTest val deployment = toDeploy.find(_.scheduleName.value.contains(firstSchedule)).value service.deploy(deployment) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Running, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Running, Some(deployment.id)) val toDeployAfterDeploy = service.findToBeDeployed.futureValue toDeployAfterDeploy should have length 0 - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deployment.id)) service.handleFinished.futureValue val toDeployAfterFinish = service.findToBeDeployed.futureValue @@ -633,7 +637,7 @@ class PeriodicProcessServiceIntegrationTest toDeploy should have length 1 val deployment = toDeploy.head service.deploy(deployment).futureValue - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deployment.id)) tryWithFailedListener { () => service.deactivate(processName) @@ -663,7 +667,7 @@ class PeriodicProcessServiceIntegrationTest val deployment = toDeploy.head service.deploy(deployment).futureValue - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deployment.id)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deployment.id)) // this one is cyclically called by RescheduleActor service.handleFinished.futureValue diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index 9b6569f972f..a258d5c2ac4 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -221,7 +221,7 @@ class PeriodicProcessServiceTest PeriodicProcessDeploymentStatus.Deployed, processActionId = Some(processActionId) ) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) f.periodicProcessService.handleFinished.futureValue @@ -238,7 +238,7 @@ class PeriodicProcessServiceTest val finished :: scheduled :: Nil = f.repository.deploymentEntities.map(createPeriodicProcessDeployment(processEntity, _)).toList f.events.toList shouldBe List( - FinishedEvent(finished, f.delegateDeploymentManagerStub.jobStatus), + FinishedEvent(finished, f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption)), ScheduledEvent(scheduled, firstSchedule = false) ) } @@ -252,7 +252,7 @@ class PeriodicProcessServiceTest PeriodicProcessDeploymentStatus.Deployed, processActionId = Some(processActionId) ) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.DuringDeploy, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.DuringDeploy, Some(deploymentId)) f.periodicProcessService.handleFinished.futureValue @@ -269,7 +269,7 @@ class PeriodicProcessServiceTest scheduleProperty = cronInPast, processActionId = Some(processActionId) ) - f.delegateDeploymentManagerStub.setStateStatus(SimpleStateStatus.Finished, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, SimpleStateStatus.Finished, Some(deploymentId)) f.periodicProcessService.handleFinished.futureValue @@ -281,7 +281,10 @@ class PeriodicProcessServiceTest // TODO: active should be false val event = createPeriodicProcessDeployment(processEntity.copy(active = true), f.repository.deploymentEntities.loneElement) - f.events.loneElement shouldBe FinishedEvent(event, f.delegateDeploymentManagerStub.jobStatus) + f.events.loneElement shouldBe FinishedEvent( + event, + f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption) + ) } test("handleFinished - should not deactivate process if there is future schedule") { @@ -346,7 +349,7 @@ class PeriodicProcessServiceTest test("handleFinished - should mark as failed for failed Flink job") { val f = new Fixture val deploymentId = f.repository.addActiveProcess(processName, PeriodicProcessDeploymentStatus.Deployed) - f.delegateDeploymentManagerStub.setStateStatus(ProblemStateStatus.Failed, Some(deploymentId)) + f.delegateDeploymentManagerStub.setStateStatus(processName, ProblemStateStatus.Failed, Some(deploymentId)) f.periodicProcessService.handleFinished.futureValue @@ -356,7 +359,12 @@ class PeriodicProcessServiceTest f.repository.deploymentEntities.loneElement.status shouldBe PeriodicProcessDeploymentStatus.Failed val expectedDetails = createPeriodicProcessDeployment(processEntity, f.repository.deploymentEntities.head) - f.events.toList shouldBe List(FailedOnRunEvent(expectedDetails, f.delegateDeploymentManagerStub.jobStatus)) + f.events.toList shouldBe List( + FailedOnRunEvent( + expectedDetails, + f.delegateDeploymentManagerStub.jobStatus.get(processName).flatMap(_.headOption) + ) + ) } test("deploy - should deploy and mark as so") { diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessesFetchingTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessesFetchingTest.scala new file mode 100644 index 00000000000..a5278f94586 --- /dev/null +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessesFetchingTest.scala @@ -0,0 +1,120 @@ +package pl.touk.nussknacker.engine.management.periodic + +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.prop.TableDrivenPropertyChecks +import org.scalatest.{Inside, OptionValues} +import pl.touk.nussknacker.engine.api.deployment._ +import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus +import pl.touk.nussknacker.engine.api.process.ProcessName +import pl.touk.nussknacker.engine.deployment.DeploymentData +import pl.touk.nussknacker.engine.management.periodic.db.InMemPeriodicProcessesRepository.getLatestDeploymentQueryCount +import pl.touk.nussknacker.engine.management.periodic.model.PeriodicProcessDeploymentStatus +import pl.touk.nussknacker.engine.management.periodic.service.{ + DefaultAdditionalDeploymentDataProvider, + EmptyListener, + ProcessConfigEnricher +} +import pl.touk.nussknacker.test.PatientScalaFutures + +import java.time.Clock +import java.util.UUID + +class PeriodicProcessesFetchingTest + extends AnyFunSuite + with Matchers + with ScalaFutures + with OptionValues + with Inside + with TableDrivenPropertyChecks + with PatientScalaFutures { + + protected implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh + + import scala.concurrent.ExecutionContext.Implicits.global + + private def processName(n: Int) = ProcessName(s"test$n") + + class Fixture(executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig()) { + val repository = new db.InMemPeriodicProcessesRepository(processingType = "testProcessingType") + val delegateDeploymentManagerStub = new DeploymentManagerStub + val jarManagerStub = new JarManagerStub + val preparedDeploymentData = DeploymentData.withDeploymentId(UUID.randomUUID().toString) + + val periodicProcessService = new PeriodicProcessService( + delegateDeploymentManager = delegateDeploymentManagerStub, + jarManager = jarManagerStub, + scheduledProcessesRepository = repository, + periodicProcessListener = EmptyListener, + additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider, + deploymentRetryConfig = DeploymentRetryConfig(), + executionConfig = executionConfig, + maxFetchedPeriodicScenarioActivities = Some(200), + processConfigEnricher = ProcessConfigEnricher.identity, + clock = Clock.systemDefaultZone(), + new ProcessingTypeActionServiceStub, + Map.empty + ) + + val periodicDeploymentManager = new PeriodicDeploymentManager( + delegate = delegateDeploymentManagerStub, + service = periodicProcessService, + repository = repository, + schedulePropertyExtractor = CronSchedulePropertyExtractor(), + toClose = () => () + ) + + } + + test( + "getStatusDetails - should perform 2*N db queries for N periodic processes when fetching statuses individually" + ) { + val f = new Fixture + val n = 10 + + f.delegateDeploymentManagerStub.setEmptyStateStatus() + + for (i <- 1 to n) { + val deploymentId = f.repository.addActiveProcess(processName(i), PeriodicProcessDeploymentStatus.Deployed) + f.delegateDeploymentManagerStub.addStateStatus(processName(i), SimpleStateStatus.Running, Some(deploymentId)) + } + + getLatestDeploymentQueryCount.set(0) + + implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.CanBeCached + + for (i <- 1 to n) { + f.periodicProcessService.getStatusDetails(processName(i)).futureValue + } + + getLatestDeploymentQueryCount.get() shouldEqual 2 * n + } + + test("getStatusDetails - should perform 2 db queries for N periodic processes when fetching all at once") { + val f = new Fixture + val n = 10 + + f.delegateDeploymentManagerStub.setEmptyStateStatus() + + for (i <- 1 to n) { + val deploymentId = f.repository.addActiveProcess(processName(i), PeriodicProcessDeploymentStatus.Deployed) + f.delegateDeploymentManagerStub.addStateStatus(processName(i), SimpleStateStatus.Running, Some(deploymentId)) + } + + getLatestDeploymentQueryCount.set(0) + + implicit val freshnessPolicy: DataFreshnessPolicy = DataFreshnessPolicy.Fresh + + val statuses = f.periodicProcessService.stateQueryForAllScenariosSupport + .asInstanceOf[StateQueryForAllScenariosSupported] + .getAllProcessesStates() + .futureValue + .value + + statuses.size shouldEqual n + + getLatestDeploymentQueryCount.get() shouldEqual 2 + } + +} diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala index 8c4b39c08e0..2ebb103de28 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/db/InMemPeriodicProcessesRepository.scala @@ -5,11 +5,11 @@ import io.circe.syntax.EncoderOps import pl.touk.nussknacker.engine.api.deployment.ProcessActionId import pl.touk.nussknacker.engine.api.process.{ProcessName, VersionId} import pl.touk.nussknacker.engine.build.ScenarioBuilder -import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.management.periodic._ import pl.touk.nussknacker.engine.management.periodic.db.InMemPeriodicProcessesRepository.{ DeploymentIdSequence, - ProcessIdSequence + ProcessIdSequence, + getLatestDeploymentQueryCount } import pl.touk.nussknacker.engine.management.periodic.db.PeriodicProcessesRepository.createPeriodicProcessDeployment import pl.touk.nussknacker.engine.management.periodic.model.DeploymentWithJarData.WithCanonicalProcess @@ -27,6 +27,8 @@ import scala.util.Random object InMemPeriodicProcessesRepository { private val ProcessIdSequence = new AtomicLong(0) private val DeploymentIdSequence = new AtomicLong(0) + + val getLatestDeploymentQueryCount = new AtomicLong(0) } class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicProcessesRepository { @@ -167,22 +169,50 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP override def getLatestDeploymentsForActiveSchedules( processName: ProcessName, deploymentsPerScheduleMaxCount: Int - ): Action[SchedulesState] = + ): Action[SchedulesState] = { + getLatestDeploymentQueryCount.incrementAndGet() getLatestDeploymentsForPeriodicProcesses( processEntities(processName).filter(_.active), deploymentsPerScheduleMaxCount ) + } + + override def getLatestDeploymentsForActiveSchedules( + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] = { + getLatestDeploymentQueryCount.incrementAndGet() + allProcessEntities.map { case (processName, list) => + processName -> getLatestDeploymentsForPeriodicProcesses( + list.filter(_.active), + deploymentsPerScheduleMaxCount + ) + } + } override def getLatestDeploymentsForLatestInactiveSchedules( processName: ProcessName, inactiveProcessesMaxCount: Int, deploymentsPerScheduleMaxCount: Int ): Action[SchedulesState] = { + getLatestDeploymentQueryCount.incrementAndGet() val filteredProcesses = processEntities(processName).filterNot(_.active).sortBy(_.createdAt).takeRight(inactiveProcessesMaxCount) getLatestDeploymentsForPeriodicProcesses(filteredProcesses, deploymentsPerScheduleMaxCount) } + override def getLatestDeploymentsForLatestInactiveSchedules( + inactiveProcessesMaxCount: Int, + deploymentsPerScheduleMaxCount: Int + ): Action[Map[ProcessName, SchedulesState]] = { + getLatestDeploymentQueryCount.incrementAndGet() + allProcessEntities.map { case (processName, list) => + processName -> getLatestDeploymentsForPeriodicProcesses( + list.filterNot(_.active).sortBy(_.createdAt).takeRight(inactiveProcessesMaxCount), + deploymentsPerScheduleMaxCount + ) + } + } + private def getLatestDeploymentsForPeriodicProcesses( processes: Seq[PeriodicProcessEntity], deploymentsPerScheduleMaxCount: Int @@ -224,6 +254,12 @@ class InMemPeriodicProcessesRepository(processingType: String) extends PeriodicP .filter(_.active) .map(PeriodicProcessesRepository.createPeriodicProcessWithJson) + private def allProcessEntities: Map[ProcessName, Seq[PeriodicProcessEntity]] = + processEntities + .filter(process => process.processingType == processingType) + .toSeq + .groupBy(_.processName) + private def processEntities(processName: ProcessName): Seq[PeriodicProcessEntityWithJson] = processEntities .filter(process => process.processName == processName && process.processingType == processingType) diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala index fae0d547781..3fef55f145e 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkRestManager.scala @@ -5,12 +5,16 @@ import org.apache.flink.api.common.{JobID, JobStatus} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus -import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName, VersionId} +import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessName, VersionId} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{DeploymentId, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.FlinkRestManager.ParsedJobConfig import pl.touk.nussknacker.engine.management.rest.FlinkClient -import pl.touk.nussknacker.engine.management.rest.flinkRestModel.BaseJobStatusCounts +import pl.touk.nussknacker.engine.management.rest.flinkRestModel.{BaseJobStatusCounts, JobOverview} +import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.{ + WithDataFreshnessStatusMapOps, + WithDataFreshnessStatusOps +} import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} import scala.concurrent.Future @@ -33,37 +37,7 @@ class FlinkRestManager( override def getProcessStates( name: ProcessName )(implicit freshnessPolicy: DataFreshnessPolicy): Future[WithDataFreshnessStatus[List[StatusDetails]]] = { - val preparedName = modelData.namingStrategy.prepareName(name.value) - - client - .getJobsOverviews() - .flatMap(result => - Future - .sequence( - result.value - .filter(_.name == preparedName) - .map(job => - withParsedJobConfig(job.jid, name).map { jobConfig => - // TODO: return error when there's no correct version in process - // currently we're rather lax on this, so that this change is backward-compatible - // we log debug here for now, since it's invoked v. often - if (jobConfig.isEmpty) { - logger.debug(s"No correct job details in deployed scenario: ${job.name}") - } - StatusDetails( - SimpleStateStatus.fromDeploymentStatus(toDeploymentStatus(job.state, job.tasks)), - jobConfig.flatMap(_.deploymentId), - Some(ExternalDeploymentId(job.jid)), - version = jobConfig.map(_.version), - startTime = Some(job.`start-time`), - attributes = Option.empty, - errors = List.empty - ) - } - ) - ) - .map(WithDataFreshnessStatus(_, cached = result.cached)) // TODO: How to do it nicer? - ) + getAllProcessesStatesFromFlink().map(_.getOrElse(name, List.empty)) } override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport = @@ -88,6 +62,62 @@ class FlinkRestManager( } + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = + new StateQueryForAllScenariosSupported { + + override def getAllProcessesStates()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = getAllProcessesStatesFromFlink() + + } + + private def getAllProcessesStatesFromFlink()( + implicit freshnessPolicy: DataFreshnessPolicy + ): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]] = { + client + .getJobsOverviews() + .flatMap { result => + statusDetailsFromJobOverviews(result.value).map( + WithDataFreshnessStatus(_, cached = result.cached) + ) // TODO: How to do it nicer? + } + } + + private def statusDetailsFromJobOverviews( + jobOverviews: List[JobOverview] + ): Future[Map[ProcessName, List[StatusDetails]]] = Future + .sequence { + jobOverviews + .groupBy(_.name) + .flatMap { case (name, jobs) => + modelData.namingStrategy.decodeName(name).map(decoded => (ProcessName(decoded), jobs)) + } + .map { case (name, jobs) => + val statusDetails = jobs.map { job => + withParsedJobConfig(job.jid, name).map { jobConfig => + // TODO: return error when there's no correct version in process + // currently we're rather lax on this, so that this change is backward-compatible + // we log debug here for now, since it's invoked v. often + if (jobConfig.isEmpty) { + logger.debug(s"No correct job details in deployed scenario: ${job.name}") + } + StatusDetails( + SimpleStateStatus.fromDeploymentStatus(toDeploymentStatus(job.state, job.tasks)), + jobConfig.flatMap(_.deploymentId), + Some(ExternalDeploymentId(job.jid)), + version = jobConfig.map(_.version), + startTime = Some(job.`start-time`), + attributes = Option.empty, + errors = List.empty + ) + } + } + Future.sequence(statusDetails).map((name, _)) + } + + } + .map(_.toMap) + // NOTE: Flink <1.10 compatibility - protected to make it easier to work with Flink 1.9, JobStatus changed package, so we use String in case class protected def toDeploymentStatus(jobState: String, jobStatusCounts: BaseJobStatusCounts): DeploymentStatus = { toJobStatus(jobState) match { diff --git a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala index fbdc8b5df8a..fdc3eef46ac 100644 --- a/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala +++ b/engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/FlinkStreamingDeploymentManagerProvider.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine._ import pl.touk.nussknacker.engine.api.StreamMetaData import pl.touk.nussknacker.engine.api.component.ScenarioPropertyConfig import pl.touk.nussknacker.engine.api.definition._ -import pl.touk.nussknacker.engine.api.deployment.DeploymentManager +import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.cache.CachingProcessStateDeploymentManager import pl.touk.nussknacker.engine.deployment.EngineSetupName import pl.touk.nussknacker.engine.management.FlinkConfig.RestUrlPath @@ -26,14 +26,21 @@ class FlinkStreamingDeploymentManagerProvider extends DeploymentManagerProvider dependencies: DeploymentManagerDependencies, deploymentConfig: Config, scenarioStateCacheTTL: Option[FiniteDuration] + ): ValidatedNel[String, DeploymentManager] = { + createDeploymentManagerWithCapabilities(modelData, dependencies, deploymentConfig, scenarioStateCacheTTL) + } + + def createDeploymentManagerWithCapabilities( + modelData: BaseModelData, + dependencies: DeploymentManagerDependencies, + deploymentConfig: Config, + scenarioStateCacheTTL: Option[FiniteDuration] ): ValidatedNel[String, DeploymentManager] = { import dependencies._ val flinkConfig = deploymentConfig.rootAs[FlinkConfig] FlinkClient.create(flinkConfig, scenarioStateCacheTTL).map { client => - CachingProcessStateDeploymentManager.wrapWithCachingIfNeeded( - new FlinkStreamingRestManager(client, flinkConfig, modelData, dependencies), - scenarioStateCacheTTL - ) + val underlying = new FlinkStreamingRestManager(client, flinkConfig, modelData, dependencies) + CachingProcessStateDeploymentManager.wrapWithCachingIfNeeded(underlying, scenarioStateCacheTTL) } } diff --git a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala index 76862770f0d..ca3839d100f 100644 --- a/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala +++ b/engine/lite/embeddedDeploymentManager/src/main/scala/pl/touk/nussknacker/engine/embedded/EmbeddedDeploymentManager.scala @@ -255,6 +255,8 @@ class EmbeddedDeploymentManager( } + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + override def processStateDefinitionManager: ProcessStateDefinitionManager = EmbeddedProcessStateDefinitionManager override def close(): Unit = { diff --git a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala index 85e68aa9e04..a479ce85402 100644 --- a/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala +++ b/engine/lite/k8sDeploymentManager/src/main/scala/pl/touk/nussknacker/k8s/manager/K8sDeploymentManager.scala @@ -387,6 +387,8 @@ class K8sDeploymentManager( // for each scenario in this case and where store the deploymentId override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport + override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport + } object K8sDeploymentManager {