From 643754c9698b758dbb145f14d222981fa53d5bc4 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Thu, 22 Feb 2024 18:35:30 +0800 Subject: [PATCH] [FLINK-34454] Introduce StateRecoveryOptions in flink-core and Rename options --- docs/content.zh/docs/deployment/config.md | 5 +- docs/content/docs/deployment/config.md | 5 +- ...execution_checkpointing_configuration.html | 6 -- ...html => state_recovery_configuration.html} | 12 ++- .../flink/client/cli/CliFrontendParser.java | 4 +- .../program/StreamContextEnvironmentTest.java | 4 +- .../testjar/ForbidConfigurationJob.java | 4 +- .../CoordinatedSourceRescaleITCase.java | 4 +- .../configuration/StateRecoveryOptions.java | 100 ++++++++++++++++++ .../webmonitor/handlers/JarRunHandler.java | 8 +- .../handlers/JarRunHandlerParameterTest.java | 14 +-- .../jobgraph/SavepointConfigOptions.java | 8 +- .../jobgraph/SavepointRestoreSettings.java | 17 +-- .../runtime/minicluster/MiniCluster.java | 4 +- .../api/environment/CheckpointConfig.java | 18 ++-- .../ExecutionCheckpointingOptions.java | 9 +- .../api/graph/StreamGraphGeneratorTest.java | 6 +- .../MiniClusterTestEnvironment.java | 2 +- .../RestoreUpgradedJobITCase.java | 4 +- .../UnalignedCheckpointTestBase.java | 6 +- .../runtime/UnifiedSinkMigrationITCase.java | 4 +- 21 files changed, 179 insertions(+), 65 deletions(-) rename docs/layouts/shortcodes/generated/{savepoint_config_configuration.html => state_recovery_configuration.html} (70%) create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 702d29bd2a0130..56d9206ab06a2b 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -417,7 +417,6 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via # Execution {{< generated/deployment_configuration >}} -{{< generated/savepoint_config_configuration >}} {{< generated/execution_configuration >}} ### Pipeline @@ -428,6 +427,10 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via {{< generated/execution_checkpointing_configuration >}} +### Recovery + +{{< generated/state_recovery_configuration >}} + ---- ---- diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index 993ded6203c7c7..f7927e260b4afc 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -419,7 +419,6 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via # Execution {{< generated/deployment_configuration >}} -{{< generated/savepoint_config_configuration >}} {{< generated/execution_configuration >}} ### Pipeline @@ -430,6 +429,10 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via {{< generated/execution_checkpointing_configuration >}} +### Recovery + +{{< generated/state_recovery_configuration >}} + ---- ---- diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html index 60b45004393482..e191ab1e26d7c0 100644 --- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html @@ -56,12 +56,6 @@

Enum

The checkpointing mode (exactly-once vs. at-least-once).

Possible values: - -
execution.checkpointing.recover-without-channel-state.checkpoint-id
- -1 - Long - Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.

It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.
-
execution.checkpointing.timeout
10 min diff --git a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html b/docs/layouts/shortcodes/generated/state_recovery_configuration.html similarity index 70% rename from docs/layouts/shortcodes/generated/savepoint_config_configuration.html rename to docs/layouts/shortcodes/generated/state_recovery_configuration.html index 8b1f53e3696582..5c255b2f820ae5 100644 --- a/docs/layouts/shortcodes/generated/savepoint_config_configuration.html +++ b/docs/layouts/shortcodes/generated/state_recovery_configuration.html @@ -9,22 +9,28 @@ -
execution.savepoint-restore-mode
+
execution.state-recovery.claim-mode
NO_CLAIM

Enum

Describes the mode how Flink should restore from the given savepoint or retained checkpoint.

Possible values: -
execution.savepoint.ignore-unclaimed-state
+
execution.state-recovery.ignore-unclaimed-state
false Boolean Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered. -
execution.savepoint.path
+
execution.state-recovery.path
(none) String Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). + +
execution.state-recovery.without-channel-state.checkpoint-id
+ -1 + Long + Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.

