diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 3ad7b452855c5..2a855bdf3385e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -55,6 +55,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public abstract class AbstractTask implements Task { @@ -101,6 +103,8 @@ public static IngestionMode fromString(String name) private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + private volatile CountDownLatch cleanupCompletionLatch; + protected AbstractTask(String id, String dataSource, Map context, IngestionMode ingestionMode) { this(id, null, null, dataSource, context, ingestionMode); @@ -166,6 +170,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception { TaskStatus taskStatus = TaskStatus.running(getId()); try { + cleanupCompletionLatch = new CountDownLatch(1); String errorMessage = setup(taskToolbox); if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) { return TaskStatus.failure(getId(), errorMessage); @@ -178,14 +183,24 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception throw e; } finally { - cleanUp(taskToolbox, taskStatus); + try { + cleanUp(taskToolbox, taskStatus); + } + finally { + cleanupCompletionLatch.countDown(); + } } } public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception; - public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception + protected void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception { + if (Thread.currentThread().isInterrupted()) { + // clears the interrupted status so the subsequent cleanup work can continue without interruption + Thread.interrupted(); + } + if (!toolbox.getConfig().isEncapsulatedTask()) { log.debug("Not pushing task logs and reports from task."); return; @@ -216,6 +231,19 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception } } + public void waitForCleanupToFinish() + { + try { + if (cleanupCompletionLatch != null) { + // block until the cleanup process completes + cleanupCompletionLatch.await(30, TimeUnit.SECONDS); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource) { return getOrMakeId(id, typeName, dataSource, null); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index fce76d1d1af92..d6a6cf2395088 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -33,6 +33,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.java.util.common.DateTimes; @@ -185,6 +186,10 @@ public void stop() // stopGracefully for resource cleaning log.info("Starting graceful shutdown of task[%s].", task.getId()); task.stopGracefully(taskConfig); + // Only certain tasks, primarily from unit tests, are not subclasses of AbstractTask. + if (task instanceof AbstractTask) { + ((AbstractTask) task).waitForCleanupToFinish(); + } if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index e683c828ddc63..c8478c8f66fe0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -193,14 +193,23 @@ public void testGetQueryRunner() throws ExecutionException, InterruptedException @Test public void testStop() throws ExecutionException, InterruptedException, TimeoutException { + AtomicReference methodCallHolder = new AtomicReference<>(); final ListenableFuture future = runner.run( new NoopTask(null, null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task + { + @Override + public void waitForCleanupToFinish() + { + methodCallHolder.set(true); + } + } ); runner.stop(); Assert.assertEquals( TaskState.FAILED, future.get(1000, TimeUnit.MILLISECONDS).getStatusCode() ); + Assert.assertTrue(methodCallHolder.get()); } @Test @@ -389,8 +398,6 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } - - } private static class BooleanHolder diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 8b8e1a426e8f5..5fad9b2b5d38a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -56,6 +56,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.guice.ManageLifecycleServer; import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; @@ -246,7 +247,10 @@ public void configure(Binder binder) binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class); binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class); - binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class); + // Bind to ManageLifecycleServer to ensure SingleTaskBackgroundRunner is closed before + // its dependent services, such as DiscoveryServiceLocator and OverlordClient. + // This order ensures that tasks can finalize their cleanup operations before service location closure. + binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycleServer.class); bindRealtimeCache(binder); bindCoordinatorHandoffNotifer(binder);