Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 18, 2023
1 parent ccf2196 commit f264ef8
Showing 1 changed file with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
Expand All @@ -43,14 +44,25 @@
import java.util.stream.Collectors;

/**
* This action exists in addition to retrieveUsedSegmentsAction because that action suffers
* from a race condition described by the following sequence of events:
*
* -Segments S1, S2, S3 exist
* -Compact acquires a replace lock
* -A concurrent appending job publishes a segment S4 which needs to be upgraded to the replace lock's version
* -Compact task processes S1-S4 to create new segments
* -Compact task publishes new segments and carries S4 forward to the new version
*
* This can lead to the data in S4 being duplicated
*
* This TaskAction returns a collection of segments which have data within the specified interval and are marked as
* used, and have been created before a REPLACE lock, if any, was acquired.
*
* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in
* the collection only once.
* This ensures that a consistent set of segments is returned each time this action is called
*/
public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<DataSegment>>
{
private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class);

@JsonIgnore
private final String dataSource;

Expand Down Expand Up @@ -89,7 +101,8 @@ public TypeReference<Collection<DataSegment>> getReturnTypeReference()
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
// The DruidInputSource can be used to read from one datasource and write to another.
// In such a case, the action can simply fetch all visible segments for the datasource and interval
// In such a case, the race condition described in the class-level docs cannot occur,
// and the action can simply fetch all visible segments for the datasource and interval
if (!task.getDataSource().equals(dataSource)) {
return retrieveAllVisibleSegments(toolbox);
}
Expand Down Expand Up @@ -136,6 +149,11 @@ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
for (Map.Entry<String, Set<DataSegment>> createdAndSegments : createdToSegmentsMap.entrySet()) {
if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
} else {
for (DataSegment segment : createdAndSegments.getValue()) {
log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]",
segment.getId(), createdAndSegments.getKey(), lockVersion);
}
}
}
}
Expand Down

0 comments on commit f264ef8

Please sign in to comment.