Skip to content

Commit

Permalink
Fix used segment retrieval in Kill tasks (apache#15306)
Browse files Browse the repository at this point in the history
Fix used segment retrieval in Kill tasks
  • Loading branch information
AmatyaAvadhanula authored Nov 2, 2023
1 parent d261587 commit dc3213b
Showing 1 changed file with 32 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
Expand Down Expand Up @@ -171,8 +172,6 @@ public Set<ResourceAction> getInputSourceResources()
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());

// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
Expand All @@ -196,21 +195,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
limit,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);

RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
null,
ImmutableList.of(getInterval()),
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused segment intervals
final Set<Map<String, Object>> usedSegmentLoadSpecs =
new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet())
);

do {
if (nextBatchSize <= 0) {
break;
}

unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));

// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
= getNonRevokedTaskLockMap(toolbox.getTaskActionClient());

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

Expand All @@ -222,24 +240,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));

final Set<Interval> unusedSegmentIntervals = unusedSegments.stream()
.map(DataSegment::getInterval)
.collect(Collectors.toSet());
final Set<Map<String, Object>> usedSegmentLoadSpecs = new HashSet<>();
if (!unusedSegmentIntervals.isEmpty()) {
RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
null,
unusedSegmentIntervals,
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused segment intervals
usedSegmentLoadSpecs.addAll(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet())
);
}

// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
Expand Down Expand Up @@ -289,11 +289,15 @@ int computeNextBatchSize(int numSegmentsKilled)
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;
}

private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
getTaskLocks(client).forEach(
taskLock -> taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock)
taskLock -> {
if (!taskLock.isRevoked()) {
taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock);
}
}
);
return taskLockMap;
}
Expand Down

0 comments on commit dc3213b

Please sign in to comment.