Skip to content

Commit

Permalink
Unnest now works on MSQ (apache#14886)
Browse files Browse the repository at this point in the history
This entails:
    Removing the enableUnnest flag and additional machinery
    Updating the datasource plan and frame processors to support unnest
    Adding support in MSQ for UnnestDataSource and FilteredDataSource
    CalciteArrayTest now has a MSQ test component
    Additional tests for Unnest on MSQ
  • Loading branch information
somu-imply authored Sep 25, 2023
1 parent c62193c commit c184b52
Show file tree
Hide file tree
Showing 17 changed files with 732 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
Expand Down Expand Up @@ -96,7 +98,17 @@ private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputCh
final long memoryReservedForBroadcastJoin
)
{
if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
// An UnnestDataSource or FilteredDataSource can have a join as a base
// In such a case a side channel is expected to be there
final DataSource baseDataSource;
if (dataSource instanceof UnnestDataSource) {
baseDataSource = ((UnnestDataSource) dataSource).getBase();
} else if (dataSource instanceof FilteredDataSource) {
baseDataSource = ((FilteredDataSource) dataSource).getBase();
} else {
baseDataSource = dataSource;
}
if (!(baseDataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]", dataSource);
}

Expand All @@ -106,8 +118,8 @@ private static Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> makeInputCh
if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}

if (dataSource instanceof JoinDataSource) {
if (baseDataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();

Expand Down Expand Up @@ -196,7 +208,7 @@ private boolean initializeSegmentMapFn(final IntSet readableInputs)
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
segmentMapFn = Function.identity();
segmentMapFn = query.getDataSource().createSegmentMapFunction(query, cpuAccumulator);
return true;
} else {
final boolean retVal = broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
Expand Down Expand Up @@ -135,8 +137,29 @@ public static DataSourcePlan forDataSource(
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forInline((InlineDataSource) dataSource, broadcast);
} else if (dataSource instanceof LookupDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forLookup((LookupDataSource) dataSource, broadcast);
} else if (dataSource instanceof FilteredDataSource) {
return forFilteredDataSource(
queryKit,
queryId,
queryContext,
(FilteredDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof UnnestDataSource) {
return forUnnest(
queryKit,
queryId,
queryContext,
(UnnestDataSource) dataSource,
querySegmentSpec,
maxWorkerCount,
minStageNumber,
broadcast
);
} else if (dataSource instanceof QueryDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forQuery(
Expand Down Expand Up @@ -353,6 +376,88 @@ private static DataSourcePlan forQuery(
);
}

private static DataSourcePlan forFilteredDataSource(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final FilteredDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
minStageNumber,
broadcast
);

DataSource newDataSource = basePlan.getNewDataSource();

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());
newDataSource = FilteredDataSource.create(newDataSource, dataSource.getFilter());
return new DataSourcePlan(
newDataSource,
inputSpecs,
basePlan.getBroadcastInputs(),
basePlan.getSubQueryDefBuilder().orElse(null)
);

}

/**
* Build a plan for Unnest data source
*/
private static DataSourcePlan forUnnest(
final QueryKit queryKit,
final String queryId,
final QueryContext queryContext,
final UnnestDataSource dataSource,
final QuerySegmentSpec querySegmentSpec,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
)
{
// Find the plan for base data source by recursing
final DataSourcePlan basePlan = forDataSource(
queryKit,
queryId,
queryContext,
dataSource.getBase(),
querySegmentSpec,
null,
maxWorkerCount,
minStageNumber,
broadcast
);
DataSource newDataSource = basePlan.getNewDataSource();

final List<InputSpec> inputSpecs = new ArrayList<>(basePlan.getInputSpecs());

// Create the new data source using the data source from the base plan
newDataSource = UnnestDataSource.create(
newDataSource,
dataSource.getVirtualColumn(),
dataSource.getUnnestFilter()
);
// The base data source can be a join and might already have broadcast inputs
// Need to set the broadcast inputs from the basePlan
return new DataSourcePlan(
newDataSource,
inputSpecs,
basePlan.getBroadcastInputs(),
basePlan.getSubQueryDefBuilder().orElse(null)
);
}

/**
* Build a plan for broadcast hash-join.
*/
Expand All @@ -379,7 +484,6 @@ private static DataSourcePlan forBroadcastHashJoin(
null, // Don't push query filters down through a join: this needs some work to ensure pruning works properly.
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),

broadcast
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
return false;
case UNNEST:
case CAN_SELECT:
case CAN_INSERT:
case CAN_REPLACE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,95 @@ public void testInsertWithExistingTimeColumn() throws IOException

}

@Test
public void testInsertWithUnnestInline()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{1692226800000L, 1L},
new Object[]{1692226800000L, 2L},
new Object[]{1692226800000L, 3L}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.LONG)
.build();


testIngestQuery().setSql(
"insert into foo1 select TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertWithUnnest()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, "a"},
new Object[]{946684800000L, "b"},
new Object[]{946771200000L, "b"},
new Object[]{946771200000L, "c"},
new Object[]{946857600000L, "d"},
new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null},
new Object[]{978393600000L, null},
new Object[]{978480000000L, null}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.STRING)
.build();


testIngestQuery().setSql(
"insert into foo1 select __time, d from foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertWithUnnestWithVirtualColumns()
{
List<Object[]> expectedRows = ImmutableList.of(
new Object[]{946684800000L, 1.0f},
new Object[]{946684800000L, 1.0f},
new Object[]{946771200000L, 2.0f},
new Object[]{946771200000L, 2.0f},
new Object[]{946857600000L, 3.0f},
new Object[]{946857600000L, 3.0f},
new Object[]{978307200000L, 4.0f},
new Object[]{978307200000L, 4.0f},
new Object[]{978393600000L, 5.0f},
new Object[]{978393600000L, 5.0f},
new Object[]{978480000000L, 6.0f},
new Object[]{978480000000L, 6.0f}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("d", ColumnType.FLOAT)
.build();


testIngestQuery().setSql(
"insert into foo1 select __time, d from foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL")
.setQueryContext(context)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Test
public void testInsertOnExternalDataSource() throws IOException
{
Expand Down
Loading

0 comments on commit c184b52

Please sign in to comment.