Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window function on msq #15470

Merged
merged 54 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
4ef900d
Initial code
somu-imply Nov 14, 2023
5e8dab1
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Nov 17, 2023
9ea01fc
Hacky way of atleast getting things to work
somu-imply Nov 30, 2023
9c4ac74
Temp unfinished changes
somu-imply Dec 1, 2023
54f9ac3
Converting rac back to frames
somu-imply Dec 7, 2023
d6cef47
Working UTs
somu-imply Dec 7, 2023
40a18f1
Fixing running window function in console
somu-imply Dec 8, 2023
3ecc96a
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Dec 31, 2023
7e34aa8
Adding shuffle spec and separating out stages for each window
somu-imply Jan 3, 2024
f5a1f59
serde stuff by adding ops to proc factory
somu-imply Jan 8, 2024
ab6e317
Updating for first set of reviews
somu-imply Jan 9, 2024
f1efec3
Changes for partition boundary detection
somu-imply Jan 16, 2024
1dae450
cleaning up some code, adding some tests
somu-imply Jan 19, 2024
ccfe473
Fixing up shuffle in group by if window afterwards
somu-imply Jan 19, 2024
500f54f
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 20, 2024
98f4ba5
fix after merge
somu-imply Jan 20, 2024
ee61333
More updates and ignoring the insert cases FOR NOW..
somu-imply Jan 20, 2024
8f8bfdc
A possible fix for the insert case
somu-imply Jan 20, 2024
ec1f164
Support for leaf operators in window functions in MSQ
somu-imply Jan 23, 2024
465598a
in case of MSQ engine planning the query with leafOperator as a windo…
somu-imply Jan 24, 2024
d490d78
Removing inspection profile
somu-imply Jan 24, 2024
836b693
Revert "Removing inspection profile"
somu-imply Jan 24, 2024
aee40ba
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 24, 2024
3f3d1b0
Updating inspection profile
somu-imply Jan 24, 2024
aa1b753
Updating scan query kit to handle shuffle for next window
somu-imply Jan 24, 2024
634d5bc
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 27, 2024
bd5f27b
Some comments addressed
somu-imply Jan 29, 2024
5338b76
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 29, 2024
b688755
Throwing exceptions in 2 cases
somu-imply Jan 29, 2024
ceff35d
Moving examples to a new file and adding new examples with join
somu-imply Feb 1, 2024
fc2e2b6
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 1, 2024
751e947
More tests now window functions over unnest
somu-imply Feb 1, 2024
d7840a3
Addressing review commets part 1
somu-imply Feb 2, 2024
681ee1b
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 5, 2024
b487d2a
Changes to frame processor to move from base leaf to frame processor …
somu-imply Feb 5, 2024
c3baa1d
Some more refactoring and addressing comments
somu-imply Feb 5, 2024
9768da2
Updating more tests
somu-imply Feb 6, 2024
584fa8f
Making build pass
somu-imply Feb 6, 2024
0813cc8
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 14, 2024
d3755e1
Addressing latest set of comments part 1
somu-imply Mar 14, 2024
c0c74a0
Addressing latest review comments part 2
somu-imply Mar 14, 2024
60c6290
Minor refactoring and new tests after review
somu-imply Mar 20, 2024
91edf9e
Adding guardrails for materialization and avoiding partition bossting…
somu-imply Mar 20, 2024
399a78c
Adding more javadocs, guardrails and tests
somu-imply Mar 21, 2024
806c801
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 21, 2024
04424d7
Changes to one test case
somu-imply Mar 21, 2024
f0946b1
More fixes around guardrails and addressing last set of review comments
somu-imply Mar 27, 2024
ea7882c
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
83c96b9
Fixing a testcase after the merge
somu-imply Mar 27, 2024
cfca6a5
Fixing a test by using correct in filters for sql compat mode
somu-imply Mar 27, 2024
c3e2c29
Not documenting context flag and 1 more test change
somu-imply Mar 27, 2024
1464dae
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
520ab4e
New test for inner limit on group by
somu-imply Mar 28, 2024
16b75ce
Adding to known issues
somu-imply Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
Expand All @@ -185,6 +186,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -1173,6 +1175,7 @@ private QueryKit makeQueryControllerToolKit()
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.build();

