- dstl.dfs.base-path |
+ state.changelog.dstl.dfs.base-path |
(none) |
String |
Base path to store changelog files. |
- dstl.dfs.batch.persist-delay |
+ state.changelog.dstl.dfs.batch.persist-delay |
10 ms |
Duration |
Delay before persisting changelog after receiving persist request (on checkpoint). Minimizes the number of files and requests if multiple operators (backends) or sub-tasks are using the same store. Correspondingly increases checkpoint time (async phase). |
- dstl.dfs.batch.persist-size-threshold |
+ state.changelog.dstl.dfs.batch.persist-size-threshold |
10 mb |
MemorySize |
- Size threshold for state changes that were requested to be persisted but are waiting for dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below) |
+ Size threshold for state changes that were requested to be persisted but are waiting for state.changelog.dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from state.changelog.dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below) |
- dstl.dfs.compression.enabled |
+ state.changelog.dstl.dfs.compression.enabled |
false |
Boolean |
Whether to enable compression when serializing changelog. |
- dstl.dfs.discard.num-threads |
+ state.changelog.dstl.dfs.discard.num-threads |
1 |
Integer |
Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state). |
- dstl.dfs.download.local-cache.idle-timeout-ms |
+ state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms |
10 min |
Duration |
Maximum idle time for cache files of distributed changelog file, after which the cache files will be deleted. |
- dstl.dfs.preemptive-persist-threshold |
+ state.changelog.dstl.dfs.preemptive-persist-threshold |
5 mb |
MemorySize |
Size threshold for state changes of a single operator beyond which they are persisted pre-emptively without waiting for a checkpoint. Improves checkpointing time by allowing quasi-continuous uploading of state changes (as opposed to uploading all accumulated changes on checkpoint). |
- dstl.dfs.upload.buffer-size |
+ state.changelog.dstl.dfs.upload.buffer-size |
1 mb |
MemorySize |
Buffer size used when uploading change sets |
- dstl.dfs.upload.max-attempts |
+ state.changelog.dstl.dfs.upload.max-attempts |
3 |
Integer |
- Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if dstl.dfs.upload.retry-policy is fixed. |
+ Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed. |
- dstl.dfs.upload.max-in-flight |
+ state.changelog.dstl.dfs.upload.max-in-flight |
100 mb |
MemorySize |
- Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to dstl.dfs.batch.persist-size-threshold |
+ Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if state.changelog.dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to state.changelog.dstl.dfs.batch.persist-size-threshold |
- dstl.dfs.upload.next-attempt-delay |
+ state.changelog.dstl.dfs.upload.next-attempt-delay |
500 ms |
Duration |
Delay before the next attempt (if the failure was not caused by a timeout). |
- dstl.dfs.upload.num-threads |
+ state.changelog.dstl.dfs.upload.num-threads |
5 |
Integer |
Number of threads to use for upload. |
- dstl.dfs.upload.retry-policy |
+ state.changelog.dstl.dfs.upload.retry-policy |
"fixed" |
String |
Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed. |
- dstl.dfs.upload.timeout |
+ state.changelog.dstl.dfs.upload.timeout |
1 s |
Duration |
- Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout |
+ Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout |
diff --git a/docs/layouts/shortcodes/generated/state_changelog_configuration.html b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
index 22f83b434b82b..182494fb329fa 100644
--- a/docs/layouts/shortcodes/generated/state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
@@ -9,31 +9,31 @@
- state.backend.changelog.enabled |
+ state.changelog.enabled |
false |
Boolean |
Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled. |
- state.backend.changelog.max-failures-allowed |
+ state.changelog.max-failures-allowed |
3 |
Integer |
Max number of consecutive materialization failures allowed. |
- state.backend.changelog.periodic-materialize.enabled |
+ state.changelog.periodic-materialize.enabled |
true |
Boolean |
Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled |
- state.backend.changelog.periodic-materialize.interval |
+ state.changelog.periodic-materialize.interval |
10 min |
Duration |
- Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true |
+ Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true |
- state.backend.changelog.storage |
+ state.changelog.storage |
"memory" |
String |
The storage to be used to store state changelog. The implementation can be specified via their shortcut name. The list of recognized shortcut names currently includes 'memory' and 'filesystem'. |
diff --git a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html b/docs/layouts/shortcodes/generated/state_changelog_section.html
similarity index 81%
rename from docs/layouts/shortcodes/generated/state_backend_changelog_section.html
rename to docs/layouts/shortcodes/generated/state_changelog_section.html
index 22f83b434b82b..182494fb329fa 100644
--- a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_section.html
@@ -9,31 +9,31 @@
- state.backend.changelog.enabled |
+ state.changelog.enabled |
false |
Boolean |
Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled. |
- state.backend.changelog.max-failures-allowed |
+ state.changelog.max-failures-allowed |
3 |
Integer |
Max number of consecutive materialization failures allowed. |
- state.backend.changelog.periodic-materialize.enabled |
+ state.changelog.periodic-materialize.enabled |
true |
Boolean |
Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled |
- state.backend.changelog.periodic-materialize.interval |
+ state.changelog.periodic-materialize.interval |
10 min |
Duration |
- Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true |
+ Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true |
- state.backend.changelog.storage |
+ state.changelog.storage |
"memory" |
String |
The storage to be used to store state changelog. The implementation can be specified via their shortcut name. The list of recognized shortcut names currently includes 'memory' and 'filesystem'. |
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 466f09ba4ce4e..0be6671e634aa 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -80,7 +80,7 @@ public static final class Sections {
public static final String STATE_BACKEND_LATENCY_TRACKING =
"state_backend_latency_tracking";
- public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";
+ public static final String STATE_CHANGELOG = "state_changelog";
public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
index 619137e339fe0..050dbbfe18dbc 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
@@ -27,20 +27,22 @@
@PublicEvolving
public class StateChangelogOptions {
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption PERIODIC_MATERIALIZATION_ENABLED =
- ConfigOptions.key("state.backend.changelog.periodic-materialize.enabled")
+ ConfigOptions.key("state.changelog.periodic-materialize.enabled")
.booleanType()
.defaultValue(true)
+ .withDeprecatedKeys("state.backend.changelog.periodic-materialize.enabled")
.withDescription(
"Defines whether to enable periodic materialization, "
+ "all changelogs will not be truncated which may increase the space of checkpoint if disabled");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption PERIODIC_MATERIALIZATION_INTERVAL =
- ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+ ConfigOptions.key("state.changelog.periodic-materialize.interval")
.durationType()
.defaultValue(Duration.ofMinutes(10))
+ .withDeprecatedKeys("state.backend.changelog.periodic-materialize.interval")
.withDescription(
"Defines the interval in milliseconds to perform "
+ "periodic materialization for state backend. "
@@ -48,19 +50,21 @@ public class StateChangelogOptions {
+ PERIODIC_MATERIALIZATION_ENABLED.key()
+ " is true");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption MATERIALIZATION_MAX_FAILURES_ALLOWED =
- ConfigOptions.key("state.backend.changelog.max-failures-allowed")
+ ConfigOptions.key("state.changelog.max-failures-allowed")
.intType()
.defaultValue(3)
+ .withDeprecatedKeys("state.backend.changelog.max-failures-allowed")
.withDescription("Max number of consecutive materialization failures allowed.");
/** Whether to enable state change log. */
- @Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(value = Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption ENABLE_STATE_CHANGE_LOG =
- ConfigOptions.key("state.backend.changelog.enabled")
+ ConfigOptions.key("state.changelog.enabled")
.booleanType()
.defaultValue(false)
+ .withDeprecatedKeys("state.backend.changelog.enabled")
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog. "
+ "If this config is not set explicitly, it means no preference "
@@ -75,11 +79,12 @@ public class StateChangelogOptions {
* Recognized shortcut name is 'memory' from {@code
* InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also the default value.
*/
- @Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(value = Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption STATE_CHANGE_LOG_STORAGE =
- ConfigOptions.key("state.backend.changelog.storage")
+ ConfigOptions.key("state.changelog.storage")
.stringType()
.defaultValue("memory")
+ .withDeprecatedKeys("state.backend.changelog.storage")
.withDescription(
Description.builder()
.text("The storage to be used to store state changelog.")
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
index 4d05962f2eefd..1bd1ef00405d6 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
@@ -31,21 +31,24 @@
public class FsStateChangelogOptions {
public static final ConfigOption BASE_PATH =
- ConfigOptions.key("dstl.dfs.base-path")
+ ConfigOptions.key("state.changelog.dstl.dfs.base-path")
.stringType()
.noDefaultValue()
+ .withDeprecatedKeys("dstl.dfs.base-path")
.withDescription("Base path to store changelog files.");
public static final ConfigOption COMPRESSION_ENABLED =
- ConfigOptions.key("dstl.dfs.compression.enabled")
+ ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")
.booleanType()
.defaultValue(false)
+ .withDeprecatedKeys("dstl.dfs.compression.enabled")
.withDescription("Whether to enable compression when serializing changelog.");
public static final ConfigOption PREEMPTIVE_PERSIST_THRESHOLD =
- ConfigOptions.key("dstl.dfs.preemptive-persist-threshold")
+ ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")
.memoryType()
.defaultValue(MemorySize.parse("5MB"))
+ .withDeprecatedKeys("dstl.dfs.preemptive-persist-threshold")
.withDescription(
"Size threshold for state changes of a single operator "
+ "beyond which they are persisted pre-emptively without waiting for a checkpoint. "
@@ -53,9 +56,10 @@ public class FsStateChangelogOptions {
+ "(as opposed to uploading all accumulated changes on checkpoint).");
public static final ConfigOption PERSIST_DELAY =
- ConfigOptions.key("dstl.dfs.batch.persist-delay")
+ ConfigOptions.key("state.changelog.dstl.dfs.batch.persist-delay")
.durationType()
.defaultValue(Duration.ofMillis(10))
+ .withDeprecatedKeys("dstl.dfs.batch.persist-delay")
.withDescription(
"Delay before persisting changelog after receiving persist request (on checkpoint). "
+ "Minimizes the number of files and requests "
@@ -63,9 +67,10 @@ public class FsStateChangelogOptions {
+ "Correspondingly increases checkpoint time (async phase).");
public static final ConfigOption PERSIST_SIZE_THRESHOLD =
- ConfigOptions.key("dstl.dfs.batch.persist-size-threshold")
+ ConfigOptions.key("state.changelog.dstl.dfs.batch.persist-size-threshold")
.memoryType()
.defaultValue(MemorySize.parse("10MB"))
+ .withDeprecatedKeys("dstl.dfs.batch.persist-size-threshold")
.withDescription(
"Size threshold for state changes that were requested to be persisted but are waiting for "
+ PERSIST_DELAY.key()
@@ -77,28 +82,32 @@ public class FsStateChangelogOptions {
+ "Must not exceed in-flight data limit (see below)");
public static final ConfigOption UPLOAD_BUFFER_SIZE =
- ConfigOptions.key("dstl.dfs.upload.buffer-size")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.buffer-size")
.memoryType()
.defaultValue(MemorySize.parse("1MB"))
+ .withDeprecatedKeys("dstl.dfs.upload.buffer-size")
.withDescription("Buffer size used when uploading change sets");
public static final ConfigOption NUM_UPLOAD_THREADS =
- ConfigOptions.key("dstl.dfs.upload.num-threads")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.num-threads")
.intType()
.defaultValue(5)
+ .withDeprecatedKeys("dstl.dfs.upload.num-threads")
.withDescription("Number of threads to use for upload.");
public static final ConfigOption NUM_DISCARD_THREADS =
- ConfigOptions.key("dstl.dfs.discard.num-threads")
+ ConfigOptions.key("state.changelog.dstl.dfs.discard.num-threads")
.intType()
.defaultValue(1)
+ .withDeprecatedKeys("dstl.dfs.discard.num-threads")
.withDescription(
"Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state).");
public static final ConfigOption IN_FLIGHT_DATA_LIMIT =
- ConfigOptions.key("dstl.dfs.upload.max-in-flight")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.max-in-flight")
.memoryType()
.defaultValue(MemorySize.parse("100MB"))
+ .withDeprecatedKeys("dstl.dfs.upload.max-in-flight")
.withDescription(
"Max amount of data allowed to be in-flight. "
+ "Upon reaching this limit the task will be back-pressured. "
@@ -111,15 +120,17 @@ public class FsStateChangelogOptions {
+ PERSIST_SIZE_THRESHOLD.key());
public static final ConfigOption RETRY_POLICY =
- ConfigOptions.key("dstl.dfs.upload.retry-policy")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.retry-policy")
.stringType()
.defaultValue("fixed")
+ .withDeprecatedKeys("dstl.dfs.upload.retry-policy")
.withDescription(
"Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.");
public static final ConfigOption UPLOAD_TIMEOUT =
- ConfigOptions.key("dstl.dfs.upload.timeout")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(1))
+ .withDeprecatedKeys("dstl.dfs.upload.timeout")
.withDescription(
"Time threshold beyond which an upload is considered timed out. "
+ "If a new attempt is made but this upload succeeds earlier then this upload result will be used. "
@@ -130,25 +141,28 @@ public class FsStateChangelogOptions {
+ "Please note that timeout * max_attempts should be less than "
+ CHECKPOINTING_TIMEOUT.key());
public static final ConfigOption RETRY_MAX_ATTEMPTS =
- ConfigOptions.key("dstl.dfs.upload.max-attempts")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.max-attempts")
.intType()
.defaultValue(3)
+ .withDeprecatedKeys("dstl.dfs.upload.max-attempts")
.withDescription(
"Maximum number of attempts (including the initial one) to perform a particular upload. "
+ "Only takes effect if "
+ RETRY_POLICY.key()
+ " is fixed.");
public static final ConfigOption RETRY_DELAY_AFTER_FAILURE =
- ConfigOptions.key("dstl.dfs.upload.next-attempt-delay")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.next-attempt-delay")
.durationType()
.defaultValue(Duration.ofMillis(500))
+ .withDeprecatedKeys("dstl.dfs.upload.next-attempt-delay")
.withDescription(
"Delay before the next attempt (if the failure was not caused by a timeout).");
public static final ConfigOption CACHE_IDLE_TIMEOUT =
- ConfigOptions.key("dstl.dfs.download.local-cache.idle-timeout-ms")
+ ConfigOptions.key("state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms")
.durationType()
.defaultValue(Duration.ofMinutes(10))
+ .withDeprecatedKeys("dstl.dfs.download.local-cache.idle-timeout-ms")
.withDescription(
"Maximum idle time for cache files of distributed changelog file, "
+ "after which the cache files will be deleted.");