From 310bf0974a70cf5833c82e700d0a45e7e8919d6e Mon Sep 17 00:00:00 2001 From: Zakelly Date: Fri, 27 Sep 2024 14:42:18 +0800 Subject: [PATCH] [FLINK-36395][state/forst] Allow interrupt in forst library load --- .../flink/state/forst/ForStStateBackend.java | 68 ++++++++++++------- .../flink/state/forst/ForStInitITCase.java | 4 +- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 2dc2febfd40c8f..8d3046557a94f4 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -46,6 +46,7 @@ import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.FutureUtils; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; @@ -65,6 +66,9 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; @@ -312,7 +316,7 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( // first, make sure that the ForSt JNI library is loaded // we do this explicitly here to have better error handling String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath(); - ensureForStIsLoaded(tempDir); + ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool()); // replace all characters that are not legal for filenames with underscore String fileCompatibleIdentifier = @@ -373,7 +377,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath(); - ensureForStIsLoaded(tempDir); + ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool()); // replace all characters that are not legal for filenames with underscore String fileCompatibleIdentifier = @@ -696,8 +700,8 @@ public String toString() { // ------------------------------------------------------------------------ @VisibleForTesting - static void ensureForStIsLoaded(String tempDirectory) throws IOException { - ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance); + static void ensureForStIsLoaded(String tempDirectory, Executor executor) throws IOException { + ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance, executor); } @VisibleForTesting @@ -707,7 +711,9 @@ static void setForStInitialized(boolean initialized) { @VisibleForTesting static void ensureForStIsLoaded( - String tempDirectory, Supplier nativeLibraryLoaderSupplier) + String tempDirectory, + Supplier nativeLibraryLoaderSupplier, + Executor executor) throws IOException { synchronized (ForStStateBackend.class) { if (!forStInitialized) { @@ -719,7 +725,7 @@ static void ensureForStIsLoaded( Throwable lastException = null; for (int attempt = 1; attempt <= FORST_LIB_LOADING_ATTEMPTS; attempt++) { - File rocksLibFolder = null; + AtomicReference rocksLibFolder = new AtomicReference<>(null); try { // when multiple instances of this class and ForSt exist in different // class loaders, then we can see the following exception: @@ -734,22 +740,38 @@ static void ensureForStIsLoaded( // loaders, but // apparently not when coming from the same file path, so there we go) - rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID()); - - // make sure the temp path exists - LOG.debug( - "Attempting to create ForSt native library folder {}", - rocksLibFolder); - // noinspection ResultOfMethodCallIgnored - rocksLibFolder.mkdirs(); - - // explicitly load the JNI dependency if it has not been loaded before - nativeLibraryLoaderSupplier - .get() - .loadLibrary(rocksLibFolder.getAbsolutePath()); - - // this initialization here should validate that the loading succeeded - RocksDB.loadLibrary(); + // We use an async procedure to load the library, to make current thread be + // able to interrupt for a fast quit. + CompletableFuture future = + FutureUtils.runAsync( + () -> { + File libFolder = + new File( + tempDirParent, + "rocksdb-lib-" + new AbstractID()); + rocksLibFolder.set(libFolder); + + // make sure the temp path exists + LOG.debug( + "Attempting to create ForSt native library folder {}", + libFolder); + // noinspection ResultOfMethodCallIgnored + libFolder.mkdirs(); + + // explicitly load the JNI dependency if it has not been + // loaded before + nativeLibraryLoaderSupplier + .get() + .loadLibrary(libFolder.getAbsolutePath()); + + // this initialization here should validate that the + // loading succeeded + RocksDB.loadLibrary(); + }, + executor); + + // wait for finish or be interrupted. + future.get(); // seems to have worked LOG.info("Successfully loaded ForSt native library"); @@ -768,7 +790,7 @@ static void ensureForStIsLoaded( tt); } - FileUtils.deleteDirectoryQuietly(rocksLibFolder); + FileUtils.deleteDirectoryQuietly(rocksLibFolder.get()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java index 37868ffa2ac2d1..a23762d0169608 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.concurrent.Executors; import org.junit.Assert; import org.junit.Rule; @@ -52,7 +53,8 @@ public void testTempLibFolderDeletedOnFail() throws Exception { tempFolder.getAbsolutePath(), () -> { throw new ExpectedTestException(); - }); + }, + Executors.directExecutor()); fail("Not throwing expected exception."); } catch (IOException ignored) { // ignored