Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UNION ALLs in MSQ #14981

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -170,6 +171,18 @@ public static DataSourcePlan forDataSource(
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnionDataSource) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the MSQ known issues and the docs where ever we are calling union all as unsupported in MSQ.

return forUnion(
queryKit,
queryId,
queryContext,
(UnionDataSource) dataSource,
querySegmentSpec,
filter,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof JoinDataSource) {
final JoinAlgorithm preferredJoinAlgorithm = PlannerContext.getJoinAlgorithm(queryContext);
final JoinAlgorithm deducedJoinAlgorithm = deduceJoinAlgorithm(
Expand Down Expand Up @@ -458,6 +471,54 @@ private static DataSourcePlan forUnnest(
);
}

private static DataSourcePlan forUnion(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnionDataSource unionDataSource,
final QuerySegmentSpec querySegmentSpec,
@Nullable DimFilter filter,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
// This is done to prevent loss of generality since MSQ can plan any type of DataSource.
List<DataSource> children = unionDataSource.getDataSources();

final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder();
final List<DataSource> newChildren = new ArrayList<>();
final List<InputSpec> inputSpecs = new ArrayList<>();
final IntSet broadcastInputs = new IntOpenHashSet();

for (DataSource child : children) {
DataSourcePlan childDataSourcePlan = forDataSource(
queryKit,
queryId,
queryContext,
child,
querySegmentSpec,
filter,
maxWorkerCount,
Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()),
broadcast
);

int shift = inputSpecs.size();

newChildren.add(shiftInputNumbers(childDataSourcePlan.getNewDataSource(), shift));
inputSpecs.addAll(childDataSourcePlan.getInputSpecs());
childDataSourcePlan.getSubQueryDefBuilder().ifPresent(subqueryDefBuilder::addAll);
childDataSourcePlan.getBroadcastInputs().forEach(inp -> broadcastInputs.add(inp + shift));
}
return new DataSourcePlan(
new UnionDataSource(newChildren),
inputSpecs,
broadcastInputs,
subqueryDefBuilder
);
}

/**
* Build a plan for broadcast hash-join.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case ALLOW_TOP_LEVEL_UNION_ALL:
return false;
case UNNEST:
case CAN_SELECT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.druid.msq.exec;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -330,4 +332,46 @@ public void testTooManyInputFiles() throws IOException
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
.verifyResults();
}

@Test
public void testUnionAllWithDifferentColumnNames()
{
// This test fails till MSQ can support arbitrary column names and column types for UNION ALL
testIngestQuery()
.setSql(
"INSERT INTO druid.dst "
+ "SELECT dim2, dim1, m1 FROM foo2 "
+ "UNION ALL "
+ "SELECT dim1, dim2, m1 FROM foo "
+ "PARTITIONED BY ALL TIME")
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains("SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
.verifyPlanningErrors();
}

@Test
public void testTopLevelUnionAllWithJoins()
{
// This test fails becaues it is a top level UNION ALL which cannot be planned using MSQ. It will be supported once
// we support arbitrary types and column names for UNION ALL
testSelectQuery()
.setSql(
"(SELECT COUNT(*) FROM foo INNER JOIN lookup.lookyloo ON foo.dim1 = lookyloo.k) "
+ "UNION ALL "
+ "(SELECT SUM(cnt) FROM foo)"
)
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains(
"SQL requires union between inputs that are not simple table scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
Expand Down Expand Up @@ -1929,8 +1930,8 @@ public void testGroupByOnFooWithDurableStoragePathAssertions() throws IOExceptio
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "cnt"),
new ColumnMapping("a0", "cnt1")
)
))
)
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
Expand Down Expand Up @@ -2322,6 +2323,64 @@ public void testSelectUnnestOnQueryFoo()
.verifyResults();
}

@Test
public void testUnionAllUsingUnionDataSource()
{

final RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();

final List<Object[]> results = ImmutableList.of(
new Object[]{946684800000L, ""},
new Object[]{946684800000L, ""},
new Object[]{946771200000L, "10.1"},
new Object[]{946771200000L, "10.1"},
new Object[]{946857600000L, "2"},
new Object[]{946857600000L, "2"},
new Object[]{978307200000L, "1"},
new Object[]{978307200000L, "1"},
new Object[]{978393600000L, "def"},
new Object[]{978393600000L, "def"},
new Object[]{978480000000L, "abc"},
new Object[]{978480000000L, "abc"}
);
// This plans the query using DruidUnionDataSourceRule since the DruidUnionDataSourceRule#isCompatible
// returns true (column names, types match, and it is a union on the table data sources).
// It gets planned correctly, however MSQ engine cannot plan the query correctly
testSelectQuery()
.setSql("SELECT __time, dim1 FROM foo\n"
+ "UNION ALL\n"
+ "SELECT __time, dim1 FROM foo\n")
.setExpectedRowSignature(rowSignature)
.setExpectedMSQSpec(
MSQSpec.builder()
.query(newScanQueryBuilder()
.dataSource(new UnionDataSource(
ImmutableList.of(new TableDataSource("foo"), new TableDataSource("foo"))
))
.intervals(querySegmentSpec(Filtration.eternity()))
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false)
.context(defaultScanQueryContext(
context,
rowSignature
))
.columns(ImmutableList.of("__time", "dim1"))
.build())
.columnMappings(ColumnMappings.identity(rowSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination()
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setQueryContext(context)
.setExpectedResultRows(results)
.verifyResults();
}

@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{
Expand Down
Loading