diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java index 4294ff683826d..a4f0536234868 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java @@ -267,17 +267,28 @@ private JobGraph createJobGraphWithKeyedState( env.addSource( new NotifyingDefiniteKeySource( numberKeys, numberElements, failAfterEmission) { + + String lastCheckpointPath = null; + + /** + * This wait method waits at least two checkpoint finished to + * make sure the latest checkpoint contains all the source data. + */ @Override - public void waitCheckpointCompleted() throws Exception { + public boolean waitCheckpointCompleted() throws Exception { Optional mostRecentCompletedCheckpointPath = getLatestCompletedCheckpointPath( jobID.get(), miniClusterRef.get()); - while (!mostRecentCompletedCheckpointPath.isPresent()) { - Thread.sleep(50); - mostRecentCompletedCheckpointPath = - getLatestCompletedCheckpointPath( - jobID.get(), miniClusterRef.get()); + if (mostRecentCompletedCheckpointPath.isPresent()) { + if (lastCheckpointPath == null) { + lastCheckpointPath = + mostRecentCompletedCheckpointPath.get(); + } else if (!lastCheckpointPath.equals( + mostRecentCompletedCheckpointPath.get())) { + return true; + } } + return false; } }) .keyBy( @@ -315,7 +326,9 @@ public NotifyingDefiniteKeySource( this.failAfterEmission = failAfterEmission; } - public void waitCheckpointCompleted() throws Exception {} + public boolean waitCheckpointCompleted() throws Exception { + return true; + } @Override public void run(SourceContext ctx) throws Exception { @@ -334,7 +347,18 @@ public void run(SourceContext ctx) throws Exception { counter++; } } else { - waitCheckpointCompleted(); + boolean newCheckpoint = false; + long waited = 0L; + // maximum wait 5min + while (!newCheckpoint && waited < 30000L) { + synchronized (ctx.getCheckpointLock()) { + newCheckpoint = waitCheckpointCompleted(); + } + if (!newCheckpoint) { + waited += 10L; + Thread.sleep(10L); + } + } if (failAfterEmission) { throw new FlinkRuntimeException( "Make job fail artificially, to retain completed checkpoint.");