-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter pending segments upgraded with transactional replace #15169
Changes from 8 commits
37613c6
368312e
548c4c0
9120210
c100b02
156d809
53d509a
7b27f4b
e03329e
daf0337
92a853c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated() | |
Assert.assertTrue(e instanceof UnsupportedOperationException); | ||
} | ||
|
||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
supervisor.getActiveRealtimeSequencePrefixes(); | ||
} | ||
catch (Exception e) { | ||
Assert.assertTrue(e instanceof UnsupportedOperationException); | ||
} | ||
|
||
Callable<Integer> noop = new Callable<Integer>() { | ||
@Override | ||
public Integer call() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ | |
|
||
import javax.annotation.Nullable; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -110,6 +111,14 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String | |
return Optional.absent(); | ||
} | ||
|
||
public Set<String> getActiveRealtimeSequencePrefixes(String activeSupervisorId) | ||
{ | ||
if (!supervisors.containsKey(activeSupervisorId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please invert the if condition:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return Collections.emptySet(); | ||
} | ||
return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); | ||
} | ||
|
||
public Optional<SupervisorSpec> getSupervisorSpec(String id) | ||
{ | ||
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments( | |
* </ul> | ||
* | ||
* @param replaceSegments Segments being committed by a REPLACE task | ||
* @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the javadoc to say prefixes instead of sequence names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
* of the supervisor (if any) for this datasource | ||
* @return Map from originally allocated pending segment to its new upgraded ID. | ||
*/ | ||
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith( | ||
Set<DataSegment> replaceSegments | ||
Set<DataSegment> replaceSegments, | ||
Set<String> activeRealtimeSequencePrefixes | ||
); | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import javax.annotation.Nullable; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
public interface Supervisor | ||
{ | ||
|
@@ -93,4 +94,9 @@ default Boolean isHealthy() | |
LagStats computeLagStats(); | ||
|
||
int getActiveTaskGroupsCount(); | ||
|
||
/** | ||
* @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: prefixes instead of sequence names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
*/ | ||
Set<String> getActiveRealtimeSequencePrefixes(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Probably don't need an explicit message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix this as the message is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Missed this earlier