Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Replacing task must read segments created before it acquired its lock #15085

Merged

Conversation

AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Oct 4, 2023

Fixes duplication of data with concurrent append and replace jobs.

Description

Consider the following events:

-Segments S1, S2, S3 exist
-Compact acquires replace lock
-Append publishes a segment S4 which needs to be carried forward
-Compact task processes S1-S4 to create new segments
-Compact task publishes new segments and carries S4 forward

This can lead to the data in S4 being duplicated

Approach:

This PR introduces a new task action to fetch only those segments that were present before the REPLACE lock was acquired.

RetrieveSegmentsToReplaceAction:

  1. Implement the method getSupervisorTaskId at the level of AbstractBatchSubtask.
  2. Fetch all replace locks for datasource using getAllReplaceLocksForDatasource, and filter those that have the same supervisorId.
  3. Fetch segments and created_date for the interval of the action.
  4. For each segment, find a replace lock in the filtered set whose interval contains its own interval and filter based on created_date and lock_version

This new action is utilized by the DruidInputSource.

Release note


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as ready for review October 5, 2023 05:20
@kfaraz
Copy link
Contributor

kfaraz commented Oct 5, 2023

@AmatyaAvadhanula , should we use the new action only when using REPLACE lock type?

@AmatyaAvadhanula
Copy link
Contributor Author

AmatyaAvadhanula commented Oct 5, 2023

should we use the new action only when using REPLACE lock type?

I think it would be harder to figure out which action to use within the DruidInputSource.
The current action is identical to the previous action (for a single interval) in cases where the task itself doesn't hold a replace lock over the interval

@kfaraz
Copy link
Contributor

kfaraz commented Oct 6, 2023

I think it would be harder to figure out which action to use within the DruidInputSource.

I guess the only case we care about is CompactionTask, where we can just the TaskLockType while constructing the DruidInputSource. If this input source is passed along to the sub tasks, the DruidInputSource deserialized in the sub tasks will know the lock type as well.

The reason this would be preferable is the new action would just have to fetch the segments that the task is going to replace, i.e. segments belonging to the interval on which this task has a replace lock and which were created before the lock was acquired. It would avoid duplication and confusion with the existing action.

Other cases:

  • Re-indexing from datasource A to B: None of the above applies, so we would just continue using the old action.
  • Re-indexing into the same datasource: Does this get launched as a compact task or a regular index_parallel task? If it is the latter, then while doing REPLACE, the submitted spec would need to declare REPLACE both in task context and in the io config.

@AmatyaAvadhanula
Copy link
Contributor Author

Re-indexing into the same datasource: Does this get launched as a compact task or a regular index_parallel task? If it is the latter, then while doing REPLACE, the submitted spec would need to declare REPLACE both in task context and in the io config.

It is the latter.

segments belonging to the interval on which this task has a replace lock and which were created before the lock was acquired. It would avoid duplication and confusion with the existing action.

I think that the task can fetch segments without having a lock as well (cardinality phase with hash partitioning for example)

@kfaraz
Copy link
Contributor

kfaraz commented Oct 6, 2023

I think that the task can fetch segments without having a lock as well (cardinality phase with hash partitioning for example)

Isn't the replace lock acquired by the index_parallel supervisor task even before it has started running, i.e. in the isReady call?

@AmatyaAvadhanula
Copy link
Contributor Author

Isn't the replace lock acquired by the index_parallel supervisor task even before it has started running, i.e. in the isReady call?

No, it depends on ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() to lock the interval which is often unspecified. In such cases the lock is acquired after the cardinality phase reads the segments

@kfaraz
Copy link
Contributor

kfaraz commented Oct 8, 2023

@AmatyaAvadhanula , since the new action deals with replace locks, it must be fired using the SurrogateTaskActionClient.

@AmatyaAvadhanula
Copy link
Contributor Author

since the new action deals with replace locks, it must be fired using the SurrogateTaskActionClient.

Thanks, @kfaraz. I'll make the change.

