Skip to content

Commit

Permalink
[FLINK-34454] Introduce StateRecoveryOptions in flink-core and Rename…
Browse files Browse the repository at this point in the history
… options
  • Loading branch information
Zakelly committed Feb 22, 2024
1 parent 80090c7 commit 643754c
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 65 deletions.
5 changes: 4 additions & 1 deletion docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via
# Execution

{{< generated/deployment_configuration >}}
{{< generated/savepoint_config_configuration >}}
{{< generated/execution_configuration >}}

### Pipeline
Expand All @@ -428,6 +427,10 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via

{{< generated/execution_checkpointing_configuration >}}

### Recovery

{{< generated/state_recovery_configuration >}}

----
----

Expand Down
5 changes: 4 additions & 1 deletion docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via
# Execution

{{< generated/deployment_configuration >}}
{{< generated/savepoint_config_configuration >}}
{{< generated/execution_configuration >}}

### Pipeline
Expand All @@ -430,6 +429,10 @@ Flink can fetch user artifacts stored locally, on remote DFS, or accessible via

{{< generated/execution_checkpointing_configuration >}}

### Recovery

{{< generated/state_recovery_configuration >}}

----
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@
<td><p>Enum</p></td>
<td>The checkpointing mode (exactly-once vs. at-least-once).<br /><br />Possible values:<ul><li>"EXACTLY_ONCE"</li><li>"AT_LEAST_ONCE"</li></ul></td>
</tr>
<tr>
<td><h5>execution.checkpointing.recover-without-channel-state.checkpoint-id</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
</tr>
<tr>
<td><h5>execution.checkpointing.timeout</h5></td>
<td style="word-wrap: break-word;">10 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,28 @@
</thead>
<tbody>
<tr>
<td><h5>execution.savepoint-restore-mode</h5></td>
<td><h5>execution.state-recovery.claim-mode</h5></td>
<td style="word-wrap: break-word;">NO_CLAIM</td>
<td><p>Enum</p></td>
<td>Describes the mode how Flink should restore from the given savepoint or retained checkpoint.<br /><br />Possible values:<ul><li>"CLAIM": Flink will take ownership of the given snapshot. It will clean the snapshot once it is subsumed by newer ones.</li><li>"NO_CLAIM": Flink will not claim ownership of the snapshot files. However it will make sure it does not depend on any artefacts from the restored snapshot. In order to do that, Flink will take the first checkpoint as a full one, which means it might reupload/duplicate files that are part of the restored checkpoint.</li></ul></td>
</tr>
<tr>
<td><h5>execution.savepoint.ignore-unclaimed-state</h5></td>
<td><h5>execution.state-recovery.ignore-unclaimed-state</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Allow to skip savepoint state that cannot be restored. Allow this if you removed an operator from your pipeline after the savepoint was triggered.</td>
</tr>
<tr>
<td><h5>execution.savepoint.path</h5></td>
<td><h5>execution.state-recovery.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).</td>
</tr>
<tr>
<td><h5>execution.state-recovery.without-channel-state.checkpoint-id</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Long</td>
<td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.commons.cli.CommandLine;
Expand Down Expand Up @@ -690,7 +690,7 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin
commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE),
RestoreMode.class);
} else {
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
}
return SavepointRestoreSettings.forPath(
savepointPath, allowNonRestoredState, restoreMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
Expand Down Expand Up @@ -56,7 +56,7 @@ void testDisallowProgramConfigurationChanges(
final Configuration clusterConfig = new Configuration();
clusterConfig.set(DeploymentOptions.PROGRAM_CONFIG_ENABLED, false);
clusterConfig.set(DeploymentOptions.TARGET, "local");
clusterConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, "/flink/savepoints");
clusterConfig.set(StateRecoveryOptions.SAVEPOINT_PATH, "/flink/savepoints");
clusterConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);

final Configuration programConfig = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.client.testjar;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

