Skip to content

Commit

Permalink
review changes part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko committed Dec 19, 2024
1 parent 59d7ac8 commit 84c28ec
Showing 1 changed file with 87 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class DeploymentService(
_ = checkIfCanPerformActionOnScenario(actionName, processDetails)
// 1.7. check if action is allowed for current state
inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId)
processState <- getProcessState(processDetails, inProgressActionNames, None, None)
processState <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None)
_ = checkIfCanPerformActionInState(actionName, processDetails, processState)
// 1.8. create new action, action is started with "in progress" state, the whole command execution can take some time
actionId <- actionRepository.addInProgressAction(
Expand Down Expand Up @@ -441,7 +441,11 @@ class DeploymentService(
processDetailsOpt <- processRepository.fetchLatestProcessDetailsForProcessId[Unit](processIdWithName.id)
processDetails <- existsOrFail(processDetailsOpt, ProcessNotFoundError(processIdWithName.name))
inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId)
result <- getProcessState(processDetails, inProgressActionNames, currentlyPresentedVersionId, None)
result <- getProcessStateFetchingStatusFromManager(
processDetails,
inProgressActionNames,
currentlyPresentedVersionId
)
} yield result)
}

Expand All @@ -450,7 +454,7 @@ class DeploymentService(
)(implicit user: LoggedUser, freshnessPolicy: DataFreshnessPolicy): Future[ProcessState] = {
dbioRunner.run(for {
inProgressActionNames <- actionRepository.getInProgressActionNames(processDetails.processId)
result <- getProcessState(processDetails, inProgressActionNames, None, None)
result <- getProcessStateFetchingStatusFromManager(processDetails, inProgressActionNames, None)
} yield result)
}

Expand All @@ -467,14 +471,26 @@ class DeploymentService(
.map {
case process if process.isFragment => DBIO.successful(process)
case process =>
val prefetchedStateForProcessOpt =
prefetchedStates.get(process.processingType).flatMap(_.get(process.name))
getProcessState(
process.toEntity,
actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty),
None,
prefetchedStateForProcessOpt,
).map(state => process.copy(state = Some(state)))
val prefetchedStatusDetails = for {
prefetchedStatusDetailsForProcessingTypes <- prefetchedStates.get(process.processingType)
prefetchedStatusDetails <- prefetchedStatusDetailsForProcessingTypes.get(process.name)
} yield prefetchedStatusDetails
prefetchedStatusDetails match {
case Some(prefetchedStatusDetails) =>
getProcessStateUsingPrefetchedStatus(
process.toEntity,
actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty),
None,
prefetchedStatusDetails,
).map(state => process.copy(state = Some(state)))
case None =>
getProcessStateFetchingStatusFromManager(
process.toEntity,
actionsInProgress.getOrElse(process.processIdUnsafe, Set.empty),
None,
).map(state => process.copy(state = Some(state)))
}

}
.sequence[DB, ScenarioWithDetails]
} yield processesWithState
Expand Down Expand Up @@ -546,12 +562,57 @@ class DeploymentService(
}
}

private def getProcessState(
private def getProcessStateFetchingStatusFromManager(
processDetails: ScenarioWithDetailsEntity[_],
inProgressActionNames: Set[ScenarioActionName],
currentlyPresentedVersionId: Option[VersionId],
prefetchedStatusDetailsWithFreshness: Option[WithDataFreshnessStatus[List[StatusDetails]]],
)(implicit freshnessPolicy: DataFreshnessPolicy, user: LoggedUser): DB[ProcessState] = {
getProcessState(
processDetails,
inProgressActionNames,
currentlyPresentedVersionId,
manager =>
getStateFromDeploymentManager(
manager,
processDetails.idWithName,
processDetails.lastStateAction,
processDetails.processVersionId,
processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId,
)
)
}

private def getProcessStateUsingPrefetchedStatus(
processDetails: ScenarioWithDetailsEntity[_],
inProgressActionNames: Set[ScenarioActionName],
currentlyPresentedVersionId: Option[VersionId],
prefetchedStatusDetails: WithDataFreshnessStatus[List[StatusDetails]],
)(implicit user: LoggedUser): DB[ProcessState] = {
getProcessState(
processDetails,
inProgressActionNames,
currentlyPresentedVersionId,
manager =>
manager
.resolve(
processDetails.idWithName,
prefetchedStatusDetails.value,
processDetails.lastStateAction,
processDetails.processVersionId,
processDetails.lastDeployedAction.map(_.processVersionId),
currentlyPresentedVersionId,
)
.map(prefetchedStatusDetails.withValue)
)
}

private def getProcessState(
processDetails: ScenarioWithDetailsEntity[_],
inProgressActionNames: Set[ScenarioActionName],
currentlyPresentedVersionId: Option[VersionId],
fetchState: DeploymentManager => Future[WithDataFreshnessStatus[ProcessState]],
)(implicit user: LoggedUser): DB[ProcessState] = {
val processVersionId = processDetails.processVersionId
val deployedVersionId = processDetails.lastDeployedAction.map(_.processVersionId)
dispatcher
Expand Down Expand Up @@ -585,17 +646,7 @@ class DeploymentService(
processDetails.lastStateAction match {
case Some(_) =>
DBIOAction
.from(
getStateFromDeploymentManager(
manager,
processDetails.idWithName,
processDetails.lastStateAction,
processVersionId,
deployedVersionId,
currentlyPresentedVersionId,
prefetchedStatusDetailsWithFreshness,
)
)
.from(fetchState(manager))
.map { statusWithFreshness =>
logger.debug(
s"Status for: '${processDetails.name}' is: ${statusWithFreshness.value.status}, cached: ${statusWithFreshness.cached}, last status action: ${processDetails.lastStateAction
Expand Down Expand Up @@ -686,35 +737,22 @@ class DeploymentService(
latestVersionId: VersionId,
deployedVersionId: Option[VersionId],
currentlyPresentedVersionId: Option[VersionId],
prefetchedStateOpt: Option[WithDataFreshnessStatus[List[StatusDetails]]],
)(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[ProcessState]] = {

val state = (prefetchedStateOpt match {
case Some(prefetchedState) =>
deploymentManager
.resolve(
processIdWithName,
prefetchedState.value,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
.map(prefetchedState.withValue)
case None =>
deploymentManager.getProcessState(
processIdWithName,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
}).recover { case NonFatal(e) =>
logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e)
failedToGetProcessState(latestVersionId)
}
val state = deploymentManager
.getProcessState(
processIdWithName,
lastStateAction,
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
)
.recover { case NonFatal(e) =>
logger.warn(s"Failed to get status of ${processIdWithName.name}: ${e.getMessage}", e)
failedToGetProcessState(latestVersionId)
}

scenarioStateTimeout
.map { timeout =>
Expand Down

0 comments on commit 84c28ec

Please sign in to comment.