Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topn with granularity regression fixes #17565

Merged
merged 10 commits into from
Dec 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static CursorGranularizer create(
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
}

return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
}

private final Cursor cursor;
Expand All @@ -109,20 +109,28 @@ public static CursorGranularizer create(

private long currentBucketStart;
private long currentBucketEnd;
private final Granularity granularity;

private CursorGranularizer(
Cursor cursor,
Granularity granularity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis Do we need this field ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable<Interval> bucketIterable,
@Nullable ColumnValueSelector timeSelector,
boolean descending
)
{
this.cursor = cursor;
this.granularity = granularity;
this.bucketIterable = bucketIterable;
this.timeSelector = timeSelector;
this.descending = descending;
}

public Granularity getGranularity()
{
return granularity;
}

public Iterable<Interval> getBucketIterable()
{
return bucketIterable;
Expand All @@ -135,11 +143,11 @@ public DateTime getBucketStart()

public boolean advanceToBucket(final Interval bucketInterval)
{
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (cursor.isDone()) {
return false;
}
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (timeSelector == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public boolean hasNext()
if (delegate != null && delegate.hasNext()) {
return true;
} else {
if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
if (granularizer.currentOffsetWithinBucket()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the cursor.isDone() here removed simply because it's redundant?

Copy link
Member Author

@clintropolis clintropolis Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, i noticed granularizer.currentOffsetWithinBucket also checks isDone so it seemed nicer this way, this didn't cause any issues, just noticed while i was looking over stuff

if (delegate != null) {
delegate.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.query.topn;

import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
Expand Down Expand Up @@ -97,12 +99,20 @@ private void runWithCardinalityKnown(
}
boolean hasDimValSelector = (dimValSelector != null);

int cardinality = params.getCardinality();
final int cardinality = params.getCardinality();
final int numValuesPerPass = params.getNumValuesPerPass();
// sanity check to ensure that we only do multi-pass with ALL granularity
if (numValuesPerPass < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) {
throw DruidException.defensive(
"runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required"
);
}
int numProcessed = 0;
long processedRows = 0;
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
int maxNumToProcess = Math.min(numValuesPerPass, cardinality - numProcessed);


DimValSelector theDimValSelector;
if (!hasDimValSelector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,27 @@ public long scanAndAggregate(
{
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
if (granularizer.currentOffsetWithinBucket()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the granularizer.currentOffsetWithinBucket() check added here (& the other prototypes) to enable skipping of empty granular buckets?

Copy link
Member Author

@clintropolis clintropolis Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, this was a bug, i did it as an if statement wrapping the loop so we wouldn't need to check currentOffsetWithinBucket() twice per loop since the granularizer advance methods also call currentOffsetWithinBucket

while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,31 @@ public long scanAndAggregate(
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;
Expand Down
Loading
Loading