Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch status of all scenarios in DM in a single operation instead of separately for each scenario. #7295

Merged
merged 22 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import pl.touk.nussknacker.engine.api.deployment.inconsistency.InconsistentState
import pl.touk.nussknacker.engine.api.process.{ProcessIdWithName, ProcessName, VersionId}
import pl.touk.nussknacker.engine.deployment.CustomActionDefinition
import pl.touk.nussknacker.engine.newdeployment
import pl.touk.nussknacker.engine.util.WithDataFreshnessStatusUtils.WithDataFreshnessStatusOps

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits._
Expand Down Expand Up @@ -46,6 +47,8 @@ trait DeploymentManager extends AutoCloseable {

def deploymentSynchronisationSupport: DeploymentSynchronisationSupport

def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport

def processCommand[Result](command: DMScenarioCommand[Result]): Future[Result]

final def getProcessState(
Expand All @@ -66,7 +69,7 @@ trait DeploymentManager extends AutoCloseable {
latestVersionId,
deployedVersionId,
currentlyPresentedVersionId,
).map(state => statusDetailsWithFreshness.map(_ => state))
).map(statusDetailsWithFreshness.withValue)
} yield stateWithFreshness
}

Expand Down Expand Up @@ -109,6 +112,18 @@ trait ManagerSpecificScenarioActivitiesStoredByManager { self: DeploymentManager

}

sealed trait StateQueryForAllScenariosSupport

trait StateQueryForAllScenariosSupported extends StateQueryForAllScenariosSupport {

def getAllProcessesStates()(
implicit freshnessPolicy: DataFreshnessPolicy
): Future[WithDataFreshnessStatus[Map[ProcessName, List[StatusDetails]]]]

}

case object NoStateQueryForAllScenariosSupport extends StateQueryForAllScenariosSupport

sealed trait DeploymentSynchronisationSupport

trait DeploymentSynchronisationSupported extends DeploymentSynchronisationSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import scala.concurrent.duration._
class CachingProcessStateDeploymentManager(
delegate: DeploymentManager,
cacheTTL: FiniteDuration,
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport
override val deploymentSynchronisationSupport: DeploymentSynchronisationSupport,
override val stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport,
) extends DeploymentManager {

private val cache: AsyncCache[ProcessName, List[StatusDetails]] = Caffeine
Expand Down Expand Up @@ -81,7 +82,12 @@ object CachingProcessStateDeploymentManager extends LazyLogging {
scenarioStateCacheTTL
.map { cacheTTL =>
logger.debug(s"Wrapping DeploymentManager: $delegate with caching mechanism with TTL: $cacheTTL")
new CachingProcessStateDeploymentManager(delegate, cacheTTL, delegate.deploymentSynchronisationSupport)
new CachingProcessStateDeploymentManager(
delegate,
cacheTTL,
delegate.deploymentSynchronisationSupport,
delegate.stateQueryForAllScenariosSupport
)
}
.getOrElse {
logger.debug(s"Skipping ProcessState caching for DeploymentManager: $delegate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class DeploymentManagerStub extends BaseDeploymentManager with StubbingCommands

override def deploymentSynchronisationSupport: DeploymentSynchronisationSupport = NoDeploymentSynchronisationSupport

override def stateQueryForAllScenariosSupport: StateQueryForAllScenariosSupport = NoStateQueryForAllScenariosSupport

override def close(): Unit = {}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pl.touk.nussknacker.engine.util

import pl.touk.nussknacker.engine.api.deployment.WithDataFreshnessStatus

object WithDataFreshnessStatusUtils {

implicit class WithDataFreshnessStatusMapOps[K, V](withDataFreshnessStatus: WithDataFreshnessStatus[Map[K, V]]) {

def get(k: K): Option[WithDataFreshnessStatus[V]] = withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => Some(WithDataFreshnessStatus(value, cached))
case WithDataFreshnessStatus(None, _) => None
}

def getOrElse(k: K, orElse: V): WithDataFreshnessStatus[V] = {
withDataFreshnessStatus.map(_.get(k)) match {
case WithDataFreshnessStatus(Some(value), cached) => WithDataFreshnessStatus(value, cached)
case WithDataFreshnessStatus(None, cached) => WithDataFreshnessStatus(orElse, cached)
}
}

}

implicit class WithDataFreshnessStatusOps[A, B](scenarioActivity: WithDataFreshnessStatus[A]) {

def withValue(v: B): WithDataFreshnessStatus[B] = {
scenarioActivity.map(_ => v)
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import org.scalatest.OptionValues
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.process.ProcessName
import pl.touk.nussknacker.engine.deployment.ExternalDeploymentId
import pl.touk.nussknacker.test.PatientScalaFutures
Expand All @@ -26,8 +26,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should ask delegate for a fresh state each time") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val results = List(
cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh),
Expand All @@ -41,8 +45,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should cache state for DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val firstInvocation = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.CanBeCached)
firstInvocation.cached shouldBe false
Expand All @@ -55,8 +63,12 @@ class CachingProcessStateDeploymentManagerSpec

test("should reuse state updated by DataFreshnessPolicy.Fresh during reading with DataFreshnessPolicy.CanBeCached") {
val delegate = prepareDMReturningRandomStates
val cachingManager =
new CachingProcessStateDeploymentManager(delegate, 10 seconds, NoDeploymentSynchronisationSupport)
val cachingManager = new CachingProcessStateDeploymentManager(
delegate,
10 seconds,
NoDeploymentSynchronisationSupport,
NoStateQueryForAllScenariosSupport
)

val resultForFresh = cachingManager.getProcessStatesDeploymentIdNow(DataFreshnessPolicy.Fresh)
resultForFresh.cached shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.restmodel.scenariodetails.ScenarioWithDetails
import pl.touk.nussknacker.ui.process.repository.ScenarioWithDetailsEntity
import pl.touk.nussknacker.ui.security.api.LoggedUser

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.language.higherKinds

trait ProcessStateProvider {
Expand Down
Loading
Loading