Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topn with granularity regression fixes #17565

Merged
merged 10 commits into from
Dec 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static CursorGranularizer create(
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
}

return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
}

private final Cursor cursor;
Expand All @@ -109,20 +109,28 @@ public static CursorGranularizer create(

private long currentBucketStart;
private long currentBucketEnd;
private final Granularity granularity;

private CursorGranularizer(
Cursor cursor,
Granularity granularity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis Do we need this field ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable<Interval> bucketIterable,
@Nullable ColumnValueSelector timeSelector,
boolean descending
)
{
this.cursor = cursor;
this.granularity = granularity;
this.bucketIterable = bucketIterable;
this.timeSelector = timeSelector;
this.descending = descending;
}

public Granularity getGranularity()
{
return granularity;
}

public Iterable<Interval> getBucketIterable()
{
return bucketIterable;
Expand All @@ -135,11 +143,11 @@ public DateTime getBucketStart()

public boolean advanceToBucket(final Interval bucketInterval)
{
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (cursor.isDone()) {
return false;
}
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (timeSelector == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.query.topn;

import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
Expand Down Expand Up @@ -97,12 +99,20 @@ private void runWithCardinalityKnown(
}
boolean hasDimValSelector = (dimValSelector != null);

int cardinality = params.getCardinality();
final int cardinality = params.getCardinality();
final int numValuesPerPass = params.getNumValuesPerPass();
// sanity check to ensure that we only do multi-pass with ALL granularity
if (numValuesPerPass < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) {
throw DruidException.defensive(
"runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required"
);
}
int numProcessed = 0;
long processedRows = 0;
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
int maxNumToProcess = Math.min(numValuesPerPass, cardinality - numProcessed);


DimValSelector theDimValSelector;
if (!hasDimValSelector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public long scanAndAggregate(
{
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public long scanAndAggregate(
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private static long scanAndAggregateDefault(
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
long processedRows = 0;
while (!cursor.isDoneOrInterrupted()) {
while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = dimSelector.getRow();

final int dimSize = dimValues.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected long scanAndAggregate(
final DimensionSelector dimSelector = params.getDimSelector();

long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));

Aggregator[] theAggregators = aggregatesStore.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private static boolean canUsePooledAlgorithm(
final int numBytesToWorkWith = resultsBuf.capacity();
final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality;

return numValuesPerPass <= cardinality;
return numValuesPerPass >= cardinality;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comparision change caught my eye...the new cmp seem to be more in line with the intention - but it seems to me that
depending on some conditions here ; this cardinality's value might be -1 in some cases
is that ok to answer true if cardinality is -1 ?

note: if cardinality == -1 ; then both the old and the new logic returns true - is that ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this is fair, it is not really a problem in practice because currently the column capabilities should not report as dictionary encoded if the cardinality is -1, so we return false earlier in the method before we get here, but this seems worth explicitly checking for just in case.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public long scanAndAggregate(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
if (hasNulls && selector.isNull()) {
if (nullValueAggregates == null) {
nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private long scanAndAggregateWithCardinalityKnown(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Expand Down Expand Up @@ -192,7 +192,7 @@ private long scanAndAggregateWithCardinalityUnknown(
)
{
long processedRows = 0;
while (!cursor.isDone()) {
while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
final IndexedInts dimValues = selector.getRow();
for (int i = 0, size = dimValues.size(); i < size; ++i) {
final int dimIndex = dimValues.get(i);
Expand Down
Loading
Loading