diff --git a/engine/flink/management/periodic/README.md b/engine/flink/management/periodic/README.md index 9c0c0cf09bf..caa2148a016 100644 --- a/engine/flink/management/periodic/README.md +++ b/engine/flink/management/periodic/README.md @@ -31,3 +31,4 @@ Use `deploymentManager` with the following properties: - `deployMaxRetries` - maximum amount of retries for failed deployment. - `deployRetryPenalize` - an amount of time by which the next retry should be delayed. - `jarsDir` - directory for jars storage. +- `maxFetchedPeriodicScenarioActivities` - optional, maximum number of latest ScenarioActivities that will be fetched, by default 200 diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicBatchConfig.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicBatchConfig.scala index 3f79c3443fe..61b8f9bb1ac 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicBatchConfig.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicBatchConfig.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration._ * @param deployInterval {@link DeploymentActor} check interval. * @param deploymentRetry {@link DeploymentRetryConfig} for deployment failure recovery. * @param jarsDir Directory for jars storage. + * @param maxFetchedPeriodicScenarioActivities Optional limit of number of latest periodic-related Scenario Activities that are returned by Periodic DM. */ case class PeriodicBatchConfig( db: Config, @@ -21,7 +22,8 @@ case class PeriodicBatchConfig( deployInterval: FiniteDuration = 17 seconds, deploymentRetry: DeploymentRetryConfig, executionConfig: PeriodicExecutionConfig, - jarsDir: String + jarsDir: String, + maxFetchedPeriodicScenarioActivities: Option[Int] = Some(200), ) /** diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index 42b87ba0352..9ad4fdf8342 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -60,6 +60,7 @@ object PeriodicDeploymentManager { additionalDeploymentDataProvider, periodicBatchConfig.deploymentRetry, periodicBatchConfig.executionConfig, + periodicBatchConfig.maxFetchedPeriodicScenarioActivities, processConfigEnricher, clock, dependencies.actionService, diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala index 6fca6e85dbd..6e48530d7c6 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessService.scala @@ -48,6 +48,7 @@ class PeriodicProcessService( additionalDeploymentDataProvider: AdditionalDeploymentDataProvider, deploymentRetryConfig: DeploymentRetryConfig, executionConfig: PeriodicExecutionConfig, + maxFetchedPeriodicScenarioActivities: Option[Int], processConfigEnricher: ProcessConfigEnricher, clock: Clock, actionService: ProcessingTypeActionService, @@ -99,7 +100,11 @@ class PeriodicProcessService( retriesLeft = deployment.nextRetryAt.map(_ => deployment.retriesLeft), ) } - } yield activities + limitedActivities = maxFetchedPeriodicScenarioActivities match { + case Some(limit) => activities.sortBy(_.date).takeRight(limit) + case None => activities + } + } yield limitedActivities def schedule( schedule: ScheduleProperty, diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala index bae299e53fd..d4310d1b5ec 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManagerTest.scala @@ -72,6 +72,7 @@ class PeriodicDeploymentManagerTest additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider, deploymentRetryConfig = DeploymentRetryConfig(), executionConfig = executionConfig, + maxFetchedPeriodicScenarioActivities = None, processConfigEnricher = ProcessConfigEnricher.identity, clock = Clock.systemDefaultZone(), new ProcessingTypeActionServiceStub, diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala index e8b05eb53c8..e7807319126 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceIntegrationTest.scala @@ -13,21 +13,10 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.exceptions.TestFailedException import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{Millis, Seconds, Span} import org.testcontainers.utility.DockerImageName +import pl.touk.nussknacker.engine.api.deployment._ import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus -import pl.touk.nussknacker.engine.api.deployment.{ - DataFreshnessPolicy, - ProcessActionId, - ProcessingTypeActionServiceStub, - ScenarioActivity, - ScenarioId, - ScenarioUser, - ScenarioVersionId, - ScheduledExecutionStatus, - UserName -} import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName} import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData} import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess @@ -82,7 +71,8 @@ class PeriodicProcessServiceIntegrationTest def withFixture( deploymentRetryConfig: DeploymentRetryConfig = DeploymentRetryConfig(), - executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig() + executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig(), + maxFetchedPeriodicScenarioActivities: Option[Int] = None, )(testCode: Fixture => Any): Unit = { val postgresConfig = ConfigFactory.parseMap( Map( @@ -105,7 +95,9 @@ class PeriodicProcessServiceIntegrationTest def runTestCodeWithDbConfig(config: Config) = { val (db: jdbc.JdbcBackend.DatabaseDef, dbProfile: JdbcProfile) = DbInitializer.init(config) try { - testCode(new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig)) + testCode( + new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig, maxFetchedPeriodicScenarioActivities) + ) } finally { db.close() } @@ -120,7 +112,8 @@ class PeriodicProcessServiceIntegrationTest db: JdbcBackend.DatabaseDef, dbProfile: JdbcProfile, deploymentRetryConfig: DeploymentRetryConfig, - executionConfig: PeriodicExecutionConfig + executionConfig: PeriodicExecutionConfig, + maxFetchedPeriodicScenarioActivities: Option[Int], ) { val delegateDeploymentManagerStub = new DeploymentManagerStub val jarManagerStub = new JarManagerStub @@ -144,6 +137,7 @@ class PeriodicProcessServiceIntegrationTest additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider, deploymentRetryConfig = deploymentRetryConfig, executionConfig = executionConfig, + maxFetchedPeriodicScenarioActivities = maxFetchedPeriodicScenarioActivities, processConfigEnricher = ProcessConfigEnricher.identity, clock = fixedClock(currentTime), new ProcessingTypeActionServiceStub, @@ -451,6 +445,66 @@ class PeriodicProcessServiceIntegrationTest } it should "handle multiple one time schedules" in withFixture() { f => + handleMultipleOneTimeSchedules(f) + def service = f.periodicProcessService(startTime) + val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue + val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + activities shouldBe List( + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = firstActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = firstActivity.date, + scenarioVersionId = Some(ScenarioVersionId(1)), + dateFinished = firstActivity.dateFinished, + scheduleName = "schedule1", + scheduledExecutionStatus = ScheduledExecutionStatus.Finished, + createdAt = firstActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ), + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = secondActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = secondActivity.date, + scenarioVersionId = Some(ScenarioVersionId(1)), + dateFinished = secondActivity.dateFinished, + scheduleName = "schedule2", + scheduledExecutionStatus = ScheduledExecutionStatus.Finished, + createdAt = secondActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ), + ) + } + + it should "handle multiple one time schedules and return only latest activities" in withFixture( + maxFetchedPeriodicScenarioActivities = Some(1) + ) { f => + handleMultipleOneTimeSchedules(f) + def service = f.periodicProcessService(startTime) + val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue + val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] + activities shouldBe List( + ScenarioActivity.PerformedScheduledExecution( + scenarioId = ScenarioId(1), + scenarioActivityId = firstActivity.scenarioActivityId, + user = ScenarioUser(None, UserName("Nussknacker"), None, None), + date = firstActivity.date, + scenarioVersionId = Some(ScenarioVersionId(1)), + dateFinished = firstActivity.dateFinished, + scheduleName = "schedule2", + scheduledExecutionStatus = ScheduledExecutionStatus.Finished, + createdAt = firstActivity.createdAt, + retriesLeft = None, + nextRetryAt = None + ), + ) + } + + private def handleMultipleOneTimeSchedules(f: Fixture) = { var currentTime = startTime def service = f.periodicProcessService(currentTime) val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS) @@ -549,37 +603,6 @@ class PeriodicProcessServiceIntegrationTest inactiveStates.latestDeploymentForSchedule(schedule1).state.status shouldBe PeriodicProcessDeploymentStatus.Finished inactiveStates.latestDeploymentForSchedule(schedule2).state.status shouldBe PeriodicProcessDeploymentStatus.Finished - val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue - val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution] - val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution] - activities shouldBe List( - ScenarioActivity.PerformedScheduledExecution( - scenarioId = ScenarioId(1), - scenarioActivityId = firstActivity.scenarioActivityId, - user = ScenarioUser(None, UserName("Nussknacker"), None, None), - date = firstActivity.date, - scenarioVersionId = Some(ScenarioVersionId(1)), - dateFinished = firstActivity.dateFinished, - scheduleName = "schedule1", - scheduledExecutionStatus = ScheduledExecutionStatus.Finished, - createdAt = firstActivity.createdAt, - retriesLeft = None, - nextRetryAt = None - ), - ScenarioActivity.PerformedScheduledExecution( - scenarioId = ScenarioId(1), - scenarioActivityId = secondActivity.scenarioActivityId, - user = ScenarioUser(None, UserName("Nussknacker"), None, None), - date = secondActivity.date, - scenarioVersionId = Some(ScenarioVersionId(1)), - dateFinished = secondActivity.dateFinished, - scheduleName = "schedule2", - scheduledExecutionStatus = ScheduledExecutionStatus.Finished, - createdAt = secondActivity.createdAt, - retriesLeft = None, - nextRetryAt = None - ), - ) } it should "handle failed event handler" in withFixture() { f => diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala index d35b65a0593..9b6569f972f 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicProcessServiceTest.scala @@ -93,6 +93,7 @@ class PeriodicProcessServiceTest }, DeploymentRetryConfig(), PeriodicExecutionConfig(), + maxFetchedPeriodicScenarioActivities = None, new ProcessConfigEnricher { override def onInitialSchedule(