Skip to content

Commit

Permalink
[FLINK-34624][state/changelog] Enable local recovery in ChangelogResc…
Browse files Browse the repository at this point in the history
…alingITCase (apache#24470)
  • Loading branch information
fredia authored and hanyuzheng7 committed May 6, 2024
1 parent ad108a9 commit 8605749
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Collection;
import java.util.Collections;

import static org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
import static org.apache.flink.runtime.testutils.PseudoRandomValueSelector.randomize;

/** A {@link StreamExecutionEnvironment} that executes its jobs on {@link MiniCluster}. */
Expand Down Expand Up @@ -125,41 +124,35 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio
}

// randomize ITTests for enabling state change log
if (isConfigurationSupportedByChangelog(miniCluster.getConfiguration())) {
if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
miniCluster.overrideRestoreModeForChangelogStateBackend();
}
} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
boolean enabled =
randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
if (enabled) {
// More situations about enabling periodic materialization should be tested
randomize(
conf,
StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED,
true,
true,
true,
false);
randomize(
conf,
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
Duration.ofMillis(100),
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(5));
miniCluster.overrideRestoreModeForChangelogStateBackend();
}
if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
miniCluster.overrideRestoreModeForChangelogStateBackend();
}
} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
boolean enabled =
randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false);
if (enabled) {
// More situations about enabling periodic materialization should be tested
randomize(
conf,
StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED,
true,
true,
true,
false);
randomize(
conf,
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
Duration.ofMillis(100),
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(5));
miniCluster.overrideRestoreModeForChangelogStateBackend();
}
}
}

private static boolean isConfigurationSupportedByChangelog(Configuration configuration) {
return !configuration.get(LOCAL_RECOVERY);
}

/**
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
* the given cluster with the given default parallelism.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions;
import org.apache.flink.core.testutils.OneShotLatch;
Expand Down Expand Up @@ -163,6 +164,8 @@ public void setup() throws Exception {
config.set(StateBackendOptions.STATE_BACKEND, currentBackend);
config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDB);
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
// todo: local rescaling is not supported by changelog.
config.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
config.set(CheckpointingOptions.LOCAL_RECOVERY, true);
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private Configuration configureJob(int parallelism, File cpDir) {
conf.set(CHECKPOINT_STORAGE, "filesystem");
conf.set(CHECKPOINTS_DIRECTORY, cpDir.toURI().toString());
conf.set(STATE_BACKEND, "hashmap");
conf.set(LOCAL_RECOVERY, false); // not supported by changelog
conf.set(LOCAL_RECOVERY, true);
// tune changelog
conf.set(PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10));
conf.set(PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofMinutes(3));
Expand Down

0 comments on commit 8605749

Please sign in to comment.