It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.
+ diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 96b64086a7ed87..3be4598cc8a793 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -20,8 +20,8 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.RestoreMode; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; @@ -690,7 +690,7 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE), RestoreMode.class); } else { - restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue(); + restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue(); } return SavepointRestoreSettings.forPath( savepointPath, allowNonRestoredState, restoreMode); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java index f6eb3b5d7ff7eb..e96f0c7dcb885d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java @@ -24,9 +24,9 @@ import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -56,7 +56,7 @@ void testDisallowProgramConfigurationChanges( final Configuration clusterConfig = new Configuration(); clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false); clusterConfig.set(DeploymentOptions.TARGET, "local"); - clusterConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, "/flink/savepoints"); + clusterConfig.set(StateRecoveryOptions.SAVEPOINT_PATH, "/flink/savepoints"); clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING); final Configuration programConfig = new Configuration(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java index 8e7f0222e2af4c..d702fecbd1fe84 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ForbidConfigurationJob.java @@ -19,7 +19,7 @@ package org.apache.flink.client.testjar; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; @@ -33,7 +33,7 @@ public class ForbidConfigurationJob { public static void main(String[] args) throws Exception { final Configuration config = new Configuration(); - config.set(SavepointConfigOptions.SAVEPOINT_PATH, SAVEPOINT_PATH); + config.set(StateRecoveryOptions.SAVEPOINT_PATH, SAVEPOINT_PATH); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index 47c81e8622fcdd..9469e58dcfe7cd 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; @@ -113,7 +113,7 @@ private StreamExecutionEnvironment createEnv( conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb")); if (restoreCheckpoint != null) { - conf.set(SavepointConfigOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString()); + conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString()); } conf.set(TaskManagerOptions.NUM_TASK_SLOTS, p); diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java new file mode 100644 index 00000000000000..a2bbc2e354f1f9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateRecoveryOptions.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.core.execution.RestoreMode; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The {@link ConfigOption configuration options} used when restoring state from a savepoint or a + * checkpoint. + */ +@PublicEvolving +public class StateRecoveryOptions { + + /** The path to a savepoint that will be used to bootstrap the pipeline's state. */ + public static final ConfigOption SAVEPOINT_PATH = + key("execution.state-recovery.path") + .stringType() + .noDefaultValue() + .withDeprecatedKeys("execution.savepoint.path") + .withDescription( + "Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537)."); + + /** + * A flag indicating if we allow Flink to skip savepoint state that cannot be restored, e.g. + * because the corresponding operator has been removed. + */ + public static final ConfigOption SAVEPOINT_IGNORE_UNCLAIMED_STATE = + key("execution.state-recovery.ignore-unclaimed-state") + .booleanType() + .defaultValue(false) + .withDeprecatedKeys("execution.savepoint.ignore-unclaimed-state") + .withDescription( + "Allow to skip savepoint state that cannot be restored. " + + "Allow this if you removed an operator from your pipeline after the savepoint was triggered."); + /** + * Describes the mode how Flink should restore from the given savepoint or retained checkpoint. + */ + public static final ConfigOption RESTORE_MODE = + key("execution.state-recovery.claim-mode") + .enumType(RestoreMode.class) + .defaultValue(RestoreMode.DEFAULT) + .withDeprecatedKeys("execution.savepoint-restore-mode") + .withDescription( + "Describes the mode how Flink should restore from the given" + + " savepoint or retained checkpoint."); + + /** + * Access to this option is officially only supported via {@link + * org.apache.flink.runtime.jobgraph.CheckpointConfig#enableApproximateLocalRecovery(boolean)}, + * but there is no good reason behind this. + */ + @Internal @Documentation.ExcludeFromDocumentation + public static final ConfigOption APPROXIMATE_LOCAL_RECOVERY = + key("execution.state-recovery.approximate-local-recovery") + .booleanType() + .defaultValue(false) + .withDeprecatedKeys("execution.checkpointing.approximate-local-recovery") + .withDescription("Flag to enable approximate local recovery."); + + public static final ConfigOption CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = + ConfigOptions.key("execution.state-recovery.without-channel-state.checkpoint-id") + .longType() + .defaultValue(-1L) + .withDeprecatedKeys( + "execution.checkpointing.recover-without-channel-state.checkpoint-id") + .withDescription( + Description.builder() + .text( + "Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.") + .linebreak() + .linebreak() + .text( + "It is better to keep this value empty until " + + "there is explicit needs to restore from " + + "the specific checkpoint without in-flight data.") + .linebreak() + .build()); +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 723ec7cddd181d..83a7dc0f91623a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -25,9 +25,9 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.runtime.dispatcher.DispatcherGateway; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; @@ -138,7 +138,7 @@ private SavepointRestoreSettings getSavepointRestoreSettings( requestBody.getAllowNonRestoredState(), () -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class), effectiveConfiguration.get( - SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE), + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE), log); final String savepointPath = fromRequestBodyOrQueryParameter( @@ -147,14 +147,14 @@ private SavepointRestoreSettings getSavepointRestoreSettings( emptyToNull( getQueryParameter( request, SavepointPathQueryParameter.class)), - effectiveConfiguration.get(SavepointConfigOptions.SAVEPOINT_PATH), + effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH), log); final RestoreMode restoreMode = Optional.ofNullable(requestBody.getRestoreMode()) .orElseGet( () -> effectiveConfiguration.get( - SavepointConfigOptions.RESTORE_MODE)); + StateRecoveryOptions.RESTORE_MODE)); if (restoreMode.equals(RestoreMode.LEGACY)) { log.warn( "The {} restore mode is deprecated, please use {} or {} mode instead.", diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java index 3f78510f55c7c8..cb07a3f36e7f20 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java @@ -29,11 +29,11 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -87,9 +87,9 @@ class JarRunHandlerParameterTest new Configuration() .set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 120000L) .set(CoreOptions.DEFAULT_PARALLELISM, 57) - .set(SavepointConfigOptions.SAVEPOINT_PATH, "/foo/bar/test") - .set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false) - .set(SavepointConfigOptions.RESTORE_MODE, RESTORE_MODE) + .set(StateRecoveryOptions.SAVEPOINT_PATH, "/foo/bar/test") + .set(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false) + .set(StateRecoveryOptions.RESTORE_MODE, RESTORE_MODE) .set( PipelineOptions.PARALLELISM_OVERRIDES, new HashMap() { @@ -336,13 +336,13 @@ void validateGraphWithFlinkConfig(JobGraph jobGraph) { final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings(); assertThat(savepointRestoreSettings.getRestoreMode()) - .isEqualTo(FLINK_CONFIGURATION.get(SavepointConfigOptions.RESTORE_MODE)); + .isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE)); assertThat(savepointRestoreSettings.getRestorePath()) - .isEqualTo(FLINK_CONFIGURATION.get(SavepointConfigOptions.SAVEPOINT_PATH)); + .isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH)); assertThat(savepointRestoreSettings.allowNonRestoredState()) .isEqualTo( FLINK_CONFIGURATION.get( - SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)); + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)); } private void validateSavepointJarRunMessageParameters( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java index 74d6dc141f220d..7121f1d506fe38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java @@ -19,13 +19,19 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.core.execution.RestoreMode; import static org.apache.flink.configuration.ConfigOptions.key; -/** The {@link ConfigOption configuration options} used when restoring from a savepoint. */ +/** + * The {@link ConfigOption configuration options} used when restoring from a savepoint. @Deprecated + * All options are moved into {@link org.apache.flink.configuration.StateRecoveryOptions}. + */ @PublicEvolving +@Deprecated +@Documentation.ExcludeFromDocumentation("Hidden for deprecated.") public class SavepointConfigOptions { /** The path to a savepoint that will be used to bootstrap the pipeline's state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java index 2ee016cf50d49c..663313bfb870fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.RestoreMode; import javax.annotation.Nonnull; @@ -146,7 +147,7 @@ public static SavepointRestoreSettings none() { public static SavepointRestoreSettings forPath(String savepointPath) { return forPath( savepointPath, - SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue()); + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.defaultValue()); } public static SavepointRestoreSettings forPath( @@ -155,7 +156,7 @@ public static SavepointRestoreSettings forPath( return new SavepointRestoreSettings( savepointPath, allowNonRestoredState, - SavepointConfigOptions.RESTORE_MODE.defaultValue()); + StateRecoveryOptions.RESTORE_MODE.defaultValue()); } public static SavepointRestoreSettings forPath( @@ -171,21 +172,21 @@ public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { configuration.set( - SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, savepointRestoreSettings.allowNonRestoredState()); configuration.set( - SavepointConfigOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode()); + StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.getRestoreMode()); final String savepointPath = savepointRestoreSettings.getRestorePath(); if (savepointPath != null) { - configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); + configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath); } } public static SavepointRestoreSettings fromConfiguration(final ReadableConfig configuration) { - final String savepointPath = configuration.get(SavepointConfigOptions.SAVEPOINT_PATH); + final String savepointPath = configuration.get(StateRecoveryOptions.SAVEPOINT_PATH); final boolean allowNonRestored = - configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE); - final RestoreMode restoreMode = configuration.get(SavepointConfigOptions.RESTORE_MODE); + configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE); + final RestoreMode restoreMode = configuration.get(StateRecoveryOptions.RESTORE_MODE); return savepointPath == null ? SavepointRestoreSettings.none() : SavepointRestoreSettings.forPath(savepointPath, allowNonRestored, restoreMode); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index c124869438e3b8..16bca240e0118c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.core.execution.SavepointFormatType; @@ -66,7 +67,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; @@ -1095,7 +1095,7 @@ private void checkRestoreModeForChangelogStateBackend(JobGraph jobGraph) { && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) { final Configuration conf = new Configuration(); SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, conf); - conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY); + conf.set(StateRecoveryOptions.RESTORE_MODE, RestoreMode.LEGACY); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(conf)); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 6899fd701a9678..c14660207e1f7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; @@ -113,13 +114,11 @@ public class CheckpointConfig implements java.io.Serializable { * Default id of checkpoint for which in-flight data should be ignored on recovery. * * @deprecated This field is no longer used. Please use {@link - * ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA} instead. + * StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA} instead. */ @Deprecated public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = - ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA - .defaultValue() - .intValue(); + StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA.defaultValue().intValue(); // -------------------------------------------------------------------------------------------- @@ -700,7 +699,7 @@ public void setMaxSubtasksPerChannelStateFile(int maxSubtasksPerChannelStateFile */ @Experimental public boolean isApproximateLocalRecoveryEnabled() { - return configuration.get(ExecutionCheckpointingOptions.APPROXIMATE_LOCAL_RECOVERY); + return configuration.get(StateRecoveryOptions.APPROXIMATE_LOCAL_RECOVERY); } /** @@ -719,7 +718,7 @@ public boolean isApproximateLocalRecoveryEnabled() { */ @Experimental public void enableApproximateLocalRecovery(boolean enabled) { - configuration.set(ExecutionCheckpointingOptions.APPROXIMATE_LOCAL_RECOVERY, enabled); + configuration.set(StateRecoveryOptions.APPROXIMATE_LOCAL_RECOVERY, enabled); } /** @@ -892,7 +891,7 @@ public CheckpointStorage getCheckpointStorage() { @PublicEvolving public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData) { configuration.set( - ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA, + StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA, checkpointIdOfIgnoredInFlightData); } @@ -902,8 +901,7 @@ public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFli */ @PublicEvolving public long getCheckpointIdOfIgnoredInFlightData() { - return configuration.get( - ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA); + return configuration.get(StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA); } /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */ @@ -1001,7 +999,7 @@ public void configure(ReadableConfig configuration) { .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED) .ifPresent(this::enableUnalignedCheckpoints); configuration - .getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA) + .getOptional(StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA) .ifPresent(this::setCheckpointIdOfIgnoredInFlightData); configuration .getOptional(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 14e9ae52c9556c..de1c067cb53fa8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; import org.apache.flink.streaming.api.CheckpointingMode; @@ -250,6 +251,10 @@ public class ExecutionCheckpointingOptions { "Forces unaligned checkpoints, particularly allowing them for iterative jobs.") .build()); + /** + * @Deprecated Use {@link StateRecoveryOptions#CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA} instead. + */ + @Deprecated public static final ConfigOption CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = ConfigOptions.key("execution.checkpointing.recover-without-channel-state.checkpoint-id") .longType() @@ -297,9 +302,9 @@ public class ExecutionCheckpointingOptions { /** * Access to this option is officially only supported via {@link * CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is no good reason behind - * this. + * this. @Deprecated Use {@link StateRecoveryOptions#APPROXIMATE_LOCAL_RECOVERY} instead. */ - @Internal @Documentation.ExcludeFromDocumentation + @Internal @Documentation.ExcludeFromDocumentation @Deprecated public static final ConfigOption APPROXIMATE_LOCAL_RECOVERY = key("execution.checkpointing.approximate-local-recovery") .booleanType() diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index db5ac6f7851708..c6f6979ef1f3ae 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -29,9 +29,9 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.CachedDataStream; @@ -747,7 +747,7 @@ public void testSetSlotSharingResource() { @Test public void testSettingSavepointRestoreSettings() { Configuration config = new Configuration(); - config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint"); + config.set(StateRecoveryOptions.SAVEPOINT_PATH, "/tmp/savepoint"); final StreamGraph streamGraph = new StreamGraphGenerator( @@ -767,7 +767,7 @@ public void testSettingSavepointRestoreSettings() { @Test public void testSettingSavepointRestoreSettingsSetterOverrides() { Configuration config = new Configuration(); - config.set(SavepointConfigOptions.SAVEPOINT_PATH, "/tmp/savepoint"); + config.set(StateRecoveryOptions.SAVEPOINT_PATH, "/tmp/savepoint"); StreamGraphGenerator generator = new StreamGraphGenerator( diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java index 40328f9061b902..4fca3ff500a1c6 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java @@ -43,8 +43,8 @@ import java.util.stream.Collectors; import static org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL; +import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS; -import static org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH; /** Test environment for running jobs on Flink mini-cluster. */ @Experimental diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java index b4a036b83ffafa..8899047ad9bcfc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java @@ -23,10 +23,10 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; @@ -214,7 +214,7 @@ private String runOriginalJob() throws Exception { private void runUpgradedJob(String snapshotPath) throws Exception { StreamExecutionEnvironment env; Configuration conf = new Configuration(); - conf.set(SavepointConfigOptions.SAVEPOINT_PATH, snapshotPath); + conf.set(StateRecoveryOptions.SAVEPOINT_PATH, snapshotPath); env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(PARALLELISM); env.addSource(new StringSource(allDataEmittedLatch)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index ea664da1772bec..c6201faff07cd1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -48,10 +48,10 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RpcOptions; import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.shuffle.ShuffleServiceOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -769,9 +769,7 @@ public Configuration getConfiguration(File checkpointDir) { conf.set(StateBackendOptions.STATE_BACKEND, "filesystem"); conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); if (restoreCheckpoint != null) { - conf.set( - SavepointConfigOptions.SAVEPOINT_PATH, - restoreCheckpoint.toURI().toString()); + conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString()); } conf.set( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java index 2b23936c6bb8a5..949d93da032a1c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/UnifiedSinkMigrationITCase.java @@ -26,13 +26,13 @@ import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.connector.sink.SinkWriter; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -126,7 +126,7 @@ void testRestoreSinkState() throws Exception { private JobClient executeJob(boolean restore) throws Exception { final Configuration conf = new Configuration(); if (restore) { - conf.set(SavepointConfigOptions.SAVEPOINT_PATH, findSavepointPath()); + conf.set(StateRecoveryOptions.SAVEPOINT_PATH, findSavepointPath()); } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);