From f7ff832554c834237779d56e1c86961d70eebdca Mon Sep 17 00:00:00 2001 From: Zakelly Date: Wed, 13 Mar 2024 18:07:21 +0800 Subject: [PATCH] [FLINK-34516] Use new CheckpointingMode in flink-core everywhere --- .../program/StreamContextEnvironmentTest.java | 2 +- .../sink/FileSinkCompactionSwitchITCase.java | 2 +- .../StreamingExecutionFileSinkITCase.java | 2 +- .../sink/writer/FileSinkMigrationITCase.java | 2 +- .../DataStreamAllroundTestJobFactory.java | 2 +- .../formats/avro/AvroBulkFormatITCase.java | 2 +- .../state/api/BootstrapTransformation.java | 2 +- .../api/StateBootstrapTransformation.java | 2 +- .../api/environment/CheckpointConfig.java | 41 +++++++------ .../ExecutionCheckpointingOptions.java | 24 ++++---- .../StreamExecutionEnvironment.java | 58 ++++++++++++++----- .../streaming/api/graph/StreamConfig.java | 2 +- .../api/graph/StreamingJobGraphGenerator.java | 4 +- .../operators/StreamingRuntimeContext.java | 10 ---- .../collect/CollectResultIterator.java | 4 +- ...CheckpointConfigFromConfigurationTest.java | 35 ++++++----- .../graph/StreamingJobGraphGeneratorTest.java | 2 +- .../streaming/graph/TranslationTest.java | 2 +- .../checkpointing/InputProcessorUtilTest.java | 2 +- .../DummyStreamExecutionEnvironment.java | 18 +++++- .../stream/FsStreamingSinkITCaseBase.scala | 4 +- .../external/sink/TestingSinkSettings.java | 2 +- .../source/TestingSourceSettings.java | 2 +- .../junit/annotations/TestSemantics.java | 2 +- .../extensions/ConnectorTestingExtension.java | 2 +- .../TestCaseInvocationContextProvider.java | 2 +- .../testsuites/SinkTestSuiteBase.java | 6 +- .../testsuites/SourceTestSuiteBase.java | 6 +- .../utils/CollectIteratorAssert.java | 2 +- .../utils/UnorderedCollectIteratorAssert.java | 2 +- .../utils/CollectIteratorAssertTest.java | 2 +- .../lib/NumberSequenceSourceITCase.java | 2 +- .../lifecycle/graph/TestJobBuilders.java | 4 +- .../checkpointing/AutoRescalingITCase.java | 4 +- .../CheckpointRestoreWithUidHashITCase.java | 2 +- .../IgnoreInFlightDataITCase.java | 4 +- .../NotifyCheckpointAbortedITCase.java | 2 +- .../checkpointing/RegionFailoverITCase.java | 2 +- ...lignedCheckpointFailureHandlingITCase.java | 2 +- .../StreamingScalabilityAndLatency.java | 2 +- .../test/recovery/LocalRecoveryITCase.java | 2 +- .../scheduling/AdaptiveSchedulerITCase.java | 2 +- .../state/ChangelogRecoveryCachingITCase.java | 6 +- .../test/state/ChangelogRescalingITCase.java | 6 +- ...ManagerWideRocksDbMemorySharingITCase.java | 2 +- .../AbstractOperatorRestoreTestBase.java | 2 +- .../operator/restore/keyed/KeyedJob.java | 2 +- .../operator/restore/unkeyed/NonKeyedJob.java | 2 +- .../test/streaming/runtime/IterateITCase.java | 5 +- 49 files changed, 170 insertions(+), 133 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java index e96f0c7dcb885d..1f382266bed214 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java @@ -25,10 +25,10 @@ import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java index ca7ebc85b70d17..953b8d6833c4aa 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder; import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner; import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobExecutionException; @@ -48,7 +49,6 @@ import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java index e3fe477573298b..c20c70491a7ba9 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java @@ -25,11 +25,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java index 90dde5462c4ef1..aca1dc06ea1451 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index f799bcb9d919a9..dd21f4d3c489d9 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -33,8 +33,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.WindowedStream; diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java index 964b464e6e8242..e12b12063247d0 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java @@ -32,8 +32,8 @@ import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java index f7556a6a6962a2..0ad5182c04e068 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.checkpoint.OperatorState; @@ -39,7 +40,6 @@ import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator; import org.apache.flink.state.api.output.partitioner.HashSelector; import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamOperator; diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java index 0e37520d94d7c6..ddc83a4adf906e 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.checkpoint.OperatorState; @@ -36,7 +37,6 @@ import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator; import org.apache.flink.state.api.output.operators.GroupReduceOperator; import org.apache.flink.state.api.runtime.MutableConfig; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.graph.StreamConfig; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index ea70b3d085c75d..12893888ca6bc9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -29,12 +29,12 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.description.InlineElement; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -72,9 +72,9 @@ public class CheckpointConfig implements java.io.Serializable { * The default checkpoint mode: exactly once. * * @deprecated This field is no longer used. Please use {@link - * ExecutionCheckpointingOptions.CHECKPOINTING_MODE} instead. + * ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE} instead. */ - public static final CheckpointingMode DEFAULT_MODE = + public static final org.apache.flink.streaming.api.CheckpointingMode DEFAULT_MODE = ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue(); /** @@ -177,10 +177,10 @@ public boolean isCheckpointingEnabled() { * Gets the checkpointing mode (exactly-once vs. at-least-once). * * @return The checkpointing mode. - * @deprecated Use {@link #getCheckpointMode} instead. + * @deprecated Use {@link #getConsistencyMode} instead. */ @Deprecated - public CheckpointingMode getCheckpointingMode() { + public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE); } @@ -188,30 +188,31 @@ public CheckpointingMode getCheckpointingMode() { * Sets the checkpointing mode (exactly-once vs. at-least-once). * * @param checkpointingMode The checkpointing mode. - * @deprecated Use {@link #setCheckpointMode} instead. + * @deprecated Use {@link #setConsistencyMode} instead. */ @Deprecated - public void setCheckpointingMode(CheckpointingMode checkpointingMode) { + public void setCheckpointingMode( + org.apache.flink.streaming.api.CheckpointingMode checkpointingMode) { configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, checkpointingMode); } /** - * Gets the checkpointing mode (exactly-once vs. at-least-once). + * Gets the checkpointing consistency mode (exactly-once vs. at-least-once). * * @return The checkpointing mode. */ - public org.apache.flink.core.execution.CheckpointingMode getCheckpointMode() { - return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2); + public CheckpointingMode getConsistencyMode() { + return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE); } /** - * Sets the checkpointing mode (exactly-once vs. at-least-once). + * Sets the checkpointing consistency mode (exactly-once vs. at-least-once). * * @param checkpointingMode The checkpointing mode. */ - public void setCheckpointMode( - org.apache.flink.core.execution.CheckpointingMode checkpointingMode) { - configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2, checkpointingMode); + public void setConsistencyMode(CheckpointingMode checkpointingMode) { + configuration.set( + ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE, checkpointingMode); } /** @@ -598,7 +599,8 @@ public boolean isExternalizedCheckpointsEnabled() { * embedded into the stream of data anymore. * *

Unaligned checkpoints can only be enabled if {@link - * ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}. + * ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link + * CheckpointingMode#EXACTLY_ONCE}. * * @param enabled Flag to indicate whether unaligned are enabled. */ @@ -616,7 +618,8 @@ public void enableUnalignedCheckpoints(boolean enabled) { * embedded into the stream of data anymore. * *

Unaligned checkpoints can only be enabled if {@link - * ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}. + * ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link + * CheckpointingMode#EXACTLY_ONCE}. */ @PublicEvolving public void enableUnalignedCheckpoints() { @@ -986,7 +989,7 @@ public InlineElement getDescription() { /** * Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link - * ExecutionCheckpointingOptions#CHECKPOINTING_MODE}. + * ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE}. * *

It will change the value of a setting only if a corresponding option was set in the {@code * configuration}. If a key is not present, the current value of a field will remain untouched. @@ -995,8 +998,8 @@ public InlineElement getDescription() { */ public void configure(ReadableConfig configuration) { configuration - .getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_MODE) - .ifPresent(this::setCheckpointingMode); + .getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE) + .ifPresent(this::setConsistencyMode); configuration .getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL) .ifPresent(i -> this.setCheckpointInterval(i.toMillis())); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index c728d1dca94542..65d31ee5959464 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -27,7 +27,7 @@ import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import java.time.Duration; @@ -44,18 +44,20 @@ public class ExecutionCheckpointingOptions { @Deprecated @Documentation.ExcludeFromDocumentation("Hidden for deprecatd.") - public static final ConfigOption CHECKPOINTING_MODE = - ConfigOptions.key("execution.checkpointing.mode") - .enumType(CheckpointingMode.class) - .defaultValue(CheckpointingMode.EXACTLY_ONCE) - .withDescription("The checkpointing mode (exactly-once vs. at-least-once)."); + public static final ConfigOption + CHECKPOINTING_MODE = + ConfigOptions.key("execution.checkpointing.mode") + .enumType(org.apache.flink.streaming.api.CheckpointingMode.class) + .defaultValue( + org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE) + .withDescription( + "The checkpointing mode (exactly-once vs. at-least-once)."); public static final ConfigOption - CHECKPOINTING_MODE_V2 = + CHECKPOINTING_CONSISTENCY_MODE = ConfigOptions.key("execution.checkpointing.mode") - .enumType(org.apache.flink.core.execution.CheckpointingMode.class) - .defaultValue( - org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE) + .enumType(CheckpointingMode.class) + .defaultValue(CheckpointingMode.EXACTLY_ONCE) .withDescription( "The checkpointing mode (exactly-once vs. at-least-once)."); @@ -201,7 +203,7 @@ public class ExecutionCheckpointingOptions { .linebreak() .text( "Unaligned checkpoints can only be enabled if %s is %s and if %s is 1", - TextElement.code(CHECKPOINTING_MODE.key()), + TextElement.code(CHECKPOINTING_CONSISTENCY_MODE.key()), TextElement.code( CheckpointingMode.EXACTLY_ONCE.toString()), TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key())) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index e07eca51ca6557..a63b1db4a20d06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -66,6 +66,7 @@ import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.core.execution.CacheSupportedPipelineExecutor; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; import org.apache.flink.core.execution.JobClient; @@ -78,7 +79,6 @@ import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -518,9 +518,7 @@ public CheckpointConfig getCheckpointConfig() { * the configured state backend. * *

NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. - * For that reason, iterative jobs will not be started if used with enabled checkpointing. To - * override this mechanism, use the {@link #enableCheckpointing(long, CheckpointingMode, - * boolean)} method. + * For that reason, iterative jobs will not be started if used with enabled checkpointing. * * @param interval Time interval between state checkpoints in milliseconds. */ @@ -529,6 +527,31 @@ public StreamExecutionEnvironment enableCheckpointing(long interval) { return this; } + /** + * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow + * will be periodically snapshotted. In case of a failure, the streaming dataflow will be + * restarted from the latest completed checkpoint. + * + *

The job draws checkpoints periodically, in the given interval. The system uses the given + * {@link org.apache.flink.streaming.api.CheckpointingMode} for the checkpointing ("exactly + * once" vs "at least once"). The state will be stored in the configured state backend. + * + *

NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. + * For that reason, iterative jobs will not be started if used with enabled checkpointing. + * + * @param interval Time interval between state checkpoints in milliseconds. + * @param mode The checkpointing mode, selecting between "exactly once" and "at least once" + * guaranteed. + * @deprecated use {@link #enableCheckpointing(long, CheckpointingMode)} instead. + */ + @Deprecated + public StreamExecutionEnvironment enableCheckpointing( + long interval, org.apache.flink.streaming.api.CheckpointingMode mode) { + checkpointCfg.setCheckpointingMode(mode); + checkpointCfg.setCheckpointInterval(interval); + return this; + } + /** * Enables checkpointing for the streaming job. The distributed state of the streaming dataflow * will be periodically snapshotted. In case of a failure, the streaming dataflow will be @@ -539,16 +562,14 @@ public StreamExecutionEnvironment enableCheckpointing(long interval) { * state will be stored in the configured state backend. * *

NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. - * For that reason, iterative jobs will not be started if used with enabled checkpointing. To - * override this mechanism, use the {@link #enableCheckpointing(long, CheckpointingMode, - * boolean)} method. + * For that reason, iterative jobs will not be started if used with enabled checkpointing. * * @param interval Time interval between state checkpoints in milliseconds. * @param mode The checkpointing mode, selecting between "exactly once" and "at least once" * guaranteed. */ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) { - checkpointCfg.setCheckpointingMode(mode); + checkpointCfg.setConsistencyMode(mode); checkpointCfg.setCheckpointInterval(interval); return this; } @@ -575,7 +596,7 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi @SuppressWarnings("deprecation") @PublicEvolving public StreamExecutionEnvironment enableCheckpointing( - long interval, CheckpointingMode mode, boolean force) { + long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) { checkpointCfg.setCheckpointingMode(mode); checkpointCfg.setCheckpointInterval(interval); checkpointCfg.setForceCheckpointing(force); @@ -592,9 +613,7 @@ public StreamExecutionEnvironment enableCheckpointing( * in the configured state backend. * *

NOTE: Checkpointing iterative streaming dataflows is not properly supported at the moment. - * For that reason, iterative jobs will not be started if used with enabled checkpointing. To - * override this mechanism, use the {@link #enableCheckpointing(long, CheckpointingMode, - * boolean)} method. + * For that reason, iterative jobs will not be started if used with enabled checkpointing. * * @deprecated Use {@link #enableCheckpointing(long)} instead. */ @@ -646,11 +665,24 @@ public boolean isForceUnalignedCheckpoints() { *

Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}. * * @return The checkpoint mode + * @deprecated Use {@link #getCheckpointingConsistencyMode()} instead. */ - public CheckpointingMode getCheckpointingMode() { + @Deprecated + public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { return checkpointCfg.getCheckpointingMode(); } + /** + * Returns the checkpointing consistency mode (exactly-once vs. at-least-once). + * + *

Shorthand for {@code getCheckpointConfig().getConsistencyMode()}. + * + * @return The checkpoint mode + */ + public CheckpointingMode getCheckpointingConsistencyMode() { + return checkpointCfg.getConsistencyMode(); + } + /** * Sets the state backend that describes how to store operator. It defines the data structures * that hold state during execution (for example hash tables, RocksDB, or other data stores). diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 9235dad183eb75..ee169cdb933a15 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index b67fcfda999218..de90764e99e21c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; @@ -63,7 +64,6 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -1394,7 +1394,7 @@ private void tryConvertPartitionerForDynamicGraph( } private CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) { - CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingMode(); + CheckpointingMode checkpointingMode = checkpointConfig.getConsistencyMode(); checkArgument( checkpointingMode == CheckpointingMode.EXACTLY_ONCE diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index bb2c96211ebfd3..75bed241a4c07a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -253,13 +252,4 @@ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( public boolean isCheckpointingEnabled() { return streamConfig.isCheckpointingEnabled(); } - - /** - * Returns the checkpointing mode. - * - * @return checkpointing mode - */ - public CheckpointingMode getCheckpointMode() { - return streamConfig.getCheckpointMode(); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java index a6dd08b8f395d5..b21b0f7d70b1bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java @@ -21,9 +21,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.util.CloseableIterator; @@ -133,7 +133,7 @@ private T nextResultFromFetcher() { private AbstractCollectResultBuffer createBuffer( TypeSerializer serializer, CheckpointConfig checkpointConfig) { if (checkpointConfig.isCheckpointingEnabled()) { - if (checkpointConfig.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE) { + if (checkpointConfig.getConsistencyMode() == CheckpointingMode.EXACTLY_ONCE) { return new CheckpointedCollectResultBuffer<>(serializer); } else { return new UncheckpointedCollectResultBuffer<>(serializer, true); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java index 655a6b1b4f3f05..8c6fae71dcb6b2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java @@ -21,9 +21,9 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; -import org.apache.flink.streaming.api.CheckpointingMode; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.params.ParameterizedTest; @@ -43,37 +43,36 @@ public class CheckpointConfigFromConfigurationTest { private static Stream> specs() { return Stream.of( - TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE) + TestSpec.testValue(org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE) .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") .viaSetter(CheckpointConfig::setCheckpointingMode) .getterVia(CheckpointConfig::getCheckpointingMode) - .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), - TestSpec.testValue(org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE) - .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") - .viaSetter(CheckpointConfig::setCheckpointMode) - .getterVia(CheckpointConfig::getCheckpointMode) .nonDefaultValue( - org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE), - TestSpec.testValue(org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE) + org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE) + .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") + .viaSetter(CheckpointConfig::setConsistencyMode) + .getterVia(CheckpointConfig::getConsistencyMode) + .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE) .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") .viaSetter( (config, v) -> { config.setCheckpointingMode( - CheckpointingMode.valueOf(v.name())); + org.apache.flink.streaming.api.CheckpointingMode + .valueOf(v.name())); }) - .getterVia(CheckpointConfig::getCheckpointMode) - .nonDefaultValue( - org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE), - TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE) + .getterVia(CheckpointConfig::getConsistencyMode) + .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE) .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") .viaSetter( (config, v) -> { - config.setCheckpointMode( - org.apache.flink.core.execution.CheckpointingMode - .valueOf(v.name())); + config.setConsistencyMode(CheckpointingMode.valueOf(v.name())); }) .getterVia(CheckpointConfig::getCheckpointingMode) - .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), + .nonDefaultValue( + org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE), TestSpec.testValue(10000L) .whenSetFromFile("execution.checkpointing.interval", "10 s") .viaSetter(CheckpointConfig::setCheckpointInterval) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 86e77a4254a363..4704cbea5c5d6e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -53,6 +53,7 @@ import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -72,7 +73,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java index 299b217ff2c030..ee3f2f1c16bc8f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.graph; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.graph.StreamConfig; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java index b3d417ecc18c05..5457bae49d665c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/InputProcessorUtilTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io.checkpointing; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java index 02faf81195f74b..0687773fc72966 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java @@ -25,8 +25,8 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -151,6 +151,13 @@ public StreamExecutionEnvironment enableCheckpointing(long interval) { "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); } + @Override + public StreamExecutionEnvironment enableCheckpointing( + long interval, org.apache.flink.streaming.api.CheckpointingMode mode) { + throw new UnsupportedOperationException( + "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); + } + @Override public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) { throw new UnsupportedOperationException( @@ -159,7 +166,7 @@ public StreamExecutionEnvironment enableCheckpointing(long interval, Checkpointi @Override public StreamExecutionEnvironment enableCheckpointing( - long interval, CheckpointingMode mode, boolean force) { + long interval, org.apache.flink.streaming.api.CheckpointingMode mode, boolean force) { throw new UnsupportedOperationException( "This is a dummy StreamExecutionEnvironment, enableCheckpointing method is unsupported."); } @@ -181,10 +188,15 @@ public boolean isForceCheckpointing() { } @Override - public CheckpointingMode getCheckpointingMode() { + public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() { return realExecEnv.getCheckpointingMode(); } + @Override + public CheckpointingMode getCheckpointingConsistencyMode() { + return realExecEnv.getCheckpointingConsistencyMode(); + } + @Override public StreamExecutionEnvironment setStateBackend(StateBackend backend) { throw new UnsupportedOperationException( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala index d772d8ace77fc1..9fb0cb11c678a8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala @@ -21,7 +21,7 @@ import org.apache.flink.api.common.state.CheckpointListener import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.connector.file.table.FileSystemConnectorOptions._ -import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.core.execution.CheckpointingMode import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.watermark.Watermark @@ -72,7 +72,7 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { env.setParallelism(1) env.enableCheckpointing(100) - env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) + env.getCheckpointConfig.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE) } def additionalProperties(): Array[String] = Array() diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java index 1d0b1ae50eca99..2494b814aa268b 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/sink/TestingSinkSettings.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.testframe.external.sink; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java index 72b18b77ee0e99..2d1626bc6194d7 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/external/source/TestingSourceSettings.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.testframe.external.source; import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java index 6e067ec3e0b056..291db078403101 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java @@ -27,7 +27,7 @@ /** * Marks the field in test class defining supported semantic: {@link - * org.apache.flink.streaming.api.CheckpointingMode}. + * org.apache.flink.core.execution.CheckpointingMode}. * *

Only one field can be annotated in test class. */ diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java index beb945d7503eea..7e1e1a8c06ca17 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java @@ -26,7 +26,7 @@ import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java index b320928c9f6142..b827d005f13c66 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java @@ -23,7 +23,7 @@ import org.apache.flink.connector.testframe.environment.TestEnvironment; import org.apache.flink.connector.testframe.external.ExternalContext; import org.apache.flink.connector.testframe.external.ExternalContextFactory; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import org.junit.jupiter.api.extension.AfterTestExecutionCallback; import org.junit.jupiter.api.extension.Extension; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java index 2fe57682f2cba0..a92d9b67309c10 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java @@ -40,11 +40,11 @@ import org.apache.flink.connector.testframe.source.FromElementsSource; import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; import org.apache.flink.connector.testframe.utils.MetricQuerier; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.rest.RestClient; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -80,13 +80,13 @@ import java.util.stream.Collectors; import static org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails; +import static org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE; +import static org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; -import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; -import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; import static org.apache.flink.util.Preconditions.checkNotNull; /** diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java index 7f5f0d3013a120..c2cbb778081bd9 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java @@ -38,11 +38,11 @@ import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; import org.apache.flink.connector.testframe.utils.MetricQuerier; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.rest.RestClient; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -309,7 +309,7 @@ private void restartFromSavepoint( // Step 3: Build and execute Flink job final StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions); - execEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE); execEnv.enableCheckpointing(50); execEnv.setRestartStrategy(RestartStrategies.noRestart()); DataStreamSource source = @@ -358,7 +358,7 @@ private void restartFromSavepoint( final StreamExecutionEnvironment restartEnv = testEnv.createExecutionEnvironment(restartEnvOptions); restartEnv.enableCheckpointing(500); - restartEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + restartEnv.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE); DataStreamSource restartSource = restartEnv diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java index 5a291bd95d926b..1ae093ebe816ef 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.testframe.utils; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import org.assertj.core.api.AbstractAssert; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java index de5b749396cbad..8fb4a3804a8635 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.testframe.utils; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import org.assertj.core.api.AbstractAssert; diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java index 7503e8e42a8c06..c1dbd7bc9ce6f0 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.testframe.utils; -import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.core.execution.CheckpointingMode; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Nested; diff --git a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java index 1fb817c872cb04..e5c608a0e0e99f 100644 --- a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java @@ -20,8 +20,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java index ae7fd0cbb84299..1fa69d7be65e70 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestJobBuilders.java @@ -24,10 +24,10 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription; import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher; import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -267,7 +267,7 @@ private static StreamExecutionEnvironment prepareEnv( env.setParallelism(4); env.setRestartStrategy(noRestart()); env.enableCheckpointing(200); // shouldn't matter - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE); env.getConfig().setAutoWatermarkInterval(50); envConsumer.accept(env); return env; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index d5136f2678f3df..8d97cac8db26c3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -40,6 +40,7 @@ import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -49,7 +50,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -577,7 +577,7 @@ public void testCheckpointRescalingPartitionedOperatorState( private static void configureCheckpointing(CheckpointConfig config) { config.setCheckpointInterval(100); - config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + config.setConsistencyMode(CheckpointingMode.EXACTLY_ONCE); config.enableUnalignedCheckpoints(true); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java index 89ac3e5b6055a4..bef3058b0ab8a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointRestoreWithUidHashITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -32,7 +33,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResource; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java index 526943a6453b49..9ce4f5bba5c62e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java @@ -25,12 +25,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -123,7 +123,7 @@ private boolean executeIgnoreInFlightDataDuringRecovery() { env.disableOperatorChaining(); env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setConsistencyMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1); env.setRestartStrategy(fixedDelayRestart(1, 0)); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java index 8bcc80293a8824..432bdd6ff2e59d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; @@ -60,7 +61,6 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 2c729c881cc2ee..69c8a7fbbbeecb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; @@ -45,7 +46,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStreamUtils; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java index 99d9711945705e..53ae1313da51ee 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -38,7 +39,6 @@ import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend.MockSnapshotSupplier; import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.test.util.MiniClusterWithClientResource; diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index aefe8e7da0f528..9e769c99379134 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -24,8 +24,8 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java index 0e7fd3a995a211..e8f5488db731ef 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; import org.apache.flink.runtime.rest.RestClient; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java index 0cbfdd950bd793..2836ff8237101f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java index a6131a710edb90..f784d4111d11b5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -74,8 +74,8 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint; import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT; +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE; @@ -170,7 +170,7 @@ private Configuration configureJob(File cpDir) { conf.set(EXTERNALIZED_CHECKPOINT, RETAIN_ON_CANCELLATION); conf.set(DEFAULT_PARALLELISM, PARALLELISM); conf.set(ENABLE_STATE_CHANGE_LOG, true); - conf.set(CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); + conf.set(CHECKPOINTING_CONSISTENCY_MODE, CheckpointingMode.EXACTLY_ONCE); conf.set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10)); conf.set(CHECKPOINT_STORAGE, "filesystem"); conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index bacdad4a1fd3fe..ef311600b40023 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -29,11 +29,11 @@ import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.io.InputStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -84,8 +84,8 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint; import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT; +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_MODE; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_UNALIGNED; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT; import static org.apache.flink.util.Preconditions.checkArgument; @@ -215,7 +215,7 @@ private Configuration configureJob(int parallelism, File cpDir) { conf.set(EXTERNALIZED_CHECKPOINT, RETAIN_ON_CANCELLATION); conf.set(DEFAULT_PARALLELISM, parallelism); conf.set(ENABLE_STATE_CHANGE_LOG, true); - conf.set(CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); + conf.set(CHECKPOINTING_CONSISTENCY_MODE, CheckpointingMode.EXACTLY_ONCE); conf.set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10)); conf.set(CHECKPOINT_STORAGE, "filesystem"); conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java index 679546de90d4bc..f99968263f578a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java @@ -27,9 +27,9 @@ import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory; import org.apache.flink.contrib.streaming.state.RocksDBOptions; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index f31826d2bd7389..f96a4fc7a77ed6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -30,7 +31,6 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.test.util.MigrationTest; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java index 197dfdb3b3c4df..91c0f65f07c84c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java @@ -28,8 +28,8 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java index e25a2cc5d0eadf..f86df36bd99b72 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java @@ -24,8 +24,8 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 64a313ea47cc1e..085c02c1523686 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -659,7 +658,7 @@ public void testWithCheckPointing() throws Exception { createIteration(env, timeoutScale); env.enableCheckpointing( CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME, - CheckpointingMode.EXACTLY_ONCE, + org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE, false); env.execute(); @@ -672,7 +671,7 @@ public void testWithCheckPointing() throws Exception { createIteration(env, timeoutScale); env.enableCheckpointing( CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME, - CheckpointingMode.EXACTLY_ONCE, + org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE, true); env.getStreamGraph().getJobGraph();