Skip to content

Commit

Permalink
[FLINK-36374][state/forst] Bundle forst statebackend in flink-dist an…
Browse files Browse the repository at this point in the history
…d provide shortcut to enable
  • Loading branch information
Zakelly committed Sep 26, 2024
1 parent 82582b3 commit 85e4ce2
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 30 deletions.
7 changes: 7 additions & 0 deletions flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ under the License.
<optional>${flink.markBundledAsOptional}</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-forst</artifactId>
<version>${project.version}</version>
<optional>${flink.markBundledAsOptional}</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-changelog</artifactId>
Expand Down
1 change: 1 addition & 0 deletions flink-dist/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This project bundles the following dependencies under the Apache Software Licens
- com.google.code.findbugs:jsr305:1.3.9
- com.twitter:chill-java:0.7.6
- com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0
- com.ververica:forstjni:0.1.0-beta
- commons-cli:commons-cli:1.5.0
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.15.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class StateBackendLoader {
private static final String ROCKSDB_STATE_BACKEND_FACTORY =
"org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory";

/** Used for loading ForStStateBackend. */
private static final String FORST_STATE_BACKEND_FACTORY =
"org.apache.flink.state.forst.ForStStateBackendFactory";

// ------------------------------------------------------------------------
// Configuration shortcut names
// ------------------------------------------------------------------------
Expand All @@ -77,6 +81,8 @@ public class StateBackendLoader {
/** The shortcut configuration name for the RocksDB State Backend. */
public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";

public static final String FORST_STATE_BACKEND_NAME = "forst";

// ------------------------------------------------------------------------
// Loading the state backend from a configuration
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -144,38 +150,45 @@ public static StateBackend loadStateBackendFromConfig(
case ROCKSDB_STATE_BACKEND_NAME:
factoryClassName = ROCKSDB_STATE_BACKEND_FACTORY;

// fall through to the 'default' case that uses reflection to load the backend
// fall through to the case that uses reflection to load the backend
// that way we can keep RocksDB in a separate module
break;

default:
if (logger != null) {
logger.info("Loading state backend via factory {}", factoryClassName);
}
case FORST_STATE_BACKEND_NAME:
factoryClassName = FORST_STATE_BACKEND_FACTORY;

StateBackendFactory<?> factory;
try {
@SuppressWarnings("rawtypes")
Class<? extends StateBackendFactory> clazz =
Class.forName(factoryClassName, false, classLoader)
.asSubclass(StateBackendFactory.class);

factory = clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new DynamicCodeLoadingException(
"Cannot find configured state backend factory class: " + backendName,
e);
} catch (ClassCastException | InstantiationException | IllegalAccessException e) {
throw new DynamicCodeLoadingException(
"The class configured under '"
+ StateBackendOptions.STATE_BACKEND.key()
+ "' is not a valid state backend factory ("
+ backendName
+ ')',
e);
}
// fall through to the case that uses reflection to load the backend
// that way we can keep ForSt in a separate module
break;
}

return factory.createFromConfig(config, classLoader);
// The reflection loading path
if (logger != null) {
logger.info("Loading state backend via factory {}", factoryClassName);
}

StateBackendFactory<?> factory;
try {
@SuppressWarnings("rawtypes")
Class<? extends StateBackendFactory> clazz =
Class.forName(factoryClassName, false, classLoader)
.asSubclass(StateBackendFactory.class);

factory = clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new DynamicCodeLoadingException(
"Cannot find configured state backend factory class: " + backendName, e);
} catch (ClassCastException | InstantiationException | IllegalAccessException e) {
throw new DynamicCodeLoadingException(
"The class configured under '"
+ StateBackendOptions.STATE_BACKEND.key()
+ "' is not a valid state backend factory ("
+ backendName
+ ')',
e);
}

return factory.createFromConfig(config, classLoader);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,27 @@ public void testLoadForStStateBackend() throws Exception {
final String localDirs = localDir1 + File.pathSeparator + localDir2;
final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();

// TODO: Support short name of backendKey

final Configuration config1 = new Configuration();
config1.setString(backendKey, ForStStateBackendFactory.class.getName());
config1.setString(backendKey, "forst");
config1.set(ForStOptions.LOCAL_DIRECTORIES, localDirs);
config1.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);

final Configuration config2 = new Configuration();
config2.setString(backendKey, ForStStateBackendFactory.class.getName());
config2.set(ForStOptions.LOCAL_DIRECTORIES, localDirs);
config2.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);

StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

assertTrue(backend1 instanceof ForStStateBackend);
assertTrue(backend2 instanceof ForStStateBackend);

ForStStateBackend fs1 = (ForStStateBackend) backend1;
ForStStateBackend fs2 = (ForStStateBackend) backend1;

checkPaths(fs1.getLocalDbStoragePaths(), localDir1, localDir2);
checkPaths(fs2.getLocalDbStoragePaths(), localDir1, localDir2);
}

/**
Expand Down

0 comments on commit 85e4ce2

Please sign in to comment.