Skip to content

Commit

Permalink
Intervals are updated properly for Unnest queries (#15020)
Browse files Browse the repository at this point in the history
Fixes a bug where the unnest queries were not updated with the correct intervals.
  • Loading branch information
somu-imply authored Oct 3, 2023
1 parent 64754b6 commit cb05028
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class MaterializedViewQueryTest
private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
private DataSourceOptimizer optimizer;

static {
NullHandling.initializeForTests();
}

@Before
public void setUp()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ public void testTimeColumnAggregationFromExtern() throws IOException
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(DruidException.Persona.ADMIN, DruidException.Category.INVALID_INPUT, "general")
.expectMessageIs(
"Query planning failed for unknown reason, our best guess is this "
"Query could not be planned. A possible reason is "
+ "[LATEST and EARLIEST aggregators implicitly depend on the __time column, "
+ "but the table queried doesn't contain a __time column. "
+ "Please use LATEST_BY or EARLIEST_BY and specify the column explicitly.]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
exception,
"Query planning failed for unknown reason, our best guess is this [%s]",
"Query could not be planned. A possible reason is [%s]",
errorMessage
);
}
Expand Down
54 changes: 52 additions & 2 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,26 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.SimpleLongAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
Expand Down Expand Up @@ -773,6 +777,17 @@ VirtualColumns getVirtualColumns(final boolean includeDimensions)
return VirtualColumns.create(columns);
}

public static List<DimFilter> getAllFiltersUnderDataSource(DataSource d, List<DimFilter> dimFilterList)
{
if (d instanceof FilteredDataSource) {
dimFilterList.add(((FilteredDataSource) d).getFilter());
}
for (DataSource ds : d.getChildren()) {
dimFilterList.addAll(getAllFiltersUnderDataSource(ds, dimFilterList));
}
return dimFilterList;
}

/**
* Returns a pair of DataSource and Filtration object created on the query filter. In case the, data source is
* a join datasource, the datasource may be altered and left filter of join datasource may
Expand All @@ -786,8 +801,44 @@ static Pair<DataSource, Filtration> getFiltration(
JoinableFactoryWrapper joinableFactoryWrapper
)
{
if (!canUseIntervalFiltering(dataSource)) {
if (dataSource instanceof UnnestDataSource) {
// UnnestDataSource can have another unnest data source
// join datasource, filtered data source, etc as base
Pair<DataSource, Filtration> pair = getFiltration(
((UnnestDataSource) dataSource).getBase(),
filter,
virtualColumnRegistry,
joinableFactoryWrapper
);
return Pair.of(dataSource, pair.rhs);
} else if (!canUseIntervalFiltering(dataSource)) {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry.getFullRowSignature(), false));
} else if (dataSource instanceof FilteredDataSource) {
// A filteredDS is created only inside the rel for Unnest, ensuring it only grabs the outermost filter
// and, if possible, pushes it down inside the data source.
// So a chain of Filter->Unnest->Filter is typically impossible when the query is done through SQL.
// Also, Calcite has filter reduction rules that push filters deep into base data sources for better query planning.
// A base table with a chain of filters is synonymous with a filteredDS.
// We recursively find all filters under a filteredDS and then
// 1) creating a filtration from the filteredDS's filters and
// 2) Updating the interval of the outer filter with the intervals in step 1, and you'll see these 2 calls in the code
List<DimFilter> dimFilterList = getAllFiltersUnderDataSource(dataSource, new ArrayList<>());
final FilteredDataSource filteredDataSource = (FilteredDataSource) dataSource;
// Defensive check as in the base of a filter cannot be another filter
final DataSource baseOfFilterDataSource = filteredDataSource.getBase();
if (baseOfFilterDataSource instanceof FilteredDataSource) {
throw DruidException.defensive("Cannot create a filteredDataSource using another filteredDataSource as a base");
}
final boolean useIntervalFiltering = canUseIntervalFiltering(filteredDataSource);
final Filtration baseFiltration = toFiltration(
new AndDimFilter(dimFilterList),
virtualColumnRegistry.getFullRowSignature(),
useIntervalFiltering
);
// Adds the intervals from the filter of filtered data source to query filtration
final Filtration queryFiltration = Filtration.create(filter, baseFiltration.getIntervals())
.optimize(virtualColumnRegistry.getFullRowSignature());
return Pair.of(filteredDataSource, queryFiltration);
} else if (dataSource instanceof JoinDataSource && ((JoinDataSource) dataSource).getLeftFilter() != null) {
final JoinDataSource joinDataSource = (JoinDataSource) dataSource;

Expand All @@ -809,7 +860,6 @@ static Pair<DataSource, Filtration> getFiltration(
leftFiltration.getDimFilter(),
joinableFactoryWrapper
);

return Pair.of(newDataSource, queryFiltration);
} else {
return Pair.of(dataSource, toFiltration(filter, virtualColumnRegistry.getFullRowSignature(), true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St
new DruidExceptionMatcher(DruidException.Persona.ADMIN, DruidException.Category.INVALID_INPUT, "general")
.expectMessageIs(
StringUtils.format(
"Query planning failed for unknown reason, our best guess is this [%s]",
"Query could not be planned. A possible reason is [%s]",
expectedError
)
)
Expand Down
Loading

0 comments on commit cb05028

Please sign in to comment.