From ecd7a5c79691755fb5dad00a7115dbd4f518bc7b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 12 Dec 2024 18:23:45 -0800 Subject: [PATCH 1/8] topn with granularity regression fixes changes: * fix issue where topN with query granularity other than ALL would use the heap algorithm when it was actual able to use the pooled algorithm, and incorrectly used the pool algorithm in cases where it must use the heap algorithm, a regression from #16533 * fix issue where topN with query granularity other than ALL could incorrectly process values in the wrong time bucket, another regression from #16533 --- .../druid/query/CursorGranularizer.java | 14 +- .../druid/query/topn/BaseTopNAlgorithm.java | 8 + ...Generic1AggPooledTopNScannerPrototype.java | 2 +- ...Generic2AggPooledTopNScannerPrototype.java | 2 +- .../druid/query/topn/PooledTopNAlgorithm.java | 2 +- .../topn/TimeExtractionTopNAlgorithm.java | 2 +- .../druid/query/topn/TopNQueryEngine.java | 2 +- ...eNumericTopNColumnAggregatesProcessor.java | 2 +- .../StringTopNColumnAggregatesProcessor.java | 4 +- .../druid/query/topn/TopNQueryRunnerTest.java | 219 ++++++++++++++++++ 10 files changed, 246 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index 6bc387183bae..e538ecca5025 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -94,7 +94,7 @@ public static CursorGranularizer create( timeSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME); } - return new CursorGranularizer(cursor, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); + return new CursorGranularizer(cursor, granularity, bucketIterable, timeSelector, timeOrder == Order.DESCENDING); } private final Cursor cursor; @@ -109,20 +109,28 @@ public static CursorGranularizer create( private long currentBucketStart; private long currentBucketEnd; + private final Granularity granularity; private CursorGranularizer( Cursor cursor, + Granularity granularity, Iterable bucketIterable, @Nullable ColumnValueSelector timeSelector, boolean descending ) { this.cursor = cursor; + this.granularity = granularity; this.bucketIterable = bucketIterable; this.timeSelector = timeSelector; this.descending = descending; } + public Granularity getGranularity() + { + return granularity; + } + public Iterable getBucketIterable() { return bucketIterable; @@ -135,11 +143,11 @@ public DateTime getBucketStart() public boolean advanceToBucket(final Interval bucketInterval) { + currentBucketStart = bucketInterval.getStartMillis(); + currentBucketEnd = bucketInterval.getEndMillis(); if (cursor.isDone()) { return false; } - currentBucketStart = bucketInterval.getStartMillis(); - currentBucketEnd = bucketInterval.getEndMillis(); if (timeSelector == null) { return true; } diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 4c0bb066eecb..6f4bad3c2439 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -20,8 +20,10 @@ package org.apache.druid.query.topn; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -103,6 +105,12 @@ private void runWithCardinalityKnown( while (numProcessed < cardinality) { final int numToProcess; int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); + // sanity check to ensure that we only do multi-pass with ALL granularity + if (maxNumToProcess < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) { + throw DruidException.defensive( + "runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required" + ); + } DimValSelector theDimValSelector; if (!hasDimValSelector) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java index f4c2ba1863bd..52d065c95c0b 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java @@ -54,7 +54,7 @@ public long scanAndAggregate( { long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimensionSelector.getRow(); final int dimSize = dimValues.size(); for (int i = 0; i < dimSize; i++) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java index 4de281d8a0b8..406344e26ca1 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java @@ -57,7 +57,7 @@ public long scanAndAggregate( int totalAggregatorsSize = aggregator1Size + aggregator2Size; long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimensionSelector.getRow(); final int dimSize = dimValues.size(); for (int i = 0; i < dimSize; i++) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index d2ba16746218..b3f2a3b9028d 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -477,7 +477,7 @@ private static long scanAndAggregateDefault( final int aggExtra = aggSize % AGG_UNROLL_COUNT; int currentPosition = 0; long processedRows = 0; - while (!cursor.isDoneOrInterrupted()) { + while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = dimSelector.getRow(); final int dimSize = dimValues.size(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 0c79e7c8d31b..72f2abc80bb0 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -93,7 +93,7 @@ protected long scanAndAggregate( final DimensionSelector dimSelector = params.getDimSelector(); long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 1382c9aaa4b7..414f5bad26cb 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -275,7 +275,7 @@ private static boolean canUsePooledAlgorithm( final int numBytesToWorkWith = resultsBuf.capacity(); final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; - return numValuesPerPass <= cardinality; + return numValuesPerPass >= cardinality; } } diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index 565ad036cea0..cf80074e63ee 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -94,7 +94,7 @@ public long scanAndAggregate( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { if (hasNulls && selector.isNull()) { if (nullValueAggregates == null) { nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 9eca369fdc7e..47333d2ed321 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -150,7 +150,7 @@ private long scanAndAggregateWithCardinalityKnown( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); @@ -192,7 +192,7 @@ private long scanAndAggregateWithCardinalityUnknown( ) { long processedRows = 0; - while (!cursor.isDone()) { + while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0, size = dimValues.size(); i < size; ++i) { final int dimIndex = dimValues.get(i); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 285ccf31a60e..e3cf0fa51d28 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -60,6 +60,7 @@ import org.apache.druid.query.aggregation.FloatMaxAggregatorFactory; import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory; @@ -6378,6 +6379,224 @@ public void testTopNAggregateTopnMetricFirstWithGranularity() assertExpectedResults(expectedResults, query); } + + @Test + public void testTopN_time_granularity_empty_buckets() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_uses_heap_if_too_big() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 10000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + private void assumeTimeOrdered() { try (final CursorHolder cursorHolder = From 76d1c6cf3c2f34949c5870a8651f8d873f51fca6 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 12 Dec 2024 18:37:49 -0800 Subject: [PATCH 2/8] move defensive check outside of loop --- .../druid/query/topn/BaseTopNAlgorithm.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 6f4bad3c2439..80c964c9ba73 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -99,18 +99,20 @@ private void runWithCardinalityKnown( } boolean hasDimValSelector = (dimValSelector != null); - int cardinality = params.getCardinality(); + final int cardinality = params.getCardinality(); + final int numValuesPerPass = params.getNumValuesPerPass(); + // sanity check to ensure that we only do multi-pass with ALL granularity + if (numValuesPerPass < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) { + throw DruidException.defensive( + "runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required" + ); + } int numProcessed = 0; long processedRows = 0; while (numProcessed < cardinality) { final int numToProcess; - int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); - // sanity check to ensure that we only do multi-pass with ALL granularity - if (maxNumToProcess < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) { - throw DruidException.defensive( - "runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required" - ); - } + int maxNumToProcess = Math.min(numValuesPerPass, cardinality - numProcessed); + DimValSelector theDimValSelector; if (!hasDimValSelector) { From 2f61031cd19fdd293c88ae6e6b792890a056ddaa Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 12 Dec 2024 23:22:55 -0800 Subject: [PATCH 3/8] more test --- .../druid/query/topn/TopNQueryRunnerTest.java | 477 ++++++++++++++++++ 1 file changed, 477 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index e3cf0fa51d28..19886f3443ca 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -6484,6 +6484,483 @@ public void testTopN_time_granularity_empty_buckets() assertExpectedResults(expectedResults, query); } + @Test + public void testTopN_time_granularity_empty_buckets_expression() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .virtualColumns( + new ExpressionVirtualColumn( + "vc", + "market + ' ' + placement", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ) + .dimension("vc") + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators(QueryRunnerTestHelper.INDEX_LONG_SUM) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + "vc", "total_market preferred", + "index", 2836L + ), + ImmutableMap.of( + "vc", "upfront preferred", + "index", 2681L + ), + ImmutableMap.of( + "vc", "spot preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + "vc", "total_market preferred", + "index", 2514L + ), + ImmutableMap.of( + "vc", "upfront preferred", + "index", 2193L + ), + ImmutableMap.of( + "vc", "spot preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_2pool() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + QueryRunnerTestHelper.INDEX_DOUBLE_MAX + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2836L, + "doubleMaxIndex", 1522.043733 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2681L, + "doubleMaxIndex", 1447.34116 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1102L, + "doubleMaxIndex", 158.747224 + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "index", 2514L, + "doubleMaxIndex", 1321.375057 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "index", 2193L, + "doubleMaxIndex", 1144.342401 + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "index", 1120L, + "doubleMaxIndex", 166.016049 + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_timeExtract() + { + // this is pretty wierd to have both query granularity and a time extractionFn... but it is not explicitly + // forbidden so might as well test it + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension( + new ExtractionDimensionSpec( + ColumnHolder.TIME_COLUMN_NAME, + "dayOfWeek", + new TimeFormatExtractionFn("EEEE", null, null, null, false) + ) + ) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Collections.singletonList( + ImmutableMap.of( + "dayOfWeek", "Friday", + "index", 6619L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Collections.singletonList( + ImmutableMap.of( + "dayOfWeek", "Saturday", + "index", 5827L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_empty_buckets_numeric() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(new DefaultDimensionSpec("qualityLong", "qualityLong", ColumnType.LONG)) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.asList( + ImmutableMap.of( + "qualityLong", 1600L, + "index", 2900L + ), + ImmutableMap.of( + "qualityLong", 1400L, + "index", 2870L + ), + ImmutableMap.of( + "qualityLong", 1200L, + "index", 158L + ), + ImmutableMap.of( + "qualityLong", 1000L, + "index", 135L + ), + ImmutableMap.of( + "qualityLong", 1500L, + "index", 121L + ), + ImmutableMap.of( + "qualityLong", 1300L, + "index", 120L + ), + ImmutableMap.of( + "qualityLong", 1800L, + "index", 119L + ), + ImmutableMap.of( + "qualityLong", 1100L, + "index", 118L + ), + ImmutableMap.of( + "qualityLong", 1700L, + "index", 78L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.asList( + ImmutableMap.of( + "qualityLong", 1600L, + "index", 2505L + ), + ImmutableMap.of( + "qualityLong", 1400L, + "index", 2447L + ), + ImmutableMap.of( + "qualityLong", 1200L, + "index", 166L + ), + ImmutableMap.of( + "qualityLong", 1000L, + "index", 147L + ), + ImmutableMap.of( + "qualityLong", 1800L, + "index", 126L + ), + ImmutableMap.of( + "qualityLong", 1500L, + "index", 114L + ), + ImmutableMap.of( + "qualityLong", 1300L, + "index", 113L + ), + ImmutableMap.of( + "qualityLong", 1100L, + "index", 112L + ), + ImmutableMap.of( + "qualityLong", 1700L, + "index", 97L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + @Test public void testTopN_time_granularity_uses_heap_if_too_big() { From 3d986f1116b8162405652a416aad3260bf8a7e07 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Dec 2024 02:50:32 -0800 Subject: [PATCH 4/8] extra layer of safety --- .../org/apache/druid/query/topn/TopNQueryEngine.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 414f5bad26cb..f5af3d5f1a19 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -245,6 +245,11 @@ private static boolean canUsePooledAlgorithm( final int numBytesPerRecord ) { + if (cardinality < 0) { + // unknown cardinality doesn't work with the pooled algorith which requires an exact count of dictionary ids + return false; + } + if (selector.isHasExtractionFn()) { // extraction functions can have a many to one mapping, and should use a heap algorithm return false; @@ -254,19 +259,22 @@ private static boolean canUsePooledAlgorithm( // non-string output cannot use the pooled algorith, even if the underlying selector supports it return false; } + if (!Types.is(capabilities, ValueType.STRING)) { // non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm return false; } - // string columns must use the on heap algorithm unless they have the following capabilites if (!capabilities.isDictionaryEncoded().isTrue() || !capabilities.areDictionaryValuesUnique().isTrue()) { + // string columns must use the on heap algorithm unless they have the following capabilites return false; } + if (Granularities.ALL.equals(query.getGranularity())) { // all other requirements have been satisfied, ALL granularity can always use the pooled algorithms return true; } + // if not using ALL granularity, we can still potentially use the pooled algorithm if we are certain it doesn't // need to make multiple passes (e.g. reset the cursor) try (final ResourceHolder resultsBufHolder = bufferPool.take()) { From 63c4376db17dd8c5296d2c62b5d73cc4027fa06a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Dec 2024 12:00:00 -0800 Subject: [PATCH 5/8] move check outside of loop --- .../epinephelinae/GroupByQueryEngine.java | 2 +- ...Generic1AggPooledTopNScannerPrototype.java | 38 +-- ...Generic2AggPooledTopNScannerPrototype.java | 46 ++-- .../druid/query/topn/PooledTopNAlgorithm.java | 242 +++++++++--------- .../topn/TimeExtractionTopNAlgorithm.java | 26 +- ...eNumericTopNColumnAggregatesProcessor.java | 32 +-- .../StringTopNColumnAggregatesProcessor.java | 72 +++--- 7 files changed, 236 insertions(+), 222 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java index 8a1f142cd14d..35d958f7ad1f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngine.java @@ -391,7 +391,7 @@ public boolean hasNext() if (delegate != null && delegate.hasNext()) { return true; } else { - if (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { + if (granularizer.currentOffsetWithinBucket()) { if (delegate != null) { delegate.close(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java index 52d065c95c0b..2de0a3db65fa 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java @@ -54,25 +54,27 @@ public long scanAndAggregate( { long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { - final IndexedInts dimValues = dimensionSelector.getRow(); - final int dimSize = dimValues.size(); - for (int i = 0; i < dimSize; i++) { - int dimIndex = dimValues.get(i); - int position = positions[dimIndex]; - if (position >= 0) { - aggregator.aggregate(resultsBuffer, position); - } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { - positions[dimIndex] = positionToAllocate; - position = positionToAllocate; - aggregator.init(resultsBuffer, position); - aggregator.aggregate(resultsBuffer, position); - positionToAllocate += aggregatorSize; + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimensionSelector.getRow(); + final int dimSize = dimValues.size(); + for (int i = 0; i < dimSize; i++) { + int dimIndex = dimValues.get(i); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator.aggregate(resultsBuffer, position); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + position = positionToAllocate; + aggregator.init(resultsBuffer, position); + aggregator.aggregate(resultsBuffer, position); + positionToAllocate += aggregatorSize; + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java index 406344e26ca1..1e221992b023 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/org/apache/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java @@ -57,29 +57,31 @@ public long scanAndAggregate( int totalAggregatorsSize = aggregator1Size + aggregator2Size; long processedRows = 0; int positionToAllocate = 0; - while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { - final IndexedInts dimValues = dimensionSelector.getRow(); - final int dimSize = dimValues.size(); - for (int i = 0; i < dimSize; i++) { - int dimIndex = dimValues.get(i); - int position = positions[dimIndex]; - if (position >= 0) { - aggregator1.aggregate(resultsBuffer, position); - aggregator2.aggregate(resultsBuffer, position + aggregator1Size); - } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { - positions[dimIndex] = positionToAllocate; - position = positionToAllocate; - aggregator1.init(resultsBuffer, position); - aggregator1.aggregate(resultsBuffer, position); - position += aggregator1Size; - aggregator2.init(resultsBuffer, position); - aggregator2.aggregate(resultsBuffer, position); - positionToAllocate += totalAggregatorsSize; + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimensionSelector.getRow(); + final int dimSize = dimValues.size(); + for (int i = 0; i < dimSize; i++) { + int dimIndex = dimValues.get(i); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator1.aggregate(resultsBuffer, position); + aggregator2.aggregate(resultsBuffer, position + aggregator1Size); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + position = positionToAllocate; + aggregator1.init(resultsBuffer, position); + aggregator1.aggregate(resultsBuffer, position); + position += aggregator1Size; + aggregator2.init(resultsBuffer, position); + aggregator2.aggregate(resultsBuffer, position); + positionToAllocate += totalAggregatorsSize; + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index b3f2a3b9028d..458d34035f8c 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -477,13 +477,105 @@ private static long scanAndAggregateDefault( final int aggExtra = aggSize % AGG_UNROLL_COUNT; int currentPosition = 0; long processedRows = 0; - while (!cursor.isDoneOrInterrupted() && granularizer.currentOffsetWithinBucket()) { - final IndexedInts dimValues = dimSelector.getRow(); - - final int dimSize = dimValues.size(); - final int dimExtra = dimSize % AGG_UNROLL_COUNT; - switch (dimExtra) { - case 7: + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDoneOrInterrupted()) { + final IndexedInts dimValues = dimSelector.getRow(); + + final int dimSize = dimValues.size(); + final int dimExtra = dimSize % AGG_UNROLL_COUNT; + switch (dimExtra) { + case 7: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(6), + currentPosition + ); + // fall through + case 6: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(5), + currentPosition + ); + // fall through + case 5: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(4), + currentPosition + ); + // fall through + case 4: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(3), + currentPosition + ); + // fall through + case 3: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(2), + currentPosition + ); + // fall through + case 2: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(1), + currentPosition + ); + // fall through + case 1: + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(0), + currentPosition + ); + } + for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { currentPosition = aggregateDimValue( positions, theAggregators, @@ -492,11 +584,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(6), + dimValues.get(i), currentPosition ); - // fall through - case 6: currentPosition = aggregateDimValue( positions, theAggregators, @@ -505,11 +595,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(5), + dimValues.get(i + 1), currentPosition ); - // fall through - case 5: currentPosition = aggregateDimValue( positions, theAggregators, @@ -518,11 +606,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(4), + dimValues.get(i + 2), currentPosition ); - // fall through - case 4: currentPosition = aggregateDimValue( positions, theAggregators, @@ -531,11 +617,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(3), + dimValues.get(i + 3), currentPosition ); - // fall through - case 3: currentPosition = aggregateDimValue( positions, theAggregators, @@ -544,11 +628,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(2), + dimValues.get(i + 4), currentPosition ); - // fall through - case 2: currentPosition = aggregateDimValue( positions, theAggregators, @@ -557,11 +639,9 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(1), + dimValues.get(i + 5), currentPosition ); - // fall through - case 1: currentPosition = aggregateDimValue( positions, theAggregators, @@ -570,103 +650,25 @@ private static long scanAndAggregateDefault( aggregatorOffsets, aggSize, aggExtra, - dimValues.get(0), + dimValues.get(i + 6), currentPosition ); - } - for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 1), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 2), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 3), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 4), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 5), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 6), - currentPosition - ); - currentPosition = aggregateDimValue( - positions, - theAggregators, - resultsBuf, - numBytesPerRecord, - aggregatorOffsets, - aggSize, - aggExtra, - dimValues.get(i + 7), - currentPosition - ); - } - processedRows++; - if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { - break; + currentPosition = aggregateDimValue( + positions, + theAggregators, + resultsBuf, + numBytesPerRecord, + aggregatorOffsets, + aggSize, + aggExtra, + dimValues.get(i + 7), + currentPosition + ); + } + processedRows++; + if (!granularizer.advanceCursorWithinBucketUninterruptedly()) { + break; + } } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 72f2abc80bb0..b6079a29f294 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -93,20 +93,22 @@ protected long scanAndAggregate( final DimensionSelector dimSelector = params.getDimSelector(); long processedRows = 0; - while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { - final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + final Object key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0))); - Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( - key, - k -> makeAggregators(cursor, query.getAggregatorSpecs()) - ); + Aggregator[] theAggregators = aggregatesStore.computeIfAbsent( + key, + k -> makeAggregators(cursor, query.getAggregatorSpecs()) + ); - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; + for (Aggregator aggregator : theAggregators) { + aggregator.aggregate(); + } + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; + } } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java index cf80074e63ee..41709e354975 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/NullableNumericTopNColumnAggregatesProcessor.java @@ -94,23 +94,25 @@ public long scanAndAggregate( ) { long processedRows = 0; - while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { - if (hasNulls && selector.isNull()) { - if (nullValueAggregates == null) { - nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + if (hasNulls && selector.isNull()) { + if (nullValueAggregates == null) { + nullValueAggregates = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + } + for (Aggregator aggregator : nullValueAggregates) { + aggregator.aggregate(); + } + } else { + Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor); + for (Aggregator aggregator : valueAggregates) { + aggregator.aggregate(); + } } - for (Aggregator aggregator : nullValueAggregates) { - aggregator.aggregate(); + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; } - } else { - Aggregator[] valueAggregates = getValueAggregators(query, selector, cursor); - for (Aggregator aggregator : valueAggregates) { - aggregator.aggregate(); - } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; } } return processedRows; diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java index 47333d2ed321..ac83de847603 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/topn/types/StringTopNColumnAggregatesProcessor.java @@ -150,28 +150,30 @@ private long scanAndAggregateWithCardinalityKnown( ) { long processedRows = 0; - while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { - final IndexedInts dimValues = selector.getRow(); - for (int i = 0, size = dimValues.size(); i < size; ++i) { - final int dimIndex = dimValues.get(i); - Aggregator[] aggs = rowSelector[dimIndex]; - if (aggs == null) { - final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - aggs = aggregatesStore.computeIfAbsent( - key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) - ); - rowSelector[dimIndex] = aggs; + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + final IndexedInts dimValues = selector.getRow(); + for (int i = 0, size = dimValues.size(); i < size; ++i) { + final int dimIndex = dimValues.get(i); + Aggregator[] aggs = rowSelector[dimIndex]; + if (aggs == null) { + final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); + aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + rowSelector[dimIndex] = aggs; + } + + for (Aggregator aggregator : aggs) { + aggregator.aggregate(); + } } - - for (Aggregator aggregator : aggs) { - aggregator.aggregate(); + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; } } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; - } } return processedRows; } @@ -192,22 +194,24 @@ private long scanAndAggregateWithCardinalityUnknown( ) { long processedRows = 0; - while (!cursor.isDone() && granularizer.currentOffsetWithinBucket()) { - final IndexedInts dimValues = selector.getRow(); - for (int i = 0, size = dimValues.size(); i < size; ++i) { - final int dimIndex = dimValues.get(i); - final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); - Aggregator[] aggs = aggregatesStore.computeIfAbsent( - key, - k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) - ); - for (Aggregator aggregator : aggs) { - aggregator.aggregate(); + if (granularizer.currentOffsetWithinBucket()) { + while (!cursor.isDone()) { + final IndexedInts dimValues = selector.getRow(); + for (int i = 0, size = dimValues.size(); i < size; ++i) { + final int dimIndex = dimValues.get(i); + final Object key = dimensionValueConverter.apply(selector.lookupName(dimIndex)); + Aggregator[] aggs = aggregatesStore.computeIfAbsent( + key, + k -> BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()) + ); + for (Aggregator aggregator : aggs) { + aggregator.aggregate(); + } + } + processedRows++; + if (!granularizer.advanceCursorWithinBucket()) { + break; } - } - processedRows++; - if (!granularizer.advanceCursorWithinBucket()) { - break; } } return processedRows; From d4fcf8437ede83da79665a946d92cfc700d5ed16 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Dec 2024 14:06:46 -0800 Subject: [PATCH 6/8] fix spelling --- .../main/java/org/apache/druid/query/CursorGranularizer.java | 3 ++- .../main/java/org/apache/druid/query/topn/TopNQueryEngine.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index e538ecca5025..d5e12b57d1a0 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -99,6 +99,8 @@ public static CursorGranularizer create( private final Cursor cursor; + private final Granularity granularity; + // Iterable that iterates over time buckets. private final Iterable bucketIterable; @@ -109,7 +111,6 @@ public static CursorGranularizer create( private long currentBucketStart; private long currentBucketEnd; - private final Granularity granularity; private CursorGranularizer( Cursor cursor, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index f5af3d5f1a19..336f4b08c822 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -246,7 +246,7 @@ private static boolean canUsePooledAlgorithm( ) { if (cardinality < 0) { - // unknown cardinality doesn't work with the pooled algorith which requires an exact count of dictionary ids + // unknown cardinality doesn't work with the pooled algorithm which requires an exact count of dictionary ids return false; } From 9ba9679b1b6c653e7a37a6ed82aa61ee8228eade Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 16 Dec 2024 15:09:24 -0800 Subject: [PATCH 7/8] add query context parameter to allow using pooled algorithm for topN when multi-passes is required even wihen query granularity is not all --- .../queries/twitterstream_queries.json | 24 +- .../druid/query/CursorGranularizer.java | 11 +- .../org/apache/druid/query/QueryContexts.java | 1 + .../AggregateTopNMetricFirstAlgorithm.java | 1 + .../druid/query/topn/BaseTopNAlgorithm.java | 9 +- .../druid/query/topn/TopNQueryEngine.java | 19 +- .../druid/query/topn/TopNQueryRunnerTest.java | 229 +++++++++++++++++- 7 files changed, 261 insertions(+), 33 deletions(-) diff --git a/integration-tests/src/test/resources/queries/twitterstream_queries.json b/integration-tests/src/test/resources/queries/twitterstream_queries.json index cdd4057eb572..483c93b2a178 100644 --- a/integration-tests/src/test/resources/queries/twitterstream_queries.json +++ b/integration-tests/src/test/resources/queries/twitterstream_queries.json @@ -19,8 +19,8 @@ } ], "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, @@ -92,8 +92,8 @@ }, "threshold": 2, "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, @@ -196,8 +196,8 @@ }, "threshold": 2, "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, @@ -320,8 +320,8 @@ }, "threshold": 2, "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, @@ -473,8 +473,8 @@ }, "limit": 3, "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, @@ -739,8 +739,8 @@ }, "threshold": 2, "context": { - "useCache": "true", - "populateCache": "true", + "useCache": "false", + "populateCache": "false", "timeout": 60000 } }, diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index d5e12b57d1a0..ffdb5bd15fa9 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.segment.ColumnValueSelector; @@ -127,11 +128,6 @@ private CursorGranularizer( this.descending = descending; } - public Granularity getGranularity() - { - return granularity; - } - public Iterable getBucketIterable() { return bucketIterable; @@ -142,6 +138,11 @@ public DateTime getBucketStart() return DateTimes.utc(currentBucketStart); } + public Interval getCurrentInterval() + { + return Intervals.utc(currentBucketStart, currentBucketEnd); + } + public boolean advanceToBucket(final Interval bucketInterval) { currentBucketStart = bucketInterval.getStartMillis(); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 74b45023d041..a4971bf10c77 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -89,6 +89,7 @@ public class QueryContexts public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; + public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "topNuseMultiPassPooledQueryGranularity"; /** * Context parameter to enable/disable the extended filtered sum rewrite logic. * diff --git a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index a4605c3f2652..b51b79da049c 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -113,6 +113,7 @@ public void run( try { // reset cursor since we call run again params.getCursor().reset(); + params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval()); // Run topN for all metrics for top N dimension values allMetricsParam = allMetricAlgo.makeInitParams(params.getSelectorPlus(), params.getCursor(), params.getGranularizer()); allMetricAlgo.run( diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 80c964c9ba73..b375db72d842 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -20,10 +20,8 @@ package org.apache.druid.query.topn; import com.google.common.annotations.VisibleForTesting; -import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -101,12 +99,6 @@ private void runWithCardinalityKnown( final int cardinality = params.getCardinality(); final int numValuesPerPass = params.getNumValuesPerPass(); - // sanity check to ensure that we only do multi-pass with ALL granularity - if (numValuesPerPass < cardinality && !Granularities.ALL.equals(params.getGranularizer().getGranularity())) { - throw DruidException.defensive( - "runWithCardinalityKnown can only be used for ALL granularity if multiple-passes are required" - ); - } int numProcessed = 0; long processedRows = 0; while (numProcessed < cardinality) { @@ -135,6 +127,7 @@ private void runWithCardinalityKnown( numProcessed += numToProcess; if (numProcessed < cardinality) { params.getCursor().reset(); + params.getGranularizer().advanceToBucket(params.getGranularizer().getCurrentInterval()); } } if (queryMetrics != null) { diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java index 336f4b08c822..c8aa32921586 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.CursorGranularizer; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -270,19 +271,23 @@ private static boolean canUsePooledAlgorithm( return false; } - if (Granularities.ALL.equals(query.getGranularity())) { - // all other requirements have been satisfied, ALL granularity can always use the pooled algorithms - return true; - } - - // if not using ALL granularity, we can still potentially use the pooled algorithm if we are certain it doesn't - // need to make multiple passes (e.g. reset the cursor) + // num values per pass must be greater than 0 or else the pooled algorithm cannot progress try (final ResourceHolder resultsBufHolder = bufferPool.take()) { final ByteBuffer resultsBuf = resultsBufHolder.get(); final int numBytesToWorkWith = resultsBuf.capacity(); final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; + final boolean allowMultiPassPooled = query.context().getBoolean( + QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY, + false + ); + if (Granularities.ALL.equals(query.getGranularity()) || allowMultiPassPooled) { + return numValuesPerPass > 0; + } + + // if not using multi-pass for pooled + query granularity other than 'ALL', we must check that all values can fit + // in a single pass return numValuesPerPass >= cardinality; } } diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java index 19886f3443ca..8adbfb5d6a3d 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java @@ -6961,6 +6961,233 @@ public void testTopN_time_granularity_empty_buckets_numeric() assertExpectedResults(expectedResults, query); } + @Test + public void testTopN_time_granularity_multipass_no_pool() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + + @Test + public void testTopN_time_granularity_multipass_with_pooled() + { + assumeTimeOrdered(); + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .granularity(Granularities.HOUR) + .dimension(QueryRunnerTestHelper.MARKET_DIMENSION) + .metric(QueryRunnerTestHelper.INDEX_METRIC) + .threshold(10_000) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .context(ImmutableMap.of(QueryContexts.TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY, true)) + .aggregators( + QueryRunnerTestHelper.INDEX_LONG_SUM, + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 4000000, null) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-04-01T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2836L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2681L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1102L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-01T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-01T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>( + DateTimes.of("2011-04-02T00:00:00.000Z"), + TopNResultValue.create( + Arrays.>asList( + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "total_market", + "big", "preferred", + "index", 2514L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "upfront", + "big", "preferred", + "index", 2193L + ), + ImmutableMap.of( + QueryRunnerTestHelper.MARKET_DIMENSION, "spot", + "big", "preferred", + "index", 1120L + ) + ) + ) + ), + new Result<>(DateTimes.of("2011-04-02T01:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T02:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T03:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T04:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T05:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T06:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T07:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T08:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T09:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T10:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T11:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T12:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T13:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T14:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T15:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T16:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T17:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T18:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T19:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T20:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T21:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T22:00:00.000Z"), TopNResultValue.create(Collections.emptyList())), + new Result<>(DateTimes.of("2011-04-02T23:00:00.000Z"), TopNResultValue.create(Collections.emptyList())) + ); + + assertExpectedResults(expectedResults, query); + } + @Test public void testTopN_time_granularity_uses_heap_if_too_big() { @@ -6974,7 +7201,7 @@ public void testTopN_time_granularity_uses_heap_if_too_big() .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) .aggregators( QueryRunnerTestHelper.INDEX_LONG_SUM, - new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 10000000, null) + new StringAnyAggregatorFactory("big", QueryRunnerTestHelper.PLACEMENT_DIMENSION, 40000000, null) ) .build(); From 9728e6ac98ada103bb5a019c9faec67b3a1f03a3 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 16 Dec 2024 19:11:21 -0800 Subject: [PATCH 8/8] add comment, revert IT context changes and add new context flag --- .../queries/twitterstream_queries.json | 36 ++++++++++--------- .../org/apache/druid/query/QueryContexts.java | 7 +++- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/integration-tests/src/test/resources/queries/twitterstream_queries.json b/integration-tests/src/test/resources/queries/twitterstream_queries.json index 483c93b2a178..104b07ba47a2 100644 --- a/integration-tests/src/test/resources/queries/twitterstream_queries.json +++ b/integration-tests/src/test/resources/queries/twitterstream_queries.json @@ -19,8 +19,8 @@ } ], "context": { - "useCache": "false", - "populateCache": "false", + "useCache": "true", + "populateCache": "true", "timeout": 60000 } }, @@ -92,9 +92,10 @@ }, "threshold": 2, "context": { - "useCache": "false", - "populateCache": "false", - "timeout": 60000 + "useCache": "true", + "populateCache": "true", + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -196,9 +197,10 @@ }, "threshold": 2, "context": { - "useCache": "false", - "populateCache": "false", - "timeout": 60000 + "useCache": "true", + "populateCache": "true", + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -320,9 +322,10 @@ }, "threshold": 2, "context": { - "useCache": "false", - "populateCache": "false", - "timeout": 60000 + "useCache": "true", + "populateCache": "true", + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ @@ -473,8 +476,8 @@ }, "limit": 3, "context": { - "useCache": "false", - "populateCache": "false", + "useCache": "true", + "populateCache": "true", "timeout": 60000 } }, @@ -739,9 +742,10 @@ }, "threshold": 2, "context": { - "useCache": "false", - "populateCache": "false", - "timeout": 60000 + "useCache": "true", + "populateCache": "true", + "timeout": 60000, + "useTopNMultiPassPooledQueryGranularity": "true" } }, "expectedResults": [ diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index a4971bf10c77..a2a81c6eb746 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -89,7 +89,12 @@ public class QueryContexts public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; - public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "topNuseMultiPassPooledQueryGranularity"; + // this flag controls whether the topN engine can use the 'pooled' algorithm when query granularity is set to + // anything other than 'ALL' and the cardinality + number of aggregators would require more size than is available + // in the buffers and so must reset the cursor to use multiple passes. This is likely slower than the default + // behavior of falling back to heap memory, but less dangerous since too large of a query can cause the heap to run + // out of memory + public static final String TOPN_USE_MULTI_PASS_POOLED_QUERY_GRANULARITY = "useTopNMultiPassPooledQueryGranularity"; /** * Context parameter to enable/disable the extended filtered sum rewrite logic. *