diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index b814144ad9632..fca3c7a854833 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -52,6 +52,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; /** Builder for {@link AdaptiveScheduler}. */ public class AdaptiveSchedulerBuilder { @@ -117,6 +118,12 @@ public AdaptiveSchedulerBuilder setJobMasterConfiguration( return this; } + public AdaptiveSchedulerBuilder withConfigurationOverride( + Function<Configuration, Configuration> modifyFn) { + this.jobMasterConfiguration = modifyFn.apply(jobMasterConfiguration); + return this; + } + public AdaptiveSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) { this.userCodeLoader = userCodeLoader; return this; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 7b028991f3753..e2260d1c19c67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -1221,6 +1221,13 @@ void testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure( final AdaptiveScheduler scheduler = prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) + .withConfigurationOverride( + conf -> { + conf.set( + JobManagerOptions.RESOURCE_WAIT_TIMEOUT, + Duration.ofMillis(1)); + return conf; + }) .setJobResourceRequirements(initialJobResourceRequirements) .build(); @@ -1261,14 +1268,6 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots() startJobWithSlotsMatchingParallelism( scheduler, declarativeSlotPool, taskManagerGateway, availableSlots); - // at this point we'd ideally check that the job is stuck in WaitingForResources, but we - // can't differentiate between waiting due to the minimum requirements not being fulfilled - // and the resource timeout not being elapsed - // We just continue here, as the following tests validate that the lower bound can prevent - // a job from running: - // - #testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure() - // - #testRequirementLowerBoundIncreaseBeyondCurrentParallelismAttemptsImmediateRescale() - // unlock job by decreasing the parallelism JobResourceRequirements newJobResourceRequirements = createRequirementsWithLowerAndUpperParallelism(availableSlots, PARALLELISM); @@ -1280,7 +1279,8 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots() private static Configuration createConfigurationWithNoTimeouts() { return new Configuration() - .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)) + .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1L)) + .set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(1L)) .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L)); }