diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 680684000ff6..e5b6ab1b7312 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -55,6 +55,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public abstract class AbstractTask implements Task { @@ -101,6 +103,8 @@ public static IngestionMode fromString(String name) private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + private volatile CountDownLatch cleanupCompletionLatch; + protected AbstractTask(String id, String dataSource, Map context, IngestionMode ingestionMode) { this(id, null, null, dataSource, context, ingestionMode); @@ -166,6 +170,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception { TaskStatus taskStatus = TaskStatus.running(getId()); try { + cleanupCompletionLatch = new CountDownLatch(1); String errorMessage = setup(taskToolbox); if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) { return TaskStatus.failure(getId(), errorMessage); @@ -178,14 +183,23 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception throw e; } finally { - cleanUp(taskToolbox, taskStatus); + try { + cleanUp(taskToolbox, taskStatus); + } + finally { + cleanupCompletionLatch.countDown(); + } } } public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception; + @Override public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception { + // clear any interrupted status to ensure subsequent cleanup proceeds without interruption. + Thread.interrupted(); + if (!toolbox.getConfig().isEncapsulatedTask()) { log.debug("Not pushing task logs and reports from task."); return; @@ -216,6 +230,24 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception } } + @Override + public boolean waitForCleanupToFinish() + { + try { + if (cleanupCompletionLatch != null) { + // block until the cleanup process completes + return cleanupCompletionLatch.await(300, TimeUnit.SECONDS); + } + + return true; + } + catch (InterruptedException e) { + log.warn("Interrupted while waiting for task cleanUp to finish!"); + Thread.currentThread().interrupt(); + return false; + } + } + public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource) { return getOrMakeId(id, typeName, dataSource, null); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 58a2ad435b0f..81a55aae1b4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -257,6 +257,32 @@ default UOE getInputSecurityOnFirehoseUnsupportedError() */ TaskStatus run(TaskToolbox toolbox) throws Exception; + /** + * Performs cleanup operations after the task execution. + * This method is intended to be overridden by tasks that need to perform + * specific cleanup actions upon task completion or termination. + * + * @param toolbox Toolbox for this task + * @param taskStatus Provides the final status of the task, indicating if the task + * was successful, failed, or was killed. + * @throws Exception If any error occurs during the cleanup process. + */ + default void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception + { + } + + /** + * Waits for the cleanup operations to finish. + * This method can be overridden by tasks that need to ensure that certain cleanup + * operations have completed before proceeding further. + * + * @return true if the cleanup completed successfully, false otherwise. + */ + default boolean waitForCleanupToFinish() + { + return true; + } + default Map addToContext(String key, Object val) { getContext().put(key, val); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index 26358deea3e0..7f0e95ce08d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -185,6 +185,7 @@ public void stop() // stopGracefully for resource cleaning log.info("Starting graceful shutdown of task[%s].", task.getId()); task.stopGracefully(taskConfig); + task.waitForCleanupToFinish(); if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index e44dfe9a451e..27909aea83cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1422,7 +1422,10 @@ public void stopForcefully() { log.info("Stopping forcefully (status: [%s])", status); stopRequested.set(true); - runThread.interrupt(); + // Interrupt if the task has started to run + if (runThread != null) { + runThread.interrupt(); + } } public void stopGracefully() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java index fc7a6c991566..22485806c518 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java @@ -84,6 +84,8 @@ public TaskStatus runTask(TaskToolbox toolbox) @JsonTypeName("unending") public static class UnendingTask extends AbstractTask { + private Thread runningThread; + @JsonCreator public UnendingTask(@JsonProperty("id") String id) { @@ -105,12 +107,16 @@ public boolean isReady(TaskActionClient taskActionClient) @Override public void stopGracefully(TaskConfig taskConfig) { + if (runningThread != null) { + runningThread.interrupt(); + } } @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { - while (!Thread.currentThread().isInterrupted()) { + runningThread = Thread.currentThread(); + while (!runningThread.isInterrupted()) { Thread.sleep(1000); } return TaskStatus.failure(getId(), "Dummy task status failure for testing"); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index fc7bd9236018..087ae3e1fc16 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -179,14 +179,24 @@ public void testGetQueryRunner() throws ExecutionException, InterruptedException @Test public void testStop() throws ExecutionException, InterruptedException, TimeoutException { + AtomicReference methodCallHolder = new AtomicReference<>(); final ListenableFuture future = runner.run( new NoopTask(null, null, null, Long.MAX_VALUE, 0, null) // infinite task + { + @Override + public boolean waitForCleanupToFinish() + { + methodCallHolder.set(true); + return true; + } + } ); runner.stop(); Assert.assertEquals( TaskState.FAILED, future.get(1000, TimeUnit.MILLISECONDS).getStatusCode() ); + Assert.assertTrue(methodCallHolder.get()); } @Test diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 2ba88117cd51..50dc64a1e06f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -56,6 +56,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ManageLifecycleServer; import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; @@ -246,7 +247,10 @@ public void configure(Binder binder) binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class); binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class); - binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class); + // Bind to ManageLifecycleServer to ensure SingleTaskBackgroundRunner is closed before + // its dependent services, such as DiscoveryServiceLocator and OverlordClient. + // This order ensures that tasks can finalize their cleanup operations before service location closure. + binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class); bindRealtimeCache(binder); bindCoordinatorHandoffNotifer(binder);