Skip to content

Commit

Permalink
SQL: Use regular filters for time filtering in subqueries. (apache#17173
Browse files Browse the repository at this point in the history
)

* SQL: Use regular filters for time filtering in subqueries.

Using the "intervals" feature on subqueries, or any non-table, should be
avoided because it isn't a meaningful optimization in those cases, and
it's simpler for runtime implementations if they can assume all filters
are located in the regular filter object.

Two changes:

1) Fix the logic in DruidQuery.canUseIntervalFiltering. It was intended
   to return false for QueryDataSource, but actually returned true.

2) Add a validation to ScanQueryFrameProcessor to ensure that when running
   on an input channel (which would include any subquery), the query has
   "intervals" set to ONLY_ETERNITY.

Prior to this patch, the new test case in testTimeFilterOnSubquery would
throw a "Can only handle a single interval" error in the native engine,
and "QueryNotSupported" in the MSQ engine.

* Mark new case as having extra columns in decoupled mode.

* Adjust test.
  • Loading branch information
gianm authored Sep 27, 2024
1 parent 157fe1b commit dc223f2
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
Expand Down Expand Up @@ -64,7 +65,6 @@
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
Expand Down Expand Up @@ -312,13 +312,14 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
);
}

if (!Intervals.ONLY_ETERNITY.equals(query.getIntervals())) {
// runWithInputChannel is for running on subquery results, where we don't expect to see "intervals" set.
// The SQL planner avoid it for subqueries; see DruidQuery#canUseIntervalFiltering.
throw DruidException.defensive("Expected eternity intervals, but got[%s]", query.getIntervals());
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
null
)
);
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -177,7 +180,7 @@ public void test_runWithInputChannel() throws Exception
}
}

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
// put funny intervals on query to ensure it is validated before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
Expand Down Expand Up @@ -240,11 +243,16 @@ public void close()
FrameReader.create(signature)
);

FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, false),
rowsFromProcessor
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
rowsFromProcessor::toList
);

Assert.assertEquals(Unit.instance(), retVal.get());
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Expected eternity intervals, but got[[2001-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z, "
+ "2011-01-02T00:00:00.000Z/2021-01-01T00:00:00.000Z]]"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.query.operator.ScanOperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
Expand Down Expand Up @@ -884,7 +885,8 @@ static Pair<DataSource, Filtration> getFiltration(
*/
private static boolean canUseIntervalFiltering(final DataSource dataSource)
{
return dataSource.getAnalysis().isTableBased();
final DataSourceAnalysis analysis = dataSource.getAnalysis();
return !analysis.getBaseQuery().isPresent() && analysis.isTableBased();
}

private static Filtration toFiltration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
Expand Down Expand Up @@ -7764,6 +7765,56 @@ public void testMultipleExactCountDistinctWithGroupingAndOtherAggregatorsUsingJo
);
}

@DecoupledTestConfig(quidemReason = QuidemTestCaseReason.EQUIV_PLAN_EXTRA_COLUMNS, separateDefaultModeTest = true)
@Test
public void testTimeFilterOnSubquery()
{
testQuery(
"SELECT __time, m1 FROM (SELECT * FROM \"foo\" LIMIT 100)\n"
+ "WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D')",
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__time", "m1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(100)
.context(QUERY_CONTEXT_DEFAULT)
.build()
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(or(
range(
ColumnHolder.TIME_COLUMN_NAME,
ColumnType.LONG,
DateTimes.of("2000").getMillis(),
DateTimes.of("2000-01-02").getMillis(),
false,
true
),
range(
ColumnHolder.TIME_COLUMN_NAME,
ColumnType.LONG,
DateTimes.of("2001").getMillis(),
DateTimes.of("2001-01-02").getMillis(),
false,
true
)
))
.columns("__time", "m1")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
ImmutableList.of(
new Object[]{DateTimes.of("2000-01-01").getMillis(), 1.0f},
new Object[]{DateTimes.of("2001-01-01").getMillis(), 4.0f}
)
);
}

@SqlTestFrameworkConfig.NumMergeBuffers(4)
@Test
public void testMultipleExactCountDistinctWithGroupingUsingGroupingSets()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# testTimeFilterOnSubquery@NullHandling=default case-crc:73448efc
# quidem testcase reason: EQUIV_PLAN_EXTRA_COLUMNS
!set debug true
!set defaultTimeout 300000
!set maxScatterGatherBytes 9223372036854775807
!set plannerStrategy DECOUPLED
!set sqlCurrentTimestamp 2000-01-01T00:00:00Z
!set sqlQueryId dummy
!set outputformat mysql
!use druidtest:///
SELECT __time, m1 FROM (SELECT * FROM "foo" LIMIT 100)
WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D');
+-------------------------+-----+
| __time | m1 |
+-------------------------+-----+
| 2000-01-01 00:00:00.000 | 1.0 |
| 2001-01-01 00:00:00.000 | 4.0 |
+-------------------------+-----+
(2 rows)

!ok
LogicalProject(__time=[$0], m1=[$5])
LogicalFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))])
LogicalSort(fetch=[100])
LogicalTableScan(table=[[druid, foo]])