Expand All @@ -33,7 +33,7 @@ public class ForbidConfigurationJob {

public static void main(String[] args) throws Exception {
final Configuration config = new Configuration();
config.set(SavepointConfigOptions.SAVEPOINT_PATH, SAVEPOINT_PATH);
config.set(StateRecoveryOptions.SAVEPOINT_PATH, SAVEPOINT_PATH);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
Expand Down Expand Up @@ -113,7 +113,7 @@ private StreamExecutionEnvironment createEnv(
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
if (restoreCheckpoint != null) {
conf.set(SavepointConfigOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
}
conf.set(TaskManagerOptions.NUM_TASK_SLOTS, p);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.core.execution.RestoreMode;

import static org.apache.flink.configuration.ConfigOptions.key;

/**
* The {@link ConfigOption configuration options} used when restoring state from a savepoint or a
* checkpoint.
*/
@PublicEvolving
public class StateRecoveryOptions {

/** The path to a savepoint that will be used to bootstrap the pipeline's state. */
public static final ConfigOption<String> SAVEPOINT_PATH =
key("execution.state-recovery.path")
.stringType()
.noDefaultValue()
.withDeprecatedKeys("execution.savepoint.path")
.withDescription(
"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");

/**
* A flag indicating if we allow Flink to skip savepoint state that cannot be restored, e.g.
* because the corresponding operator has been removed.
*/
public static final ConfigOption<Boolean> SAVEPOINT_IGNORE_UNCLAIMED_STATE =
key("execution.state-recovery.ignore-unclaimed-state")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys("execution.savepoint.ignore-unclaimed-state")
.withDescription(
"Allow to skip savepoint state that cannot be restored. "
+ "Allow this if you removed an operator from your pipeline after the savepoint was triggered.");
/**
* Describes the mode how Flink should restore from the given savepoint or retained checkpoint.
*/
public static final ConfigOption<RestoreMode> RESTORE_MODE =
key("execution.state-recovery.claim-mode")
.enumType(RestoreMode.class)
.defaultValue(RestoreMode.DEFAULT)
.withDeprecatedKeys("execution.savepoint-restore-mode")
.withDescription(
"Describes the mode how Flink should restore from the given"
+ " savepoint or retained checkpoint.");

/**
* Access to this option is officially only supported via {@link
* org.apache.flink.runtime.jobgraph.CheckpointConfig#enableApproximateLocalRecovery(boolean)},
* but there is no good reason behind this.
*/
@Internal @Documentation.ExcludeFromDocumentation
public static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =
key("execution.state-recovery.approximate-local-recovery")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys("execution.checkpointing.approximate-local-recovery")
.withDescription("Flag to enable approximate local recovery.");

public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
ConfigOptions.key("execution.state-recovery.without-channel-state.checkpoint-id")
.longType()
.defaultValue(-1L)
.withDeprecatedKeys(
"execution.checkpointing.recover-without-channel-state.checkpoint-id")
.withDescription(
Description.builder()
.text(
"Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.")
.linebreak()
.linebreak()
.text(
"It is better to keep this value empty until "
+ "there is explicit needs to restore from "
+ "the specific checkpoint without in-flight data.")
.linebreak()
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand Down Expand Up @@ -138,7 +138,7 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
requestBody.getAllowNonRestoredState(),
() -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class),
effectiveConfiguration.get(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE),
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE),
log);
final String savepointPath =
fromRequestBodyOrQueryParameter(
Expand All @@ -147,14 +147,14 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
emptyToNull(
getQueryParameter(
request, SavepointPathQueryParameter.class)),
effectiveConfiguration.get(SavepointConfigOptions.SAVEPOINT_PATH),
effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH),
log);
final RestoreMode restoreMode =
Optional.ofNullable(requestBody.getRestoreMode())
.orElseGet(
() ->
effectiveConfiguration.get(
SavepointConfigOptions.RESTORE_MODE));
StateRecoveryOptions.RESTORE_MODE));
if (restoreMode.equals(RestoreMode.LEGACY)) {
log.warn(
"The {} restore mode is deprecated, please use {} or {} mode instead.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
Expand Down Expand Up @@ -87,9 +87,9 @@ class JarRunHandlerParameterTest
new Configuration()
.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 120000L)
.set(CoreOptions.DEFAULT_PARALLELISM, 57)
.set(SavepointConfigOptions.SAVEPOINT_PATH, "/foo/bar/test")
.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false)
.set(SavepointConfigOptions.RESTORE_MODE, RESTORE_MODE)
.set(StateRecoveryOptions.SAVEPOINT_PATH, "/foo/bar/test")
.set(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false)
.set(StateRecoveryOptions.RESTORE_MODE, RESTORE_MODE)
.set(
PipelineOptions.PARALLELISM_OVERRIDES,
new HashMap<String, String>() {
Expand Down Expand Up @@ -336,13 +336,13 @@ void validateGraphWithFlinkConfig(JobGraph jobGraph) {
final SavepointRestoreSettings savepointRestoreSettings =
jobGraph.getSavepointRestoreSettings();
assertThat(savepointRestoreSettings.getRestoreMode())
.isEqualTo(FLINK_CONFIGURATION.get(SavepointConfigOptions.RESTORE_MODE));
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE));
assertThat(savepointRestoreSettings.getRestorePath())
.isEqualTo(FLINK_CONFIGURATION.get(SavepointConfigOptions.SAVEPOINT_PATH));
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH));
assertThat(savepointRestoreSettings.allowNonRestoredState())
.isEqualTo(
FLINK_CONFIGURATION.get(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
}

private void validateSavepointJarRunMessageParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@
package org.apache.flink.runtime.jobgraph;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.core.execution.RestoreMode;

import static org.apache.flink.configuration.ConfigOptions.key;

/** The {@link ConfigOption configuration options} used when restoring from a savepoint. */
/**
* The {@link ConfigOption configuration options} used when restoring from a savepoint. @Deprecated
* All options are moved into {@link org.apache.flink.configuration.StateRecoveryOptions}.
*/
@PublicEvolving
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated.")
public class SavepointConfigOptions {

/** The path to a savepoint that will be used to bootstrap the pipeline's state. */
Expand Down
Loading

0 comments on commit 643754c

Please sign in to comment.