diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
similarity index 95%
rename from flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index d5967fc35edfc..6808861880c20 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -18,8 +18,6 @@
package org.apache.flink.util.function;
-import org.apache.flink.util.ExceptionUtils;
-
import java.util.function.BiConsumer;
/**
@@ -56,7 +54,7 @@ static BiConsumer unchecked(
try {
biConsumerWithException.accept(a, b);
} catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ ThrowingExceptionUtils.rethrow(t);
}
};
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
similarity index 96%
rename from flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 914a6d7cf7ab6..2467542cb1fd3 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -18,8 +18,6 @@
package org.apache.flink.util.function;
-import org.apache.flink.util.ExceptionUtils;
-
import java.util.function.BiFunction;
/**
@@ -58,7 +56,7 @@ static BiFunction unchecked(
try {
return biFunctionWithException.apply(a, b);
} catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ ThrowingExceptionUtils.rethrow(t);
// we need this to appease the compiler :-(
return null;
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java b/flink-core-api/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
similarity index 94%
rename from flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
index 38f9008926b99..f1ea67572ac7b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/CheckedSupplier.java
@@ -18,8 +18,6 @@
package org.apache.flink.util.function;
-import org.apache.flink.util.FlinkException;
-
import java.util.function.Supplier;
/** Similar to {@link java.util.function.Supplier} but can throw {@link Exception}. */
@@ -41,7 +39,7 @@ static CheckedSupplier checked(Supplier supplier) {
try {
return supplier.get();
} catch (RuntimeException e) {
- throw new FlinkException(e);
+ throw new Exception(e);
}
};
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/FunctionWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/FunctionWithException.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/LongFunctionWithException.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/QuadConsumer.java b/flink-core-api/src/main/java/org/apache/flink/util/function/QuadConsumer.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/QuadConsumer.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/QuadConsumer.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/QuadFunction.java b/flink-core-api/src/main/java/org/apache/flink/util/function/QuadFunction.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/QuadFunction.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/QuadFunction.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/RunnableWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/RunnableWithException.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/RunnableWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/RunnableWithException.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SerializableFunction.java b/flink-core-api/src/main/java/org/apache/flink/util/function/SerializableFunction.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/SerializableFunction.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/SerializableFunction.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplier.java b/flink-core-api/src/main/java/org/apache/flink/util/function/SerializableSupplier.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplier.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/SerializableSupplier.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/SerializableSupplierWithException.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/SupplierWithException.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/SupplierWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/SupplierWithException.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java b/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingConsumer.java
diff --git a/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingExceptionUtils.java b/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingExceptionUtils.java
new file mode 100644
index 0000000000000..c432c3a152207
--- /dev/null
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingExceptionUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+/** Exception utils only for this package. */
+class ThrowingExceptionUtils {
+
+ /**
+ * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
+ * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other
+ * exceptions are packed into runtime exceptions
+ *
+ * @param t The throwable to be thrown.
+ */
+ static void rethrow(Throwable t) {
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private ThrowingExceptionUtils() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java b/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
similarity index 95%
rename from flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 50ad30aa25514..69329177347f6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,7 +19,6 @@
package org.apache.flink.util.function;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.ExceptionUtils;
/**
* Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed.
@@ -48,7 +47,7 @@ static Runnable unchecked(ThrowingRunnable> throwingRunnable) {
try {
throwingRunnable.run();
} catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ ThrowingExceptionUtils.rethrow(t);
}
};
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/TriConsumer.java b/flink-core-api/src/main/java/org/apache/flink/util/function/TriConsumer.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/TriConsumer.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/TriConsumer.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
similarity index 96%
rename from flink-core/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
index 36a0838ab820a..135e18903dd66 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
@@ -18,8 +18,6 @@
package org.apache.flink.util.function;
-import org.apache.flink.util.ExceptionUtils;
-
/**
* A checked extension of the {@link TriConsumer} interface.
*
@@ -57,7 +55,7 @@ static TriConsumer unchecked(
try {
triConsumerWithException.accept(a, b, c);
} catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ ThrowingExceptionUtils.rethrow(t);
}
};
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/TriFunction.java b/flink-core-api/src/main/java/org/apache/flink/util/function/TriFunction.java
similarity index 100%
rename from flink-core/src/main/java/org/apache/flink/util/function/TriFunction.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/TriFunction.java
diff --git a/flink-core/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java b/flink-core-api/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
similarity index 96%
rename from flink-core/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
rename to flink-core-api/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
index 12d78f25db0c3..aa7f85a74f3b2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
+++ b/flink-core-api/src/main/java/org/apache/flink/util/function/TriFunctionWithException.java
@@ -19,7 +19,6 @@
package org.apache.flink.util.function;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.ExceptionUtils;
import java.util.function.BiFunction;
@@ -63,7 +62,7 @@ static TriFunction unchecked(
try {
return triFunctionWithException.apply(a, b, c);
} catch (Throwable t) {
- ExceptionUtils.rethrow(t);
+ ThrowingExceptionUtils.rethrow(t);
// we need this to appease the compiler :-(
return null;
}
diff --git a/pom.xml b/pom.xml
index b803a1afef28f..afb48c510bac9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2365,6 +2365,12 @@ under the License.
org.apache.flink.api.common.functions.Function
org.apache.flink.api.java.functions.KeySelector
+ org.apache.flink.util.function.FunctionWithException
+ org.apache.flink.util.function.LongFunctionWithException
+ org.apache.flink.util.function.RunnableWithException
+ org.apache.flink.util.function.SerializableFunction
+ org.apache.flink.util.function.SupplierWithException
+ org.apache.flink.util.function.ThrowingConsumer
public