Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest now works on MSQ #14886

Merged
merged 31 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
416a139
Adding unnest to MSQ
somu-imply Aug 15, 2023
5037157
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 21, 2023
5916cfd
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 23, 2023
eb024d0
Adding filtered data source + nested querying on unnest works in MSQ
somu-imply Aug 24, 2023
d3c1e76
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 24, 2023
b4a8c62
Removing Unnest feature
somu-imply Aug 24, 2023
96bbaa4
Cleaning up code
somu-imply Aug 24, 2023
a3599b7
removing a test added by mistake
somu-imply Aug 24, 2023
fab0425
Handling failed CI for useDefault=false
somu-imply Aug 25, 2023
b17186e
tmp
somu-imply Aug 25, 2023
31f283b
Do not need the shift
somu-imply Aug 25, 2023
be06d35
Updating to accept JOIN under unnest
somu-imply Aug 29, 2023
12bd9fa
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 29, 2023
5096be3
FilteredDataSource and unnestdata source can have joins underneath an…
somu-imply Aug 29, 2023
e249736
Removing stale comment
somu-imply Aug 29, 2023
79c1cd4
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Aug 29, 2023
1dd7933
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 5, 2023
53db2ff
Adding insert and replace tests and fixing broadcast inputs for filte…
somu-imply Sep 5, 2023
948668c
Refactor 1
somu-imply Sep 5, 2023
8d33ed9
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 7, 2023
a4659d3
Adding the unnest engine feature back
somu-imply Sep 7, 2023
7a30bfc
Making the feature always true
somu-imply Sep 7, 2023
d942772
Moving the engine feature from planner context to individual engines.…
somu-imply Sep 7, 2023
d0a423b
Revert "Moving the engine feature from planner context to individual …
somu-imply Sep 7, 2023
7119ed0
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 15, 2023
4588e30
Removing an unnecessary check in dataSourcePlan + pushing feature ins…
somu-imply Sep 15, 2023
3b2bdfb
Updating engine in tests
somu-imply Sep 15, 2023
32dbeba
Adding new datasource to msq tests
somu-imply Sep 15, 2023
ef2424f
Merge remote-tracking branch 'upstream/master' into Unnest_MSQ
somu-imply Sep 18, 2023
77905d4
Updating code and test cases
somu-imply Sep 21, 2023
d4a6c7c
Removed the subqueryDefBuilder to use the one from basePlan for Unnes…
somu-imply Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
Expand Down Expand Up @@ -96,7 +98,17 @@ private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputCh
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
// An UnnestDataSource or FilteredDataSource can have a join as a base
// In such a case a side channel is expected to be there
final DataSource baseDataSource;
if (dataSource instanceof UnnestDataSource) {
baseDataSource = ((UnnestDataSource) dataSource).getBase();
} else if (dataSource instanceof FilteredDataSource) {
baseDataSource = ((FilteredDataSource) dataSource).getBase();
} else {
baseDataSource = dataSource;
}
if (!(baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

Expand All @@ -106,8 +118,8 @@ private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputCh
if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
if (baseDataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

Expand Down Expand Up @@ -196,7 +208,7 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
segmentMapFn = Function.identity();
segmentMapFn = query.getDataSource().createSegmentMapFunction(query, cpuAccumulator);
return true;
} else {
final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
Expand Down Expand Up @@ -135,8 +137,29 @@ public static DataSourcePlan forDataSource(
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forInline((InlineDataSource) dataSource, broadcast);
} else if (dataSource instanceof LookupDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forLookup((LookupDataSource) dataSource, broadcast);
} else if (dataSource instanceof FilteredDataSource) {
return forFilteredDataSource(
queryKit,
queryId,
queryContext,
(FilteredDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnnestDataSource) {
return forUnnest(
queryKit,
queryId,
queryContext,
(UnnestDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof QueryDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forQuery(
Expand Down Expand Up @@ -347,6 +370,88 @@ private static DataSourcePlan forQuery(
);
}

private static DataSourcePlan forFilteredDataSource(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final FilteredDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
minStageNumber,
broadcast
);

DataSource newDataSource = basePlan.getNewDataSource();

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());
newDataSource = FilteredDataSource.create(newDataSource, dataSource.getFilter());
return new DataSourcePlan(
newDataSource,
inputSpecs,
basePlan.getBroadcastInputs(),
basePlan.getSubQueryDefBuilder().orElse(null)
);

}

/**
* Build a plan for Unnest data source
*/
private static DataSourcePlan forUnnest(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnnestDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
// Find the plan for base data source by recursing
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
minStageNumber,
broadcast
);
DataSource newDataSource = basePlan.getNewDataSource();

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());

// Create the new data source using the data source from the base plan
newDataSource = UnnestDataSource.create(
newDataSource,
dataSource.getVirtualColumn(),
dataSource.getUnnestFilter()
);
// The base data source can be a join and might already have broadcast inputs
// Need to set the broadcast inputs from the basePlan
return new DataSourcePlan(
newDataSource,
inputSpecs,
basePlan.getBroadcastInputs(),
basePlan.getSubQueryDefBuilder().orElse(null)
);
}

/**
* Build a plan for broadcast hash-join.
*/
Expand All @@ -373,7 +478,6 @@ private static DataSourcePlan forBroadcastHashJoin(
null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly.
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),

broadcast
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
return false;
case UNNEST:
case CAN_SELECT:
case CAN_INSERT:
case CAN_REPLACE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,95 @@ public void testInsertWithExistingTimeColumn() throws IOException

}

@Test
public void testInsertWithUnnestInline()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{1692226800000L, 1L},
new Object[]{1692226800000L, 2L},
new Object[]{1692226800000L, 3L}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.LONG)
.build();


testIngestQuery().setSql(
"insert into foo1 select TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertWithUnnest()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, "a"},
new Object[]{946684800000L, "b"},
new Object[]{946771200000L, "b"},
new Object[]{946771200000L, "c"},
new Object[]{946857600000L, "d"},
new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null},
new Object[]{978393600000L, null},
new Object[]{978480000000L, null}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.STRING)
.build();


testIngestQuery().setSql(
"insert into foo1 select __time, d from foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertWithUnnestWithVirtualColumns()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, 1.0f},
new Object[]{946684800000L, 1.0f},
new Object[]{946771200000L, 2.0f},
new Object[]{946771200000L, 2.0f},
new Object[]{946857600000L, 3.0f},
new Object[]{946857600000L, 3.0f},
new Object[]{978307200000L, 4.0f},
new Object[]{978307200000L, 4.0f},
new Object[]{978393600000L, 5.0f},
new Object[]{978393600000L, 5.0f},
new Object[]{978480000000L, 6.0f},
new Object[]{978480000000L, 6.0f}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.FLOAT)
.build();


testIngestQuery().setSql(
"insert into foo1 select __time, d from foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertOnExternalDataSource() throws IOException
{
Expand Down
Loading