-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A Replacing task must read segments created before it acquired its lock #15085
A Replacing task must read segments created before it acquired its lock #15085
Conversation
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/apache/druid/indexing/common/actions/RetrieveLockedSegmentsAction.java
Outdated
Show resolved
Hide resolved
@AmatyaAvadhanula , should we use the new action only when using REPLACE lock type? |
I think it would be harder to figure out which action to use within the DruidInputSource. |
I guess the only case we care about is The reason this would be preferable is the new action would just have to fetch the segments that the task is going to replace, i.e. segments belonging to the interval on which this task has a replace lock and which were created before the lock was acquired. It would avoid duplication and confusion with the existing action. Other cases:
|
It is the latter.
I think that the task can fetch segments without having a lock as well (cardinality phase with hash partitioning for example) |
Isn't the replace lock acquired by the |
No, it depends on |
@AmatyaAvadhanula , since the new action deals with replace locks, it must be fired using the |
Thanks, @kfaraz. I'll make the change. |
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach looks fine, left some comments for improvements.
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Show resolved
Hide resolved
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Outdated
Show resolved
Hide resolved
if (replaceLock.getInterval().contains(segment.getInterval()) | ||
&& replaceLock.getVersion().compareTo(createdDate) < 0) { | ||
// If a REPLACE lock covers a segment but has a version less than the segment's created date, remove it | ||
allSegmentsToBeReplaced.remove(segment); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than going through the collection of segments and created dates twice (once to add everything, then to remove the invalid ones), start with an empty set and add only the valid entries to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>(); | ||
segmentsAndCreatedDates.forEach(segmentAndCreatedDate -> allSegmentsToBeReplaced.add(segmentAndCreatedDate.lhs)); | ||
for (Pair<DataSegment, String> segmentAndCreatedDate : segmentsAndCreatedDates) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that a single interval may have several segments, see if this loop can be improved by using a map from Interval
to version to Set<DataSegment>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
final DataSegment segment = segmentAndCreatedDate.lhs; | ||
final String createdDate = segmentAndCreatedDate.rhs; | ||
for (ReplaceTaskLock replaceLock : replaceLocksForTask) { | ||
if (replaceLock.getInterval().contains(segment.getInterval()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we don't really have much of a choice here other than doing this processing in-memory since these filters would be too complicated to push down.
@@ -40,6 +40,7 @@ | |||
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), | |||
// Type name doesn't correspond to the name of the class for backward compatibility. | |||
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), | |||
@JsonSubTypes.Type(name = "segmentListLocked", value = RetrieveSegmentsToReplaceAction.class), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segmentListToReplace
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I use retrieveSegmentsToReplace since the other actions do not match the class names for backward compatibility?
} | ||
|
||
/** | ||
* Returns the ID of {@link SubTaskSpec} that is assigned to this subtask. | ||
* This ID is used to identify duplicate work of retry tasks for the same spec. | ||
*/ | ||
public abstract String getSubtaskSpecId(); | ||
|
||
/** | ||
* @return the id of the subtask's supervisor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @return the id of the subtask's supervisor | |
* @return Task ID of the {@code ParallelIndexSupervisorTask} which launched this sub-task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" | ||
); | ||
|
||
final boolean intervalStartIsEternityStart = Intervals.ETERNITY.getStart().equals(interval.getStart()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do this special eternity handling? Are other methods in this class doing this too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is important as such an interval may be passed to the method as an argument.
Other methods in this class do not implement it as they assume that no interval is passed when fetching segments for ETERNITY.
.filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) | ||
.collect(Collectors.toSet()); | ||
|
||
Collection<Pair<DataSegment, String>> segmentsAndCreatedDates = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return early if there are no REPLACE locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Outdated
Show resolved
Hide resolved
// The DruidInputSource can be used to read from one datasource and write to another. | ||
// In such a case, the action can simply fetch all visible segments for the datasource and interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The DruidInputSource can be used to read from one datasource and write to another. | |
// In such a case, the action can simply fetch all visible segments for the datasource and interval | |
// The DruidInputSource can be used to read from one datasource and write to another. | |
// In such a case, the race condition described in class-level docs cannot occur and the action can simply fetch all visible segments for the datasource and interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
final Map<String, Set<DataSegment>> createdToSegmentsMap = entry.getValue(); | ||
for (Map.Entry<String, Set<DataSegment>> createdAndSegments : createdToSegmentsMap.entrySet()) { | ||
if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { | ||
allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a log line perhaps when a segment wasn't considered because it was created after replace lock was acquired. I am assuming it will not be frequent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logs
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Show resolved
Hide resolved
.../src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java
Show resolved
Hide resolved
@kfaraz @abhishekagarwal87 Thank you for the review! |
…ck (apache#15085) * Replacing tasks must read segments created before they acquired their locks
…ck (apache#15085) * Replacing tasks must read segments created before they acquired their locks
Fixes duplication of data with concurrent append and replace jobs.
Description
Consider the following events:
-Segments S1, S2, S3 exist
-Compact acquires replace lock
-Append publishes a segment S4 which needs to be carried forward
-Compact task processes S1-S4 to create new segments
-Compact task publishes new segments and carries S4 forward
This can lead to the data in S4 being duplicated
Approach:
This PR introduces a new task action to fetch only those segments that were present before the REPLACE lock was acquired.
RetrieveSegmentsToReplaceAction:
This new action is utilized by the DruidInputSource.
Release note
This PR has: