diff --git a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java index fa2defce7f03..6bc387183bae 100644 --- a/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java +++ b/processing/src/main/java/org/apache/druid/query/CursorGranularizer.java @@ -147,12 +147,16 @@ public boolean advanceToBucket(final Interval bucketInterval) if (descending) { while (currentTime >= currentBucketEnd && !cursor.isDone()) { cursor.advance(); - currentTime = timeSelector.getLong(); + if (!cursor.isDone()) { + currentTime = timeSelector.getLong(); + } } } else { while (currentTime < currentBucketStart && !cursor.isDone()) { cursor.advance(); - currentTime = timeSelector.getLong(); + if (!cursor.isDone()) { + currentTime = timeSelector.getLong(); + } } } diff --git a/processing/src/test/java/org/apache/druid/query/CursorGranularizerTest.java b/processing/src/test/java/org/apache/druid/query/CursorGranularizerTest.java index 863c01bb3905..ecb8b7d88b84 100644 --- a/processing/src/test/java/org/apache/druid/query/CursorGranularizerTest.java +++ b/processing/src/test/java/org/apache/druid/query/CursorGranularizerTest.java @@ -23,9 +23,11 @@ import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -65,8 +67,11 @@ public class CursorGranularizerTest extends InitializedNullHandlingTest @Before public void setup() throws IOException { - final RowSignature signature = RowSignature.builder().add("x", ColumnType.STRING).build(); - final List dims = ImmutableList.of("x"); + final RowSignature signature = RowSignature.builder() + .add("x", ColumnType.STRING) + .add("y", ColumnType.STRING) + .build(); + final List dims = ImmutableList.of("x", "y"); final IncrementalIndexSchema schema = IncrementalIndexSchema.builder() .withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build()) @@ -81,79 +86,79 @@ public void setup() throws IOException signature, DateTimes.of("2024-01-01T00:00Z"), dims, - ImmutableList.of("a") + ImmutableList.of("a", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T00:01Z"), dims, - ImmutableList.of("b") + ImmutableList.of("b", "2") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T00:02Z"), dims, - ImmutableList.of("c") + ImmutableList.of("c", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T00:03Z"), dims, - ImmutableList.of("d") + ImmutableList.of("d", "2") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T01:00Z"), dims, - ImmutableList.of("e") + ImmutableList.of("e", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T01:01Z"), dims, - ImmutableList.of("f") + ImmutableList.of("f", "2") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T03:04Z"), dims, - ImmutableList.of("g") + ImmutableList.of("g", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T03:05Z"), dims, - ImmutableList.of("h") + ImmutableList.of("h", "2") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T03:15Z"), dims, - ImmutableList.of("i") + ImmutableList.of("i", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T05:03Z"), dims, - ImmutableList.of("j") + ImmutableList.of("j", "2") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T06:00Z"), dims, - ImmutableList.of("k") + ImmutableList.of("k", "1") ), new ListBasedInputRow( signature, DateTimes.of("2024-01-01T09:01Z"), dims, - ImmutableList.of("l") + ImmutableList.of("l", "2") ) ) ) .tmpDir(temporaryFolder.newFolder()); - final QueryableIndex index = bob.buildMMappedIndex(); + final QueryableIndex index = bob.buildMMappedIndex(Intervals.of("2024-01-01T00:00Z/2024-01-02T00:00Z")); interval = index.getDataInterval(); cursorFactory = new QueryableIndexCursorFactory(index); timeBoundaryInspector = QueryableIndexTimeBoundaryInspector.create(index); @@ -261,4 +266,102 @@ public void testGranularizeFullScanDescending() ); } } + + @Test + public void testGranularizeFiltered() + { + final CursorBuildSpec filtered = CursorBuildSpec.builder() + .setFilter(new EqualityFilter("y", ColumnType.STRING, "1", null)) + .build(); + try (CursorHolder cursorHolder = cursorFactory.makeCursorHolder(filtered)) { + final Cursor cursor = cursorHolder.asCursor(); + CursorGranularizer granularizer = CursorGranularizer.create( + cursor, + timeBoundaryInspector, + Order.ASCENDING, + Granularities.HOUR, + interval + ); + + final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory(); + final ColumnValueSelector xSelector = selectorFactory.makeColumnValueSelector("x"); + final Sequence> theSequence = + Sequences.simple(granularizer.getBucketIterable()) + .map(bucketInterval -> { + List bucket = new ArrayList<>(); + if (!granularizer.advanceToBucket(bucketInterval)) { + return bucket; + } + while (!cursor.isDone()) { + bucket.add((String) xSelector.getObject()); + if (!granularizer.advanceCursorWithinBucket()) { + break; + } + } + return bucket; + }); + + List> granularized = theSequence.toList(); + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("a", "c"), + ImmutableList.of("e"), + ImmutableList.of(), + ImmutableList.of("g", "i"), + ImmutableList.of(), + ImmutableList.of(), + ImmutableList.of("k"), + ImmutableList.of(), + ImmutableList.of(), + ImmutableList.of() + ), + granularized + ); + } + } + + @Test + public void testGranularizeFilteredClippedAndPartialOverlap() + { + final CursorBuildSpec filtered = CursorBuildSpec.builder() + .setFilter(new EqualityFilter("y", ColumnType.STRING, "1", null)) + .build(); + try (CursorHolder cursorHolder = cursorFactory.makeCursorHolder(filtered)) { + final Cursor cursor = cursorHolder.asCursor(); + CursorGranularizer granularizer = CursorGranularizer.create( + cursor, + timeBoundaryInspector, + Order.ASCENDING, + Granularities.HOUR, + Intervals.of("2024-01-01T08:00Z/2024-01-03T00:00Z") + ); + + final ColumnSelectorFactory selectorFactory = cursor.getColumnSelectorFactory(); + final ColumnValueSelector xSelector = selectorFactory.makeColumnValueSelector("x"); + final Sequence> theSequence = + Sequences.simple(granularizer.getBucketIterable()) + .map(bucketInterval -> { + List bucket = new ArrayList<>(); + if (!granularizer.advanceToBucket(bucketInterval)) { + return bucket; + } + while (!cursor.isDone()) { + bucket.add((String) xSelector.getObject()); + if (!granularizer.advanceCursorWithinBucket()) { + break; + } + } + return bucket; + }); + + List> granularized = theSequence.toList(); + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(), + ImmutableList.of() + ), + granularized + ); + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index 9c88ab0dc8f2..986266015d30 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -46,6 +46,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.timeline.SegmentId; +import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -235,6 +236,11 @@ public IncrementalIndex buildIncrementalIndex() } public File buildMMappedIndexFile() + { + return buildMMappedIndexFile(null); + } + + public File buildMMappedIndexFile(@Nullable Interval dataInterval) { Preconditions.checkNotNull(indexMerger, "indexMerger"); Preconditions.checkNotNull(tmpDir, "tmpDir"); @@ -244,6 +250,7 @@ public File buildMMappedIndexFile() indexIO.loadIndex( indexMerger.persist( incrementalIndex, + dataInterval == null ? incrementalIndex.getInterval() : dataInterval, new File( tmpDir, StringUtils.format("testIndex-%s", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)) @@ -276,7 +283,17 @@ public File buildMMappedIndexFile() public QueryableIndex buildMMappedIndex() { try { - return indexIO.loadIndex(buildMMappedIndexFile()); + return indexIO.loadIndex(buildMMappedIndexFile(null)); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public QueryableIndex buildMMappedIndex(Interval dataInterval) + { + try { + return indexIO.loadIndex(buildMMappedIndexFile(dataInterval)); } catch (IOException e) { throw new RuntimeException(e);