diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md
index 341340faf2fb..74bbe27fd1eb 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3550,6 +3550,63 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
+### Handoff task groups for a supervisor early
+
+Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`
+
+#### Sample request
+
+The following example shows how to handoff task groups for a Kafka supervisor with the name `social_media` and has the task groups: `1,2,3`.
+
+
+
+
+
+
+```shell
+curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/taskGroups/handoff"
+--header 'Content-Type: application/json'
+--data-raw '{"taskGroupIds": [1, 2, 3]}'
+```
+
+
+
+
+
+```HTTP
+POST /druid/indexer/v1/supervisor/social_media/taskGroups/handoff HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+Content-Type: application/json
+
+{
+ "taskGroupIds": [1, 2, 3],
+}
+```
+
+
+
+
+#### Sample response
+
+
+ View the response
+
+ ```json
+{
+ "id": "social_media",
+ "taskGroupIds": [
+ 1,
+ 2,
+ 3
+ ]
+}
+ ```
+
+
### Shut down a supervisor
Shuts down a supervisor. This endpoint is deprecated and will be removed in future releases. Use the equivalent [terminate](#terminate-a-supervisor) endpoint instead.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 288b2a141564..2fab1ed9bd1d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -137,6 +137,16 @@ public Optional getSupervisorState(String id)
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
}
+ public boolean handoffTaskGroupsEarly(String id, List taskGroupIds)
+ {
+ Pair supervisor = supervisors.get(id);
+ if (supervisor == null || supervisor.lhs == null) {
+ return false;
+ }
+ supervisor.lhs.handoffTaskGroupsEarly(taskGroupIds);
+ return true;
+ }
+
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 0b99f494ccc4..0ff76e5d41cf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
@@ -30,6 +32,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -48,6 +51,7 @@
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@@ -395,6 +399,45 @@ public Response shutdown(@PathParam("id") final String id)
return terminate(id);
}
+ /**
+ * This method will immediately try to handoff the list of task group ids for the given supervisor.
+ * This is a best effort API and makes no guarantees of execution, e.g. if a non-existent task group id
+ * is passed to it, the API call will still suceced.
+ */
+ @POST
+ @Path("/{id}/taskGroups/handoff")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest)
+ {
+ List taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds();
+ if (taskGroupIds == null || taskGroupIds.isEmpty()) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty"))
+ .build();
+
+ }
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ try {
+ if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
+ return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
+ } else {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))
+ .build();
+ }
+ }
+ catch (NotImplementedException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", StringUtils.format("Supervisor [%s] does not support early handoff", id)))
+ .build();
+ }
+ }
+ );
+ }
+
@POST
@Path("/{id}/terminate")
@Produces(MediaType.APPLICATION_JSON)
@@ -631,4 +674,22 @@ private Response suspendOrResumeAll(final HttpServletRequest req, final boolean
}
);
}
+
+ public static class HandoffTaskGroupsRequest
+ {
+
+ private final List taskGroupIds;
+
+ @JsonCreator
+ public HandoffTaskGroupsRequest(@JsonProperty("taskGroupIds") List taskGroupIds)
+ {
+ this.taskGroupIds = taskGroupIds;
+ }
+
+ @JsonProperty
+ public List getTaskGroupIds()
+ {
+ return taskGroupIds;
+ }
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 58a433325a3f..379f5fe9ae48 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -203,6 +203,8 @@ public class TaskGroup
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
+ boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly
+
TaskGroup(
int groupId,
ImmutableMap startingSequences,
@@ -266,6 +268,16 @@ Set taskIds()
return tasks.keySet();
}
+ void setShutdownEarly()
+ {
+ shutdownEarly = true;
+ }
+
+ Boolean getShutdownEarly()
+ {
+ return shutdownEarly;
+ }
+
@VisibleForTesting
public String getBaseSequenceName()
{
@@ -657,6 +669,39 @@ public String getType()
}
}
+ private class HandoffTaskGroupsNotice implements Notice
+ {
+ final List taskGroupIds;
+ private static final String TYPE = "handoff_task_group_notice";
+
+ HandoffTaskGroupsNotice(
+ @Nonnull final List taskGroupIds
+ )
+ {
+ this.taskGroupIds = taskGroupIds;
+ }
+
+ @Override
+ public void handle()
+ {
+ for (Integer taskGroupId : taskGroupIds) {
+ TaskGroup taskGroup = activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
+ if (taskGroup == null) {
+ log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
+ continue;
+ }
+
+ taskGroup.setShutdownEarly();
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+ }
+
protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
@@ -1932,6 +1977,12 @@ private boolean isTaskInPendingCompletionGroups(String taskId)
return false;
}
+ @Override
+ public void handoffTaskGroupsEarly(List taskGroupIds)
+ {
+ addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
+ }
+
private void discoverTasks() throws ExecutionException, InterruptedException
{
int taskCount = 0;
@@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
- if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+ if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
+ // If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
- < ioConfig.getMaxAllowedStops()) {
+ < ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 4a9fccd4663b..d3d1045ea714 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -240,6 +240,27 @@ public void testGetSupervisorStatus()
verifyAll();
}
+ @Test
+ public void testHandoffTaskGroupsEarly()
+ {
+ Map existingSpecs = ImmutableMap.of(
+ "id1", new TestSupervisorSpec("id1", supervisor1)
+ );
+
+ EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+ supervisor1.start();
+ supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));
+
+ replayAll();
+
+ manager.start();
+
+ Assert.assertTrue(manager.handoffTaskGroupsEarly("id1", ImmutableList.of(1)));
+ Assert.assertFalse(manager.handoffTaskGroupsEarly("id2", ImmutableList.of(1)));
+
+ verifyAll();
+ }
+
@Test
public void testStartAlreadyStarted()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 489315cc2495..00689cee040f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1209,6 +1209,133 @@ public Duration getEmissionDuration()
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}
+ @Test(timeout = 10_000L)
+ public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, InterruptedException
+ {
+ DateTime startTime = DateTimes.nowUtc();
+ SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("PT1S"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, 200L),
+ null
+ )
+ {
+ };
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
+ {
+ @Override
+ public Duration getEmissionDuration()
+ {
+ return new Period("PT2S").toStandardDuration();
+ }
+ }).anyTimes();
+ EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
+ EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();
+
+ TreeMap> sequenceOffsets = new TreeMap<>();
+ sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
+
+ Map context = new HashMap<>();
+ context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));
+
+ TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+ "id1",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ createTaskIoConfigExt(
+ 0,
+ Collections.singletonMap("0", "10"),
+ Collections.singletonMap("0", "20"),
+ "test",
+ startTime,
+ null,
+ Collections.emptySet(),
+ ioConfig
+ ),
+ context,
+ "0"
+ );
+
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+
+ EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+ EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
+ .andReturn(ImmutableList.of(id1))
+ .anyTimes();
+ EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
+ .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+
+ EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
+ .andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
+ .anyTimes();
+
+ ImmutableMap partitionOffset = ImmutableMap.of("0", "10");
+ final TreeMap> checkpoints = new TreeMap<>();
+ checkpoints.put(0, partitionOffset);
+
+ EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+
+ // The task should only be pause/resumed in one of the runInternal commands, after stopTaskGroupEarly has been called.
+ EasyMock.expect(indexTaskClient.resumeAsync("id1"))
+ .andReturn(Futures.immediateFuture(true))
+ .once();
+ EasyMock.expect(indexTaskClient.pauseAsync("id1"))
+ .andReturn(Futures.immediateFuture(true))
+ .once();
+ taskQueue.shutdown("id1", "All tasks in group[%s] failed to transition to publishing state", 0);
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.runInternal();
+ supervisor.handoffTaskGroupsEarly(ImmutableList.of(0));
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+ supervisor.runInternal();
+ verifyAll();
+ }
+
@Test
public void testEmitBothLag() throws Exception
{
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index bcfc5ebe8196..4d88a6b8633d 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@@ -93,4 +94,10 @@ default Boolean isHealthy()
LagStats computeLagStats();
int getActiveTaskGroupsCount();
+
+ /** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
+ default void handoffTaskGroupsEarly(List taskGroupIds)
+ {
+ throw new NotImplementedException("Supervisor does not have the feature to handoff task groups early implemented");
+ }
}
diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index a6e4b8fe0970..c83c4c0647d2 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
@@ -89,4 +91,14 @@ public void testNoppSupervisorResetOffsetsDoNothing()
Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, noOpSupervisor.getState());
}
+
+ @Test
+ public void testNoppSupervisorStopTaskEarlyDoNothing()
+ {
+ NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
+ Supervisor noOpSupervisor = expectedSpec.createSupervisor();
+ Assert.assertThrows(NotImplementedException.class,
+ () -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
+ );
+ }
}