diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala index 4ce2a16a398..be38d1ec39f 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/process/deployment/DeploymentService.scala @@ -207,7 +207,7 @@ class DeploymentService( _ = checkIfCanPerformActionOnScenario(actionName, processDetails) // 1.7. check if action is allowed for current state inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - processState <- getProcessState(processDetails, inProgressActionNames, None, None) + processState <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None) _ = checkIfCanPerformActionInState(actionName, processDetails, processState) // 1.8. create new action, action is started with "in progress" state, the whole command execution can take some time actionId <- actionRepository.addInProgressAction( @@ -441,7 +441,11 @@ class DeploymentService( processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id) processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processIdWithName.name)) inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessState(processDetails, inProgressActionNames, currentlyPresentedVersionId, None) + result <- getProcessStateFetchingStatusFromManager( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId + ) } yield result) } @@ -450,7 +454,7 @@ class DeploymentService( )(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = { dbioRunner.run(for { inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId) - result <- getProcessState(processDetails, inProgressActionNames, None, None) + result <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None) } yield result) } @@ -467,14 +471,26 @@ class DeploymentService( .map { case process if process.isFragment => DBIO.successful(process) case process => - val prefetchedStateForProcessOpt = - prefetchedStates.get(process.processingType).flatMap(_.get(process.name)) - getProcessState( - process.toEntity, - actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), - None, - prefetchedStateForProcessOpt, - ).map(state => process.copy(state = Some(state))) + val prefetchedStatusDetails = for { + prefetchedStatusDetailsForProcessingTypes <- prefetchedStates.get(process.processingType) + prefetchedStatusDetails <- prefetchedStatusDetailsForProcessingTypes.get(process.name) + } yield prefetchedStatusDetails + prefetchedStatusDetails match { + case Some(prefetchedStatusDetails) => + getProcessStateUsingPrefetchedStatus( + process.toEntity, + actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), + None, + prefetchedStatusDetails, + ).map(state => process.copy(state = Some(state))) + case None => + getProcessStateFetchingStatusFromManager( + process.toEntity, + actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty), + None, + ).map(state => process.copy(state = Some(state))) + } + } .sequence[DB, ScenarioWithDetails] } yield processesWithState @@ -546,12 +562,57 @@ class DeploymentService( } } - private def getProcessState( + private def getProcessStateFetchingStatusFromManager( processDetails: ScenarioWithDetailsEntity[_], inProgressActionNames: Set[ScenarioActionName], currentlyPresentedVersionId: Option[VersionId], - prefetchedStatusDetailsWithFreshness: Option[WithDataFreshnessStatus[List[StatusDetails]]], )(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[ProcessState] = { + getProcessState( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId, + manager => + getStateFromDeploymentManager( + manager, + processDetails.idWithName, + processDetails.lastStateAction, + processDetails.processVersionId, + processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId, + ) + ) + } + + private def getProcessStateUsingPrefetchedStatus( + processDetails: ScenarioWithDetailsEntity[_], + inProgressActionNames: Set[ScenarioActionName], + currentlyPresentedVersionId: Option[VersionId], + prefetchedStatusDetails: WithDataFreshnessStatus[List[StatusDetails]], + )(implicit user: LoggedUser): DB[ProcessState] = { + getProcessState( + processDetails, + inProgressActionNames, + currentlyPresentedVersionId, + manager => + manager + .resolve( + processDetails.idWithName, + prefetchedStatusDetails.value, + processDetails.lastStateAction, + processDetails.processVersionId, + processDetails.lastDeployedAction.map(_.processVersionId), + currentlyPresentedVersionId, + ) + .map(prefetchedStatusDetails.withValue) + ) + } + + private def getProcessState( + processDetails: ScenarioWithDetailsEntity[_], + inProgressActionNames: Set[ScenarioActionName], + currentlyPresentedVersionId: Option[VersionId], + fetchState: DeploymentManager => Future[WithDataFreshnessStatus[ProcessState]], + )(implicit user: LoggedUser): DB[ProcessState] = { val processVersionId = processDetails.processVersionId val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId) dispatcher @@ -585,17 +646,7 @@ class DeploymentService( processDetails.lastStateAction match { case Some(_) => DBIOAction - .from( - getStateFromDeploymentManager( - manager, - processDetails.idWithName, - processDetails.lastStateAction, - processVersionId, - deployedVersionId, - currentlyPresentedVersionId, - prefetchedStatusDetailsWithFreshness, - ) - ) + .from(fetchState(manager)) .map { statusWithFreshness => logger.debug( s"Status for: '${processDetails.name}' is: ${statusWithFreshness.value.status}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction @@ -686,35 +737,22 @@ class DeploymentService( latestVersionId: VersionId, deployedVersionId: Option[VersionId], currentlyPresentedVersionId: Option[VersionId], - prefetchedStateOpt: Option[WithDataFreshnessStatus[List[StatusDetails]]], )( implicit freshnessPolicy: DataFreshnessPolicy ): Future[WithDataFreshnessStatus[ProcessState]] = { - val state = (prefetchedStateOpt match { - case Some(prefetchedState) => - deploymentManager - .resolve( - processIdWithName, - prefetchedState.value, - lastStateAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - .map(prefetchedState.withValue) - case None => - deploymentManager.getProcessState( - processIdWithName, - lastStateAction, - latestVersionId, - deployedVersionId, - currentlyPresentedVersionId, - ) - }).recover { case NonFatal(e) => - logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) - failedToGetProcessState(latestVersionId) - } + val state = deploymentManager + .getProcessState( + processIdWithName, + lastStateAction, + latestVersionId, + deployedVersionId, + currentlyPresentedVersionId, + ) + .recover { case NonFatal(e) => + logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e) + failedToGetProcessState(latestVersionId) + } scenarioStateTimeout .map { timeout =>