diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 857d8ec497eb3..e95c02c264990 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -453,9 +453,9 @@ Please refer to the [Debugging Classloading Docs]({{< ref "docs/ops/debugging/de
{{< generated/expert_state_backends_section >}}
-### State Backends Latency Tracking Options
+### State Latency Tracking Options
-{{< generated/state_backend_latency_tracking_section >}}
+{{< generated/state_latency_tracking_section >}}
### Advanced RocksDB State Backends Options
diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 424a7d4776f4b..652c8ab5dc2da 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -2241,12 +2241,12 @@ purposes.
## State access latency tracking
Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from `AbstractStateBackend`. This feature is disabled by default.
-To enable this feature you must set the `state.backend.latency-track.keyed-state-enabled` to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-latency-tracking-options).
+To enable this feature you must set the `state.latency-track.keyed-state-enabled` to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-latency-tracking-options).
-Once tracking keyed state access latency is enabled, Flink will sample the state access latency every `N` access, in which `N` is defined by `state.backend.latency-track.sample-interval`.
+Once tracking keyed state access latency is enabled, Flink will sample the state access latency every `N` access, in which `N` is defined by `state.latency-track.sample-interval`.
This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
-As the type of this latency metrics is histogram, `state.backend.latency-track.history-size` will control the maximum number of recorded values in history, which has the default value of 128.
+As the type of this latency metrics is histogram, `state.latency-track.history-size` will control the maximum number of recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will provide a more accurate result.
Warning Enabling state-access-latency metrics may impact the performance.
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index bf1e9c67d518f..f582958a873f4 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -455,9 +455,9 @@ Please refer to the [Debugging Classloading Docs]({{< ref "docs/ops/debugging/de
{{< generated/expert_state_backends_section >}}
-### State Backends Latency Tracking Options
+### State Latency Tracking Options
-{{< generated/state_backend_latency_tracking_section >}}
+{{< generated/state_latency_tracking_section >}}
### Advanced RocksDB State Backends Options
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 2756b92171e48..cf331e3de44e5 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -2191,12 +2191,12 @@ purposes.
## State access latency tracking
Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from `AbstractStateBackend`. This feature is disabled by default.
-To enable this feature you must set the `state.backend.latency-track.keyed-state-enabled` to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-latency-tracking-options).
+To enable this feature you must set the `state.latency-track.keyed-state-enabled` to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-latency-tracking-options).
-Once tracking keyed state access latency is enabled, Flink will sample the state access latency every `N` access, in which `N` is defined by `state.backend.latency-track.sample-interval`.
+Once tracking keyed state access latency is enabled, Flink will sample the state access latency every `N` access, in which `N` is defined by `state.latency-track.sample-interval`.
This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
-As the type of this latency metrics is histogram, `state.backend.latency-track.history-size` will control the maximum number of recorded values in history, which has the default value of 128.
+As the type of this latency metrics is histogram, `state.latency-track.history-size` will control the maximum number of recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will provide a more accurate result.
Warning Enabling state-access-latency metrics may impact the performance.
diff --git a/docs/layouts/shortcodes/generated/state_backend_configuration.html b/docs/layouts/shortcodes/generated/state_backend_configuration.html
index 10e9e45a7625b..dd6106e728718 100644
--- a/docs/layouts/shortcodes/generated/state_backend_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_backend_configuration.html
@@ -8,30 +8,6 @@
-
- state.backend.latency-track.history-size |
- 128 |
- Integer |
- Defines the number of measured latencies to maintain at each state access operation. |
-
-
- state.backend.latency-track.keyed-state-enabled |
- false |
- Boolean |
- Whether to track latency of keyed state operations, e.g value state put/get/clear. |
-
-
- state.backend.latency-track.sample-interval |
- 100 |
- Integer |
- The sample interval of latency track once 'state.backend.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests. |
-
-
- state.backend.latency-track.state-name-as-variable |
- true |
- Boolean |
- Whether to expose state name as a variable if tracking latency. |
-
state.backend.type |
"hashmap" |
diff --git a/docs/layouts/shortcodes/generated/state_backend_latency_tracking_section.html b/docs/layouts/shortcodes/generated/state_latency_track_configuration.html
similarity index 72%
rename from docs/layouts/shortcodes/generated/state_backend_latency_tracking_section.html
rename to docs/layouts/shortcodes/generated/state_latency_track_configuration.html
index 3d9f3c8eee655..b3ecf1d81fca6 100644
--- a/docs/layouts/shortcodes/generated/state_backend_latency_tracking_section.html
+++ b/docs/layouts/shortcodes/generated/state_latency_track_configuration.html
@@ -9,25 +9,25 @@
- state.backend.latency-track.history-size |
+ state.latency-track.history-size |
128 |
Integer |
Defines the number of measured latencies to maintain at each state access operation. |
- state.backend.latency-track.keyed-state-enabled |
+ state.latency-track.keyed-state-enabled |
false |
Boolean |
Whether to track latency of keyed state operations, e.g value state put/get/clear. |
- state.backend.latency-track.sample-interval |
+ state.latency-track.sample-interval |
100 |
Integer |
- The sample interval of latency track once 'state.backend.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests. |
+ The sample interval of latency track once 'state.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests. |
- state.backend.latency-track.state-name-as-variable |
+ state.latency-track.state-name-as-variable |
true |
Boolean |
Whether to expose state name as a variable if tracking latency. |
diff --git a/docs/layouts/shortcodes/generated/state_latency_tracking_section.html b/docs/layouts/shortcodes/generated/state_latency_tracking_section.html
new file mode 100644
index 0000000000000..b3ecf1d81fca6
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/state_latency_tracking_section.html
@@ -0,0 +1,36 @@
+
+
+
+ Key |
+ Default |
+ Type |
+ Description |
+
+
+
+
+ state.latency-track.history-size |
+ 128 |
+ Integer |
+ Defines the number of measured latencies to maintain at each state access operation. |
+
+
+ state.latency-track.keyed-state-enabled |
+ false |
+ Boolean |
+ Whether to track latency of keyed state operations, e.g value state put/get/clear. |
+
+
+ state.latency-track.sample-interval |
+ 100 |
+ Integer |
+ The sample interval of latency track once 'state.latency-track.keyed-state-enabled' is enabled. The default value is 100, which means we would track the latency every 100 access requests. |
+
+
+ state.latency-track.state-name-as-variable |
+ true |
+ Boolean |
+ Whether to expose state name as a variable if tracking latency. |
+
+
+
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 466f09ba4ce4e..c95f372c2c030 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -77,8 +77,7 @@ public static final class Sections {
public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb";
- public static final String STATE_BACKEND_LATENCY_TRACKING =
- "state_backend_latency_tracking";
+ public static final String STATE_LATENCY_TRACKING = "state_latency_tracking";
public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
index 2778c0f59bd6b..deec87c5cd48a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
@@ -18,11 +18,13 @@
package org.apache.flink.configuration;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;
/** A collection of all configuration options that relate to state backend. */
+@PublicEvolving
public class StateBackendOptions {
// ------------------------------------------------------------------------
@@ -63,7 +65,9 @@ public class StateBackendOptions {
.text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
.build());
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
+ /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_ENABLED} instead. */
+ @Deprecated
+ @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption LATENCY_TRACK_ENABLED =
ConfigOptions.key("state.backend.latency-track.keyed-state-enabled")
.booleanType()
@@ -71,7 +75,9 @@ public class StateBackendOptions {
.withDescription(
"Whether to track latency of keyed state operations, e.g value state put/get/clear.");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
+ /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_SAMPLE_INTERVAL} instead. */
+ @Deprecated
+ @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption LATENCY_TRACK_SAMPLE_INTERVAL =
ConfigOptions.key("state.backend.latency-track.sample-interval")
.intType()
@@ -82,7 +88,9 @@ public class StateBackendOptions {
+ "The default value is 100, which means we would track the latency every 100 access requests.",
LATENCY_TRACK_ENABLED.key()));
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
+ /** @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_HISTORY_SIZE} instead. */
+ @Deprecated
+ @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption LATENCY_TRACK_HISTORY_SIZE =
ConfigOptions.key("state.backend.latency-track.history-size")
.intType()
@@ -90,7 +98,12 @@ public class StateBackendOptions {
.withDescription(
"Defines the number of measured latencies to maintain at each state access operation.");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_LATENCY_TRACKING)
+ /**
+ * @deprecated Use {@link StateLatencyTrackOptions#LATENCY_TRACK_STATE_NAME_AS_VARIABLE}
+ * instead.
+ */
+ @Deprecated
+ @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption LATENCY_TRACK_STATE_NAME_AS_VARIABLE =
ConfigOptions.key("state.backend.latency-track.state-name-as-variable")
.booleanType()
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java
new file mode 100644
index 0000000000000..4177ab40df8d1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateLatencyTrackOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+
+/**
+ * A collection of all configuration options that relate to the latency tracking for state access.
+ */
+@PublicEvolving
+public class StateLatencyTrackOptions {
+
+ @Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
+ public static final ConfigOption LATENCY_TRACK_ENABLED =
+ ConfigOptions.key("state.latency-track.keyed-state-enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDeprecatedKeys(StateBackendOptions.LATENCY_TRACK_ENABLED.key())
+ .withDescription(
+ "Whether to track latency of keyed state operations, e.g value state put/get/clear.");
+
+ @Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
+ public static final ConfigOption LATENCY_TRACK_SAMPLE_INTERVAL =
+ ConfigOptions.key("state.latency-track.sample-interval")
+ .intType()
+ .defaultValue(100)
+ .withDeprecatedKeys(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.key())
+ .withDescription(
+ String.format(
+ "The sample interval of latency track once '%s' is enabled. "
+ + "The default value is 100, which means we would track the latency every 100 access requests.",
+ LATENCY_TRACK_ENABLED.key()));
+
+ @Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
+ public static final ConfigOption LATENCY_TRACK_HISTORY_SIZE =
+ ConfigOptions.key("state.latency-track.history-size")
+ .intType()
+ .defaultValue(128)
+ .withDeprecatedKeys(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.key())
+ .withDescription(
+ "Defines the number of measured latencies to maintain at each state access operation.");
+
+ @Documentation.Section(Documentation.Sections.STATE_LATENCY_TRACKING)
+ public static final ConfigOption LATENCY_TRACK_STATE_NAME_AS_VARIABLE =
+ ConfigOptions.key("state.latency-track.state-name-as-variable")
+ .booleanType()
+ .defaultValue(true)
+ .withDeprecatedKeys(
+ StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.key())
+ .withDescription(
+ "Whether to expose state name as a variable if tracking latency.");
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java
index d4425e14c3058..9c918040d37dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfig.java
@@ -20,7 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.Preconditions;
@@ -86,12 +86,13 @@ public static Builder newBuilder() {
public static class Builder implements Serializable {
private static final long serialVersionUID = 1L;
- private boolean enabled = StateBackendOptions.LATENCY_TRACK_ENABLED.defaultValue();
+ private boolean enabled = StateLatencyTrackOptions.LATENCY_TRACK_ENABLED.defaultValue();
private int sampleInterval =
- StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue();
- private int historySize = StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue();
+ StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue();
+ private int historySize =
+ StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue();
private boolean stateNameAsVariable =
- StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue();
+ StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue();
private MetricGroup metricGroup;
public Builder setEnabled(boolean enabled) {
@@ -120,12 +121,13 @@ public Builder setMetricGroup(MetricGroup metricGroup) {
}
public Builder configure(ReadableConfig config) {
- this.setEnabled(config.get(StateBackendOptions.LATENCY_TRACK_ENABLED))
+ this.setEnabled(config.get(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED))
.setSampleInterval(
- config.get(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL))
- .setHistorySize(config.get(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE))
+ config.get(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL))
+ .setHistorySize(config.get(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE))
.setStateNameAsVariable(
- config.get(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE));
+ config.get(
+ StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE));
return this;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 1cec488d3891e..86389a00f2c37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -48,7 +48,7 @@
import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
@@ -277,7 +277,7 @@ protected MetricGroup getMetricGroup() {
void testEnableStateLatencyTracking() throws Exception {
ConfigurableStateBackend stateBackend = getStateBackend();
Configuration config = new Configuration();
- config.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
+ config.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend configuredBackend =
stateBackend.configure(config, Thread.currentThread().getContextClassLoader());
KeyGroupRange groupRange = new KeyGroupRange(0, 1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java
index 4e5ac17ccd300..5c3d88f27bd78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateConfigTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.state.metrics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.junit.jupiter.api.Test;
@@ -46,11 +46,17 @@ void testDefaultEnabledLatencyTrackingStateConfig() {
.build();
assertThat(latencyTrackingStateConfig.isEnabled()).isTrue();
assertThat(latencyTrackingStateConfig.getSampleInterval())
- .isEqualTo((int) StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL.defaultValue());
+ .isEqualTo(
+ (int)
+ StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL
+ .defaultValue());
assertThat(latencyTrackingStateConfig.getHistorySize())
- .isEqualTo((long) StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue());
+ .isEqualTo(
+ (long) StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE.defaultValue());
assertThat(latencyTrackingStateConfig.isStateNameAsVariable())
- .isEqualTo(StateBackendOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE.defaultValue());
+ .isEqualTo(
+ StateLatencyTrackOptions.LATENCY_TRACK_STATE_NAME_AS_VARIABLE
+ .defaultValue());
}
@Test
@@ -72,9 +78,9 @@ void testSetLatencyTrackingStateConfig() {
void testConfigureFromReadableConfig() {
LatencyTrackingStateConfig.Builder builder = LatencyTrackingStateConfig.newBuilder();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
- configuration.set(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10);
- configuration.set(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE, 500);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, 10);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, 500);
LatencyTrackingStateConfig latencyTrackingStateConfig =
builder.configure(configuration)
.setMetricGroup(new UnregisteredMetricsGroup())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java
index 8054fa1fbf043..c03968033232d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metrics/LatencyTrackingStateTestBase.java
@@ -25,7 +25,7 @@
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
@@ -58,11 +58,11 @@ protected AbstractKeyedStateBackend createKeyedBackend(TypeSerializer keyS
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 127);
int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
- configuration.set(StateBackendOptions.LATENCY_TRACK_SAMPLE_INTERVAL, SAMPLE_INTERVAL);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_SAMPLE_INTERVAL, SAMPLE_INTERVAL);
// use a very large value to not let metrics data overridden.
int historySize = 1000_000;
- configuration.set(StateBackendOptions.LATENCY_TRACK_HISTORY_SIZE, historySize);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_HISTORY_SIZE, historySize);
HashMapStateBackend stateBackend =
new HashMapStateBackend()
.configure(configuration, Thread.currentThread().getContextClassLoader());
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
index 909c3c6170480..e2e11497565f7 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendTest;
import org.apache.flink.runtime.execution.Environment;
@@ -105,7 +105,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend stateBackend =
getStateBackend()
.configure(configuration, Thread.currentThread().getContextClassLoader());
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java
index e306f8451f8ed..84f5dbe77e475 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateFileStateBackendTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -101,7 +101,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend stateBackend =
getStateBackend()
.configure(configuration, Thread.currentThread().getContextClassLoader());
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java
index 7de866250d6a7..bf01fe521904a 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
@@ -92,7 +92,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend stateBackend =
getStateBackend()
.configure(configuration, Thread.currentThread().getContextClassLoader());
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java
index ebcd5a4804b5c..91b7c3baab87e 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateMemoryStateBackendTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.configuration.StateLatencyTrackOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -101,7 +101,7 @@ public void testMaterializedRestoreWithWrappedState() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
Configuration configuration = new Configuration();
- configuration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, true);
+ configuration.set(StateLatencyTrackOptions.LATENCY_TRACK_ENABLED, true);
StateBackend stateBackend =
getStateBackend()
.configure(configuration, Thread.currentThread().getContextClassLoader());