-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
topn with granularity regression fixes #17565
Changes from 9 commits
ecd7a5c
76d1c6c
2f61031
3d986f1
129f381
63c4376
f8f07dd
d4fcf84
9ba9679
9728e6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import com.google.common.collect.Lists; | ||
import org.apache.druid.error.DruidException; | ||
import org.apache.druid.java.util.common.DateTimes; | ||
import org.apache.druid.java.util.common.Intervals; | ||
import org.apache.druid.java.util.common.granularity.Granularities; | ||
import org.apache.druid.java.util.common.granularity.Granularity; | ||
import org.apache.druid.segment.ColumnValueSelector; | ||
|
@@ -94,11 +95,13 @@ public static CursorGranularizer create( | |
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); | ||
} | ||
|
||
return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); | ||
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); | ||
} | ||
|
||
private final Cursor cursor; | ||
|
||
private final Granularity granularity; | ||
|
||
// Iterable that iterates over time buckets. | ||
private final Iterable<Interval> bucketIterable; | ||
|
||
|
@@ -112,12 +115,14 @@ public static CursorGranularizer create( | |
|
||
private CursorGranularizer( | ||
Cursor cursor, | ||
Granularity granularity, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @clintropolis Do we need this field ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
Iterable<Interval> bucketIterable, | ||
@Nullable ColumnValueSelector timeSelector, | ||
boolean descending | ||
) | ||
{ | ||
this.cursor = cursor; | ||
this.granularity = granularity; | ||
this.bucketIterable = bucketIterable; | ||
this.timeSelector = timeSelector; | ||
this.descending = descending; | ||
|
@@ -133,13 +138,18 @@ public DateTime getBucketStart() | |
return DateTimes.utc(currentBucketStart); | ||
} | ||
|
||
public Interval getCurrentInterval() | ||
{ | ||
return Intervals.utc(currentBucketStart, currentBucketEnd); | ||
} | ||
|
||
public boolean advanceToBucket(final Interval bucketInterval) | ||
{ | ||
currentBucketStart = bucketInterval.getStartMillis(); | ||
currentBucketEnd = bucketInterval.getEndMillis(); | ||
if (cursor.isDone()) { | ||
return false; | ||
} | ||
currentBucketStart = bucketInterval.getStartMillis(); | ||
currentBucketEnd = bucketInterval.getEndMillis(); | ||
if (timeSelector == null) { | ||
return true; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,7 @@ public class QueryContexts | |
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; | ||
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; | ||
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; | ||
public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "topNuseMultiPassPooledQueryGranularity"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a javadoc comment here explaining what this is for, and linking to this PR (or a related issue). It's helpful to have that kind of thing for any undocumented parameter. |
||
/** | ||
* Context parameter to enable/disable the extended filtered sum rewrite logic. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -391,7 +391,7 @@ public boolean hasNext() | |
if (delegate != null && delegate.hasNext()) { | ||
return true; | ||
} else { | ||
if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { | ||
if (granularizer.currentOffsetWithinBucket()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, i noticed granularizer.currentOffsetWithinBucket also checks isDone so it seemed nicer this way, this didn't cause any issues, just noticed while i was looking over stuff |
||
if (delegate != null) { | ||
delegate.close(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,25 +54,27 @@ public long scanAndAggregate( | |
{ | ||
long processedRows = 0; | ||
int positionToAllocate = 0; | ||
while (!cursor.isDoneOrInterrupted()) { | ||
final IndexedInts dimValues = dimensionSelector.getRow(); | ||
final int dimSize = dimValues.size(); | ||
for (int i = 0; i < dimSize; i++) { | ||
int dimIndex = dimValues.get(i); | ||
int position = positions[dimIndex]; | ||
if (position >= 0) { | ||
aggregator.aggregate(resultsBuffer, position); | ||
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { | ||
positions[dimIndex] = positionToAllocate; | ||
position = positionToAllocate; | ||
aggregator.init(resultsBuffer, position); | ||
aggregator.aggregate(resultsBuffer, position); | ||
positionToAllocate += aggregatorSize; | ||
if (granularizer.currentOffsetWithinBucket()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, this was a bug, i did it as an if statement wrapping the loop so we wouldn't need to check |
||
while (!cursor.isDoneOrInterrupted()) { | ||
final IndexedInts dimValues = dimensionSelector.getRow(); | ||
final int dimSize = dimValues.size(); | ||
for (int i = 0; i < dimSize; i++) { | ||
int dimIndex = dimValues.get(i); | ||
int position = positions[dimIndex]; | ||
if (position >= 0) { | ||
aggregator.aggregate(resultsBuffer, position); | ||
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { | ||
positions[dimIndex] = positionToAllocate; | ||
position = positionToAllocate; | ||
aggregator.init(resultsBuffer, position); | ||
aggregator.aggregate(resultsBuffer, position); | ||
positionToAllocate += aggregatorSize; | ||
} | ||
} | ||
processedRows++; | ||
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { | ||
break; | ||
} | ||
} | ||
processedRows++; | ||
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { | ||
break; | ||
} | ||
} | ||
return processedRows; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was trying to see if could allow the tests to pass without setting the new context parameter, but it didn't seem to help, so changed to use the new context parameter.