Skip to content

Commit

Permalink
API for stopping streaming tasks early (#16310)
Browse files Browse the repository at this point in the history
* Try stopping task early

* Fix checkstyle

* Add unit test

* Add a couple more tests

* PR changes

* Use notice

* fix checkstyle

* PR changes

* Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Co-authored-by: Suneet Saldanha <[email protected]>

* Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Co-authored-by: Suneet Saldanha <[email protected]>

* Change payload

* Remove quotes

---------

Co-authored-by: Suneet Saldanha <[email protected]>
  • Loading branch information
georgew5656 and suneet-s authored May 14, 2024
1 parent cdf78ec commit c1bf4fe
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 2 deletions.
57 changes: 57 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3550,6 +3550,63 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
</details>

### Handoff task groups for a supervisor early

Trigger handoff for specified task groups of a supervisor early. This is a best effort API and makes no guarantees of handoff execution

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`

#### Sample request

The following example shows how to handoff task groups for a Kafka supervisor with the name `social_media` and has the task groups: `1,2,3`.

<Tabs>

<TabItem value="3" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/taskGroups/handoff"
--header 'Content-Type: application/json'
--data-raw '{"taskGroupIds": [1, 2, 3]}'
```

</TabItem>
<TabItem value="4" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/taskGroups/handoff HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
{
"taskGroupIds": [1, 2, 3],
}
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"taskGroupIds": [
1,
2,
3
]
}
```
</details>

### Shut down a supervisor

Shuts down a supervisor. This endpoint is deprecated and will be removed in future releases. Use the equivalent [terminate](#terminate-a-supervisor) endpoint instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ public Optional<SupervisorStateManager.State> getSupervisorState(String id)
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
}

public boolean handoffTaskGroupsEarly(String id, List<Integer> taskGroupIds)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
if (supervisor == null || supervisor.lhs == null) {
return false;
}
supervisor.lhs.handoffTaskGroupsEarly(taskGroupIds);
return true;
}

public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.overlord.supervisor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
Expand All @@ -30,6 +32,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand All @@ -48,6 +51,7 @@
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -395,6 +399,45 @@ public Response shutdown(@PathParam("id") final String id)
return terminate(id);
}

/**
* This method will immediately try to handoff the list of task group ids for the given supervisor.
* This is a best effort API and makes no guarantees of execution, e.g. if a non-existent task group id
* is passed to it, the API call will still suceced.
*/
@POST
@Path("/{id}/taskGroups/handoff")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest)
{
List<Integer> taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds();
if (taskGroupIds == null || taskGroupIds.isEmpty()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty"))
.build();

}
return asLeaderWithSupervisorManager(
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))
.build();
}
}
catch (NotImplementedException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor [%s] does not support early handoff", id)))
.build();
}
}
);
}

@POST
@Path("/{id}/terminate")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -631,4 +674,22 @@ private Response suspendOrResumeAll(final HttpServletRequest req, final boolean
}
);
}

public static class HandoffTaskGroupsRequest
{

private final List<Integer> taskGroupIds;

@JsonCreator
public HandoffTaskGroupsRequest(@JsonProperty("taskGroupIds") List<Integer> taskGroupIds)
{
this.taskGroupIds = taskGroupIds;
}

@JsonProperty
public List<Integer> getTaskGroupIds()
{
return taskGroupIds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public class TaskGroup
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action

boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly

TaskGroup(
int groupId,
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
Expand Down Expand Up @@ -266,6 +268,16 @@ Set<String> taskIds()
return tasks.keySet();
}

void setShutdownEarly()
{
shutdownEarly = true;
}

Boolean getShutdownEarly()
{
return shutdownEarly;
}

@VisibleForTesting
public String getBaseSequenceName()
{
Expand Down Expand Up @@ -657,6 +669,39 @@ public String getType()
}
}

private class HandoffTaskGroupsNotice implements Notice
{
final List<Integer> taskGroupIds;
private static final String TYPE = "handoff_task_group_notice";

HandoffTaskGroupsNotice(
@Nonnull final List<Integer> taskGroupIds
)
{
this.taskGroupIds = taskGroupIds;
}

@Override
public void handle()
{
for (Integer taskGroupId : taskGroupIds) {
TaskGroup taskGroup = activelyReadingTaskGroups.getOrDefault(taskGroupId, null);
if (taskGroup == null) {
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}

taskGroup.setShutdownEarly();
}
}

@Override
public String getType()
{
return TYPE;
}
}

protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
Expand Down Expand Up @@ -1932,6 +1977,12 @@ private boolean isTaskInPendingCompletionGroups(String taskId)
return false;
}

@Override
public void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
}

private void discoverTasks() throws ExecutionException, InterruptedException
{
int taskCount = 0;
Expand Down Expand Up @@ -3143,14 +3194,15 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops()) {
< ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,27 @@ public void testGetSupervisorStatus()
verifyAll();
}

@Test
public void testHandoffTaskGroupsEarly()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.handoffTaskGroupsEarly(ImmutableList.of(1));

replayAll();

manager.start();

Assert.assertTrue(manager.handoffTaskGroupsEarly("id1", ImmutableList.of(1)));
Assert.assertFalse(manager.handoffTaskGroupsEarly("id2", ImmutableList.of(1)));

verifyAll();
}

@Test
public void testStartAlreadyStarted()
{
Expand Down
Loading

0 comments on commit c1bf4fe

Please sign in to comment.