From 860574966d9f9814dda4fcd03d9d38dc6b752500 Mon Sep 17 00:00:00 2001 From: Yanfei Lei Date: Tue, 12 Mar 2024 11:28:35 +0800 Subject: [PATCH] [FLINK-34624][state/changelog] Enable local recovery in ChangelogRescalingITCase (#24470) --- .../streaming/util/TestStreamEnvironment.java | 57 ++++++++----------- .../checkpointing/AutoRescalingITCase.java | 3 + .../test/state/ChangelogRescalingITCase.java | 2 +- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index e8f9f2578f6c97..58ddd3796524e9 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -35,7 +35,6 @@ import java.util.Collection; import java.util.Collections; -import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY; import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize; /** A {@link StreamExecutionEnvironment} that executes its jobs on {@link MiniCluster}. */ @@ -125,41 +124,35 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio } // randomize ITTests for enabling state change log - if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) { - if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { - if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { - conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); - miniCluster.overrideRestoreModeForChangelogStateBackend(); - } - } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { - boolean enabled = - randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); - if (enabled) { - // More situations about enabling periodic materialization should be tested - randomize( - conf, - StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, - true, - true, - true, - false); - randomize( - conf, - StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, - Duration.ofMillis(100), - Duration.ofMillis(500), - Duration.ofSeconds(1), - Duration.ofSeconds(5)); - miniCluster.overrideRestoreModeForChangelogStateBackend(); - } + if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { + if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { + conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); + miniCluster.overrideRestoreModeForChangelogStateBackend(); + } + } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { + boolean enabled = + randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); + if (enabled) { + // More situations about enabling periodic materialization should be tested + randomize( + conf, + StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, + true, + true, + true, + false); + randomize( + conf, + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(100), + Duration.ofMillis(500), + Duration.ofSeconds(1), + Duration.ofSeconds(5)); + miniCluster.overrideRestoreModeForChangelogStateBackend(); } } } - private static boolean isConfigurationSupportedByChangelog(Configuration configuration) { - return !configuration.get(LOCAL_RECOVERY); - } - /** * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on * the given cluster with the given default parallelism. diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 404e129eb4e97b..d5136f2678f3df 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions; import org.apache.flink.core.testutils.OneShotLatch; @@ -163,6 +164,8 @@ public void setup() throws Exception { config.set(StateBackendOptions.STATE_BACKEND, currentBackend); config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDB); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); + // todo: local rescaling is not supported by changelog. + config.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false); config.set(CheckpointingOptions.LOCAL_RECOVERY, true); config.set( CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index c259f2bf715e02..bacdad4a1fd3fe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -220,7 +220,7 @@ private Configuration configureJob(int parallelism, File cpDir) { conf.set(CHECKPOINT_STORAGE, "filesystem"); conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString()); conf.set(STATE_BACKEND, "hashmap"); - conf.set(LOCAL_RECOVERY, false); // not supported by changelog + conf.set(LOCAL_RECOVERY, true); // tune changelog conf.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10)); conf.set(PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofMinutes(3));