diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index dbcd271cccfb..06dce22a1897 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -40,6 +40,7 @@ import org.apache.druid.frame.write.InvalidFieldException; import org.apache.druid.frame.write.InvalidNullByteException; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.guava.Sequence; @@ -63,6 +64,8 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Cursor; @@ -259,7 +262,12 @@ protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment } final CursorHolder nextCursorHolder = - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)); + cursorFactory.makeCursorHolder( + ScanQueryEngine.makeCursorBuildSpec( + query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())), + null + ) + ); final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { @@ -305,7 +313,12 @@ protected ReturnOrAwait runWithInputChannel( } final CursorHolder nextCursorHolder = - cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)); + cursorFactory.makeCursorHolder( + ScanQueryEngine.makeCursorBuildSpec( + query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)), + null + ) + ); final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java index 97ade19f5bcd..1636e1177406 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java @@ -56,8 +56,7 @@ public ScanQueryFrameProcessorFactory(@JsonProperty("query") ScanQuery query) { super(query); this.query = Preconditions.checkNotNull(query, "query"); - this.runningCountForLimit = - query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null; + this.runningCountForLimit = query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null; } @JsonProperty diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index bfb511f949f3..af0a72035702 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -19,9 +19,11 @@ package org.apache.druid.msq.querykit.scan; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.collections.StupidResourceHolder; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; @@ -39,6 +41,8 @@ import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.input.ReadableInput; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.msq.input.table.SegmentWithDescriptor; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StagePartition; import org.apache.druid.msq.querykit.FrameProcessorTestBase; @@ -46,10 +50,15 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexCursorFactory; +import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; +import org.apache.druid.timeline.SegmentId; import org.junit.Assert; import org.junit.Test; @@ -60,6 +69,91 @@ public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase { + + @Test + public void test_runWithSegments() throws Exception + { + final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex(); + + final CursorFactory cursorFactory = + new QueryableIndexCursorFactory(queryableIndex); + + // put funny intervals on query to ensure it is adjusted to the segment interval before building cursor + final ScanQuery query = + Druids.newScanQueryBuilder() + .dataSource("test") + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + Intervals.of("2001-01-01T00Z/2011-01-01T00Z"), + Intervals.of("2011-01-02T00Z/2021-01-01T00Z") + ) + ) + ) + .columns(cursorFactory.getRowSignature().getColumnNames()) + .build(); + + final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal(); + + // Limit output frames to 1 row to ensure we test edge cases + final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory( + FrameWriters.makeRowBasedFrameWriterFactory( + new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + cursorFactory.getRowSignature(), + Collections.emptyList(), + false + ), + 1 + ); + + final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor( + query, + null, + new DefaultObjectMapper(), + ReadableInput.segment( + new SegmentWithDescriptor( + () -> new StupidResourceHolder<>(new CompleteSegment(null, new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))), + new RichSegmentDescriptor(queryableIndex.getDataInterval(), queryableIndex.getDataInterval(), "dummy_version", 0) + ) + ), + Function.identity(), + new ResourceHolder() + { + @Override + public WritableFrameChannel get() + { + return outputChannel.writable(); + } + + @Override + public void close() + { + try { + outputChannel.writable().close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }, + new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}) + ); + + ListenableFuture retVal = exec.runFully(processor, null); + + final Sequence> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel( + outputChannel.readable(), + FrameReader.create(cursorFactory.getRowSignature()) + ); + + FrameTestUtil.assertRowsEqual( + FrameTestUtil.readRowsFromCursorFactory(cursorFactory, cursorFactory.getRowSignature(), false), + rowsFromProcessor + ); + + Assert.assertEquals(Unit.instance(), retVal.get()); + } + @Test public void test_runWithInputChannel() throws Exception { @@ -83,10 +177,18 @@ public void test_runWithInputChannel() throws Exception } } + // put funny intervals on query to ensure it is adjusted to the segment interval before building cursor final ScanQuery query = Druids.newScanQueryBuilder() .dataSource("test") - .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .intervals( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + Intervals.of("2001-01-01T00Z/2011-01-01T00Z"), + Intervals.of("2011-01-02T00Z/2021-01-01T00Z") + ) + ) + ) .columns(cursorFactory.getRowSignature().getColumnNames()) .build();