Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pending segments upgraded with transactional replace #15169

Merged

Conversation

AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Oct 16, 2023

The pending segment upgrade introduced as part of #15039 upgrades all pending segments for a given interval.
This PR aims to optimize it by only upgrading those pending segments for the interval which correspond to active tasks of the supervisor.
This can be done by fetching the baseSequenceNames of the active and pending completion task groups of the supervisor correspdonding to the datasource, and upgrading only those pending segments that have a sequence_name which has one of the preivous baseSequenceNames as the prefix.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@AmatyaAvadhanula AmatyaAvadhanula added this to the 28.0 milestone Oct 16, 2023
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

Also, I would prefer the name activeRealtimeSequencePrefixes as it avoids ambiguity.

@Override
public Set<String> getActiveBaseSequenceNames()
{
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probably don't need an explicit message here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this as the message is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Missed this earlier

@@ -345,10 +345,12 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor
* @param activeBaseSequenceNames Set of base sequence names of active and pending completion task groups of the supervisor (if any) for this datasource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -660,6 +662,18 @@ private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmen
: overlappingPendingSegments.entrySet()) {
final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey();
final String pendingSegmentSequence = overlappingPendingSegment.getValue();

boolean considerSequence = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the offline discussion, I think we had decided to include the sequence name in the SELECT query itself, rather than filtering in-memory here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", pendingSegment.toString())
.bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode(
Hashing.sha1()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note test

Invoking
Hashing.sha1
should be avoided because it has been deprecated.
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New approach looks better, just need some code cleanup for approval.

@@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated()
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Assert.assertThrows instead and update the existing assertions in this method too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public Set<String> getActiveBaseSequenceNames()
{
throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this as the message is redundant.

@@ -110,6 +111,14 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String
return Optional.absent();
}

public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId)
{
if (!supervisors.containsKey(activeSupervisorId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please invert the if condition:

if contains:
   return value
else
  return empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the javadoc to say prefixes instead of sequence names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using this. It is okay to just test the method which invokes this internally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method invoking this internally would ignore segments that are not needed eventually, and it would be hard to test this optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, It seems like it filters segments that have already been upgraded, and not all the unnecessary segments. I can make this change

}

final List<String> sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter);
StringBuilder sql = new StringBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use .append instead of + as it is already a StringBuilder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be much easier to read and understand the sql if written using a format string.

SELECT sequence, payload
FROM %s
WHERE dataSource=:dataSource
AND start < :end
AND end < :start
AND (:sequenceFilterCondition)

Then build a separate string for sequenceFilterCondition which would basically be a bunch of filters tied by ORs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

Comment on lines 286 to 295
for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
sql.append("(sequence_name LIKE ")
.append(StringUtils.format(":prefix%d", i))
.append(")")
.append(" OR ");
}

sql.append("(sequence_name LIKE ")
.append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
.append(")");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner to just do this for i = 0 to sequenceNamePrefixes.size():

Suggested change
for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
sql.append("(sequence_name LIKE ")
.append(StringUtils.format(":prefix%d", i))
.append(")")
.append(" OR ");
}
sql.append("(sequence_name LIKE ")
.append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
.append(")");
sequenceFilterCondition.append(
StringUtils.format("(sequence_name LIKE :prefix%d)", i)
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("used_status_last_updated", now);
.bind("id", segment.getId().toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a sensitive PR for a new feature, let's revert all these formatting changes for now. Better to do reformatting in refactor PRs.

Please revert all formatting changes in other lines in this class too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I must have auto-indented the whole file

.bind("end", interval.getEnd().toString());

for (int i = 1; i <= sequenceNamePrefixes.size(); i++) {
query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i - 1) + "%");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need the % for in sequenceNamePrefixes.get(i - 1) + "%"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want the sequence to begin with the sequence name prefix and not just be equal to it

@AmatyaAvadhanula AmatyaAvadhanula merged commit 65b69cd into apache:master Oct 23, 2023
82 checks passed
AmatyaAvadhanula added a commit to AmatyaAvadhanula/druid that referenced this pull request Oct 23, 2023
…5169)

* Filter pending segments upgraded with transactional replace

* Push sequence name filter to metadata query
LakshSingla pushed a commit that referenced this pull request Oct 23, 2023
…15234)

* Filter pending segments upgraded with transactional replace

* Push sequence name filter to metadata query
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
…5169)

* Filter pending segments upgraded with transactional replace

* Push sequence name filter to metadata query
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants