Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch only latest scenario activities for periodic DM #7344

Merged
merged 7 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engine/flink/management/periodic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ Use `deploymentManager` with the following properties:
- `deployMaxRetries` - maximum amount of retries for failed deployment.
- `deployRetryPenalize` - an amount of time by which the next retry should be delayed.
- `jarsDir` - directory for jars storage.
- `maxFetchedPeriodicScenarioActivities` - optional, maximum number of latest ScenarioActivities that will be fetched, by default 200
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.concurrent.duration._
* @param deployInterval {@link DeploymentActor} check interval.
* @param deploymentRetry {@link DeploymentRetryConfig} for deployment failure recovery.
* @param jarsDir Directory for jars storage.
* @param maxFetchedPeriodicScenarioActivities Optional limit of number of latest periodic-related Scenario Activities that are returned by Periodic DM.
*/
case class PeriodicBatchConfig(
db: Config,
Expand All @@ -21,7 +22,8 @@ case class PeriodicBatchConfig(
deployInterval: FiniteDuration = 17 seconds,
deploymentRetry: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig,
jarsDir: String
jarsDir: String,
maxFetchedPeriodicScenarioActivities: Option[Int] = Some(200),
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object PeriodicDeploymentManager {
additionalDeploymentDataProvider,
periodicBatchConfig.deploymentRetry,
periodicBatchConfig.executionConfig,
periodicBatchConfig.maxFetchedPeriodicScenarioActivities,
processConfigEnricher,
clock,
dependencies.actionService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PeriodicProcessService(
additionalDeploymentDataProvider: AdditionalDeploymentDataProvider,
deploymentRetryConfig: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig,
maxFetchedPeriodicScenarioActivities: Option[Int],
processConfigEnricher: ProcessConfigEnricher,
clock: Clock,
actionService: ProcessingTypeActionService,
Expand Down Expand Up @@ -99,7 +100,11 @@ class PeriodicProcessService(
retriesLeft = deployment.nextRetryAt.map(_ => deployment.retriesLeft),
)
}
} yield activities
limitedActivities = maxFetchedPeriodicScenarioActivities match {
case Some(limit) => activities.sortBy(_.date).takeRight(limit)
case None => activities
}
} yield limitedActivities

def schedule(
schedule: ScheduleProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class PeriodicDeploymentManagerTest
additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider,
deploymentRetryConfig = DeploymentRetryConfig(),
executionConfig = executionConfig,
maxFetchedPeriodicScenarioActivities = None,
processConfigEnricher = ProcessConfigEnricher.identity,
clock = Clock.systemDefaultZone(),
new ProcessingTypeActionServiceStub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,10 @@ import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Millis, Seconds, Span}
import org.testcontainers.utility.DockerImageName
import pl.touk.nussknacker.engine.api.deployment._
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus.ProblemStateStatus
import pl.touk.nussknacker.engine.api.deployment.{
DataFreshnessPolicy,
ProcessActionId,
ProcessingTypeActionServiceStub,
ScenarioActivity,
ScenarioId,
ScenarioUser,
ScenarioVersionId,
ScheduledExecutionStatus,
UserName
}
import pl.touk.nussknacker.engine.api.process.{ProcessId, ProcessIdWithName, ProcessName}
import pl.touk.nussknacker.engine.api.{MetaData, ProcessVersion, StreamMetaData}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand Down Expand Up @@ -82,7 +71,8 @@ class PeriodicProcessServiceIntegrationTest

def withFixture(
deploymentRetryConfig: DeploymentRetryConfig = DeploymentRetryConfig(),
executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig()
executionConfig: PeriodicExecutionConfig = PeriodicExecutionConfig(),
maxFetchedPeriodicScenarioActivities: Option[Int] = None,
)(testCode: Fixture => Any): Unit = {
val postgresConfig = ConfigFactory.parseMap(
Map(
Expand All @@ -105,7 +95,9 @@ class PeriodicProcessServiceIntegrationTest
def runTestCodeWithDbConfig(config: Config) = {
val (db: jdbc.JdbcBackend.DatabaseDef, dbProfile: JdbcProfile) = DbInitializer.init(config)
try {
testCode(new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig))
testCode(
new Fixture(db, dbProfile, deploymentRetryConfig, executionConfig, maxFetchedPeriodicScenarioActivities)
)
} finally {
db.close()
}
Expand All @@ -120,7 +112,8 @@ class PeriodicProcessServiceIntegrationTest
db: JdbcBackend.DatabaseDef,
dbProfile: JdbcProfile,
deploymentRetryConfig: DeploymentRetryConfig,
executionConfig: PeriodicExecutionConfig
executionConfig: PeriodicExecutionConfig,
maxFetchedPeriodicScenarioActivities: Option[Int],
) {
val delegateDeploymentManagerStub = new DeploymentManagerStub
val jarManagerStub = new JarManagerStub
Expand All @@ -144,6 +137,7 @@ class PeriodicProcessServiceIntegrationTest
additionalDeploymentDataProvider = DefaultAdditionalDeploymentDataProvider,
deploymentRetryConfig = deploymentRetryConfig,
executionConfig = executionConfig,
maxFetchedPeriodicScenarioActivities = maxFetchedPeriodicScenarioActivities,
processConfigEnricher = ProcessConfigEnricher.identity,
clock = fixedClock(currentTime),
new ProcessingTypeActionServiceStub,
Expand Down Expand Up @@ -451,6 +445,66 @@ class PeriodicProcessServiceIntegrationTest
}

it should "handle multiple one time schedules" in withFixture() { f =>
handleMultipleOneTimeSchedules(f)
def service = f.periodicProcessService(startTime)
val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule1",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = secondActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = secondActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = secondActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = secondActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

it should "handle multiple one time schedules and return only latest activities" in withFixture(
maxFetchedPeriodicScenarioActivities = Some(1)
) { f =>
handleMultipleOneTimeSchedules(f)
def service = f.periodicProcessService(startTime)
val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

private def handleMultipleOneTimeSchedules(f: Fixture) = {
var currentTime = startTime
def service = f.periodicProcessService(currentTime)
val timeToTriggerSchedule1 = startTime.plus(1, ChronoUnit.HOURS)
Expand Down Expand Up @@ -549,37 +603,6 @@ class PeriodicProcessServiceIntegrationTest
inactiveStates.latestDeploymentForSchedule(schedule1).state.status shouldBe PeriodicProcessDeploymentStatus.Finished
inactiveStates.latestDeploymentForSchedule(schedule2).state.status shouldBe PeriodicProcessDeploymentStatus.Finished

val activities = service.getScenarioActivitiesSpecificToPeriodicProcess(processIdWithName, None).futureValue
val firstActivity = activities.head.asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
val secondActivity = activities(1).asInstanceOf[ScenarioActivity.PerformedScheduledExecution]
activities shouldBe List(
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = firstActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = firstActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = firstActivity.dateFinished,
scheduleName = "schedule1",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = firstActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
ScenarioActivity.PerformedScheduledExecution(
scenarioId = ScenarioId(1),
scenarioActivityId = secondActivity.scenarioActivityId,
user = ScenarioUser(None, UserName("Nussknacker"), None, None),
date = secondActivity.date,
scenarioVersionId = Some(ScenarioVersionId(1)),
dateFinished = secondActivity.dateFinished,
scheduleName = "schedule2",
scheduledExecutionStatus = ScheduledExecutionStatus.Finished,
createdAt = secondActivity.createdAt,
retriesLeft = None,
nextRetryAt = None
),
)
}

it should "handle failed event handler" in withFixture() { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class PeriodicProcessServiceTest
},
DeploymentRetryConfig(),
PeriodicExecutionConfig(),
maxFetchedPeriodicScenarioActivities = None,
new ProcessConfigEnricher {

override def onInitialSchedule(
Expand Down
Loading