From dc3213b05dd4ca62f4273d9e14f547c791c0dbc6 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Thu, 2 Nov 2023 19:07:17 +0530 Subject: [PATCH] Fix used segment retrieval in Kill tasks (#15306) Fix used segment retrieval in Kill tasks --- .../common/task/KillUnusedSegmentsTask.java | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 1726a3e68003..54fae94684fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.error.InvalidInput; @@ -171,8 +172,6 @@ public Set getInputSourceResources() @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { - final NavigableMap> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient()); - // Track stats for reporting int numSegmentsKilled = 0; int numBatchesProcessed = 0; @@ -196,21 +195,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception limit, numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "." ); + + RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction( + getDataSource(), + null, + ImmutableList.of(getInterval()), + Segments.INCLUDING_OVERSHADOWED + ); + // Fetch the load specs of all segments overlapping with the unused segment intervals + final Set> usedSegmentLoadSpecs = + new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction) + .stream() + .map(DataSegment::getLoadSpec) + .collect(Collectors.toSet()) + ); + do { if (nextBatchSize <= 0) { break; } unusedSegments = toolbox - .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); + .getTaskActionClient() + .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); + + // Fetch locks each time as a revokal could have occurred in between batches + final NavigableMap> taskLockMap + = getNonRevokedTaskLockMap(toolbox.getTaskActionClient()); if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { throw new ISE( - "Locks[%s] for task[%s] can't cover segments[%s]", - taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), - getId(), - unusedSegments + "Locks[%s] for task[%s] can't cover segments[%s]", + taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), + getId(), + unusedSegments ); } @@ -222,24 +240,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - final Set unusedSegmentIntervals = unusedSegments.stream() - .map(DataSegment::getInterval) - .collect(Collectors.toSet()); - final Set> usedSegmentLoadSpecs = new HashSet<>(); - if (!unusedSegmentIntervals.isEmpty()) { - RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction( - getDataSource(), - null, - unusedSegmentIntervals, - Segments.INCLUDING_OVERSHADOWED - ); - // Fetch the load specs of all segments overlapping with the unused segment intervals - usedSegmentLoadSpecs.addAll(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction) - .stream() - .map(DataSegment::getLoadSpec) - .collect(Collectors.toSet()) - ); - } // Kill segments from the deep storage only if their load specs are not being used by any used segments final List segmentsToBeKilled = unusedSegments @@ -289,11 +289,15 @@ int computeNextBatchSize(int numSegmentsKilled) return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize; } - private NavigableMap> getTaskLockMap(TaskActionClient client) throws IOException + private NavigableMap> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException { final NavigableMap> taskLockMap = new TreeMap<>(); getTaskLocks(client).forEach( - taskLock -> taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock) + taskLock -> { + if (!taskLock.isRevoked()) { + taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock); + } + } ); return taskLockMap; }