classNames = Sets.newLinkedHashSet();
+
+ private Builder() {}
+
+ /**
+ * Set the {@link ClassLoader} used to lookup classes by name.
+ *
+ * If not set, the current thread's ClassLoader is used.
+ *
+ * @param newLoader a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader newLoader) {
+ return this;
+ }
+
+ /**
+ * Checks for an implementation of the class by name.
+ *
+ * @param className name of a class
+ * @return this Builder for method chaining
+ */
+ public Builder impl(String className) {
+ classNames.add(className);
+
+ if (foundClass != null) {
+ return this;
+ }
+
+ try {
+ this.foundClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ // not the right implementation
+ }
+
+ return this;
+ }
+
+ /**
+ * Instructs this builder to return null if no class is found, rather than throwing an
+ * Exception.
+ *
+ * @return this Builder for method chaining
+ */
+ public Builder orNull() {
+ this.nullOk = true;
+ return this;
+ }
+
+ /**
+ * Returns the first implementation or throws ClassNotFoundException if one was not found.
+ *
+ * @param Java superclass
+ * @return a {@link Class} for the first implementation found
+ * @throws ClassNotFoundException if no implementation was found
+ */
+ @SuppressWarnings("unchecked")
+ public Class extends S> buildChecked() throws ClassNotFoundException {
+ if (!nullOk && foundClass == null) {
+ throw new ClassNotFoundException(
+ "Cannot find class; alternatives: " + Joiner.on(", ").join(classNames));
+ }
+ return (Class extends S>) foundClass;
+ }
+
+ /**
+ * Returns the first implementation or throws RuntimeException if one was not found.
+ *
+ * @param Java superclass
+ * @return a {@link Class} for the first implementation found
+ * @throws RuntimeException if no implementation was found
+ */
+ @SuppressWarnings("unchecked")
+ public Class extends S> build() {
+ if (!nullOk && foundClass == null) {
+ throw new RuntimeException(
+ "Cannot find class; alternatives: " + Joiner.on(", ").join(classNames));
+ }
+ return (Class extends S>) foundClass;
+ }
+ }
+}
diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynConstructors.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynConstructors.java
new file mode 100644
index 0000000000000..61566f4e191ff
--- /dev/null
+++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynConstructors.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+/** Copied from parquet-common */
+public class DynConstructors {
+
+ private DynConstructors() {}
+
+ public static class Ctor extends DynMethods.UnboundMethod {
+ private final Constructor ctor;
+ private final Class extends C> constructed;
+
+ private Ctor(Constructor constructor, Class extends C> constructed) {
+ super(null, "newInstance");
+ this.ctor = constructor;
+ this.constructed = constructed;
+ }
+
+ public Class extends C> getConstructedClass() {
+ return constructed;
+ }
+
+ public C newInstanceChecked(Object... args) throws Exception {
+ try {
+ if (args.length > ctor.getParameterCount()) {
+ return ctor.newInstance(Arrays.copyOfRange(args, 0, ctor.getParameterCount()));
+ } else {
+ return ctor.newInstance(args);
+ }
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw e;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
+ Throwables.propagateIfInstanceOf(e.getCause(), RuntimeException.class);
+ throw Throwables.propagate(e.getCause());
+ }
+ }
+
+ public C newInstance(Object... args) {
+ try {
+ return newInstanceChecked(args);
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, RuntimeException.class);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public R invoke(Object target, Object... args) {
+ Preconditions.checkArgument(
+ target == null, "Invalid call to constructor: target must be null");
+ return (R) newInstance(args);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public R invokeChecked(Object target, Object... args) throws Exception {
+ Preconditions.checkArgument(
+ target == null, "Invalid call to constructor: target must be null");
+ return (R) newInstanceChecked(args);
+ }
+
+ @Override
+ public DynMethods.BoundMethod bind(Object receiver) {
+ throw new IllegalStateException("Cannot bind constructors");
+ }
+
+ @Override
+ public boolean isStatic() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(constructor=" + ctor + ", class=" + constructed + ")";
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static Builder builder(Class> baseClass) {
+ return new Builder(baseClass);
+ }
+
+ public static class Builder {
+ private final Class> baseClass;
+ private Ctor ctor = null;
+ private Map problems = Maps.newHashMap();
+
+ public Builder(Class> baseClass) {
+ this.baseClass = baseClass;
+ }
+
+ public Builder() {
+ this.baseClass = null;
+ }
+
+ /**
+ * Set the {@link ClassLoader} used to lookup classes by name.
+ *
+ * If not set, the current thread's ClassLoader is used.
+ *
+ * @param newLoader a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader newLoader) {
+ return this;
+ }
+
+ public Builder impl(String className, Class>... types) {
+ // don't do any work if an implementation has been found
+ if (ctor != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className);
+ impl(targetClass, types);
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
+ // cannot load this implementation
+ problems.put(className, e);
+ }
+ return this;
+ }
+
+ public Builder impl(Class targetClass, Class>... types) {
+ // don't do any work if an implementation has been found
+ if (ctor != null) {
+ return this;
+ }
+
+ try {
+ ctor = new Ctor(targetClass.getConstructor(types), targetClass);
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ problems.put(methodName(targetClass, types), e);
+ }
+ return this;
+ }
+
+ public Builder hiddenImpl(Class>... types) {
+ hiddenImpl(baseClass, types);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Builder hiddenImpl(String className, Class>... types) {
+ // don't do any work if an implementation has been found
+ if (ctor != null) {
+ return this;
+ }
+
+ try {
+ Class targetClass = Class.forName(className);
+ hiddenImpl(targetClass, types);
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
+ // cannot load this implementation
+ problems.put(className, e);
+ }
+ return this;
+ }
+
+ public Builder hiddenImpl(Class targetClass, Class>... types) {
+ // don't do any work if an implementation has been found
+ if (ctor != null) {
+ return this;
+ }
+
+ try {
+ Constructor hidden = targetClass.getDeclaredConstructor(types);
+ AccessController.doPrivileged(new MakeAccessible(hidden));
+ ctor = new Ctor(hidden, targetClass);
+ } catch (SecurityException e) {
+ // unusable
+ problems.put(methodName(targetClass, types), e);
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ problems.put(methodName(targetClass, types), e);
+ }
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Ctor buildChecked() throws NoSuchMethodException {
+ if (ctor != null) {
+ return ctor;
+ }
+ throw buildCheckedException(baseClass, problems);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Ctor build() {
+ if (ctor != null) {
+ return ctor;
+ }
+ throw buildRuntimeException(baseClass, problems);
+ }
+ }
+
+ private static class MakeAccessible implements PrivilegedAction {
+ private Constructor> hidden;
+
+ MakeAccessible(Constructor> hidden) {
+ this.hidden = hidden;
+ }
+
+ @Override
+ public Void run() {
+ hidden.setAccessible(true);
+ return null;
+ }
+ }
+
+ private static NoSuchMethodException buildCheckedException(
+ Class> baseClass, Map problems) {
+ NoSuchMethodException exc =
+ new NoSuchMethodException(
+ "Cannot find constructor for " + baseClass + "\n" + formatProblems(problems));
+ problems.values().forEach(exc::addSuppressed);
+ return exc;
+ }
+
+ private static RuntimeException buildRuntimeException(
+ Class> baseClass, Map problems) {
+ RuntimeException exc =
+ new RuntimeException(
+ "Cannot find constructor for " + baseClass + "\n" + formatProblems(problems));
+ problems.values().forEach(exc::addSuppressed);
+ return exc;
+ }
+
+ private static String formatProblems(Map problems) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Map.Entry problem : problems.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append("\n");
+ }
+ sb.append("\tMissing ")
+ .append(problem.getKey())
+ .append(" [")
+ .append(problem.getValue().getClass().getName())
+ .append(": ")
+ .append(problem.getValue().getMessage())
+ .append("]");
+ }
+ return sb.toString();
+ }
+
+ private static String methodName(Class> targetClass, Class>... types) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(targetClass.getName()).append("(");
+ boolean first = true;
+ for (Class> type : types) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(",");
+ }
+ sb.append(type.getName());
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+}
diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynFields.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynFields.java
new file mode 100644
index 0000000000000..80b6af8cb7f93
--- /dev/null
+++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynFields.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Set;
+
+public class DynFields {
+
+ private DynFields() {}
+
+ /**
+ * Convenience wrapper class around {@link Field}.
+ *
+ * Allows callers to invoke the wrapped method with all Exceptions wrapped by RuntimeException,
+ * or with a single Exception catch block.
+ */
+ public static class UnboundField {
+ private final Field field;
+ private final String name;
+
+ private UnboundField(Field field, String name) {
+ this.field = field;
+ this.name = name;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T get(Object target) {
+ try {
+ return (T) field.get(target);
+ } catch (IllegalAccessException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public void set(Object target, T value) {
+ try {
+ field.set(target, value);
+ } catch (IllegalAccessException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("class", field.getDeclaringClass().toString())
+ .add("name", name)
+ .add("type", field.getType())
+ .toString();
+ }
+
+ /**
+ * Returns this method as a BoundMethod for the given receiver.
+ *
+ * @param target an Object on which to get or set this field
+ * @return a {@link BoundField} for this field and the target
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ */
+ public BoundField bind(Object target) {
+ Preconditions.checkState(
+ !isStatic() || this == AlwaysNull.INSTANCE, "Cannot bind static field %s", name);
+ Preconditions.checkArgument(
+ field.getDeclaringClass().isAssignableFrom(target.getClass()),
+ "Cannot bind field %s to instance of %s",
+ name,
+ target.getClass());
+
+ return new BoundField<>(this, target);
+ }
+
+ /**
+ * Returns this field as a StaticField.
+ *
+ * @return a {@link StaticField} for this field
+ * @throws IllegalStateException if the method is not static
+ */
+ public StaticField asStatic() {
+ Preconditions.checkState(isStatic(), "Field %s is not static", name);
+ return new StaticField<>(this);
+ }
+
+ /** Returns whether the field is a static field. */
+ public boolean isStatic() {
+ return Modifier.isStatic(field.getModifiers());
+ }
+
+ /** Returns whether the field is always null. */
+ public boolean isAlwaysNull() {
+ return this == AlwaysNull.INSTANCE;
+ }
+ }
+
+ private static class AlwaysNull extends UnboundField {
+ private static final AlwaysNull INSTANCE = new AlwaysNull();
+
+ private AlwaysNull() {
+ super(null, "AlwaysNull");
+ }
+
+ @Override
+ public Void get(Object target) {
+ return null;
+ }
+
+ @Override
+ public void set(Object target, Void value) {}
+
+ @Override
+ public String toString() {
+ return "Field(AlwaysNull)";
+ }
+
+ @Override
+ public boolean isStatic() {
+ return true;
+ }
+
+ @Override
+ public boolean isAlwaysNull() {
+ return true;
+ }
+ }
+
+ public static class StaticField {
+ private final UnboundField field;
+
+ private StaticField(UnboundField field) {
+ this.field = field;
+ }
+
+ public T get() {
+ return field.get(null);
+ }
+
+ public void set(T value) {
+ field.set(null, value);
+ }
+ }
+
+ public static class BoundField {
+ private final UnboundField field;
+ private final Object target;
+
+ private BoundField(UnboundField field, Object target) {
+ this.field = field;
+ this.target = target;
+ }
+
+ public T get() {
+ return field.get(target);
+ }
+
+ public void set(T value) {
+ field.set(target, value);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private UnboundField> field = null;
+ private final Set candidates = Sets.newHashSet();
+ private boolean defaultAlwaysNull = false;
+
+ private Builder() {}
+
+ /**
+ * Set the {@link ClassLoader} used to lookup classes by name.
+ *
+ * If not set, the current thread's ClassLoader is used.
+ *
+ * @param newLoader a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader newLoader) {
+ return this;
+ }
+
+ /**
+ * Instructs this builder to return AlwaysNull if no implementation is found.
+ *
+ * @return this Builder for method chaining
+ */
+ public Builder defaultAlwaysNull() {
+ this.defaultAlwaysNull = true;
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the class by name.
+ *
+ * @param className name of a class
+ * @param fieldName name of the field
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getField(String)
+ */
+ public Builder impl(String className, String fieldName) {
+ // don't do any work if an implementation has been found
+ if (field != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className);
+ impl(targetClass, fieldName);
+ } catch (ClassNotFoundException e) {
+ // not the right implementation
+ candidates.add(className + "." + fieldName);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation.
+ *
+ * @param targetClass a class instance
+ * @param fieldName name of a field (different from constructor)
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getField(String)
+ */
+ public Builder impl(Class> targetClass, String fieldName) {
+ // don't do any work if an implementation has been found
+ if (field != null || targetClass == null) {
+ return this;
+ }
+
+ try {
+ this.field = new UnboundField<>(targetClass.getField(fieldName), fieldName);
+ } catch (NoSuchFieldException e) {
+ // not the right implementation
+ candidates.add(targetClass.getName() + "." + fieldName);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a hidden implementation, first finding the class by name.
+ *
+ * @param className name of a class
+ * @param fieldName name of a field (different from constructor)
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getField(String)
+ */
+ public Builder hiddenImpl(String className, String fieldName) {
+ // don't do any work if an implementation has been found
+ if (field != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className);
+ hiddenImpl(targetClass, fieldName);
+ } catch (ClassNotFoundException e) {
+ // not the right implementation
+ candidates.add(className + "." + fieldName);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a hidden implementation.
+ *
+ * @param targetClass a class instance
+ * @param fieldName name of a field (different from constructor)
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getField(String)
+ */
+ public Builder hiddenImpl(Class> targetClass, String fieldName) {
+ // don't do any work if an implementation has been found
+ if (field != null || targetClass == null) {
+ return this;
+ }
+
+ try {
+ Field hidden = targetClass.getDeclaredField(fieldName);
+ AccessController.doPrivileged(new MakeFieldAccessible(hidden));
+ this.field = new UnboundField(hidden, fieldName);
+ } catch (SecurityException | NoSuchFieldException e) {
+ // unusable
+ candidates.add(targetClass.getName() + "." + fieldName);
+ }
+ return this;
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundField or throws a NoSuchFieldException if
+ * there is none.
+ *
+ * @param Java class stored in the field
+ * @return a {@link UnboundField} with a valid implementation
+ * @throws NoSuchFieldException if no implementation was found
+ */
+ @SuppressWarnings("unchecked")
+ public UnboundField buildChecked() throws NoSuchFieldException {
+ if (field != null) {
+ return (UnboundField) field;
+ } else if (defaultAlwaysNull) {
+ return (UnboundField) AlwaysNull.INSTANCE;
+ } else {
+ throw new NoSuchFieldException(
+ "Cannot find field from candidates: " + Joiner.on(", ").join(candidates));
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a NoSuchMethodException if
+ * there is none.
+ *
+ * @param target an Object on which to get and set the field
+ * @param Java class stored in the field
+ * @return a {@link BoundField} with a valid implementation and target
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws NoSuchFieldException if no implementation was found
+ */
+ public BoundField buildChecked(Object target) throws NoSuchFieldException {
+ return this.buildChecked().bind(target);
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundField or throws a NoSuchFieldException if
+ * there is none.
+ *
+ * @param Java class stored in the field
+ * @return a {@link UnboundField} with a valid implementation
+ * @throws RuntimeException if no implementation was found
+ */
+ @SuppressWarnings("unchecked")
+ public UnboundField build() {
+ if (field != null) {
+ return (UnboundField) field;
+ } else if (defaultAlwaysNull) {
+ return (UnboundField) AlwaysNull.INSTANCE;
+ } else {
+ throw new RuntimeException(
+ "Cannot find field from candidates: " + Joiner.on(", ").join(candidates));
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a RuntimeException if there
+ * is none.
+ *
+ * @param target an Object on which to get and set the field
+ * @param Java class stored in the field
+ * @return a {@link BoundField} with a valid implementation and target
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws RuntimeException if no implementation was found
+ */
+ public BoundField build(Object target) {
+ return this.build().bind(target);
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticField or throws a NoSuchFieldException if
+ * there is none.
+ *
+ * @param Java class stored in the field
+ * @return a {@link StaticField} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws NoSuchFieldException if no implementation was found
+ */
+ public StaticField buildStaticChecked() throws NoSuchFieldException {
+ return this.buildChecked().asStatic();
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticField or throws a RuntimeException if there
+ * is none.
+ *
+ * @param Java class stored in the field
+ * @return a {@link StaticField} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws RuntimeException if no implementation was found
+ */
+ public StaticField buildStatic() {
+ return this.build().asStatic();
+ }
+ }
+
+ private static class MakeFieldAccessible implements PrivilegedAction {
+ private Field hidden;
+
+ MakeFieldAccessible(Field hidden) {
+ this.hidden = hidden;
+ }
+
+ @Override
+ public Void run() {
+ hidden.setAccessible(true);
+ return null;
+ }
+ }
+}
diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynMethods.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynMethods.java
new file mode 100644
index 0000000000000..281c3d34ed304
--- /dev/null
+++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/DynMethods.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.common;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+
+/** Copied from parquet-common */
+public class DynMethods {
+
+ private DynMethods() {}
+
+ /**
+ * Convenience wrapper class around {@link Method}.
+ *
+ * Allows callers to invoke the wrapped method with all Exceptions wrapped by RuntimeException,
+ * or with a single Exception catch block.
+ */
+ public static class UnboundMethod {
+
+ private final Method method;
+ private final String name;
+ private final int argLength;
+
+ UnboundMethod(Method method, String name) {
+ this.method = method;
+ this.name = name;
+ this.argLength =
+ (method == null || method.isVarArgs()) ? -1 : method.getParameterTypes().length;
+ }
+
+ @SuppressWarnings("unchecked")
+ public R invokeChecked(Object target, Object... args) throws Exception {
+ try {
+ if (argLength < 0) {
+ return (R) method.invoke(target, args);
+ } else {
+ return (R) method.invoke(target, Arrays.copyOfRange(args, 0, argLength));
+ }
+
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
+ Throwables.propagateIfInstanceOf(e.getCause(), RuntimeException.class);
+ throw Throwables.propagate(e.getCause());
+ }
+ }
+
+ public R invoke(Object target, Object... args) {
+ try {
+ return this.invokeChecked(target, args);
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, RuntimeException.class);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Returns this method as a BoundMethod for the given receiver.
+ *
+ * @param receiver an Object to receive the method invocation
+ * @return a {@link BoundMethod} for this method and the receiver
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ */
+ public BoundMethod bind(Object receiver) {
+ Preconditions.checkState(
+ !isStatic(), "Cannot bind static method %s", method.toGenericString());
+ Preconditions.checkArgument(
+ method.getDeclaringClass().isAssignableFrom(receiver.getClass()),
+ "Cannot bind %s to instance of %s",
+ method.toGenericString(),
+ receiver.getClass());
+
+ return new BoundMethod(this, receiver);
+ }
+
+ /** Returns whether the method is a static method. */
+ public boolean isStatic() {
+ return Modifier.isStatic(method.getModifiers());
+ }
+
+ /** Returns whether the method is a noop. */
+ public boolean isNoop() {
+ return this == NOOP;
+ }
+
+ /**
+ * Returns this method as a StaticMethod.
+ *
+ * @return a {@link StaticMethod} for this method
+ * @throws IllegalStateException if the method is not static
+ */
+ public StaticMethod asStatic() {
+ Preconditions.checkState(isStatic(), "Method is not static");
+ return new StaticMethod(this);
+ }
+
+ @Override
+ public String toString() {
+ return "DynMethods.UnboundMethod(name=" + name + " method=" + method.toGenericString() + ")";
+ }
+
+ /** Singleton {@link UnboundMethod}, performs no operation and returns null. */
+ private static final UnboundMethod NOOP =
+ new UnboundMethod(null, "NOOP") {
+ @Override
+ public R invokeChecked(Object target, Object... args) throws Exception {
+ return null;
+ }
+
+ @Override
+ public BoundMethod bind(Object receiver) {
+ return new BoundMethod(this, receiver);
+ }
+
+ @Override
+ public StaticMethod asStatic() {
+ return new StaticMethod(this);
+ }
+
+ @Override
+ public boolean isStatic() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "DynMethods.UnboundMethod(NOOP)";
+ }
+ };
+ }
+
+ public static class BoundMethod {
+ private final UnboundMethod method;
+ private final Object receiver;
+
+ private BoundMethod(UnboundMethod method, Object receiver) {
+ this.method = method;
+ this.receiver = receiver;
+ }
+
+ public R invokeChecked(Object... args) throws Exception {
+ return method.invokeChecked(receiver, args);
+ }
+
+ public R invoke(Object... args) {
+ return method.invoke(receiver, args);
+ }
+ }
+
+ public static class StaticMethod {
+ private final UnboundMethod method;
+
+ private StaticMethod(UnboundMethod method) {
+ this.method = method;
+ }
+
+ public R invokeChecked(Object... args) throws Exception {
+ return method.invokeChecked(null, args);
+ }
+
+ public R invoke(Object... args) {
+ return method.invoke(null, args);
+ }
+ }
+
+ /**
+ * Constructs a new builder for calling methods dynamically.
+ *
+ * @param methodName name of the method the builder will locate
+ * @return a Builder for finding a method
+ */
+ public static Builder builder(String methodName) {
+ return new Builder(methodName);
+ }
+
+ public static class Builder {
+ private final String name;
+ private UnboundMethod method = null;
+
+ public Builder(String methodName) {
+ this.name = methodName;
+ }
+
+ /**
+ * Set the {@link ClassLoader} used to lookup classes by name.
+ *
+ * If not set, the current thread's ClassLoader is used.
+ *
+ * @param newLoader a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader newLoader) {
+ return this;
+ }
+
+ /**
+ * If no implementation has been found, adds a NOOP method.
+ *
+ *
Note: calls to impl will not match after this method is called!
+ *
+ * @return this Builder for method chaining
+ */
+ public Builder orNoop() {
+ if (method == null) {
+ this.method = UnboundMethod.NOOP;
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ * @param className name of a class
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder impl(String className, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className);
+ impl(targetClass, methodName, argClasses);
+ } catch (ClassNotFoundException e) {
+ // not the right implementation
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ *
The name passed to the constructor is the method name used.
+ *
+ * @param className name of a class
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder impl(String className, Class>... argClasses) {
+ impl(className, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ * @param targetClass a class instance
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder impl(Class> targetClass, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new UnboundMethod(targetClass.getMethod(methodName, argClasses), name);
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ *
The name passed to the constructor is the method name used.
+ *
+ * @param targetClass a class instance
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder impl(Class> targetClass, Class>... argClasses) {
+ impl(targetClass, name, argClasses);
+ return this;
+ }
+
+ public Builder ctorImpl(Class> targetClass, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new DynConstructors.Builder().impl(targetClass, argClasses).buildChecked();
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ }
+ return this;
+ }
+
+ public Builder ctorImpl(String className, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new DynConstructors.Builder().impl(className, argClasses).buildChecked();
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ * @param className name of a class
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder hiddenImpl(String className, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className );
+ hiddenImpl(targetClass, methodName, argClasses);
+ } catch (ClassNotFoundException e) {
+ // not the right implementation
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ *
The name passed to the constructor is the method name used.
+ *
+ * @param className name of a class
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder hiddenImpl(String className, Class>... argClasses) {
+ hiddenImpl(className, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ * @param targetClass a class instance
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder hiddenImpl(Class> targetClass, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Method hidden = targetClass.getDeclaredMethod(methodName, argClasses);
+ AccessController.doPrivileged(new MakeAccessible(hidden));
+ this.method = new UnboundMethod(hidden, name);
+ } catch (SecurityException | NoSuchMethodException e) {
+ // unusable or not the right implementation
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ *
The name passed to the constructor is the method name used.
+ *
+ * @param targetClass a class instance
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ * @see Class#forName(String)
+ * @see Class#getMethod(String, Class[])
+ */
+ public Builder hiddenImpl(Class> targetClass, Class>... argClasses) {
+ hiddenImpl(targetClass, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundMethod or throws a RuntimeError if there
+ * is none.
+ *
+ * @return a {@link UnboundMethod} with a valid implementation
+ * @throws RuntimeException if no implementation was found
+ */
+ public UnboundMethod build() {
+ if (method != null) {
+ return method;
+ } else {
+ throw new RuntimeException("Cannot find method: " + name);
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a RuntimeError if there is
+ * none.
+ *
+ * @param receiver an Object to receive the method invocation
+ * @return a {@link BoundMethod} with a valid implementation and receiver
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws RuntimeException if no implementation was found
+ */
+ public BoundMethod build(Object receiver) {
+ return build().bind(receiver);
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundMethod or throws a NoSuchMethodException
+ * if there is none.
+ *
+ * @return a {@link UnboundMethod} with a valid implementation
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public UnboundMethod buildChecked() throws NoSuchMethodException {
+ if (method != null) {
+ return method;
+ } else {
+ throw new NoSuchMethodException("Cannot find method: " + name);
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a NoSuchMethodException if
+ * there is none.
+ *
+ * @param receiver an Object to receive the method invocation
+ * @return a {@link BoundMethod} with a valid implementation and receiver
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public BoundMethod buildChecked(Object receiver) throws NoSuchMethodException {
+ return buildChecked().bind(receiver);
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticMethod or throws a NoSuchMethodException if
+ * there is none.
+ *
+ * @return a {@link StaticMethod} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public StaticMethod buildStaticChecked() throws NoSuchMethodException {
+ return buildChecked().asStatic();
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticMethod or throws a RuntimeException if
+ * there is none.
+ *
+ * @return a {@link StaticMethod} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws RuntimeException if no implementation was found
+ */
+ public StaticMethod buildStatic() {
+ return build().asStatic();
+ }
+ }
+
+ private static class MakeAccessible implements PrivilegedAction {
+ private Method hidden;
+
+ MakeAccessible(Method hidden) {
+ this.hidden = hidden;
+ }
+
+ @Override
+ public Void run() {
+ hidden.setAccessible(true);
+ return null;
+ }
+ }
+}
diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/README.md b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/README.md
new file mode 100644
index 0000000000000..e817f66e2d93e
--- /dev/null
+++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/org/apache/iceberg/common/README.md
@@ -0,0 +1,6 @@
+# Why we need this package?
+
+In this package we have override the `iceberg-common` package, since in original `iceberg-common` it uses `Thread.getContextClassLoader` to load classes dynamically.
+While this works well in most cases, it will fail when invoked by jni, since by default jni threads was passed bootstrap class loader, and `Thread.getContextClassLoader`
+will inherit parent thread's class loader. That's to say, all threads created by jni will use bootstrap class loader. While we can use `Thread.setContextClassLoader` to it system class loader
+manually, but it's not possible in all cases since iceberg used thread pools internally, which can't be hooked by us.
\ No newline at end of file
diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java
new file mode 100644
index 0000000000000..08fa0fdb85e80
--- /dev/null
+++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/catalog/JniCatalogWrapperTest.java
@@ -0,0 +1,28 @@
+package com.risingwave.connector.catalog;
+
+import org.junit.Test;
+
+public class JniCatalogWrapperTest {
+ @Test
+ public void testJdbc() throws Exception {
+ System.setProperty("aws.region", "us-east-1");
+ JniCatalogWrapper catalog =
+ JniCatalogWrapper.create(
+ "demo",
+ "org.apache.iceberg.jdbc.JdbcCatalog",
+ new String[] {
+ "uri", "jdbc:postgresql://172.17.0.3:5432/iceberg",
+ "jdbc.user", "admin",
+ "jdbc.password", "123456",
+ "warehouse", "s3://icebergdata/demo",
+ "io-impl", "org.apache.iceberg.aws.s3.S3FileIO",
+ "s3.endpoint", "http://172.17.0.2:9301",
+ "s3.region", "us-east-1",
+ "s3.path-style-access", "true",
+ "s3.access-key-id", "hummockadmin",
+ "s3.secret-access-key", "hummockadmin",
+ });
+
+ System.out.println(catalog.loadTable("s1.t1"));
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 5f168c48bd9ef..e11dd4d0f0f04 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -84,6 +84,9 @@
4.15.0
1.18.0
1.17.6
+ 3.45.0.0
+ 2.21.42
+ 3.1.3
@@ -324,6 +327,51 @@
simpleclient_httpserver
0.5.0