-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter pending segments upgraded with transactional replace #15169
Filter pending segments upgraded with transactional replace #15169
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
Also, I would prefer the name activeRealtimeSequencePrefixes
as it avoids ambiguity.
@Override | ||
public Set<String> getActiveBaseSequenceNames() | ||
{ | ||
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Probably don't need an explicit message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this as the message is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Missed this earlier
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Show resolved
Hide resolved
@@ -345,10 +345,12 @@ SegmentPublishResult commitReplaceSegments( | |||
* </ul> | |||
* | |||
* @param replaceSegments Segments being committed by a REPLACE task | |||
* @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor | |
* @param activeBaseSequenceNames Set of base sequence names of active and pending completion task groups of the supervisor (if any) for this datasource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -660,6 +662,18 @@ private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmen | |||
: overlappingPendingSegments.entrySet()) { | |||
final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); | |||
final String pendingSegmentSequence = overlappingPendingSegment.getValue(); | |||
|
|||
boolean considerSequence = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the offline discussion, I think we had decided to include the sequence name in the SELECT query itself, rather than filtering in-memory here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.bind("sequence_name", sequenceName) | ||
.bind("sequence_prev_id", pendingSegment.toString()) | ||
.bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode( | ||
Hashing.sha1() |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
Hashing.sha1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New approach looks better, just need some code cleanup for approval.
@@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated() | |||
Assert.assertTrue(e instanceof UnsupportedOperationException); | |||
} | |||
|
|||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Assert.assertThrows
instead and update the existing assertions in this method too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Override | ||
public Set<String> getActiveBaseSequenceNames() | ||
{ | ||
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this as the message is redundant.
@@ -110,6 +111,14 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String | |||
return Optional.absent(); | |||
} | |||
|
|||
public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId) | |||
{ | |||
if (!supervisors.containsKey(activeSupervisorId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please invert the if condition:
if contains:
return value
else
return empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments( | |||
* </ul> | |||
* | |||
* @param replaceSegments Segments being committed by a REPLACE task | |||
* @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the javadoc to say prefixes instead of sequence names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Show resolved
Hide resolved
*/ | ||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using this. It is okay to just test the method which invokes this internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method invoking this internally would ignore segments that are not needed eventually, and it would be hard to test this optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, It seems like it filters segments that have already been upgraded, and not all the unnecessary segments. I can make this change
} | ||
|
||
final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); | ||
StringBuilder sql = new StringBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use .append
instead of +
as it is already a StringBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be much easier to read and understand the sql if written using a format string.
SELECT sequence, payload
FROM %s
WHERE dataSource=:dataSource
AND start < :end
AND end < :start
AND (:sequenceFilterCondition)
Then build a separate string for sequenceFilterCondition
which would basically be a bunch of filters tied by OR
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks!
for (int i = 1; i < sequenceNamePrefixes.size(); i++) { | ||
sql.append("(sequence_name LIKE ") | ||
.append(StringUtils.format(":prefix%d", i)) | ||
.append(")") | ||
.append(" OR "); | ||
} | ||
|
||
sql.append("(sequence_name LIKE ") | ||
.append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size())) | ||
.append(")"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner to just do this for i = 0 to sequenceNamePrefixes.size()
:
for (int i = 1; i < sequenceNamePrefixes.size(); i++) { | |
sql.append("(sequence_name LIKE ") | |
.append(StringUtils.format(":prefix%d", i)) | |
.append(")") | |
.append(" OR "); | |
} | |
sql.append("(sequence_name LIKE ") | |
.append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size())) | |
.append(")"); | |
sequenceFilterCondition.append( | |
StringUtils.format("(sequence_name LIKE :prefix%d)", i) | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
.bind("used", usedSegments.contains(segment)) | ||
.bind("payload", jsonMapper.writeValueAsBytes(segment)) | ||
.bind("used_status_last_updated", now); | ||
.bind("id", segment.getId().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a sensitive PR for a new feature, let's revert all these formatting changes for now. Better to do reformatting in refactor PRs.
Please revert all formatting changes in other lines in this class too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I must have auto-indented the whole file
.bind("end", interval.getEnd().toString()); | ||
|
||
for (int i = 1; i <= sequenceNamePrefixes.size(); i++) { | ||
query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i - 1) + "%"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we need the %
for in sequenceNamePrefixes.get(i - 1) + "%"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we want the sequence to begin with the sequence name prefix and not just be equal to it
…5169) * Filter pending segments upgraded with transactional replace * Push sequence name filter to metadata query
…5169) * Filter pending segments upgraded with transactional replace * Push sequence name filter to metadata query
The pending segment upgrade introduced as part of #15039 upgrades all pending segments for a given interval.
This PR aims to optimize it by only upgrading those pending segments for the interval which correspond to active tasks of the supervisor.
This can be done by fetching the baseSequenceNames of the active and pending completion task groups of the supervisor correspdonding to the datasource, and upgrading only those pending segments that have a sequence_name which has one of the preivous baseSequenceNames as the prefix.
This PR has: