Skip to content

Commit

Permalink
Fix Peon not fail gracefully (#14880)
Browse files Browse the repository at this point in the history
* fix Peon not fail gracefully

* move methods to Task interface

* fix checkstyle

* extract to interface

* check runThread nullability

* fix merge conflict

* minor refine

* minor refine

* fix unit test

* increase latch waiting time
  • Loading branch information
YongGang authored Sep 29, 2023
1 parent 2f1bcd6 commit 86087ce
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public abstract class AbstractTask implements Task
{
Expand Down Expand Up @@ -101,6 +103,8 @@ public static IngestionMode fromString(String name)

private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();

private volatile CountDownLatch cleanupCompletionLatch;

protected AbstractTask(String id, String dataSource, Map<String, Object> context, IngestionMode ingestionMode)
{
this(id, null, null, dataSource, context, ingestionMode);
Expand Down Expand Up @@ -166,6 +170,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
TaskStatus taskStatus = TaskStatus.running(getId());
try {
cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
Expand All @@ -178,14 +183,23 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
throw e;
}
finally {
cleanUp(taskToolbox, taskStatus);
try {
cleanUp(taskToolbox, taskStatus);
}
finally {
cleanupCompletionLatch.countDown();
}
}
}

public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;

@Override
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
{
// clear any interrupted status to ensure subsequent cleanup proceeds without interruption.
Thread.interrupted();

if (!toolbox.getConfig().isEncapsulatedTask()) {
log.debug("Not pushing task logs and reports from task.");
return;
Expand Down Expand Up @@ -216,6 +230,24 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
}
}

@Override
public boolean waitForCleanupToFinish()
{
try {
if (cleanupCompletionLatch != null) {
// block until the cleanup process completes
return cleanupCompletionLatch.await(300, TimeUnit.SECONDS);
}

return true;
}
catch (InterruptedException e) {
log.warn("Interrupted while waiting for task cleanUp to finish!");
Thread.currentThread().interrupt();
return false;
}
}

public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource)
{
return getOrMakeId(id, typeName, dataSource, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@ default UOE getInputSecurityOnFirehoseUnsupportedError()
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;

/**
* Performs cleanup operations after the task execution.
* This method is intended to be overridden by tasks that need to perform
* specific cleanup actions upon task completion or termination.
*
* @param toolbox Toolbox for this task
* @param taskStatus Provides the final status of the task, indicating if the task
* was successful, failed, or was killed.
* @throws Exception If any error occurs during the cleanup process.
*/
default void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
{
}

/**
* Waits for the cleanup operations to finish.
* This method can be overridden by tasks that need to ensure that certain cleanup
* operations have completed before proceeding further.
*
* @return true if the cleanup completed successfully, false otherwise.
*/
default boolean waitForCleanupToFinish()
{
return true;
}

default Map<String, Object> addToContext(String key, Object val)
{
getContext().put(key, val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void stop()
// stopGracefully for resource cleaning
log.info("Starting graceful shutdown of task[%s].", task.getId());
task.stopGracefully(taskConfig);
task.waitForCleanupToFinish();

if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,10 @@ public void stopForcefully()
{
log.info("Stopping forcefully (status: [%s])", status);
stopRequested.set(true);
runThread.interrupt();
// Interrupt if the task has started to run
if (runThread != null) {
runThread.interrupt();
}
}

public void stopGracefully()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public TaskStatus runTask(TaskToolbox toolbox)
@JsonTypeName("unending")
public static class UnendingTask extends AbstractTask
{
private Thread runningThread;

@JsonCreator
public UnendingTask(@JsonProperty("id") String id)
{
Expand All @@ -105,12 +107,16 @@ public boolean isReady(TaskActionClient taskActionClient)
@Override
public void stopGracefully(TaskConfig taskConfig)
{
if (runningThread != null) {
runningThread.interrupt();
}
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
while (!Thread.currentThread().isInterrupted()) {
runningThread = Thread.currentThread();
while (!runningThread.isInterrupted()) {
Thread.sleep(1000);
}
return TaskStatus.failure(getId(), "Dummy task status failure for testing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,24 @@ public void testGetQueryRunner() throws ExecutionException, InterruptedException
@Test
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{
AtomicReference<Boolean> methodCallHolder = new AtomicReference<>();
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null) // infinite task
{
@Override
public boolean waitForCleanupToFinish()
{
methodCallHolder.set(true);
return true;
}
}
);
runner.stop();
Assert.assertEquals(
TaskState.FAILED,
future.get(1000, TimeUnit.MILLISECONDS).getStatusCode()
);
Assert.assertTrue(methodCallHolder.get());
}

@Test
Expand Down
6 changes: 5 additions & 1 deletion services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
Expand Down Expand Up @@ -246,7 +247,10 @@ public void configure(Binder binder)

binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
// Bind to ManageLifecycleServer to ensure SingleTaskBackgroundRunner is closed before
// its dependent services, such as DiscoveryServiceLocator and OverlordClient.
// This order ensures that tasks can finalize their cleanup operations before service location closure.
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class);

bindRealtimeCache(binder);
bindCoordinatorHandoffNotifer(binder);
Expand Down

0 comments on commit 86087ce

Please sign in to comment.