Skip to content

Commit

Permalink
fix issue with ScanQueryFrameProcessor cursor build not adjusting int…
Browse files Browse the repository at this point in the history
…ervals (apache#17168)

* fix issue with ScanQueryFrameProcessor cursor build not adjusting intervals

* all hail the robot overlords

* style bot
  • Loading branch information
clintropolis authored and kfaraz committed Oct 4, 2024
1 parent 27fc1ce commit 2403a47
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.frame.write.InvalidFieldException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -63,6 +64,8 @@
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor;
Expand Down Expand Up @@ -259,7 +262,12 @@ protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())),
null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down Expand Up @@ -305,7 +313,12 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public ScanQueryFrameProcessorFactory(@JsonProperty("query") ScanQuery query)
{
super(query);
this.query = Preconditions.checkNotNull(query, "query");
this.runningCountForLimit =
query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
this.runningCountForLimit = query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.druid.msq.querykit.scan;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.collections.StupidResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
Expand All @@ -39,17 +41,24 @@
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -60,6 +69,91 @@

public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{

@Test
public void test_runWithSegments() throws Exception
{
final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();

final CursorFactory cursorFactory =
new QueryableIndexCursorFactory(queryableIndex);

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();

// Limit output frames to 1 row to ensure we test edge cases
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
cursorFactory.getRowSignature(),
Collections.emptyList(),
false
),
1
);

final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
query,
null,
new DefaultObjectMapper(),
ReadableInput.segment(
new SegmentWithDescriptor(
() -> new StupidResourceHolder<>(new CompleteSegment(null, new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
new RichSegmentDescriptor(queryableIndex.getDataInterval(), queryableIndex.getDataInterval(), "dummy_version", 0)
)
),
Function.identity(),
new ResourceHolder<WritableFrameChannel>()
{
@Override
public WritableFrameChannel get()
{
return outputChannel.writable();
}

@Override
public void close()
{
try {
outputChannel.writable().close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
},
new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
);

ListenableFuture<Object> retVal = exec.runFully(processor, null);

final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(cursorFactory.getRowSignature())
);

FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, cursorFactory.getRowSignature(), false),
rowsFromProcessor
);

Assert.assertEquals(Unit.instance(), retVal.get());
}

@Test
public void test_runWithInputChannel() throws Exception
{
Expand All @@ -83,10 +177,18 @@ public void test_runWithInputChannel() throws Exception
}
}

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();

Expand Down

0 comments on commit 2403a47

Please sign in to comment.