Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue with ScanQueryFrameProcessor cursor build not adjusting intervals #17168

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.frame.write.InvalidFieldException;
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -63,6 +64,8 @@
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor;
Expand Down Expand Up @@ -259,7 +262,12 @@ protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new SpecificSegmentSpec(segment.getDescriptor())),
null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down Expand Up @@ -305,7 +313,12 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code did this too, but, it seems like it'd be incorrect if the query had an intervals filter on the __time column. I am not sure how (or if) this can actually happen, but I wonder if it could be made to happen by doing a query like this:

SELECT * FROM (SELECT * FROM "wikipedia" ORDER BY countryName LIMIT 100)
WHERE TIME_IN_INTERVAL(__time, '2020/P1D') OR TIME_IN_INTERVAL(__time, '2021/P1D')

I tried doing that just now, and got a QueryNotSupported error, which seems like a different problem. But what if it was supported?

IMO, here we should add a validation here to check that query has an eternity interval, rather than overriding it to be eternity. If it was anything other than eternity, this wouldn't be correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR for fixing this: #17173

null
)
);
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public ScanQueryFrameProcessorFactory(@JsonProperty("query") ScanQuery query)
{
super(query);
this.query = Preconditions.checkNotNull(query, "query");
this.runningCountForLimit =
query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
this.runningCountForLimit = query.isLimited() && query.getOrderBys().isEmpty() ? new AtomicLong() : null;
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.druid.msq.querykit.scan;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.collections.StupidResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
Expand All @@ -39,17 +41,24 @@
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.FrameProcessorTestBase;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -60,6 +69,91 @@

public class ScanQueryFrameProcessorTest extends FrameProcessorTestBase
{

@Test
public void test_runWithSegments() throws Exception
{
final QueryableIndex queryableIndex = TestIndex.getMMappedTestIndex();

final CursorFactory cursorFactory =
new QueryableIndexCursorFactory(queryableIndex);

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();

// Limit output frames to 1 row to ensure we test edge cases
final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
cursorFactory.getRowSignature(),
Collections.emptyList(),
false
),
1
);

final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
query,
null,
new DefaultObjectMapper(),
ReadableInput.segment(
new SegmentWithDescriptor(
() -> new StupidResourceHolder<>(new CompleteSegment(null, new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")))),
new RichSegmentDescriptor(queryableIndex.getDataInterval(), queryableIndex.getDataInterval(), "dummy_version", 0)
)
),
Function.identity(),
new ResourceHolder<WritableFrameChannel>()
{
@Override
public WritableFrameChannel get()
{
return outputChannel.writable();
}

@Override
public void close()
{
try {
outputChannel.writable().close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
},
new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {})
);

ListenableFuture<Object> retVal = exec.runFully(processor, null);

final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(cursorFactory.getRowSignature())
);

FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, cursorFactory.getRowSignature(), false),
rowsFromProcessor
);

Assert.assertEquals(Unit.instance(), retVal.get());
}

@Test
public void test_runWithInputChannel() throws Exception
{
Expand All @@ -83,10 +177,18 @@ public void test_runWithInputChannel() throws Exception
}
}

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
.intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2001-01-01T00Z/2011-01-01T00Z"),
Intervals.of("2011-01-02T00Z/2021-01-01T00Z")
)
)
)
.columns(cursorFactory.getRowSignature().getColumnNames())
.build();

Expand Down
Loading