Skip to content

Commit

Permalink
Fetch only latest scenario activities for periodic DM (#7344)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgoworko authored Dec 17, 2024
1 parent 5069ac1 commit 2b7211f
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 48 deletions.
1 change: 1 addition & 0 deletions engine/flink/management/periodic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ Use `deploymentManager` with the following properties:
- `deployMaxRetries` - maximum amount of retries for failed deployment.
- `deployRetryPenalize` - an amount of time by which the next retry should be delayed.
- `jarsDir` - directory for jars storage.
- `maxFetchedPeriodicScenarioActivities` - optional, maximum number of latest ScenarioActivities that will be fetched, by default 200
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.concurrent.duration._
* @param deployInterval {@link DeploymentActor} check interval.
* @param deploymentRetry {@link DeploymentRetryConfig} for deployment failure recovery.
* @param jarsDir Directory for jars storage.
* @param maxFetchedPeriodicScenarioActivities Optional limit of number of latest periodic-related Scenario Activities that are returned by Periodic DM.
*/
case class PeriodicBatchConfig(
db: Config,
Expand All @@ -21,7 +22,8 @@ case class PeriodicBatchConfig(
deployInterval: FiniteDuration = 17 seconds,
deploymentRetry: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig,
jarsDir: String
jarsDir: String,
maxFetchedPeriodicScenarioActivities: Option[Int] = Some(200),
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object PeriodicDeploymentManager {
additionalDeploymentDataProvider,
periodicBatchConfig.deploymentRetry,
periodicBatchConfig.executionConfig,
periodicBatchConfig.maxFetchedPeriodicScenarioActivities,
processConfigEnricher,
clock,
dependencies.actionService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PeriodicProcessService(
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider,
deploymentRetryConfig: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig,
maxFetchedPeriodicScenarioActivities: Option[Int],
processConfigEnricher: ProcessConfigEnricher,
clock: Clock,
actionService: ProcessingTypeActionService,
Expand Down Expand Up @@ -99,7 +100,11 @@ class PeriodicProcessService(
retriesLeft = deployment.nextRetryAt.map(_ => deployment.retriesLeft),
)
}
} yield activities
limitedActivities = maxFetchedPeriodicScenarioActivities match {
case Some(limit) => activities.sortBy(_.date).takeRight(limit)
case None => activities
}
} yield limitedActivities

def schedule(
schedule: ScheduleProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class PeriodicDeploymentManagerTest
additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider,
deploymentRetryConfig = DeploymentRetryConfig(),
executionConfig = executionConfig,
maxFetchedPeriodicScenarioActivities = None,
processConfigEnricher = ProcessConfigEnricher.identity,
clock = Clock.systemDefaultZone(),
new ProcessingTypeActionServiceStub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,10 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Millis, Seconds, Span}
import org.testcontainers.utility.DockerImageName
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.deployment.{
DataFreshnessPolicy,
ProcessActionId,
ProcessingTypeActionServiceStub,
ScenarioActivity,
ScenarioId,
ScenarioUser,
ScenarioVersionId,
ScheduledExecutionStatus,
UserName
}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand Down Expand Up @@ -82,7 +71,8 @@ class PeriodicProcessServiceIntegrationTest

def withFixture(
deploymentRetryConfig: DeploymentRetryConfig = DeploymentRetryConfig(),
executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig()
executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig(),
maxFetchedPeriodicScenarioActivities: Option[Int] = None,
)(testCode: Fixture => Any): Unit = {
val postgresConfig = ConfigFactory.parseMap(
Map(
Expand All @@ -105,7 +95,9 @@ class PeriodicProcessServiceIntegrationTest
def runTestCodeWithDbConfig(config: Config) = {
val (db: jdbc.JdbcBackend.DatabaseDef, dbProfile: JdbcProfile) = DbInitializer.init(config)
try {
testCode(new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig))
testCode(
new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig, maxFetchedPeriodicScenarioActivities)
)
} finally {
db.close()
}
Expand All @@ -120,7 +112,8 @@ class PeriodicProcessServiceIntegrationTest
db: JdbcBackend.DatabaseDef,
dbProfile: JdbcProfile,
deploymentRetryConfig: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig
executionConfig: PeriodicExecutionConfig,
maxFetchedPeriodicScenarioActivities: Option[Int],
) {
val delegateDeploymentManagerStub = new DeploymentManagerStub
val jarManagerStub = new JarManagerStub
Expand All @@ -144,6 +137,7 @@ class PeriodicProcessServiceIntegrationTest
additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider,
deploymentRetryConfig = deploymentRetryConfig,
executionConfig = executionConfig,
maxFetchedPeriodicScenarioActivities = maxFetchedPeriodicScenarioActivities,
processConfigEnricher = ProcessConfigEnricher.identity,
clock = fixedClock(currentTime),
new ProcessingTypeActionServiceStub,
Expand Down Expand Up @@ -451,6 +445,66 @@ class PeriodicProcessServiceIntegrationTest
}

it should "handle multiple one time schedules" in withFixture() { f =>
handleMultipleOneTimeSchedules(f)
def service = f.periodicProcessService(startTime)
val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule1",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = secondActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = secondActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = secondActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = secondActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

it should "handle multiple one time schedules and return only latest activities" in withFixture(
maxFetchedPeriodicScenarioActivities = Some(1)
) { f =>
handleMultipleOneTimeSchedules(f)
def service = f.periodicProcessService(startTime)
val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

private def handleMultipleOneTimeSchedules(f: Fixture) = {
var currentTime = startTime
def service = f.periodicProcessService(currentTime)
val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS)
Expand Down Expand Up @@ -549,37 +603,6 @@ class PeriodicProcessServiceIntegrationTest
inactiveStates.latestDeploymentForSchedule(schedule1).state.status shouldBe PeriodicProcessDeploymentStatus.Finished
inactiveStates.latestDeploymentForSchedule(schedule2).state.status shouldBe PeriodicProcessDeploymentStatus.Finished

val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule1",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = secondActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = secondActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = secondActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = secondActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

it should "handle failed event handler" in withFixture() { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class PeriodicProcessServiceTest
},
DeploymentRetryConfig(),
PeriodicExecutionConfig(),
maxFetchedPeriodicScenarioActivities = None,
new ProcessConfigEnricher {

override def onInitialSchedule(
Expand Down

0 comments on commit 2b7211f

Please sign in to comment.