return new MultiQueryKit(kitMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
import org.apache.druid.msq.querykit.InputNumberDataSource;
import org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
Expand Down Expand Up @@ -158,7 +159,7 @@ public List<? extends Module> getJacksonModules()
NilExtraInfoHolder.class,
SortMergeJoinFrameProcessorFactory.class,
QueryResultFrameProcessorFactory.class,

WindowOperatorQueryFrameProcessorFactory.class,
// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
InputNumberDataSource.class,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected BaseLeafFrameProcessor(
@Override
public List<ReadableFrameChannel> inputChannels()
{
// somu: need to clarify if the data is in broker only
if (baseInput.hasSegment()) {
return Collections.emptyList();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
{

private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class);
private final WindowOperatorQuery query;

private final List<OperatorFactory> operatorFactoryList;
private final ObjectMapper jsonMapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the object mapper be declared static ?

private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private final VirtualColumns frameWriterVirtualColumns;
private final Closer closer = Closer.create();
private final RowSignature outputStageSignature;
private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE);
private Cursor cursor;
private Segment segment;
private FrameWriter frameWriter;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed

public WindowOperatorQueryFrameProcessor(
final WindowOperatorQuery query,
final List<OperatorFactory> operatorFactoryList,
final ObjectMapper jsonMapper,
final ReadableInput baseInput,
final Function<SegmentReference, SegmentReference> segmentMapFn,
final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final RowSignature rowSignature
)
{
super(
baseInput,
segmentMapFn,
outputChannelHolder,
frameWriterFactoryHolder
);
this.query = query;
this.jsonMapper = jsonMapper;
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
this.operatorFactoryList = operatorFactoryList;
this.outputStageSignature = rowSignature;

final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
frameWriterVirtualColumns.add(partitionBoostVirtualColumn);

final VirtualColumn segmentGranularityVirtualColumn =
QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);

if (segmentGranularityVirtualColumn != null) {
frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
}

this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns);
}

private static Operator getOperator(RowBasedFrameRowAndColumns frameRowsAndColumns)
{
LazilyDecoratedRowsAndColumns ldrc = new LazilyDecoratedRowsAndColumns(
frameRowsAndColumns,
null,
null,
null,
OffsetLimit.limit(Integer.MAX_VALUE),
null,
null
);
// Create an operator on top of the created rows and columns
Operator op = new Operator()
{
@Nullable
@Override
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
receiver.push(ldrc);
receiver.completed();
return continuationObject;
}
};
return op;
}

@Override
protected ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment)
{
return null;
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor segment)
{
return null;
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
}

// previous stage output
@Override
protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel inputChannel, FrameReader inputFrameReader)
{
// Read the frames from the channel
// convert to FrameRowsAndColumns

if (inputChannel.canRead()) {
Frame f = inputChannel.read();

// the frame here is row based
// frame rows and columns need columnar. discuss with Eric
// Action item: need to implement a new rows and columns that accept a row-based frame

// Create a frame rows and columns what would
/**
* OVER(PARTITION BY m1)
*/
RowBasedFrameRowAndColumns frameRowsAndColumns = new RowBasedFrameRowAndColumns(f, inputFrameReader.signature());
Operator op = getOperator(frameRowsAndColumns);
//
//Operator op = new SegmentToRowsAndColumnsOperator(frameSegment);
// On the operator created above add the operators present in the query that we want to chain

// previous shuffle has partitioning
// need to remove partitioning here

for (OperatorFactory of : operatorFactoryList) {
op = of.wrap(op);
}

// Let the operator run
// the results that come in the receiver must be pushed to the outout channel
// need to transform the output rows and columns back to frame
Operator.go(op, new Operator.Receiver()
{
@Override
public Operator.Signal push(RowsAndColumns rac)
{
// convert the rac to a row-based frame
// Note that there can be multiple racs here
// one per partition
// But one rac will be written to 1 frame
AtomicInteger rowId = new AtomicInteger(0);
createFrameWriterIfNeeded(rac, rowId);
materializeRacToRowFrames(rac, outputStageSignature, rowId);
try {
flushFrameWriter();
}
catch (IOException e) {
throw new RuntimeException(e);
}
return Operator.Signal.GO;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning GO check if the frames can be paused. In such a case return that. Also need to test pausing frames through the MSQ framework correctly

}

@Override
public void completed()
{

}
});

} else if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
return ReturnOrAwait.runAgain();
}

public void materializeRacToRowFrames(RowsAndColumns rac, RowSignature outputSignature, AtomicInteger rowId)
Fixed Show fixed Hide fixed
{
final int numRows = rac.numRows();

BitSet rowsToSkip = null;

long remainingRowsToSkip = 0;
long remainingRowsToFetch = Integer.MAX_VALUE;

rowId.set(0);
for (; rowId.get() < numRows && remainingRowsToFetch > 0; rowId.incrementAndGet()) {
final int theId = rowId.get();
if (rowsToSkip != null && rowsToSkip.get(theId)) {
continue;
}
if (remainingRowsToSkip > 0) {
Fixed Show fixed Hide fixed
remainingRowsToSkip--;
continue;
}
remainingRowsToFetch--;
frameWriter.addSelection();
}
}

private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
{
if (frameWriter == null) {
final ColumnSelectorFactoryMaker csfm = ColumnSelectorFactoryMaker.fromRAC(rac);
final ColumnSelectorFactory frameWriterColumnSelectorFactory = csfm.make(rowId);
final FrameWriterFactory frameWriterFactory = getFrameWriterFactory();
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}

private long flushFrameWriter() throws IOException
{
if (frameWriter != null && frameWriter.getNumRows() > 0) {
final Frame frame = Frame.wrap(frameWriter.toByteArray());
Iterables.getOnlyElement(outputChannels()).write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
frameWriter.close();
frameWriter = null;
return frame.numRows();
} else {
if (frameWriter != null) {
frameWriter.close();
frameWriter = null;
}

return 0;
}
}
}
Loading
Loading