Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating plans when using joins with unnest on the left #15075

Merged
merged 11 commits into from
Oct 7, 2023
53 changes: 39 additions & 14 deletions processing/src/main/java/org/apache/druid/query/JoinDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,18 @@ private Function<SegmentReference, SegmentReference> createSegmentMapFunctionInt
.orElse(null)
)
);

final Function<SegmentReference, SegmentReference> baseMapFn;
if (left instanceof JoinDataSource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems worth a comment on what is going on. Is it still ok to do if left is not concrete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment as into why we are not using the isConcrete() check and instead using the instanceof check here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is added

baseMapFn = Function.identity();
} else {
baseMapFn = left.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
}
return baseSegment ->
new HashJoinSegment(
baseSegment,
baseMapFn.apply(baseSegment),
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
Expand All @@ -501,20 +509,37 @@ private static Triple<DataSource, DimFilter, List<PreJoinableClause>> flattenJoi
DimFilter currentDimFilter = null;
final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();

while (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
currentDimFilter = validateLeftFilter(current, joinDataSource.getLeftFilter());
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
joinDataSource.getRight(),
joinDataSource.getJoinType(),
joinDataSource.getConditionAnalysis()
)
);
// There can be queries like
// Join of Unnest of Join of Unnest of Filter
// so these checks are needed to be ORed
// to get the base
// This also means that an addition of a new datasource
// Will need an instanceof check here
// A future work should look into if the flattenJoin
// can be refactored to omit these instanceof checks
while (current instanceof JoinDataSource || current instanceof UnnestDataSource || current instanceof FilteredDataSource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add test cases for self join with unnest datasource if we do not have already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added a test with self join on an unnest data source

if (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
currentDimFilter = validateLeftFilter(current, joinDataSource.getLeftFilter());
preJoinableClauses.add(
new PreJoinableClause(
joinDataSource.getRightPrefix(),
joinDataSource.getRight(),
joinDataSource.getJoinType(),
joinDataSource.getConditionAnalysis()
)
);
} else if (current instanceof UnnestDataSource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem intuitive to me that we can flatten away unnest and filtered datasources, could we add comments explaining why its ok? is it still ok if the unnest datasource is wrapping a join datasource? like does it flatten through it? where does the unnest and filters go in that case?

Copy link
Contributor Author

@somu-imply somu-imply Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add comments. The getAnalysis() of an Unnest or a filteredDS always delegates to its base. So flattening through a Join->Unnest->Join kind of scenario to get the base data source makes sense as it goes down to find the base concrete data source. In this PR, the filters on the filteredDataSource and unnestDataSource are not pushed down to the left of the join, the unnest filter and the filter on the filteredDataSource remain on the data source. I have added an unit test of Join->Unnest->Join will add another UT of Join->Unnest->Filter->Join

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment and unit test added

final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
} else {
final FilteredDataSource filteredDataSource = (FilteredDataSource) current;
current = filteredDataSource.getBase();
}
}


// Join clauses were added in the order we saw them while traversing down, but we need to apply them in the
// going-up order. So reverse them.
Collections.reverse(preJoinableClauses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -433,6 +436,51 @@ public void test_computeJoinDataSourceCacheKey_keyChangesWithPrefix()
Assert.assertFalse(Arrays.equals(cacheKey1, cacheKey2));
}

@Test
public void testGetAnalysisWithUnnestDS()
{
JoinDataSource dataSource = JoinDataSource.create(
UnnestDataSource.create(
new TableDataSource("table1"),
new ExpressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING, ExprMacroTable.nil()),
null
),
new TableDataSource("table2"),
"j.",
"x == \"j.x\"",
JoinType.LEFT,
null,
ExprMacroTable.nil(),
null
);
DataSourceAnalysis analysis = dataSource.getAnalysis();
Assert.assertEquals("table1", analysis.getBaseDataSource().getTableNames().iterator().next());
}

@Test
public void testGetAnalysisWithFilteredDS()
{
JoinDataSource dataSource = JoinDataSource.create(
UnnestDataSource.create(
FilteredDataSource.create(
new TableDataSource("table1"),
TrueDimFilter.instance()
),
new ExpressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING, ExprMacroTable.nil()),
null
),
new TableDataSource("table2"),
"j.",
"x == \"j.x\"",
JoinType.LEFT,
null,
ExprMacroTable.nil(),
null
);
DataSourceAnalysis analysis = dataSource.getAnalysis();
Assert.assertEquals("table1", analysis.getBaseDataSource().getTableNames().iterator().next());
}

