Skip to content

Commit

Permalink
Added the possibility to do something else in case of a dependency fa…
Browse files Browse the repository at this point in the history
…ilure during graph execution.
  • Loading branch information
kelemen committed Oct 7, 2017
1 parent fc351c1 commit e875f15
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.util.function.Consumer;
import org.jtrim2.concurrent.AsyncTasks;

public final class TestUtils {
public static <T> T build(T obj, Consumer<? super T> config) {
config.accept(obj);
return obj;
}

public static void testUtilityClass(Class<?> type) {
if (!Modifier.isFinal(type.getModifiers())) {
throw new AssertionError("Utility class must be final: " + type.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.jtrim2.taskgraph;

import org.jtrim2.cancel.CancellationToken;

/**
* Defines an error handler to be called instead of the associated node computation when
* the node is not being executed due to a failed dependency.
*
* <h3>Thread safety</h3>
* Instances must have the same thread-safety property as the computation of the associated
* task node.
*
* <h4>Synchronization transparency</h4>
* Instances are not expected to be synchronization transparent and are called in the
* same context as the associated computation would have been.
*
* @see TaskNodeProperties#tryGetDependencyErrorHandler()
*/
public interface DependencyErrorHandler {
/**
* Called when the associated node cannot be executed due to a dependency failure. This handler is
* called on the same executor the associated node's computation would have been called on.
* <P>
* The graph execution is not considered completed before this method returns.
*
* @param cancelToken the cancellation token signaling the cancellation of the associated
* task graph execution. This argument cannot be {@code null}.
* @param nodeKey the {@code TaskNodeKey} identifying the node associated with the failure.
* This argument cannot be {@code null}.
* @param error the error causing the failure of the node execution. This argument cannot be
* {@code null}.
*
* @throws Exception thrown in case of some serious failure. The thrown exception will be
* suppressed by the error causing this method to be called.
*/
public void handleDependencyError(
CancellationToken cancelToken,
TaskNodeKey<?, ?> nodeKey,
Throwable error) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
public class TaskNodeProperties {
private final TaskExecutor executor;
private final DependencyErrorHandler dependencyErrorHandler;

/**
* Sets the properties of the {@code TaskNodeProperties} from the current
Expand All @@ -35,6 +36,7 @@ public class TaskNodeProperties {
*/
protected TaskNodeProperties(Builder builder) {
this.executor = builder.executor;
this.dependencyErrorHandler = builder.dependencyErrorHandler;
}

/**
Expand All @@ -49,6 +51,19 @@ public final TaskExecutor getExecutor() {
return executor;
}

/**
* Returns the handler to be called when the associated task node cannot due to
* a failure in one of its dependencies. The handler is called in the same context
* as the computation of the task node would have been.
*
* @return the handler to be called when the associated task node cannot due to
* a failure in one of its dependencies, or {@code null} if there is nothing to
* do with the failure.
*/
public DependencyErrorHandler tryGetDependencyErrorHandler() {
return dependencyErrorHandler;
}

/**
* The {@code Builder} used to create {@link TaskNodeProperties} instances.
*
Expand All @@ -60,6 +75,7 @@ public final TaskExecutor getExecutor() {
*/
public static class Builder {
private TaskExecutor executor;
private DependencyErrorHandler dependencyErrorHandler;

/**
* Initializes the {@code Builder} with the default values:
Expand All @@ -69,6 +85,7 @@ public static class Builder {
*/
public Builder() {
this.executor = SyncTaskExecutor.getSimpleExecutor();
this.dependencyErrorHandler = null;
}

/**
Expand All @@ -81,6 +98,22 @@ public Builder() {
*/
public Builder(TaskNodeProperties defaults) {
this.executor = defaults.getExecutor();
this.dependencyErrorHandler = defaults.tryGetDependencyErrorHandler();
}

/**
* Sets an error handler to be called if the associated node could not be
* executed due to a dependency error. The handler is called in the same context
* as the computation of the task node would have been.
* <P>
* Setting this property will override any previously set value for this property.
*
* @param dependencyErrorHandler the error handler to be called if the associated node could not be
* executed due to a dependency error. This argument can be {@code null} if there is nothing
* to do in case of a dependency error.
*/
public void setDependencyErrorHandler(DependencyErrorHandler dependencyErrorHandler) {
this.dependencyErrorHandler = dependencyErrorHandler;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void finishForwardNodes(TaskNodeKey<?, ?> key, Throwable error) {
try {
TaskNode<?, ?> child = nodes.get(childKey);
if (child != null) {
child.propagateFailure(error);
child.propagateDependencyFailure(getCancelToken(), error);
}
} catch (Throwable ex) {
onError(key, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.jtrim2.cancel.OperationCanceledException;
import org.jtrim2.concurrent.AsyncTasks;
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.taskgraph.DependencyErrorHandler;
import org.jtrim2.taskgraph.TaskErrorHandler;
import org.jtrim2.taskgraph.TaskNodeKey;
import org.jtrim2.utils.ExceptionHelper;
Expand Down Expand Up @@ -187,6 +188,67 @@ public void cancel() {
propagateFailure(OperationCanceledException.withoutStackTrace());
}

/**
* Completes this task node exceptionally but calling the
* {@link org.jtrim2.taskgraph.TaskNodeProperties dependency error handler} first (if there is any).
* If this task node was already scheduled for execution normally, this method does nothing.
* <P>
* Note that cancellation affects the dependency error handler as well. That is, if execution
* was canceled, the dependency error handler might not get executed.
* <P>
* Calling this method does not count as scheduled for the {@link #wasScheduled() wasScheduled} flag.
*
* @param cancelToken the cancellation token which can signal cancellation for the
* dependency error handler. This argument cannot be {@code null}.
* @param error the error to forward to complete this node with. This argument cannot be
* {@code null}. This argument cannot be {@code null}.
*/
@SuppressWarnings("ThrowableResultIgnored")
public void propagateDependencyFailure(CancellationToken cancelToken, Throwable error) {
Objects.requireNonNull(cancelToken, "cancelToken");
Objects.requireNonNull(error, "error");

NodeTaskRef<R> nodeTaskRef = nodeTaskRefRef.getAndSet(null);
if (nodeTaskRef == null) {
// The task was already scheduled so, we ignore dependency failure notification.
// Also, this should not happen when used reasonably.
return;
}

DependencyErrorHandler errorHandler = nodeTaskRef.getProperties().tryGetDependencyErrorHandler();
if (errorHandler == null) {
propagateFailure(error);
return;
}

try {
if (cancelToken.isCanceled()) {
cancel();
return;
}

TaskExecutor executor = nodeTaskRef.getProperties().getExecutor();
executor.execute(cancelToken, taskCancelToken -> {
errorHandler.handleDependencyError(taskCancelToken, key, error);
}).whenComplete((result, taskError) -> {
propagateSuppressed(error, taskError);
});
} catch (Throwable ex) {
propagateSuppressed(error, ex);
throw ex;
}
}

private void propagateSuppressed(Throwable error, Throwable suppressed) {
try {
if (suppressed != null && suppressed != error) {
error.addSuppressed(suppressed);
}
} finally {
propagateFailure(error);
}
}

/**
* Completes this task node exceptionally with the given error if it was not
* completed yet. If this task node was already completed, this method does nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jtrim2.cancel.Cancellation;
import org.jtrim2.cancel.CancellationToken;
import org.jtrim2.cancel.OperationCanceledException;
import org.jtrim2.concurrent.AsyncTasks;
import org.jtrim2.testutils.TestObj;
import org.jtrim2.testutils.TestUtils;
import org.jtrim2.testutils.UnsafeConsumer;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public abstract class AbstractGraphExecutorTest {
private final Supplier<TaskGraphDefConfigurer> graphConfigurerFactory;
Expand All @@ -24,7 +26,7 @@ public AbstractGraphExecutorTest(Supplier<TaskGraphDefConfigurer> graphConfigure
this.graphConfigurerFactory = Objects.requireNonNull(graphConfigurerFactory, "graphConfigurerFactory");
}

private void test(Consumer<TaskGraphDefConfigurer> graphConfigurerAction) {
private void test(UnsafeConsumer<TaskGraphDefConfigurer> graphConfigurerAction) throws Exception {
graphConfigurerAction.accept(graphConfigurerFactory.get());
}

Expand All @@ -34,9 +36,11 @@ private static <R, I> TaskNodeKey<R, I> nodeKey(Class<R> outputType, Class<I> ar

/**
* This tests verifies that no deeply nested calls occur when the whole execution gets canceled.
*
* @throws Exception test failure
*/
@Test(timeout = 60000)
public void testFailureWithLongChainCancel() {
public void testFailureWithLongChainCancel() throws Exception {
int rootCount = 10000;

test((configurer) -> {
Expand Down Expand Up @@ -106,7 +110,7 @@ public void testFailureWithLongChainCancel() {
}

@Test
public void testSingleNodeFails() {
public void testSingleNodeFails() throws Exception {
test((configurer) -> {
TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> {
});
Expand Down Expand Up @@ -159,7 +163,77 @@ public void testSingleNodeFails() {
}

@Test
public void testDoubleSplitGraph() {
public void testDependencyErrorHandler() throws Exception {
test((configurer) -> {
TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> {
});

TaskFactoryKey<TestObj, String> leafFactoryKey
= new TaskFactoryKey<>(TestObj.class, String.class, "leaf-node");
factoryGroup1.defineSimpleFactory(leafFactoryKey, (cancelToken, nodeDef) -> {
String factoryArg = nodeDef.factoryArg();
return taskCancelToken -> {
throw new TestException(factoryArg);
};
});

DependencyErrorHandler errorHandler = mock(DependencyErrorHandler.class);

TaskFactoryKey<TestObj, String> rootFactoryKey
= new TaskFactoryKey<>(TestObj.class, String.class, "root-node");
factoryGroup1.defineSimpleFactory(rootFactoryKey, (cancelToken, nodeDef) -> {
nodeDef.properties().setDependencyErrorHandler(errorHandler);

TaskInputRef<TestObj> inputRef = nodeDef.inputs().bindInput(leafFactoryKey, "test-arg");
return taskCancelToken -> inputRef.consumeInput();
});


TaskGraphBuilder builder = configurer.build();

TaskNodeKey<TestObj, String> requestedNodeKey = new TaskNodeKey<>(rootFactoryKey, "R");
builder.addNode(requestedNodeKey);

CompletionStage<TaskGraphExecutor> buildFuture = builder.buildGraph(Cancellation.UNCANCELABLE_TOKEN);
AtomicReference<TaskGraphExecutionResult> resultRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

buildFuture
.thenCompose((executor) -> {
executor.properties().setComputeErrorHandler((nodeKey, error) -> {
// Redefine to prevent logs
});
executor.properties().setStopOnFailure(false);
executor.properties().setDeliverResultOnFailure(true);

executor.properties().addResultNodeKey(requestedNodeKey);

return executor.execute(Cancellation.UNCANCELABLE_TOKEN);
})
.whenComplete((result, error) -> {
resultRef.set(result);
errorRef.set(error);
});

verify(errorHandler).handleDependencyError(
any(CancellationToken.class),
eq(requestedNodeKey),
isA(TestException.class));

TaskGraphExecutionResult result = resultRef.get();
Throwable error = errorRef.get();

assertNotNull("result", result);
assertNull("error", error);

assertEquals("resultType", ExecutionResultType.ERRORED, result.getResultType());

TestUtils.expectUnwrappedError(TestException.class, () -> result.getResult(requestedNodeKey));
});
}

@Test
public void testDoubleSplitGraph() throws Exception {
test((configurer) -> {
TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> {
});
Expand Down
Loading

0 comments on commit e875f15

Please sign in to comment.