Skip to content

Commit

Permalink
fix Peon not fail gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
YongGang committed Aug 25, 2023
1 parent 95b0de6 commit 1a30735
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public abstract class AbstractTask implements Task
{
Expand Down Expand Up @@ -101,6 +103,8 @@ public static IngestionMode fromString(String name)

private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();

private volatile CountDownLatch cleanupCompletionLatch;

protected AbstractTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode)
{
this(id, null, null, dataSource, context, ingestionMode);
Expand Down Expand Up @@ -166,6 +170,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
TaskStatus taskStatus = TaskStatus.running(getId());
try {
cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
Expand All @@ -178,14 +183,24 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
throw e;
}
finally {
cleanUp(taskToolbox, taskStatus);
try {
cleanUp(taskToolbox, taskStatus);
}
finally {
cleanupCompletionLatch.countDown();
}
}
}

public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;

public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
protected void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
{
if (Thread.currentThread().isInterrupted()) {
// clears the interrupted status so the subsequent cleanup work can continue without interruption
Thread.interrupted();
}

if (!toolbox.getConfig().isEncapsulatedTask()) {
log.debug("Not pushing task logs and reports from task.");
return;
Expand Down Expand Up @@ -216,6 +231,19 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
}
}

public void waitForCleanupToFinish()
{
try {
if (cleanupCompletionLatch != null) {
// block until the cleanup process completes
cleanupCompletionLatch.await(30, TimeUnit.SECONDS);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource)
{
return getOrMakeId(id, typeName, dataSource, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -185,6 +186,10 @@ public void stop()
// stopGracefully for resource cleaning
log.info("Starting graceful shutdown of task[%s].", task.getId());
task.stopGracefully(taskConfig);
// Only certain tasks, primarily from unit tests, are not subclasses of AbstractTask.
if (task instanceof AbstractTask) {
((AbstractTask) task).waitForCleanupToFinish();
}

if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,23 @@ public void testGetQueryRunner() throws ExecutionException, InterruptedException
@Test
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{
AtomicReference<Boolean> methodCallHolder = new AtomicReference<>();
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task
{
@Override
public void waitForCleanupToFinish()
{
methodCallHolder.set(true);
}
}
);
runner.stop();
Assert.assertEquals(
TaskState.FAILED,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
Assert.assertTrue(methodCallHolder.get());
}

@Test
Expand Down Expand Up @@ -389,8 +398,6 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}


}

private static class BooleanHolder
Expand Down
6 changes: 5 additions & 1 deletion services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
Expand Down Expand Up @@ -246,7 +247,10 @@ public void configure(Binder binder)

binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
// Bind to ManageLifecycleServer to ensure SingleTaskBackgroundRunner is closed before
// its dependent services, such as DiscoveryServiceLocator and OverlordClient.
// This order ensures that tasks can finalize their cleanup operations before service location closure.
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);

bindRealtimeCache(binder);
bindCoordinatorHandoffNotifer(binder);
Expand Down

0 comments on commit 1a30735

Please sign in to comment.