diff --git a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedStageException.java b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedStageException.java index 6fac92bfa615..b5b5f74b3f76 100644 --- a/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedStageException.java +++ b/cdap-api-common/src/main/java/io/cdap/cdap/api/exception/WrappedStageException.java @@ -49,4 +49,14 @@ public WrappedStageException(Throwable cause, String stageName) { public String getStageName() { return stageName; } + + /** + * Returns the detail message string of this exception. + * + * @return the detail message as a {@String}. + */ + @Override + public String getMessage() { + return String.format("Stage '%s' encountered : %s", stageName, super.getMessage()); + } } diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/ExceptionWrappingCaller.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/ExceptionWrappingCaller.java new file mode 100644 index 000000000000..b79636037313 --- /dev/null +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/ExceptionWrappingCaller.java @@ -0,0 +1,67 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.etl.common.plugin; + +import io.cdap.cdap.api.exception.WrappedStageException; +import java.util.concurrent.Callable; + +/** + * A caller wrapper that catches exceptions thrown during execution of a callable + * and wraps them in a {@link WrappedStageException}. + * This class is primarily used to associate the exception with a specific stage name in a pipeline, + * helping in better debugging and error tracking. + * + *

+ * The class delegates the actual calling operation to another {@link Caller} instance and + * ensures that any exceptions thrown are caught and rethrown as a {@link WrappedStageException}, + * which includes the stage name where the error occurred. + *

+ */ +public class ExceptionWrappingCaller extends Caller { + private final Caller delegate; + private final String stageName; + + /** + * Constructs an ExceptionWrappingCaller. + * + * @param delegate The {@link Caller} instance that performs the actual calling of the callable. + * @param stageName The name of the stage associated with the exception, + * for easier identification of where the error occurred. + */ + public ExceptionWrappingCaller(Caller delegate, String stageName) { + this.delegate = delegate; + this.stageName = stageName; + } + + /** + * Executes the given {@link Callable}, wrapping any exceptions thrown + * during execution in a {@link WrappedStageException}. + * + * @param callable The callable task to execute. + * @param The return type of the callable. + * @return The result of the callable task. + * @throws Exception if an exception occurs during the callable execution. + */ + @Override + public T call(Callable callable) throws Exception { + try { + return delegate.call(callable); + } catch (Exception e) { + throw new WrappedStageException(e, stageName); + } + } +} diff --git a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/PipelinePluginContext.java b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/PipelinePluginContext.java index 0d046a4977cd..f23fd1fb5091 100644 --- a/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/PipelinePluginContext.java +++ b/cdap-app-templates/cdap-etl/cdap-etl-core/src/main/java/io/cdap/cdap/etl/common/plugin/PipelinePluginContext.java @@ -115,7 +115,7 @@ private Object wrapPlugin(String pluginId, Object plugin) { } public Caller getCaller(String pluginId) { - Caller caller = Caller.DEFAULT; + Caller caller = new ExceptionWrappingCaller(Caller.DEFAULT, pluginId); if (stageLoggingEnabled) { caller = StageLoggingCaller.wrap(caller, pluginId); }