@Test
public void test_computeJoinDataSourceCacheKey_keyChangesWithBaseFilter()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static boolean isScanOrMapping(final DruidRel<?> druidRel, final boolean
*/
public static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoinOrUnion)
{
if (druidRel instanceof DruidQueryRel || (canBeJoinOrUnion && (druidRel instanceof DruidJoinQueryRel
if (druidRel instanceof DruidQueryRel || (canBeJoinOrUnion && (druidRel instanceof DruidJoinQueryRel || druidRel instanceof DruidCorrelateUnnestRel
|| druidRel instanceof DruidUnionDataSourceRel))) {
final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
final PartialDruidQuery.Stage stage = partialQuery.stage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
Expand All @@ -49,6 +50,7 @@
import org.apache.druid.query.QueryException;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
Expand All @@ -64,6 +66,7 @@
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.ExtractionDimensionSpec;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.LikeDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
Expand Down Expand Up @@ -5914,4 +5917,203 @@ public void testJoinWithInputRefCondition()
)
);
}

@Test
public void testJoinsWithUnnestOnLeft()
{
// Segment map function of MSQ needs some work
// To handle these nested cases
// Remove this when that's handled
msqIncompatible();
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"with t1 as (\n"
+ "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3)\n"
+ ")\n"
+ "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n"
+ "ON t1.d3 = t2.\"dim2\"",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
UnnestDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE1),
expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
null
),
new QueryDataSource(
newScanQueryBuilder()
.intervals(querySegmentSpec(Filtration.eternity()))
.dataSource(CalciteTests.DATASOURCE3)
.columns("dim2")
.legacy(false)
.context(context)
.build()
),
"_j0.",
"(\"j0.unnest\" == \"_j0.dim2\")",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("_j0.dim2", "dim3", "j0.unnest")
.context(context)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"}
) : ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"", "", ""}
)
);
}

@Test
public void testJoinsWithUnnestOverFilteredDSOnLeft()
{
// Segment map function of MSQ needs some work
// To handle these nested cases
// Remove this when that's handled
msqIncompatible();
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"with t1 as (\n"
+ "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3) where dim2='a'\n"
+ ")\n"
+ "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n"
+ "ON t1.d3 = t2.\"dim2\"",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
UnnestDataSource.create(
FilteredDataSource.create(
new TableDataSource(CalciteTests.DATASOURCE1),
equality("dim2", "a", ColumnType.STRING)
),
expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
null
),
new QueryDataSource(
newScanQueryBuilder()
.intervals(querySegmentSpec(Filtration.eternity()))
.dataSource(CalciteTests.DATASOURCE3)
.columns("dim2")
.legacy(false)
.context(context)
.build()
),
"_j0.",
"(\"j0.unnest\" == \"_j0.dim2\")",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("_j0.dim2", "dim3", "j0.unnest")
.context(context)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"}
) : ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"", "", ""}
)
);
}

@Test
public void testJoinsWithUnnestOverJoin()
{
// Segment map function of MSQ needs some work
// To handle these nested cases
// Remove this when that's handled
msqIncompatible();
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
testQuery(
"with t1 as (\n"
+ "select * from (SELECT * from foo JOIN (select dim2 as t from foo where dim2 IN ('a','b','ab','abc')) ON dim2=t), "
+ " unnest(MV_TO_ARRAY(\"dim3\")) as u(d3) \n"
+ ")\n"
+ "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN numfoo as t2\n"
+ "ON t1.d3 = t2.\"dim2\"",
context,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
UnnestDataSource.create(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.intervals(querySegmentSpec(Filtration.eternity()))
.dataSource(CalciteTests.DATASOURCE1)
.filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null))
.legacy(false)
.context(context)
.columns("dim2")
.build()
),
"j0.",
"(\"dim2\" == \"j0.dim2\")",
JoinType.INNER
),
expressionVirtualColumn("_j0.unnest", "\"dim3\"", ColumnType.STRING),
null
),
new QueryDataSource(
newScanQueryBuilder()
.intervals(querySegmentSpec(Filtration.eternity()))
.dataSource(CalciteTests.DATASOURCE3)
.columns("dim2")
.legacy(false)
.context(context)
.build()
),
"__j0.",
"(\"_j0.unnest\" == \"__j0.dim2\")",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("__j0.dim2", "_j0.unnest", "dim3")
.context(context)
.build()
),
useDefault ?
ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"}
) : ImmutableList.of(
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"[\"a\",\"b\"]", "a", "a"},
new Object[]{"", "", ""},
new Object[]{"", "", ""},
new Object[]{"", "", ""},
new Object[]{"", "", ""}
)
);
}
}