Skip to content

Commit

Permalink
topn with granularity regression fixes
Browse files Browse the repository at this point in the history
changes:
* fix issue where topN with query granularity other than ALL would use the heap algorithm when it was actual able to use the pooled algorithm, and incorrectly used the pool algorithm in cases where it must use the heap algorithm, a regression from apache#16533
* fix issue where topN with query granularity other than ALL could incorrectly process values in the wrong time bucket, another regression from apache#16533
  • Loading branch information
clintropolis committed Dec 13, 2024
1 parent aca56d6 commit ecd7a5c
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static CursorGranularizer create(
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
}

return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
}

private final Cursor cursor;
Expand All @@ -109,20 +109,28 @@ public static CursorGranularizer create(

private long currentBucketStart;
private long currentBucketEnd;
private final Granularity granularity;

private CursorGranularizer(
Cursor cursor,
Granularity granularity,
Iterable<Interval> bucketIterable,
@Nullable ColumnValueSelector timeSelector,
boolean descending
)
{
this.cursor = cursor;
this.granularity = granularity;
this.bucketIterable = bucketIterable;
this.timeSelector = timeSelector;
this.descending = descending;
}

public Granularity getGranularity()
{
return granularity;
}

public Iterable<Interval> getBucketIterable()
{
return bucketIterable;
Expand All @@ -135,11 +143,11 @@ public DateTime getBucketStart()

public boolean advanceToBucket(final Interval bucketInterval)
{
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (cursor.isDone()) {
return false;
}
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (timeSelector == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.query.topn;

import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
Expand Down Expand Up @@ -103,6 +105,12 @@ private void runWithCardinalityKnown(
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
// sanity check to ensure that we only do multi-pass with ALL granularity
if (maxNumToProcess < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) {
throw DruidException.defensive(
"runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required"
);
}

DimValSelector theDimValSelector;
if (!hasDimValSelector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public long scanAndAggregate(
{
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public long scanAndAggregate(
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private static long scanAndAggregateDefault(
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
long processedRows = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimSelector.getRow();

final int dimSize = dimValues.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected long scanAndAggregate(
final DimensionSelector dimSelector = params.getDimSelector();

long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));

Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private static boolean canUsePooledAlgorithm(
final int numBytesToWorkWith = resultsBuf.capacity();
final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality;

return numValuesPerPass <= cardinality;
return numValuesPerPass >= cardinality;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public long scanAndAggregate(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
if (hasNulls && selector.isNull()) {
if (nullValueAggregates == null) {
nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private long scanAndAggregateWithCardinalityKnown(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Expand Down Expand Up @@ -192,7 +192,7 @@ private long scanAndAggregateWithCardinalityUnknown(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Expand Down
Loading

0 comments on commit ecd7a5c

Please sign in to comment.