Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topn with granularity regression fixes #17565

Merged
merged 10 commits into from
Dec 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was trying to see if could allow the tests to pass without setting the new context parameter, but it didn't seem to help, so changed to use the new context parameter.

"populateCache": "false",
"timeout": 60000
}
},
Expand Down Expand Up @@ -92,8 +92,8 @@
},
"threshold": 2,
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
"populateCache": "false",
"timeout": 60000
}
},
Expand Down Expand Up @@ -196,8 +196,8 @@
},
"threshold": 2,
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
"populateCache": "false",
"timeout": 60000
}
},
Expand Down Expand Up @@ -320,8 +320,8 @@
},
"threshold": 2,
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
"populateCache": "false",
"timeout": 60000
}
},
Expand Down Expand Up @@ -473,8 +473,8 @@
},
"limit": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
"populateCache": "false",
"timeout": 60000
}
},
Expand Down Expand Up @@ -739,8 +739,8 @@
},
"threshold": 2,
"context": {
"useCache": "true",
"populateCache": "true",
"useCache": "false",
"populateCache": "false",
"timeout": 60000
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Lists;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.segment.ColumnValueSelector;
Expand Down Expand Up @@ -94,11 +95,13 @@ public static CursorGranularizer create(
timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
}

return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING);
}

private final Cursor cursor;

private final Granularity granularity;

// Iterable that iterates over time buckets.
private final Iterable<Interval> bucketIterable;

Expand All @@ -112,12 +115,14 @@ public static CursorGranularizer create(

private CursorGranularizer(
Cursor cursor,
Granularity granularity,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis Do we need this field ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable<Interval> bucketIterable,
@Nullable ColumnValueSelector timeSelector,
boolean descending
)
{
this.cursor = cursor;
this.granularity = granularity;
this.bucketIterable = bucketIterable;
this.timeSelector = timeSelector;
this.descending = descending;
Expand All @@ -133,13 +138,18 @@ public DateTime getBucketStart()
return DateTimes.utc(currentBucketStart);
}

public Interval getCurrentInterval()
{
return Intervals.utc(currentBucketStart, currentBucketEnd);
}

public boolean advanceToBucket(final Interval bucketInterval)
{
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (cursor.isDone()) {
return false;
}
currentBucketStart = bucketInterval.getStartMillis();
currentBucketEnd = bucketInterval.getEndMillis();
if (timeSelector == null) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "topNuseMultiPassPooledQueryGranularity";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a javadoc comment here explaining what this is for, and linking to this PR (or a related issue). It's helpful to have that kind of thing for any undocumented parameter.

/**
* Context parameter to enable/disable the extended filtered sum rewrite logic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public boolean hasNext()
if (delegate != null && delegate.hasNext()) {
return true;
} else {
if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) {
if (granularizer.currentOffsetWithinBucket()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the cursor.isDone() here removed simply because it's redundant?

Copy link
Member Author

@clintropolis clintropolis Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, i noticed granularizer.currentOffsetWithinBucket also checks isDone so it seemed nicer this way, this didn't cause any issues, just noticed while i was looking over stuff

if (delegate != null) {
delegate.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void run(
try {
// reset cursor since we call run again
params.getCursor().reset();
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
// Run topN for all metrics for top N dimension values
allMetricsParam = allMetricAlgo.makeInitParams(params.getSelectorPlus(), params.getCursor(), params.getGranularizer());
allMetricAlgo.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,14 @@ private void runWithCardinalityKnown(
}
boolean hasDimValSelector = (dimValSelector != null);

int cardinality = params.getCardinality();
final int cardinality = params.getCardinality();
final int numValuesPerPass = params.getNumValuesPerPass();
int numProcessed = 0;
long processedRows = 0;
while (numProcessed < cardinality) {
final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
int maxNumToProcess = Math.min(numValuesPerPass, cardinality - numProcessed);


DimValSelector theDimValSelector;
if (!hasDimValSelector) {
Expand All @@ -125,6 +127,7 @@ private void runWithCardinalityKnown(
numProcessed += numToProcess;
if (numProcessed < cardinality) {
params.getCursor().reset();
params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval());
}
}
if (queryMetrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,27 @@ public long scanAndAggregate(
{
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
if (granularizer.currentOffsetWithinBucket()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was the granularizer.currentOffsetWithinBucket() check added here (& the other prototypes) to enable skipping of empty granular buckets?

Copy link
Member Author

@clintropolis clintropolis Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, this was a bug, i did it as an if statement wrapping the loop so we wouldn't need to check currentOffsetWithinBucket() twice per loop since the granularizer advance methods also call currentOffsetWithinBucket

while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,31 @@ public long scanAndAggregate(
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long processedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
return processedRows;
Expand Down
Loading
Loading