@AmatyaAvadhanula AmatyaAvadhanula requested review from kfaraz and removed request for kfaraz October 14, 2023 12:54
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 16, 2023
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall approach looks fine, left some comments for improvements.

if (replaceLock.getInterval().contains(segment.getInterval())
&& replaceLock.getVersion().compareTo(createdDate) < 0) {
// If a REPLACE lock covers a segment but has a version less than the segment's created date, remove it
allSegmentsToBeReplaced.remove(segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than going through the collection of segments and created dates twice (once to add everything, then to remove the invalid ones), start with an empty set and add only the valid entries to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
segmentsAndCreatedDates.forEach(segmentAndCreatedDate -> allSegmentsToBeReplaced.add(segmentAndCreatedDate.lhs));
for (Pair<DataSegment, String> segmentAndCreatedDate : segmentsAndCreatedDates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a single interval may have several segments, see if this loop can be improved by using a map from Interval to version to Set<DataSegment>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

final DataSegment segment = segmentAndCreatedDate.lhs;
final String createdDate = segmentAndCreatedDate.rhs;
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
if (replaceLock.getInterval().contains(segment.getInterval())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we don't really have much of a choice here other than doing this processing in-memory since these filters would be too complicated to push down.

@@ -40,6 +40,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListLocked", value = RetrieveSegmentsToReplaceAction.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

segmentListToReplace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I use retrieveSegmentsToReplace since the other actions do not match the class names for backward compatibility?

}

/**
* Returns the ID of {@link SubTaskSpec} that is assigned to this subtask.
* This ID is used to identify duplicate work of retry tasks for the same spec.
*/
public abstract String getSubtaskSpecId();

/**
* @return the id of the subtask's supervisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return the id of the subtask's supervisor
* @return Task ID of the {@code ParallelIndexSupervisorTask} which launched this sub-task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true"
);

final boolean intervalStartIsEternityStart = Intervals.ETERNITY.getStart().equals(interval.getStart());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this special eternity handling? Are other methods in this class doing this too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important as such an interval may be passed to the method as an argument.
Other methods in this class do not implement it as they assume that no interval is passed when fetching segments for ETERNITY.

.filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());

Collection<Pair<DataSegment, String>> segmentsAndCreatedDates =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return early if there are no REPLACE locks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 91 to 92
// The DruidInputSource can be used to read from one datasource and write to another.
// In such a case, the action can simply fetch all visible segments for the datasource and interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The DruidInputSource can be used to read from one datasource and write to another.
// In such a case, the action can simply fetch all visible segments for the datasource and interval
// The DruidInputSource can be used to read from one datasource and write to another.
// In such a case, the race condition described in class-level docs cannot occur and the action can simply fetch all visible segments for the datasource and interval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final Map<String, Set<DataSegment>> createdToSegmentsMap = entry.getValue();
for (Map.Entry<String, Set<DataSegment>> createdAndSegments : createdToSegmentsMap.entrySet()) {
if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a log line perhaps when a segment wasn't considered because it was created after replace lock was acquired. I am assuming it will not be frequent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logs

@AmatyaAvadhanula
Copy link
Contributor Author

@kfaraz @abhishekagarwal87 Thank you for the review!

@AmatyaAvadhanula AmatyaAvadhanula merged commit a8febd4 into apache:master Oct 19, 2023
81 checks passed
AmatyaAvadhanula added a commit to AmatyaAvadhanula/druid that referenced this pull request Oct 19, 2023
…ck (apache#15085)

* Replacing tasks must read segments created before they acquired their locks
AmatyaAvadhanula added a commit that referenced this pull request Oct 19, 2023
…ck (#15085) (#15208)

* Replacing tasks must read segments created before they acquired their locks
@kfaraz kfaraz deleted the fixTransactionReplaceCommit branch November 7, 2023 04:15
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
…ck (apache#15085)

* Replacing tasks must read segments created before they acquired their locks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants