Skip to content

Commit

Permalink
[FLINK-34454][CLI/REST] Rename restoreMode to claimMode
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Feb 23, 2024
1 parent c2e621e commit 79ba1bd
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/ops/state/savepoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ $ bin/flink run -s :savepointPath [:runArgs]

你可以通过如下方式指定 restore 模式:
```shell
$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]
$ bin/flink run -s :savepointPath -claimMode :mode -n [:runArgs]
```

**NO_CLAIM (默认的)**
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/ops/state/savepoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Still, we believe the default *NO_CLAIM* mode is a good tradeoff in most situati

You can pass the restore mode as:
```shell
$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]
$ bin/flink run -s :savepointPath -claimMode :mode -n [:runArgs]
```

**NO_CLAIM (default)**
Expand Down
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,10 @@
"allowNonRestoredState" : {
"type" : "boolean"
},
"claimMode" : {
"type" : "string",
"enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
},
"entryClass" : {
"type" : "string"
},
Expand Down
2 changes: 2 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2255,6 +2255,8 @@ components:
properties:
allowNonRestoredState:
type: boolean
claimMode:
$ref: '#/components/schemas/RestoreMode'
entryClass:
type: string
flinkConfiguration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ public class CliFrontendParser {
+ "You need to allow this if you removed an operator from your "
+ "program that was part of the program when the savepoint was triggered.");

public static final Option SAVEPOINT_RESTORE_MODE =
public static final Option SAVEPOINT_CLAIM_MODE =
new Option(
"rm",
"restoreMode",
"cm",
"claimMode",
true,
"Defines how should we restore from the given savepoint. Supported options: "
+ "[claim - claim ownership of the savepoint and delete once it is"
Expand All @@ -144,6 +144,16 @@ public class CliFrontendParser {
+ "(deprecated) - the old behaviour, do not assume ownership of the "
+ "savepoint files, but can reuse some shared files.");

@Deprecated
public static final Option SAVEPOINT_RESTORE_MODE =
new Option(
"rm",
"restoreMode",
true,
"This option is deprecated, please use '"
+ SAVEPOINT_CLAIM_MODE.getLongOpt()
+ "' instead.");

static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");

Expand Down Expand Up @@ -335,6 +345,7 @@ public class CliFrontendParser {
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");

SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);
SAVEPOINT_CLAIM_MODE.setRequired(false);
SAVEPOINT_RESTORE_MODE.setRequired(false);

SAVEPOINT_FORMAT_OPTION.setRequired(false);
Expand Down Expand Up @@ -422,6 +433,7 @@ public static Options getRunCommandOptions() {
return getProgramSpecificOptions(buildGeneralOptions(new Options()))
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_RESTORE_MODE);
}

Expand Down Expand Up @@ -471,6 +483,7 @@ private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
return getProgramSpecificOptionsWithoutDeprecatedOptions(options)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_RESTORE_MODE);
}

Expand Down Expand Up @@ -684,11 +697,19 @@ public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLin
boolean allowNonRestoredState =
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
final RestoreMode restoreMode;
if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
if (commandLine.hasOption(SAVEPOINT_CLAIM_MODE)) {
restoreMode =
ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
RestoreMode.class);
} else if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
restoreMode =
ConfigurationUtils.convertValue(
commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE),
RestoreMode.class);
System.out.printf(
"The option '%s' is deprecated. Please use '%s' instead.%n",
SAVEPOINT_RESTORE_MODE.getLongOpt(), SAVEPOINT_CLAIM_MODE.getLongOpt());
} else {
restoreMode = StateRecoveryOptions.RESTORE_MODE.defaultValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ void testNoClaimRestoreModeParsing() throws Exception {

@Test
void testClaimRestoreModeParsingLongOption() throws Exception {
testRestoreMode("--restoreMode", "claim", RestoreMode.CLAIM);
testRestoreMode("--claimMode", "claim", RestoreMode.CLAIM);
}

@Test
void testLegacyRestoreModeParsingLongOption() throws Exception {
testRestoreMode("--restoreMode", "legacy", RestoreMode.LEGACY);
testRestoreMode("--claimMode", "legacy", RestoreMode.LEGACY);
}

@Test
void testNoClaimRestoreModeParsingLongOption() throws Exception {
testRestoreMode("--restoreMode", "no_claim", RestoreMode.NO_CLAIM);
testRestoreMode("--claimMode", "no_claim", RestoreMode.NO_CLAIM);
}

private void testRestoreMode(String flag, String arg, RestoreMode expectedMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ private SavepointRestoreSettings getSavepointRestoreSettings(
() ->
effectiveConfiguration.get(
StateRecoveryOptions.RESTORE_MODE));
if (requestBody.isDeprecatedRestoreModeHasValue()) {
log.warn("The option 'restoreMode' is deprecated, please use 'claimMode' instead.");
}
if (restoreMode.equals(RestoreMode.LEGACY)) {
log.warn(
"The {} restore mode is deprecated, please use {} or {} mode instead.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.runtime.rest.messages.RequestBody;
Expand All @@ -36,8 +38,12 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JarRunRequestBody extends JarRequestBody {
private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";

private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";

@Deprecated private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode";

private static final String FIELD_NAME_SAVEPOINT_CLAIM_MODE = "claimMode";

@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
@Nullable
Expand All @@ -49,10 +55,41 @@ public class JarRunRequestBody extends JarRequestBody {

@JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
@Nullable
@Deprecated
@Documentation.ExcludeFromDocumentation
private RestoreMode deprecatedRestoreMode;

@JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
@Nullable
private RestoreMode restoreMode;

public JarRunRequestBody() {
this(null, null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null, null);
}

/** Fallback constructor ONLY for tests. */
@VisibleForTesting
public JarRunRequestBody(
@Nullable String entryClassName,
@Nullable String programArguments,
@Nullable List<String> programArgumentsList,
@Nullable Integer parallelism,
@Nullable JobID jobId,
@Nullable Boolean allowNonRestoredState,
@Nullable String savepointPath,
@Nullable RestoreMode restoreMode,
@Nullable Map<String, String> flinkConfiguration) {
this(
entryClassName,
programArguments,
programArgumentsList,
parallelism,
jobId,
allowNonRestoredState,
savepointPath,
null,
restoreMode,
flinkConfiguration);
}

@JsonCreator
Expand All @@ -66,7 +103,9 @@ public JarRunRequestBody(
@Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
Boolean allowNonRestoredState,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE)
RestoreMode deprecatedRestoreMode,
@Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE) RestoreMode restoreMode,
@Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
Map<String, String> flinkConfiguration) {
super(
Expand All @@ -78,6 +117,7 @@ public JarRunRequestBody(
flinkConfiguration);
this.allowNonRestoredState = allowNonRestoredState;
this.savepointPath = savepointPath;
this.deprecatedRestoreMode = deprecatedRestoreMode;
this.restoreMode = restoreMode;
}

Expand All @@ -96,6 +136,11 @@ public String getSavepointPath() {
@Nullable
@JsonIgnore
public RestoreMode getRestoreMode() {
return restoreMode;
return restoreMode == null ? deprecatedRestoreMode : restoreMode;
}

@JsonIgnore
public boolean isDeprecatedRestoreModeHasValue() {
return deprecatedRestoreMode != null;
}
}

0 comments on commit 79ba1bd

Please sign in to comment.