!logicalPlan
DruidProject(__time=[$0], m1=[$5], druid=[logical])
DruidFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))])
DruidSort(fetch=[100], druid=[logical])
DruidTableScan(table=[[druid, foo]], druid=[logical])

!druidPlan
{
"queryType" : "scan",
"dataSource" : {
"type" : "query",
"query" : {
"queryType" : "scan",
"dataSource" : {
"type" : "table",
"name" : "foo"
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"resultFormat" : "compactedList",
"limit" : 100,
"columns" : [ "__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1" ],
"columnTypes" : [ "LONG", "LONG", "STRING", "STRING", "STRING", "FLOAT", "DOUBLE", "COMPLEX<hyperUnique>" ],
"granularity" : {
"type" : "all"
},
"legacy" : false
}
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"resultFormat" : "compactedList",
"filter" : {
"type" : "or",
"fields" : [ {
"type" : "bound",
"dimension" : "__time",
"lower" : "946684800000",
"upper" : "946771200000",
"upperStrict" : true,
"ordering" : {
"type" : "numeric"
}
}, {
"type" : "bound",
"dimension" : "__time",
"lower" : "978307200000",
"upper" : "978393600000",
"upperStrict" : true,
"ordering" : {
"type" : "numeric"
}
} ]
},
"columns" : [ "__time", "m1" ],
"columnTypes" : [ "LONG", "FLOAT" ],
"granularity" : {
"type" : "all"
},
"legacy" : false
}
!nativePlan
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# testTimeFilterOnSubquery@NullHandling=sql case-crc:73448efc
# quidem testcase reason: EQUIV_PLAN_EXTRA_COLUMNS
!set debug true
!set defaultTimeout 300000
!set maxScatterGatherBytes 9223372036854775807
!set plannerStrategy DECOUPLED
!set sqlCurrentTimestamp 2000-01-01T00:00:00Z
!set sqlQueryId dummy
!set outputformat mysql
!use druidtest:///
SELECT __time, m1 FROM (SELECT * FROM "foo" LIMIT 100)
WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D');
+-------------------------+-----+
| __time | m1 |
+-------------------------+-----+
| 2000-01-01 00:00:00.000 | 1.0 |
| 2001-01-01 00:00:00.000 | 4.0 |
+-------------------------+-----+
(2 rows)

!ok
LogicalProject(__time=[$0], m1=[$5])
LogicalFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))])
LogicalSort(fetch=[100])
LogicalTableScan(table=[[druid, foo]])

!logicalPlan
DruidProject(__time=[$0], m1=[$5], druid=[logical])
DruidFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))])
DruidSort(fetch=[100], druid=[logical])
DruidTableScan(table=[[druid, foo]], druid=[logical])

!druidPlan
{
"queryType" : "scan",
"dataSource" : {
"type" : "query",
"query" : {
"queryType" : "scan",
"dataSource" : {
"type" : "table",
"name" : "foo"
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"resultFormat" : "compactedList",
"limit" : 100,
"columns" : [ "__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1" ],
"columnTypes" : [ "LONG", "LONG", "STRING", "STRING", "STRING", "FLOAT", "DOUBLE", "COMPLEX<hyperUnique>" ],
"granularity" : {
"type" : "all"
},
"legacy" : false
}
},
"intervals" : {
"type" : "intervals",
"intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ]
},
"resultFormat" : "compactedList",
"filter" : {
"type" : "or",
"fields" : [ {
"type" : "range",
"column" : "__time",
"matchValueType" : "LONG",
"lower" : 946684800000,
"upper" : 946771200000,
"upperOpen" : true
}, {
"type" : "range",
"column" : "__time",
"matchValueType" : "LONG",
"lower" : 978307200000,
"upper" : 978393600000,
"upperOpen" : true
} ]
},
"columns" : [ "__time", "m1" ],
"columnTypes" : [ "LONG", "FLOAT" ],
"granularity" : {
"type" : "all"
},
"legacy" : false
}
!nativePlan

0 comments on commit dc223f2

Please sign in to comment.