From 9bdd237d0322df8853f1b9e6ae658f77b9175237 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 26 Feb 2024 17:53:11 +0800 Subject: [PATCH] [FLINK-34516] Move CheckpointingMode to flink-core --- .../core/execution/CheckpointingMode.java | 77 +++++++++++++++++++ .../streaming/api/CheckpointingMode.java | 4 + .../api/environment/CheckpointConfig.java | 23 ++++++ .../ExecutionCheckpointingOptions.java | 12 +++ ...CheckpointConfigFromConfigurationTest.java | 26 +++++++ 5 files changed, 142 insertions(+) create mode 100644 flink-core/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java new file mode 100644 index 00000000000000..caaafc0d646847 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/execution/CheckpointingMode.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.execution; + +import org.apache.flink.annotation.Public; + +/** + * The checkpointing mode defines what consistency guarantees the system gives in the presence of + * failures. + * + *

When checkpointing is activated, the data streams are replayed such that lost parts of the + * processing are repeated. For stateful operations and functions, the checkpointing mode defines + * whether the system draws checkpoints such that a recovery behaves as if the operators/functions + * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn in a + * simpler fashion that typically encounters some duplicates upon recovery ({@link #AT_LEAST_ONCE}) + */ +@Public +public enum CheckpointingMode { + + /** + * Sets the checkpointing mode to "exactly once". This mode means that the system will + * checkpoint the operator and user function state in such a way that, upon recovery, every + * record will be reflected exactly once in the operator state. + * + *

For example, if a user function counts the number of elements in a stream, this number + * will consistently be equal to the number of actual elements in the stream, regardless of + * failures and recovery. + * + *

Note that this does not mean that each record flows through the streaming data flow only + * once. It means that upon recovery, the state of operators/functions is restored such that the + * resumed data streams pick up exactly at after the last modification to the state. + * + *

Note that this mode does not guarantee exactly-once behavior in the interaction with + * external systems (only state in Flink's operators and user functions). The reason for that is + * that a certain level of "collaboration" is required between two systems to achieve + * exactly-once guarantees. However, for certain systems, connectors can be written that + * facilitate this collaboration. + * + *

This mode sustains high throughput. Depending on the data flow graph and operations, this + * mode may increase the record latency, because operators need to align their input streams, in + * order to create a consistent snapshot point. The latency increase for simple dataflows (no + * repartitioning) is negligible. For simple dataflows with repartitioning, the average latency + * remains small, but the slowest records typically have an increased latency. + */ + EXACTLY_ONCE, + + /** + * Sets the checkpointing mode to "at least once". This mode means that the system will + * checkpoint the operator and user function state in a simpler way. Upon failure and recovery, + * some records may be reflected multiple times in the operator state. + * + *

For example, if a user function counts the number of elements in a stream, this number + * will equal to, or larger, than the actual number of elements in the stream, in the presence + * of failure and recovery. + * + *

This mode has minimal impact on latency and may be preferable in very-low latency + * scenarios, where a sustained very-low latency (such as few milliseconds) is needed, and where + * occasional duplicate messages (on recovery) do not matter. + */ + AT_LEAST_ONCE +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java index f26668154edc4b..0ff0ce89c7623f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java @@ -29,8 +29,12 @@ * whether the system draws checkpoints such that a recovery behaves as if the operators/functions * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn in a * simpler fashion that typically encounters some duplicates upon recovery ({@link #AT_LEAST_ONCE}) + * + * @deprecated This class has been moved to {@link + * org.apache.flink.core.execution.CheckpointingMode}. */ @Public +@Deprecated public enum CheckpointingMode { /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 6899fd701a9678..778a58e4103e43 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -178,7 +178,9 @@ public boolean isCheckpointingEnabled() { * Gets the checkpointing mode (exactly-once vs. at-least-once). * * @return The checkpointing mode. + * @deprecated Use {@link #getCheckpointMode} instead. */ + @Deprecated public CheckpointingMode getCheckpointingMode() { return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE); } @@ -187,11 +189,32 @@ public CheckpointingMode getCheckpointingMode() { * Sets the checkpointing mode (exactly-once vs. at-least-once). * * @param checkpointingMode The checkpointing mode. + * @deprecated Use {@link #setCheckpointMode} instead. */ + @Deprecated public void setCheckpointingMode(CheckpointingMode checkpointingMode) { configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, checkpointingMode); } + /** + * Gets the checkpointing mode (exactly-once vs. at-least-once). + * + * @return The checkpointing mode. + */ + public org.apache.flink.core.execution.CheckpointingMode getCheckpointMode() { + return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2); + } + + /** + * Sets the checkpointing mode (exactly-once vs. at-least-once). + * + * @param checkpointingMode The checkpointing mode. + */ + public void setCheckpointMode( + org.apache.flink.core.execution.CheckpointingMode checkpointingMode) { + configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2, checkpointingMode); + } + /** * Gets the interval in which checkpoints are periodically scheduled. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 14e9ae52c9556c..87b6414a745298 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -40,12 +40,24 @@ */ @PublicEvolving public class ExecutionCheckpointingOptions { + + @Deprecated + @Documentation.ExcludeFromDocumentation("Hidden for deprecatd.") public static final ConfigOption CHECKPOINTING_MODE = ConfigOptions.key("execution.checkpointing.mode") .enumType(CheckpointingMode.class) .defaultValue(CheckpointingMode.EXACTLY_ONCE) .withDescription("The checkpointing mode (exactly-once vs. at-least-once)."); + public static final ConfigOption + CHECKPOINTING_MODE_V2 = + ConfigOptions.key("execution.checkpointing.mode") + .enumType(org.apache.flink.core.execution.CheckpointingMode.class) + .defaultValue( + org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE) + .withDescription( + "The checkpointing mode (exactly-once vs. at-least-once)."); + public static final ConfigOption CHECKPOINTING_TIMEOUT = ConfigOptions.key("execution.checkpointing.timeout") .durationType() diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java index b6375b254acf18..655a6b1b4f3f05 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java @@ -48,6 +48,32 @@ private static Stream> specs() { .viaSetter(CheckpointConfig::setCheckpointingMode) .getterVia(CheckpointConfig::getCheckpointingMode) .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE) + .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") + .viaSetter(CheckpointConfig::setCheckpointMode) + .getterVia(CheckpointConfig::getCheckpointMode) + .nonDefaultValue( + org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE) + .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") + .viaSetter( + (config, v) -> { + config.setCheckpointingMode( + CheckpointingMode.valueOf(v.name())); + }) + .getterVia(CheckpointConfig::getCheckpointMode) + .nonDefaultValue( + org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE), + TestSpec.testValue(CheckpointingMode.AT_LEAST_ONCE) + .whenSetFromFile("execution.checkpointing.mode", "AT_LEAST_ONCE") + .viaSetter( + (config, v) -> { + config.setCheckpointMode( + org.apache.flink.core.execution.CheckpointingMode + .valueOf(v.name())); + }) + .getterVia(CheckpointConfig::getCheckpointingMode) + .nonDefaultValue(CheckpointingMode.AT_LEAST_ONCE), TestSpec.testValue(10000L) .whenSetFromFile("execution.checkpointing.interval", "10 s") .viaSetter(CheckpointConfig::setCheckpointInterval)