Skip to content

Commit

Permalink
[FLINK-34516] Use new CheckpointingMode in flink-core everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Mar 13, 2024
1 parent 9bdd237 commit 688b83c
Show file tree
Hide file tree
Showing 49 changed files with 170 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner;
import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
Expand All @@ -48,7 +49,6 @@
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -33,7 +34,6 @@
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
Expand All @@ -39,7 +40,6 @@
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
import org.apache.flink.state.api.output.partitioner.HashSelector;
import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
Expand All @@ -36,7 +37,6 @@
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
import org.apache.flink.state.api.runtime.MutableConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -71,10 +71,10 @@ public class CheckpointConfig implements java.io.Serializable {
* The default checkpoint mode: exactly once.
*
* @deprecated This field is no longer used. Please use {@link
* ExecutionCheckpointingOptions.CHECKPOINTING_MODE} instead.
* ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE} instead.
*/
public static final CheckpointingMode DEFAULT_MODE =
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE.defaultValue();

/**
* The default timeout of a checkpoint attempt: 10 minutes.
Expand Down Expand Up @@ -178,41 +178,42 @@ public boolean isCheckpointingEnabled() {
* Gets the checkpointing mode (exactly-once vs. at-least-once).
*
* @return The checkpointing mode.
* @deprecated Use {@link #getCheckpointMode} instead.
* @deprecated Use {@link #getConsistencyMode} instead.
*/
@Deprecated
public CheckpointingMode getCheckpointingMode() {
public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE);
}

/**
* Sets the checkpointing mode (exactly-once vs. at-least-once).
*
* @param checkpointingMode The checkpointing mode.
* @deprecated Use {@link #setCheckpointMode} instead.
* @deprecated Use {@link #setConsistencyMode} instead.
*/
@Deprecated
public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
public void setCheckpointingMode(
org.apache.flink.streaming.api.CheckpointingMode checkpointingMode) {
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, checkpointingMode);
}

/**
* Gets the checkpointing mode (exactly-once vs. at-least-once).
* Gets the checkpointing consistency mode (exactly-once vs. at-least-once).
*
* @return The checkpointing mode.
*/
public org.apache.flink.core.execution.CheckpointingMode getCheckpointMode() {
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2);
public CheckpointingMode getConsistencyMode() {
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE);
}

/**
* Sets the checkpointing mode (exactly-once vs. at-least-once).
* Sets the checkpointing consistency mode (exactly-once vs. at-least-once).
*
* @param checkpointingMode The checkpointing mode.
*/
public void setCheckpointMode(
org.apache.flink.core.execution.CheckpointingMode checkpointingMode) {
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2, checkpointingMode);
public void setConsistencyMode(CheckpointingMode checkpointingMode) {
configuration.set(
ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE, checkpointingMode);
}

/**
Expand Down Expand Up @@ -599,7 +600,8 @@ public boolean isExternalizedCheckpointsEnabled() {
* embedded into the stream of data anymore.
*
* <p>Unaligned checkpoints can only be enabled if {@link
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link
* CheckpointingMode#EXACTLY_ONCE}.
*
* @param enabled Flag to indicate whether unaligned are enabled.
*/
Expand All @@ -617,7 +619,8 @@ public void enableUnalignedCheckpoints(boolean enabled) {
* embedded into the stream of data anymore.
*
* <p>Unaligned checkpoints can only be enabled if {@link
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link
* CheckpointingMode#EXACTLY_ONCE}.
*/
@PublicEvolving
public void enableUnalignedCheckpoints() {
Expand Down Expand Up @@ -988,7 +991,7 @@ public InlineElement getDescription() {

/**
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE}.
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE}.
*
* <p>It will change the value of a setting only if a corresponding option was set in the {@code
* configuration}. If a key is not present, the current value of a field will remain untouched.
Expand All @@ -997,8 +1000,8 @@ public InlineElement getDescription() {
*/
public void configure(ReadableConfig configuration) {
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_MODE)
.ifPresent(this::setCheckpointingMode);
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE)
.ifPresent(this::setConsistencyMode);
configuration
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
.ifPresent(i -> this.setCheckpointInterval(i.toMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.core.execution.CheckpointingMode;

import java.time.Duration;

Expand All @@ -43,18 +43,20 @@ public class ExecutionCheckpointingOptions {

@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecatd.")
public static final ConfigOption<CheckpointingMode> CHECKPOINTING_MODE =
ConfigOptions.key("execution.checkpointing.mode")
.enumType(CheckpointingMode.class)
.defaultValue(CheckpointingMode.EXACTLY_ONCE)
.withDescription("The checkpointing mode (exactly-once vs. at-least-once).");
public static final ConfigOption<org.apache.flink.streaming.api.CheckpointingMode>
CHECKPOINTING_MODE =
ConfigOptions.key("execution.checkpointing.mode")
.enumType(org.apache.flink.streaming.api.CheckpointingMode.class)
.defaultValue(
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE)
.withDescription(
"The checkpointing mode (exactly-once vs. at-least-once).");

public static final ConfigOption<org.apache.flink.core.execution.CheckpointingMode>
CHECKPOINTING_MODE_V2 =
CHECKPOINTING_CONSISTENCY_MODE =
ConfigOptions.key("execution.checkpointing.mode")
.enumType(org.apache.flink.core.execution.CheckpointingMode.class)
.defaultValue(
org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE)
.enumType(CheckpointingMode.class)
.defaultValue(CheckpointingMode.EXACTLY_ONCE)
.withDescription(
"The checkpointing mode (exactly-once vs. at-least-once).");

Expand Down Expand Up @@ -200,7 +202,7 @@ public class ExecutionCheckpointingOptions {
.linebreak()
.text(
"Unaligned checkpoints can only be enabled if %s is %s and if %s is 1",
TextElement.code(CHECKPOINTING_MODE.key()),
TextElement.code(CHECKPOINTING_CONSISTENCY_MODE.key()),
TextElement.code(
CheckpointingMode.EXACTLY_ONCE.toString()),
TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()))
Expand Down
Loading

0 comments on commit 688b83c

Please sign in to comment.