diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 6933af8f4315..6e91b19df4d4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -34,29 +34,23 @@ import org.apache.druid.frame.util.SettableLongVirtualColumn; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; -import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; -import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.Cursor; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; -import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.column.NullableTypeStrategy; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -66,37 +60,25 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; public class WindowOperatorQueryFrameProcessor implements FrameProcessor { private static final Logger log = new Logger(WindowOperatorQueryFrameProcessor.class); - private final WindowOperatorQuery query; private final List operatorFactoryList; - private final List partitionColumnNames; - private final ObjectMapper jsonMapper; - private final ArrayList frameRowsAndCols; private final ArrayList resultRowAndCols; + private final RowsAndColumnsBuilder frameRowsAndColsBuilder; private final ReadableFrameChannel inputChannel; private final WritableFrameChannel outputChannel; private final FrameWriterFactory frameWriterFactory; private final FrameReader frameReader; private final int maxRowsMaterialized; - private Cursor frameCursor = null; - private Supplier rowSupplierFromFrameCursor; - private ResultRow outputRow = null; private FrameWriter frameWriter = null; private final VirtualColumns frameWriterVirtualColumns; private final SettableLongVirtualColumn partitionBoostVirtualColumn; - // List of type strategies to compare the partition columns across rows. - // Type strategies are pushed in the same order as column types in frameReader.signature() - private final NullableTypeStrategy[] typeStrategies; - - private final ArrayList rowsToProcess; - private int lastPartitionIndex = -1; + private Operator operator = null; final AtomicInteger rowId = new AtomicInteger(0); @@ -107,29 +89,18 @@ public WindowOperatorQueryFrameProcessor( FrameWriterFactory frameWriterFactory, FrameReader frameReader, ObjectMapper jsonMapper, - final List operatorFactoryList, - final RowSignature rowSignature, - final int maxRowsMaterializedInWindow, - final List partitionColumnNames + final List operatorFactoryList ) { this.inputChannel = inputChannel; this.outputChannel = outputChannel; this.frameWriterFactory = frameWriterFactory; this.operatorFactoryList = operatorFactoryList; - this.jsonMapper = jsonMapper; - this.query = query; - this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); - this.rowsToProcess = new ArrayList<>(); - this.maxRowsMaterialized = maxRowsMaterializedInWindow; - this.partitionColumnNames = partitionColumnNames; + this.maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(query.context()); + this.frameRowsAndColsBuilder = new RowsAndColumnsBuilder(this.maxRowsMaterialized); this.frameReader = frameReader; - this.typeStrategies = new NullableTypeStrategy[frameReader.signature().size()]; - for (int i = 0; i < frameReader.signature().size(); i++) { - typeStrategies[i] = frameReader.signature().getColumnType(i).get().getNullableStrategy(); - } // Get virtual columns to be added to the frame writer. this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN); @@ -141,6 +112,8 @@ public WindowOperatorQueryFrameProcessor( } frameWriterVirtualColumns.add(this.partitionBoostVirtualColumn); this.frameWriterVirtualColumns = VirtualColumns.create(frameWriterVirtualColumns); + + initialiseOperator(); } @Override @@ -158,174 +131,68 @@ public List outputChannels() @Override public ReturnOrAwait runIncrementally(IntSet readableInputs) throws IOException { - /* - There are 2 scenarios: - - *** Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY *** - - In this scenario, we add all the RACs to a single RowsAndColumns to be processed. We do it via ConcatRowsAndColumns, and run all the operators on the ConcatRowsAndColumns. - This is done because we anyway need to run the operators on the entire set of rows when we have an OVER() clause without a PARTITION BY. - This scenario corresponds to partitionColumnNames.isEmpty()=true code flow. - - *** Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY *** - - In this scenario, we need to process rows for each PARTITION BY group together, but we can batch multiple PARTITION BY keys into the same RAC before passing it to the operators for processing. - Batching is fine since the operators list would have the required NaivePartitioningOperatorFactory to segregate each PARTITION BY group during the processing. - - The flow for this scenario can be summarised as following: - 1. Frame Reading and Cursor Initialization: We start by reading a frame from the inputChannel and initializing frameCursor to iterate over the rows in that frame. - 2. Row Comparison: For each row in the frame, we decide whether it belongs to the same PARTITION BY group as the previous row. - This is determined by comparePartitionKeys() method. - Please refer to the Javadoc of that method for further details and an example illustration. - 2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row, - they belong to the same PARTITION BY group, and gets added to rowsToProcess. - If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processRowsUpToLastPartition() method. - 2.2. If they don't match, then we have reached a partition boundary. - In this case, we update the value for lastPartitionIndex. - 3. End of Input: If the input channel is finished, any remaining rows in rowsToProcess are processed. - - *Illustration of Row Comparison step* - - Let's say we have window_function() OVER (PARTITION BY A ORDER BY B) in our query, and we get 3 frames in the input channel to process. - - Frame 1 - A, B - 1, 2 - 1, 3 - 2, 1 --> PARTITION BY key (column A) changed from 1 to 2. - 2, 2 - - Frame 2 - A, B - 3, 1 --> PARTITION BY key (column A) changed from 2 to 3. - 3, 2 - 3, 3 - 3, 4 - - Frame 3 - A, B - 3, 5 - 3, 6 - 4, 1 --> PARTITION BY key (column A) changed from 3 to 4. - 4, 2 - - *Why batching?* - We batch multiple PARTITION BY keys for processing together to avoid the overhead of creating different RACs for each PARTITION BY keys, as that would be unnecessary in scenarios where we have a large number of PARTITION BY keys, but each key having a single row. - - *Future thoughts: https://github.com/apache/druid/issues/16126* - Current approach with R&C and operators materialize a single R&C for processing. In case of data with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause. - Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data. - */ - // If there are rows pending flush, flush them and run again before processing any more rows. if (frameHasRowsPendingFlush()) { flushAllRowsAndCols(); return ReturnOrAwait.runAgain(); } - if (partitionColumnNames.isEmpty()) { - // Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY. - if (inputChannel.canRead()) { - final Frame frame = inputChannel.read(); - convertRowFrameToRowsAndColumns(frame); - return ReturnOrAwait.runAgain(); - } - - if (inputChannel.isFinished()) { - // If no rows are flushed yet, process all rows. - if (rowId.get() == 0) { - runAllOpsOnMultipleRac(frameRowsAndCols); - } - - // If there are still rows pending after operations, run again. - if (frameHasRowsPendingFlush()) { - return ReturnOrAwait.runAgain(); - } - return ReturnOrAwait.returnObject(Unit.instance()); - } - return ReturnOrAwait.awaitAll(inputChannels().size()); - } - - // Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY - if (frameCursor == null || frameCursor.isDone()) { - if (readableInputs.isEmpty()) { - return ReturnOrAwait.awaitAll(1); - } + if (inputChannel.canRead()) { + final Frame frame = inputChannel.read(); + LazilyDecoratedRowsAndColumns ldrc = convertRowFrameToRowsAndColumns(frame); + frameRowsAndColsBuilder.add(ldrc); - if (inputChannel.canRead()) { - final Frame frame = inputChannel.read(); - frameCursor = FrameProcessors.makeCursor(frame, frameReader); - makeRowSupplierFromFrameCursor(); - } else if (inputChannel.isFinished()) { - // If we have some rows pending processing, process them. - // We run it again as it's possible that frame writer's capacity got reached and some output rows are - // pending flush to the output channel. - if (!rowsToProcess.isEmpty()) { - lastPartitionIndex = rowsToProcess.size() - 1; - processRowsUpToLastPartition(); - return ReturnOrAwait.runAgain(); - } - return ReturnOrAwait.returnObject(Unit.instance()); - } else { - return ReturnOrAwait.runAgain(); + if (needToProcessBatch()) { + runAllOpsOnBatch(); + flushAllRowsAndCols(); } - } - - while (!frameCursor.isDone()) { - final ResultRow currentRow = rowSupplierFromFrameCursor.get(); - if (outputRow == null) { - outputRow = currentRow; - rowsToProcess.add(currentRow); - } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { - // Add current row to the same batch of rows for processing. - rowsToProcess.add(currentRow); - } else { - lastPartitionIndex = rowsToProcess.size() - 1; - outputRow = currentRow.copy(); - rowsToProcess.add(currentRow); + return ReturnOrAwait.runAgain(); + } else if (inputChannel.isFinished()) { + if (rowId.get() == 0) { + runAllOpsOnBatch(); } - frameCursor.advance(); - if (rowsToProcess.size() > maxRowsMaterialized) { - // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. - processRowsUpToLastPartition(); - ensureMaxRowsInAWindowConstraint(rowsToProcess.size()); + // If there are still rows pending after operations, run again. + if (frameHasRowsPendingFlush()) { return ReturnOrAwait.runAgain(); } + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + return ReturnOrAwait.awaitAll(inputChannels().size()); } - return ReturnOrAwait.runAgain(); } - /** - * @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link ConcatRowsAndColumns} to use as a single input for the operators to be run - */ - private void runAllOpsOnMultipleRac(ArrayList listOfRacs) + private void initialiseOperator() { - Operator op = new Operator() + operator = new Operator() { @Nullable @Override public Closeable goOrContinue(Closeable continuationObject, Receiver receiver) { - RowsAndColumns rac = new ConcatRowsAndColumns(listOfRacs); - ensureMaxRowsInAWindowConstraint(rac.numRows()); + RowsAndColumns rac = frameRowsAndColsBuilder.build(); + ensureMaxRowsInAWindowConstraint(rac.numRows(), maxRowsMaterialized); receiver.push(rac); - receiver.completed(); - return null; + + if (inputChannel.isFinished()) { + // Only call completed() when the input channel is finished. + receiver.completed(); + return null; // Signal that the operator has completed its work + } + + // Return a non-null continuation object to indicate that we want to continue processing. + return () -> { + }; } }; - runOperatorsAfterThis(op); + for (OperatorFactory of : operatorFactoryList) { + operator = of.wrap(operator); + } } - /** - * @param op Base operator for the operators to be run. Other operators are wrapped under this to run - */ - private void runOperatorsAfterThis(Operator op) + private void runAllOpsOnBatch() { - for (OperatorFactory of : operatorFactoryList) { - op = of.wrap(op); - } - Operator.go(op, new Operator.Receiver() + operator.goOrContinue(null, new Operator.Receiver() { @Override public Operator.Signal push(RowsAndColumns rac) @@ -349,6 +216,7 @@ public void completed() /** * Flushes {@link #resultRowAndCols} to the frame starting from {@link #rowId}, upto the frame writer's capacity. + * * @throws IOException */ private void flushAllRowsAndCols() throws IOException @@ -359,7 +227,7 @@ private void flushAllRowsAndCols() throws IOException } /** - * @param rac The frame writer to write this {@link RowsAndColumns} object + * @param rac The frame writer to write this {@link RowsAndColumns} object */ private void createFrameWriterIfNeeded(RowsAndColumns rac) { @@ -373,7 +241,7 @@ private void createFrameWriterIfNeeded(RowsAndColumns rac) } /** - * @param rac {@link RowsAndColumns} to be written to frame + * @param rac {@link RowsAndColumns} to be written to frame * @throws IOException */ public void writeRacToFrame(RowsAndColumns rac) throws IOException @@ -432,9 +300,10 @@ private long flushFrameWriter() throws IOException /** * @param frame Row based frame to be converted to a {@link RowsAndColumns} object - * Throw an exception if the resultant rac used goes above the guardrail value + * Throw an exception if the resultant rac used goes above the guardrail value + * @return A {@link LazilyDecoratedRowsAndColumns} encapsulating the frame. */ - private void convertRowFrameToRowsAndColumns(Frame frame) + private LazilyDecoratedRowsAndColumns convertRowFrameToRowsAndColumns(Frame frame) { final RowSignature signature = frameReader.signature(); RowBasedFrameRowsAndColumns frameRowsAndColumns = new RowBasedFrameRowsAndColumns(frame, signature); @@ -445,100 +314,63 @@ private void convertRowFrameToRowsAndColumns(Frame frame) null, OffsetLimit.limit(Integer.MAX_VALUE), null, - null + null, + frameWriterFactory.allocatorCapacity() ); - // check if existing + newly added rows exceed guardrails - ensureMaxRowsInAWindowConstraint(frameRowsAndCols.size() + ldrc.numRows()); - frameRowsAndCols.add(ldrc); + return ldrc; } - /** - * Compare two rows based on the columns in partitionColumnNames. - * If the partitionColumnNames is empty, the method will end up returning true. - *

- * For example, say: - *

    - *
  • partitionColumnNames = ["d1", "d2"]
  • - *
  • frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}
  • - *
  • frameReader.signature.indexOf("d1") = 0
  • - *
  • frameReader.signature.indexOf("d2") = 1
  • - *
  • row1 = [d1_row1, d2_row1, p0_row1]
  • - *
  • row2 = [d1_row2, d2_row2, p0_row2]
  • - *
- *

- * Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise. - * Returning true would indicate that these 2 rows can be put into the same partition for window function processing. - */ - private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List partitionColumnNames) + private static void ensureMaxRowsInAWindowConstraint(int numRowsInWindow, int maxRowsMaterialized) { - int match = 0; - for (String columnName : partitionColumnNames) { - int i = frameReader.signature().indexOf(columnName); - if (ColumnType.STRING.equals(frameReader.signature().getColumnType(columnName).get()) && (row1.get(i) instanceof List || row2.get(i) instanceof List)) { - // special handling to reject MVDs - throw new UOE( - "Encountered a multi value column [%s]. Window processing does not support MVDs. " - + "Consider using UNNEST or MV_TO_ARRAY.", - columnName - ); - } - if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) { - match++; - } + if (numRowsInWindow > maxRowsMaterialized) { + throw new MSQException(new TooManyRowsInAWindowFault( + numRowsInWindow, + maxRowsMaterialized + )); } - return match == partitionColumnNames.size(); } - private void makeRowSupplierFromFrameCursor() + private boolean needToProcessBatch() { - final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); - final Supplier[] fieldSuppliers = new Supplier[frameReader.signature().size()]; - for (int i = 0; i < fieldSuppliers.length; i++) { - final ColumnValueSelector selector = - frameColumnSelectorFactory.makeColumnValueSelector(frameReader.signature().getColumnName(i)); - fieldSuppliers[i] = selector::getObject; - } - rowSupplierFromFrameCursor = () -> { - final ResultRow row = ResultRow.create(fieldSuppliers.length); - for (int i = 0; i < fieldSuppliers.length; i++) { - row.set(i, fieldSuppliers[i].get()); - } - return row; - }; + return frameRowsAndColsBuilder.getNumRows() >= maxRowsMaterialized / 2; // Can this be improved further? } - /** - * Process rows from rowsToProcess[0, lastPartitionIndex]. - */ - private void processRowsUpToLastPartition() + private static class RowsAndColumnsBuilder { - if (lastPartitionIndex == -1) { - return; + private final List racList; + private int totalRows; + private final int maxRowsMaterialized; + + public RowsAndColumnsBuilder(int maxRowsMaterialized) + { + this.racList = new ArrayList<>(); + this.totalRows = 0; + this.maxRowsMaterialized = maxRowsMaterialized; } - RowsAndColumns singleRac = MapOfColumnsRowsAndColumns.fromResultRowTillIndex( - rowsToProcess, - frameReader.signature(), - lastPartitionIndex - ); - ArrayList rowsAndColumns = new ArrayList<>(); - rowsAndColumns.add(singleRac); - runAllOpsOnMultipleRac(rowsAndColumns); - - // Remove elements in the range [0, lastPartitionIndex] from the list. - // The call to list.subList(a, b).clear() deletes the elements in the range [a, b - 1], - // causing the remaining elements to shift and start from index 0. - rowsToProcess.subList(0, lastPartitionIndex + 1).clear(); - lastPartitionIndex = -1; - } + public void add(RowsAndColumns rac) + { + racList.add(rac); + totalRows += rac.numRows(); + ensureMaxRowsInAWindowConstraint(getNumRows(), maxRowsMaterialized); + } - private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow) - { - if (numRowsInWindow > maxRowsMaterialized) { - throw new MSQException(new TooManyRowsInAWindowFault( - numRowsInWindow, - maxRowsMaterialized - )); + public int getNumRows() + { + return totalRows; + } + + public RowsAndColumns build() + { + ConcatRowsAndColumns concatRowsAndColumns = new ConcatRowsAndColumns(new ArrayList<>(racList)); + clear(); + return concatRowsAndColumns; + } + + public void clear() + { + racList.clear(); + totalRows = 0; } } @@ -561,7 +393,6 @@ private boolean frameHasRowsPendingFlush() private void clearRACBuffers() { - frameRowsAndCols.clear(); resultRowAndCols.clear(); rowId.set(0); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index 6ad7742672f9..2f97ffd74b4c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -27,7 +27,6 @@ import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; -import org.apache.druid.error.DruidException; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.processor.OutputChannel; import org.apache.druid.frame.processor.OutputChannelFactory; @@ -60,27 +59,17 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor private final WindowOperatorQuery query; private final List operatorList; private final RowSignature stageRowSignature; - private final int maxRowsMaterializedInWindow; - private final List partitionColumnNames; @JsonCreator public WindowOperatorQueryFrameProcessorFactory( @JsonProperty("query") WindowOperatorQuery query, @JsonProperty("operatorList") List operatorFactoryList, - @JsonProperty("stageRowSignature") RowSignature stageRowSignature, - @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow, - @JsonProperty("partitionColumnNames") List partitionColumnNames + @JsonProperty("stageRowSignature") RowSignature stageRowSignature ) { this.query = Preconditions.checkNotNull(query, "query"); this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator"); this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); - this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow; - - if (partitionColumnNames == null) { - throw DruidException.defensive("List of partition column names encountered as null."); - } - this.partitionColumnNames = partitionColumnNames; } @JsonProperty("query") @@ -95,24 +84,12 @@ public List getOperators() return operatorList; } - @JsonProperty("partitionColumnNames") - public List getPartitionColumnNames() - { - return partitionColumnNames; - } - @JsonProperty("stageRowSignature") public RowSignature getSignature() { return stageRowSignature; } - @JsonProperty("maxRowsMaterializedInWindow") - public int getMaxRowsMaterializedInWindow() - { - return maxRowsMaterializedInWindow; - } - @Override public ProcessorsAndChannels makeProcessors( StageDefinition stageDefinition, @@ -153,6 +130,7 @@ public ProcessorsAndChannels makeProcessors( readableInput -> { final OutputChannel outputChannel = outputChannels.get(readableInput.getStagePartition().getPartitionNumber()); + return new WindowOperatorQueryFrameProcessor( query, readableInput.getChannel(), @@ -160,10 +138,7 @@ public ProcessorsAndChannels makeProcessors( stageDefinition.createFrameWriterFactory(outputChannel.getFrameMemoryAllocator(), removeNullBytes), readableInput.getChannelFrameReader(), frameContext.jsonMapper(), - operatorList, - stageRowSignature, - maxRowsMaterializedInWindow, - partitionColumnNames + operatorList ); } ); @@ -190,16 +165,14 @@ public boolean equals(Object o) return false; } WindowOperatorQueryFrameProcessorFactory that = (WindowOperatorQueryFrameProcessorFactory) o; - return maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow - && Objects.equals(query, that.query) + return Objects.equals(query, that.query) && Objects.equals(operatorList, that.operatorList) - && Objects.equals(partitionColumnNames, that.partitionColumnNames) && Objects.equals(stageRowSignature, that.stageRowSignature); } @Override public int hashCode() { - return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, maxRowsMaterializedInWindow); + return Objects.hash(query, operatorList, stageRowSignature); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 02542f8e7366..bc789528b061 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.HashShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; @@ -35,10 +34,12 @@ import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.operator.AbstractPartitioningOperatorFactory; +import org.apache.druid.query.operator.AbstractSortOperatorFactory; import org.apache.druid.query.operator.ColumnWithDirection; -import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; -import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.GlueingPartitioningOperatorFactory; import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.PartitionSortOperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.operator.window.WindowOperatorFactory; import org.apache.druid.segment.column.ColumnType; @@ -68,20 +69,9 @@ public QueryDefinition makeQueryDefinition( int minStageNumber ) { - // Need to validate query first. - // Populate the group of operators to be processed at each stage. - // The size of the operators is the number of serialized stages. - // Later we should also check if these can be parallelized. - // Check if there is an empty OVER() clause or not. RowSignature rowSignature = originalQuery.getRowSignature(); log.info("Row signature received for query is [%s].", rowSignature); - boolean isEmptyOverPresent = originalQuery.getOperators() - .stream() - .filter(of -> of instanceof NaivePartitioningOperatorFactory) - .map(of -> (NaivePartitioningOperatorFactory) of) - .anyMatch(of -> of.getPartitionColumns().isEmpty()); - List> operatorList = getOperatorListFromQuery(originalQuery); log.info("Created operatorList with operator factories: [%s]", operatorList); @@ -112,135 +102,84 @@ public QueryDefinition makeQueryDefinition( final ShuffleSpec finalWindowStageShuffleSpec = resultShuffleSpecFactory.build(finalWindowClusterBy, false); final RowSignature finalWindowStageRowSignature = computeSignatureForFinalWindowStage(rowSignature, finalWindowClusterBy, segmentGranularity); - final int maxRowsMaterialized; - if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { - maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); - } else { - maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; - } + final int maxRowsMaterialized = MultiStageQueryContext.getMaxRowsMaterializedInWindow(originalQuery.context()); - if (isEmptyOverPresent) { - // Move everything to a single partition since we have to load all the data on a single worker anyway to compute empty over() clause. - log.info( - "Empty over clause is present in the query. Creating a single stage with all operator factories [%s].", - queryToRun.getOperators() - ); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber) - .inputs(new StageInputSpec(firstStageNumber - 1)) - .signature(finalWindowStageRowSignature) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) - .shuffleSpec(finalWindowStageShuffleSpec) - .processorFactory(new WindowOperatorQueryFrameProcessorFactory( - queryToRun, - queryToRun.getOperators(), - finalWindowStageRowSignature, - maxRowsMaterialized, - Collections.emptyList() - )) - ); - } else { - // There are multiple windows present in the query. - // Create stages for each window in the query. - // These stages will be serialized. - // The partition by clause of the next window will be the shuffle key for the previous window. - RowSignature.Builder bob = RowSignature.builder(); - RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); - log.info("Row signature received from last stage is [%s].", signatureFromInput); - - for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { - bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); - } + // There are multiple windows present in the query. + // Create stages for each window in the query. + // These stages will be serialized. + // The partition by clause of the next window will be the shuffle key for the previous window. + RowSignature.Builder bob = RowSignature.builder(); + RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); + log.info("Row signature received from last stage is [%s].", signatureFromInput); + + for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { + bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); + } - List partitionColumnNames = new ArrayList<>(); - - /* - operatorList is a List>, where each List corresponds to the operator factories - to be used for a different window stage. - - We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. - */ - for (int i = 0; i < operatorList.size(); i++) { - for (OperatorFactory operatorFactory : operatorList.get(i)) { - if (operatorFactory instanceof WindowOperatorFactory) { - List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); - - // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, - // since they need to be present in the row signature for this window stage. - for (String columnName : outputColumnNames) { - int indexInRowSignature = rowSignature.indexOf(columnName); - if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { - ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); - bob.add(columnName, columnType); - log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); - } else { - throw new ISE( - "Found unexpected column [%s] already present in row signature [%s].", - columnName, - rowSignature - ); - } + /* + operatorList is a List>, where each List corresponds to the operator factories + to be used for a different window stage. + + We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. + */ + for (int i = 0; i < operatorList.size(); i++) { + for (OperatorFactory operatorFactory : operatorList.get(i)) { + if (operatorFactory instanceof WindowOperatorFactory) { + List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); + + // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, + // since they need to be present in the row signature for this window stage. + for (String columnName : outputColumnNames) { + int indexInRowSignature = rowSignature.indexOf(columnName); + if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { + ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); + bob.add(columnName, columnType); + log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); + } else { + throw new ISE( + "Found unexpected column [%s] already present in row signature [%s].", + columnName, + rowSignature + ); } } } + } - final RowSignature intermediateSignature = bob.build(); - final RowSignature stageRowSignature; + final RowSignature intermediateSignature = bob.build(); + final RowSignature stageRowSignature; - if (i + 1 == operatorList.size()) { - stageRowSignature = finalWindowStageRowSignature; - nextShuffleSpec = finalWindowStageShuffleSpec; + if (i + 1 == operatorList.size()) { + stageRowSignature = finalWindowStageRowSignature; + nextShuffleSpec = finalWindowStageShuffleSpec; + } else { + nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); + if (nextShuffleSpec == null) { + stageRowSignature = intermediateSignature; } else { - nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); - if (nextShuffleSpec == null) { - stageRowSignature = intermediateSignature; - } else { - stageRowSignature = QueryKitUtils.sortableSignature( - intermediateSignature, - nextShuffleSpec.clusterBy().getColumns() - ); - } - } - - log.info("Using row signature [%s] for window stage.", stageRowSignature); - - boolean partitionOperatorExists = false; - List currentPartitionColumns = new ArrayList<>(); - for (OperatorFactory of : operatorList.get(i)) { - if (of instanceof NaivePartitioningOperatorFactory) { - for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { - currentPartitionColumns.add(s); - partitionOperatorExists = true; - } - } - } - - if (partitionOperatorExists) { - partitionColumnNames = currentPartitionColumns; + stageRowSignature = QueryKitUtils.sortableSignature( + intermediateSignature, + nextShuffleSpec.clusterBy().getColumns() + ); } + } - log.info( - "Columns which would be used to define partitioning boundaries for this window stage are [%s]", - partitionColumnNames - ); + log.info("Using row signature [%s] for window stage.", stageRowSignature); - queryDefBuilder.add( - StageDefinition.builder(firstStageNumber + i) - .inputs(new StageInputSpec(firstStageNumber + i - 1)) - .signature(stageRowSignature) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) - .shuffleSpec(nextShuffleSpec) - .processorFactory(new WindowOperatorQueryFrameProcessorFactory( - queryToRun, - operatorList.get(i), - stageRowSignature, - maxRowsMaterialized, - partitionColumnNames - )) - ); - } + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber + i) + .inputs(new StageInputSpec(firstStageNumber + i - 1)) + .signature(stageRowSignature) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) + .shuffleSpec(nextShuffleSpec) + .processorFactory(new WindowOperatorQueryFrameProcessorFactory( + queryToRun, + getOperatorFactoryListForStageDefinition(operatorList.get(i), maxRowsMaterialized), + stageRowSignature + )) + ); } + return queryDefBuilder.build(); } @@ -287,13 +226,13 @@ private List> getOperatorListFromQuery(WindowOperatorQuery private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) { - NaivePartitioningOperatorFactory partition = null; - NaiveSortOperatorFactory sort = null; + AbstractPartitioningOperatorFactory partition = null; + AbstractSortOperatorFactory sort = null; for (OperatorFactory of : operatorFactories) { - if (of instanceof NaivePartitioningOperatorFactory) { - partition = (NaivePartitioningOperatorFactory) of; - } else if (of instanceof NaiveSortOperatorFactory) { - sort = (NaiveSortOperatorFactory) of; + if (of instanceof AbstractPartitioningOperatorFactory) { + partition = (AbstractPartitioningOperatorFactory) of; + } else if (of instanceof AbstractSortOperatorFactory) { + sort = (AbstractSortOperatorFactory) of; } } @@ -377,4 +316,37 @@ private static RowSignature computeSignatureForFinalWindowStage(RowSignature row finalWindowClusterBy.getColumns() ); } + + /** + * This method converts the operator chain received from native plan into MSQ plan. + * (NaiveSortOperator -> Naive/GlueingPartitioningOperator -> WindowOperator) is converted into (GlueingPartitioningOperator -> PartitionSortOperator -> WindowOperator). + * We rely on MSQ's shuffling to do the clustering on partitioning keys for us at every stage. + * This conversion allows us to blindly read N rows from input channel and push them into the operator chain, and repeat until the input channel isn't finished. + * @param operatorFactoryListFromQuery + * @param maxRowsMaterializedInWindow + * @return + */ + private List getOperatorFactoryListForStageDefinition(List operatorFactoryListFromQuery, int maxRowsMaterializedInWindow) + { + final List operatorFactoryList = new ArrayList<>(); + final List sortOperatorFactoryList = new ArrayList<>(); + for (OperatorFactory operatorFactory : operatorFactoryListFromQuery) { + if (operatorFactory instanceof AbstractPartitioningOperatorFactory) { + AbstractPartitioningOperatorFactory partition = (AbstractPartitioningOperatorFactory) operatorFactory; + operatorFactoryList.add(new GlueingPartitioningOperatorFactory(partition.getPartitionColumns(), maxRowsMaterializedInWindow)); + } else if (operatorFactory instanceof AbstractSortOperatorFactory) { + AbstractSortOperatorFactory sortOperatorFactory = (AbstractSortOperatorFactory) operatorFactory; + sortOperatorFactoryList.add(new PartitionSortOperatorFactory(sortOperatorFactory.getSortColumns())); + } else { + // Add all the PartitionSortOperator(s) before every window operator. + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + operatorFactoryList.add(operatorFactory); + } + } + + operatorFactoryList.addAll(sortOperatorFactoryList); + sortOperatorFactoryList.clear(); + return operatorFactoryList; + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 03cec9d192fc..9059a81ffe38 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -209,6 +209,14 @@ public static String getMSQMode(final QueryContext queryContext) ); } + public static int getMaxRowsMaterializedInWindow(final QueryContext queryContext) + { + return queryContext.getInt( + MAX_ROWS_MATERIALIZED_IN_WINDOW, + Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW + ); + } + public static int getMaxConcurrentStagesWithDefault( final QueryContext queryContext, final int defaultMaxConcurrentStages diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 5c8adbd89f62..effca1b06f76 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -2219,65 +2219,107 @@ public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers() 2, 0, "input0" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(13).bytes(1379).frames(1), + CounterSnapshotMatcher.with().rows(13).bytes(989).frames(1), 2, 0, "output" ) - // Stage 3, Worker 0 + // Stage 3, Worker 1 (window stage) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1), - 3, 0, "input0" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(330).frames(1), - 3, 0, "output" - ) - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(318).frames(1), - 3, 0, "shuffle" - ) - - // Stage 3, Worker 1 - .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(0, 3).bytes(0, 333).frames(0, 1), + CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 461).frames(0, 1), 3, 1, "input0" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(345).frames(1), + CounterSnapshotMatcher.with().rows(0, 6).bytes(0, 641).frames(0, 1), 3, 1, "output" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(333).frames(1), + CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(122, 132, 230, 235).frames(1, 1, 1, 1), 3, 1, "shuffle" ) - // Stage 3, Worker 2 + // Stage 3, Worker 2 (window stage) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(0, 0, 3).bytes(0, 0, 352).frames(0, 0, 1), + CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 114).frames(0, 0, 1), 3, 2, "input0" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(364).frames(1), + CounterSnapshotMatcher.with().rows(0, 0, 1).bytes(0, 0, 144).frames(0, 0, 1), 3, 2, "output" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(3).bytes(352).frames(1), + CounterSnapshotMatcher.with().rows(1).bytes(140).frames(1), 3, 2, "shuffle" ) - // Stage 3, Worker 3 + // Stage 3, Worker 3 (window stage) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 426).frames(0, 0, 0, 1), + CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 482).frames(0, 0, 0, 1), 3, 3, "input0" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(4).bytes(442).frames(1), + CounterSnapshotMatcher.with().rows(0, 0, 0, 6).bytes(0, 0, 0, 662).frames(0, 0, 0, 1), 3, 3, "output" ) .setExpectedCountersForStageWorkerChannel( - CounterSnapshotMatcher.with().rows(4).bytes(426).frames(1), + CounterSnapshotMatcher.with().rows(1, 1, 2, 2).bytes(143, 137, 222, 238).frames(1, 1, 1, 1), 3, 3, "shuffle" ) + + // Stage 4, Worker 0 + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1), + 4, 0, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(3).bytes(349).frames(1), + 4, 0, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(3).bytes(337).frames(1), + 4, 0, "shuffle" + ) + + // Stage 4, Worker 1 + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(0, 2).bytes(0, 235).frames(0, 1), + 4, 1, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(2).bytes(243).frames(1), + 4, 1, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(2).bytes(235).frames(1), + 4, 1, "shuffle" + ) + + // Stage 4, Worker 2 + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(0, 0, 4).bytes(0, 0, 418).frames(0, 0, 1), + 4, 2, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(4).bytes(434).frames(1), + 4, 2, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(4).bytes(418).frames(1), + 4, 2, "shuffle" + ) + + // Stage 4, Worker 3 + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(0, 0, 0, 4).bytes(0, 0, 0, 439).frames(0, 0, 0, 1), + 4, 3, "input0" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(4).bytes(455).frames(1), + 4, 3, "output" + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotMatcher.with().rows(4).bytes(439).frames(1), + 4, 3, "shuffle" + ) .verifyResults(); } @@ -2331,7 +2373,7 @@ public void testFailurePartitionByMVD_1() .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( CoreMatchers.instanceOf(ISE.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) + "Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) )) .verifyExecutionError(); } @@ -2350,7 +2392,7 @@ public void testFailurePartitionByMVD_2() .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( CoreMatchers.instanceOf(ISE.class), ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Encountered a multi value column [v0]. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) + "Encountered a multi value column. Window processing does not support MVDs. Consider using UNNEST or MV_TO_ARRAY.")) )) .verifyExecutionError(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java index 802a4c52f172..58affd228e17 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java @@ -28,7 +28,7 @@ public class WindowOperatorQueryFrameProcessorFactoryTest public void testEqualsAndHashcode() { EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class) - .withNonnullFields("query", "operatorList", "stageRowSignature", "maxRowsMaterializedInWindow", "partitionColumnNames") + .withNonnullFields("query", "operatorList", "stageRowSignature") .usingGetClass() .verify(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java index e5d191bbb5c3..02cb02360d93 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorTest.java @@ -22,8 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.frame.allocation.HeapMemoryAllocator; -import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; import org.apache.druid.frame.channel.BlockingQueueFrameChannel; import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.read.FrameReader; @@ -39,6 +38,7 @@ import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.test.LimitedFrameWriterFactory; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; @@ -115,7 +115,7 @@ public void testFrameWriterReachingCapacity() throws IOException final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory( FrameWriters.makeRowBasedFrameWriterFactory( - new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + new ArenaMemoryAllocatorFactory(1 << 20), outputSignature, Collections.emptyList(), false @@ -133,10 +133,7 @@ public void testFrameWriterReachingCapacity() throws IOException new ObjectMapper(), ImmutableList.of( new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) - ), - inputSignature, - 100, - ImmutableList.of("added") + ) ); exec.runFully(processor, null); @@ -160,47 +157,26 @@ public void testFrameWriterReachingCapacity() throws IOException } @Test - public void testBatchingOfPartitionByKeys_singleBatch() throws Exception - { - // With maxRowsMaterialized=100, we will get 1 frame: - // [1, 1, 2, 2, 2, 3, 3] - validateBatching(100, 1); - } - - @Test - public void testBatchingOfPartitionByKeys_multipleBatches_1() throws Exception - { - // With maxRowsMaterialized=5, we will get 2 frames: - // [1, 1, 2, 2, 2] - // [3, 3] - validateBatching(5, 2); - } - - @Test - public void testBatchingOfPartitionByKeys_multipleBatches_2() throws Exception + public void testProcessorRun() throws Exception { - // With maxRowsMaterialized=4, we will get 3 frames: - // [1, 1] - // [2, 2, 2] - // [3, 3] - validateBatching(4, 3); + runProcessor(100, 1); } @Test - public void testBatchingOfPartitionByKeys_TooManyRowsInAWindowFault() + public void testMaxRowsMaterializedConstraint() { final RuntimeException e = Assert.assertThrows( RuntimeException.class, - () -> validateBatching(2, 3) + () -> runProcessor(2, 3) ); MatcherAssert.assertThat( ((MSQException) e.getCause().getCause()).getFault(), CoreMatchers.instanceOf(TooManyRowsInAWindowFault.class) ); - Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many rows in a window (requested = 3, max = 2)")); + Assert.assertTrue(e.getMessage().contains("TooManyRowsInAWindow: Too many rows in a window (requested = 7, max = 2)")); } - public void validateBatching(int maxRowsMaterialized, int numFramesWritten) throws Exception + public void runProcessor(int maxRowsMaterialized, int expectedNumFramesWritten) throws Exception { final ReadableInput factChannel = buildWindowTestInputChannel(); @@ -234,7 +210,9 @@ public void validateBatching(int maxRowsMaterialized, int numFramesWritten) thro .context(new HashMap<>()) .build()), new LegacySegmentSpec(Intervals.ETERNITY), - new HashMap<>(), + new HashMap<>( + ImmutableMap.of(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, maxRowsMaterialized) + ), outputSignature, ImmutableList.of( new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) @@ -245,7 +223,7 @@ public void validateBatching(int maxRowsMaterialized, int numFramesWritten) thro // Limit output frames to 1 row to ensure we test edge cases final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory( FrameWriters.makeRowBasedFrameWriterFactory( - new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()), + new ArenaMemoryAllocatorFactory(1 << 20), outputSignature, Collections.emptyList(), false @@ -262,10 +240,7 @@ public void validateBatching(int maxRowsMaterialized, int numFramesWritten) thro new ObjectMapper(), ImmutableList.of( new WindowOperatorFactory(new WindowRowNumberProcessor("w0")) - ), - inputSignature, - maxRowsMaterialized, - ImmutableList.of("added") + ) ); exec.runFully(processor, null); @@ -278,7 +253,7 @@ public void validateBatching(int maxRowsMaterialized, int numFramesWritten) thro final List> rows = rowsFromProcessor.toList(); long actualNumFrames = Arrays.stream(channelCounters.snapshot().getFrames()).findFirst().getAsLong(); - Assert.assertEquals(numFramesWritten, actualNumFrames); + Assert.assertEquals(expectedNumFramesWritten, actualNumFrames); Assert.assertEquals(7, rows.size()); } diff --git a/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperator.java b/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperator.java new file mode 100644 index 000000000000..ed45ff5b8084 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperator.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.query.rowsandcols.RowsAndColumns; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractPartitioningOperator implements Operator +{ + protected final List partitionColumns; + protected final Operator child; + + public AbstractPartitioningOperator( + List partitionColumns, + Operator child + ) + { + this.partitionColumns = partitionColumns; + this.child = child; + } + + @Override + public Closeable goOrContinue(Closeable continuation, Receiver receiver) + { + if (continuation != null) { + Continuation cont = (Continuation) continuation; + + if (cont.iter != null) { + HandleContinuationResult handleContinuationResult = handleContinuation(receiver, cont); + if (!handleContinuationResult.needToContinueProcessing()) { + return handleContinuationResult.getContinuation(); + } + + if (cont.subContinuation == null) { + receiver.completed(); + return null; + } + } + + continuation = cont.subContinuation; + } + + AtomicReference> iterHolder = new AtomicReference<>(); + + final Closeable retVal = child.goOrContinue( + continuation, + createReceiver(receiver, iterHolder) + ); + + if (iterHolder.get() != null || retVal != null) { + return new Continuation( + iterHolder.get(), + retVal + ); + } else { + return null; + } + } + + protected abstract static class AbstractReceiver implements Receiver + { + protected final Receiver delegate; + protected final AtomicReference> iterHolder; + protected final List partitionColumns; + + public AbstractReceiver( + Receiver delegate, + AtomicReference> iterHolder, + List partitionColumns + ) + { + this.delegate = delegate; + this.iterHolder = iterHolder; + this.partitionColumns = partitionColumns; + } + + @Override + public Signal push(RowsAndColumns rac) + { + if (rac == null) { + throw DruidException.defensive("Should never get a null rac here."); + } + + Iterator partitionsIter = getIteratorForRAC(rac); + + Signal keepItGoing = Signal.GO; + while (keepItGoing == Signal.GO && partitionsIter.hasNext()) { + final RowsAndColumns rowsAndColumns = partitionsIter.next(); + keepItGoing = pushPartition(rowsAndColumns, !partitionsIter.hasNext(), keepItGoing); + } + + if (keepItGoing == Signal.PAUSE && partitionsIter.hasNext()) { + iterHolder.set(partitionsIter); + return Signal.PAUSE; + } + + return keepItGoing; + } + + @Override + public void completed() + { + if (iterHolder.get() == null) { + delegate.completed(); + } + } + + protected Signal pushPartition(RowsAndColumns partition, boolean isLastPartition, Signal previousSignal) + { + return delegate.push(partition); + } + + protected abstract Iterator getIteratorForRAC(RowsAndColumns rac); + } + + protected abstract HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont); + + protected abstract Receiver createReceiver(Receiver delegate, AtomicReference> iterHolder); + + protected HandleContinuationResult handleNonGoCases(Signal signal, Iterator iter, Receiver receiver, Continuation cont) + { + switch (signal) { + case PAUSE: + if (iter.hasNext()) { + return HandleContinuationResult.of(cont); + } + + if (cont.subContinuation == null) { + // We were finished anyway + receiver.completed(); + return HandleContinuationResult.of(null); + } + + return HandleContinuationResult.of(new Continuation(null, cont.subContinuation)); + + case STOP: + receiver.completed(); + try { + cont.close(); + } + catch (IOException e) { + throw new RE(e, "Unable to close continuation"); + } + return HandleContinuationResult.of(null); + + default: + throw new RE("Unknown signal[%s]", signal); + } + } + + protected static class Continuation implements Closeable + { + Iterator iter; + Closeable subContinuation; + + public Continuation(Iterator iter, Closeable subContinuation) + { + this.iter = iter; + this.subContinuation = subContinuation; + } + + @Override + public void close() throws IOException + { + if (subContinuation != null) { + subContinuation.close(); + } + } + } + + /** + * This helper class helps us distinguish whether we need to continue processing or not. + */ + protected static class HandleContinuationResult + { + private final Closeable continuation; + private final boolean continueProcessing; + + protected static final HandleContinuationResult CONTINUE_PROCESSING = new HandleContinuationResult(null, true); + + private HandleContinuationResult(Closeable continuation, boolean continueProcessing) + { + this.continuation = continuation; + this.continueProcessing = continueProcessing; + } + + protected static HandleContinuationResult of(Closeable closeable) + { + return new HandleContinuationResult(closeable, false); + } + + private boolean needToContinueProcessing() + { + return continueProcessing; + } + + private Closeable getContinuation() + { + return continuation; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperatorFactory.java new file mode 100644 index 000000000000..fffc1ec50f4d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/AbstractPartitioningOperatorFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public abstract class AbstractPartitioningOperatorFactory implements OperatorFactory +{ + protected final List partitionColumns; + + @JsonCreator + public AbstractPartitioningOperatorFactory( + @JsonProperty("partitionColumns") List partitionColumns + ) + { + this.partitionColumns = partitionColumns == null ? new ArrayList<>() : partitionColumns; + } + + @JsonProperty("partitionColumns") + public List getPartitionColumns() + { + return partitionColumns; + } + + @Override + public abstract Operator wrap(Operator op); + + @Override + public boolean validateEquivalent(OperatorFactory other) + { + if (other instanceof AbstractPartitioningOperatorFactory) { + return partitionColumns.equals(((AbstractPartitioningOperatorFactory) other).getPartitionColumns()); + } + return false; + } + + @Override + public int hashCode() + { + return Objects.hash(partitionColumns); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + AbstractPartitioningOperatorFactory other = (AbstractPartitioningOperatorFactory) obj; + return Objects.equals(partitionColumns, other.partitionColumns); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{partitionColumns=" + partitionColumns + "}"; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperator.java b/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperator.java new file mode 100644 index 000000000000..8adebbb210a7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import java.io.Closeable; +import java.util.List; + +/** + * Base class for sort operators. + */ +public abstract class AbstractSortOperator implements Operator +{ + protected final Operator child; + protected final List sortColumns; + + public AbstractSortOperator( + Operator child, + List sortColumns + ) + { + this.child = child; + this.sortColumns = sortColumns; + } + + @Override + public abstract Closeable goOrContinue(Closeable continuation, Receiver receiver); +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperatorFactory.java new file mode 100644 index 000000000000..7a9d9b4aab9c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/AbstractSortOperatorFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public abstract class AbstractSortOperatorFactory implements OperatorFactory +{ + protected final List sortColumns; + + @JsonCreator + public AbstractSortOperatorFactory( + @JsonProperty("columns") List sortColumns + ) + { + this.sortColumns = sortColumns; + } + + @JsonProperty("columns") + public List getSortColumns() + { + return sortColumns; + } + + @Override + public abstract Operator wrap(Operator op); + + @Override + public boolean validateEquivalent(OperatorFactory other) + { + if (other instanceof AbstractSortOperatorFactory) { + return sortColumns.equals(((AbstractSortOperatorFactory) other).getSortColumns()); + } + return false; + } + + @Override + public int hashCode() + { + return Objects.hash(sortColumns); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + AbstractSortOperatorFactory other = (AbstractSortOperatorFactory) obj; + return Objects.equals(sortColumns, other.sortColumns); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{sortColumns=" + sortColumns + "}"; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java b/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java new file mode 100644 index 000000000000..7601ec703b3a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperator.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns; +import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This glueing partitioning operator is supposed to continuously receive data, and output batches of partitioned RACs. + * It maintains a last-partitioning-boundary of the last-pushed-RAC, and attempts to glue it with the next RAC it receives, + * ensuring that partitions are handled correctly, even across multiple RACs. + *

+ * Additionally, this assumes that data has been pre-sorted according to the partitioning columns. + */ +public class GlueingPartitioningOperator extends AbstractPartitioningOperator +{ + private final int maxRowsMaterialized; + private final AtomicReference previousRacRef = new AtomicReference<>(null); + + private static final Integer MAX_ROWS_MATERIALIZED_NO_LIMIT = Integer.MAX_VALUE; + + public GlueingPartitioningOperator( + Operator child, + List partitionColumns + ) + { + this(child, partitionColumns, MAX_ROWS_MATERIALIZED_NO_LIMIT); + } + + public GlueingPartitioningOperator( + Operator child, + List partitionColumns, + Integer maxRowsMaterialized + ) + { + super(partitionColumns, child); + Preconditions.checkNotNull(maxRowsMaterialized, "maxRowsMaterialized cannot be null"); + this.maxRowsMaterialized = maxRowsMaterialized; + } + + @Override + protected HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont) + { + while (cont.iter.hasNext()) { + RowsAndColumns next = cont.iter.next(); + + if (!cont.iter.hasNext()) { + // We are at the last RAC. Process it only if subContinuation is null, otherwise save it in previousRac. + if (cont.subContinuation == null) { + receiver.push(next); + receiver.completed(); + return HandleContinuationResult.of(null); + } else { + previousRacRef.set(next); + break; + } + } + + final Signal signal = receiver.push(next); + if (signal != Signal.GO) { + return handleNonGoCases(signal, cont.iter, receiver, cont); + } + } + return HandleContinuationResult.CONTINUE_PROCESSING; + } + + private static class GlueingReceiver extends AbstractReceiver + { + private final AtomicReference previousRacRef; + private final int maxRowsMaterialized; + + public GlueingReceiver( + Receiver delegate, + AtomicReference> iterHolder, + AtomicReference previousRacRef, + List partitionColumns, + int maxRowsMaterialized + ) + { + super(delegate, iterHolder, partitionColumns); + this.previousRacRef = previousRacRef; + this.maxRowsMaterialized = maxRowsMaterialized; + } + + @Override + public Signal push(RowsAndColumns rac) + { + if (rac == null) { + throw DruidException.defensive("Should never get a null rac here."); + } + ensureMaxRowsMaterializedConstraint(rac.numRows(), maxRowsMaterialized); + return super.push(rac); + } + + @Override + public void completed() + { + if (previousRacRef.get() != null) { + delegate.push(previousRacRef.get()); + previousRacRef.set(null); + } + super.completed(); + } + + @Override + protected Signal pushPartition(RowsAndColumns partition, boolean isLastPartition, Signal previousSignal) + { + if (isLastPartition) { + // If it's the last partition, save it in previousRac instead of pushing to receiver. + previousRacRef.set(partition); + return previousSignal; + } else { + return super.pushPartition(partition, isLastPartition, previousSignal); + } + } + + @Override + protected Iterator getIteratorForRAC(RowsAndColumns rac) + { + return new GluedRACsIterator(rac, previousRacRef, partitionColumns, maxRowsMaterialized); + } + } + + /** + * Iterator implementation for gluing partitioned RowsAndColumns. + * It handles the boundaries of partitions within a single RAC and potentially glues + * the first partition of the current RAC with the previous RAC if needed. + */ + private static class GluedRACsIterator implements Iterator + { + private final RowsAndColumns rac; + private final int[] boundaries; + private int currentIndex = 0; + private final AtomicReference previousRacRef; + private final int maxRowsMaterialized; + private final List partitionColumns; + + public GluedRACsIterator(RowsAndColumns rac, AtomicReference previousRacRef, List partitionColumns, int maxRowsMaterialized) + { + this.rac = rac; + final ClusteredGroupPartitioner groupPartitioner = ClusteredGroupPartitioner.fromRAC(rac); + this.boundaries = groupPartitioner.computeBoundaries(partitionColumns); + this.previousRacRef = previousRacRef; + this.partitionColumns = partitionColumns; + this.maxRowsMaterialized = maxRowsMaterialized; + } + + @Override + public boolean hasNext() + { + return currentIndex < boundaries.length - 1; + } + + /** + * Retrieves the next partition in the RowsAndColumns. If the first partition has not been handled yet, + * it may be glued with the previous RowsAndColumns if the partition columns match. + * + * @return The next RowsAndColumns partition, potentially glued with the previous one. + * @throws NoSuchElementException if there are no more partitions. + */ + @Override + public RowsAndColumns next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + int start = boundaries[currentIndex]; + int end = boundaries[currentIndex + 1]; + + if (previousRacRef.get() != null) { + if (currentIndex != 0) { + throw new ISE("previousRac should be non-null only while handling the first partition boundary."); + } + + final RowsAndColumns previousRac = previousRacRef.get(); + previousRacRef.set(null); + + final LimitedRowsAndColumns limitedRAC = new LimitedRowsAndColumns(rac, start, end); + + final ConcatRowsAndColumns concatRacForFirstPartition = getConcatRacForFirstPartition(previousRac, limitedRAC); + if (isGlueingNeeded(concatRacForFirstPartition)) { + ensureMaxRowsMaterializedConstraint(concatRacForFirstPartition.numRows(), maxRowsMaterialized); + currentIndex++; + return concatRacForFirstPartition; + } else { + return previousRac; + } + } + + // If previousRac is null, just return the next partitioned RAC. + currentIndex++; + return new LimitedRowsAndColumns(rac, start, end); + } + + /** + * Determines whether glueing is needed between 2 RACs represented as a ConcatRowsAndColumns, by comparing a row belonging to each RAC. + * We do this by comparing the first and last rows of the Concat RAC, as they would belong to the two respective RACs. + * If the columns match, we can glue the 2 RACs and use the ConcatRAC. + * @param rac A {@link ConcatRowsAndColumns containing 2 RACs} + * @return true if gluing is needed, false otherwise. + */ + private boolean isGlueingNeeded(ConcatRowsAndColumns rac) + { + for (String column : partitionColumns) { + final Column theCol = rac.findColumn(column); + if (theCol == null) { + throw new ISE("Partition column [%s] not found in RAC.", column); + } + final ColumnAccessor accessor = theCol.toAccessor(); + int comparison = accessor.compareRows(0, rac.numRows() - 1); + if (comparison != 0) { + return false; + } + } + return true; + } + + private ConcatRowsAndColumns getConcatRacForFirstPartition(RowsAndColumns previousRac, RowsAndColumns firstPartitionOfCurrentRac) + { + if (previousRac == null) { + return new ConcatRowsAndColumns(new ArrayList<>(Collections.singletonList(firstPartitionOfCurrentRac))); + } + return new ConcatRowsAndColumns(new ArrayList<>(Arrays.asList(previousRac, firstPartitionOfCurrentRac))); + } + } + + private static void ensureMaxRowsMaterializedConstraint(int numRows, int maxRowsMaterialized) + { + if (numRows > maxRowsMaterialized) { + throw InvalidInput.exception( + "Too many rows to process (requested = %d, max = %d).", + numRows, + maxRowsMaterialized + ); + } + } + + @Override + protected Receiver createReceiver(Receiver delegate, AtomicReference> iterHolder) + { + return new GlueingReceiver(delegate, iterHolder, previousRacRef, partitionColumns, maxRowsMaterialized); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactory.java new file mode 100644 index 000000000000..31a2686b3a93 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +public class GlueingPartitioningOperatorFactory extends AbstractPartitioningOperatorFactory +{ + private final Integer maxRowsMaterialized; + + @JsonCreator + public GlueingPartitioningOperatorFactory( + @JsonProperty("partitionColumns") List partitionColumns, + @JsonProperty("maxRowsMaterialized") Integer maxRowsMaterialized + ) + { + super(partitionColumns); + this.maxRowsMaterialized = maxRowsMaterialized; + } + + @JsonProperty("maxRowsMaterialized") + public Integer getMaxRowsMaterialized() + { + return maxRowsMaterialized; + } + + @Override + public Operator wrap(Operator op) + { + return new GlueingPartitioningOperator(op, partitionColumns, maxRowsMaterialized); + } + + @Override + public boolean validateEquivalent(OperatorFactory other) + { + if (!super.validateEquivalent(other)) { + return false; + } + + if (!(other instanceof GlueingPartitioningOperatorFactory)) { + return false; + } + + return Objects.equals(maxRowsMaterialized, ((GlueingPartitioningOperatorFactory) other).getMaxRowsMaterialized()); + } + + @Override + public String toString() + { + return "GlueingPartitioningOperatorFactory{" + + "partitionColumns=" + partitionColumns + + "maxRowsMaterialized=" + maxRowsMaterialized + + '}'; + } + + @Override + public final int hashCode() + { + return Objects.hash(partitionColumns, maxRowsMaterialized); + } + + @Override + public final boolean equals(Object obj) + { + return super.equals(obj) && + Objects.equals(maxRowsMaterialized, ((GlueingPartitioningOperatorFactory) obj).getMaxRowsMaterialized()); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java index 0c68c3eea276..cde6c8674b73 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java @@ -19,14 +19,9 @@ package org.apache.druid.query.operator; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.RE; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner; -import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner; -import java.io.Closeable; -import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -40,137 +35,50 @@ * Additionally, this assumes that data has been pre-sorted according to the partitioning columns. If it is * given data that has not been pre-sorted, an exception is expected to be thrown. */ -public class NaivePartitioningOperator implements Operator +public class NaivePartitioningOperator extends AbstractPartitioningOperator { - private final List partitionColumns; - private final Operator child; - public NaivePartitioningOperator( List partitionColumns, Operator child ) { - this.partitionColumns = partitionColumns; - this.child = child; + super(partitionColumns, child); } @Override - public Closeable goOrContinue(Closeable continuation, Receiver receiver) + protected HandleContinuationResult handleContinuation(Receiver receiver, Continuation cont) { - if (continuation != null) { - Continuation cont = (Continuation) continuation; - - if (cont.iter != null) { - while (cont.iter.hasNext()) { - final Signal signal = receiver.push(cont.iter.next()); - switch (signal) { - case GO: - break; - case PAUSE: - if (cont.iter.hasNext()) { - return cont; - } - - if (cont.subContinuation == null) { - // We were finished anyway - receiver.completed(); - return null; - } - - return new Continuation(null, cont.subContinuation); - case STOP: - receiver.completed(); - try { - cont.close(); - } - catch (IOException e) { - throw new RE(e, "Unable to close continutation"); - } - return null; - default: - throw new RE("Unknown signal[%s]", signal); - } - } - - if (cont.subContinuation == null) { - receiver.completed(); - return null; - } + while (cont.iter.hasNext()) { + final Signal signal = receiver.push(cont.iter.next()); + if (signal != Signal.GO) { + return handleNonGoCases(signal, cont.iter, receiver, cont); } - - continuation = cont.subContinuation; - } - - AtomicReference> iterHolder = new AtomicReference<>(); - - final Closeable retVal = child.goOrContinue( - continuation, - new Receiver() - { - @Override - public Signal push(RowsAndColumns rac) - { - if (rac == null) { - throw DruidException.defensive("Should never get a null rac here."); - } - ClusteredGroupPartitioner groupPartitioner = rac.as(ClusteredGroupPartitioner.class); - if (groupPartitioner == null) { - groupPartitioner = new DefaultClusteredGroupPartitioner(rac); - } - - Iterator partitionsIter = - groupPartitioner.partitionOnBoundaries(partitionColumns).iterator(); - - Signal keepItGoing = Signal.GO; - while (keepItGoing == Signal.GO && partitionsIter.hasNext()) { - keepItGoing = receiver.push(partitionsIter.next()); - } - - if (keepItGoing == Signal.PAUSE && partitionsIter.hasNext()) { - iterHolder.set(partitionsIter); - return Signal.PAUSE; - } - - return keepItGoing; - } - - @Override - public void completed() - { - if (iterHolder.get() == null) { - receiver.completed(); - } - } - } - ); - - if (iterHolder.get() != null || retVal != null) { - return new Continuation( - iterHolder.get(), - retVal - ); - } else { - return null; } + return HandleContinuationResult.CONTINUE_PROCESSING; } - private static class Continuation implements Closeable + private static class NaiveReceiver extends AbstractReceiver { - Iterator iter; - Closeable subContinuation; - - public Continuation(Iterator iter, Closeable subContinuation) + public NaiveReceiver( + Receiver delegate, + AtomicReference> iterHolder, + List partitionColumns + ) { - this.iter = iter; - this.subContinuation = subContinuation; + super(delegate, iterHolder, partitionColumns); } @Override - public void close() throws IOException + protected Iterator getIteratorForRAC(RowsAndColumns rac) { - if (subContinuation != null) { - subContinuation.close(); - } + final ClusteredGroupPartitioner groupPartitioner = ClusteredGroupPartitioner.fromRAC(rac); + return groupPartitioner.partitionOnBoundaries(partitionColumns).iterator(); } } + + @Override + protected Receiver createReceiver(Receiver delegate, AtomicReference> iterHolder) + { + return new NaiveReceiver(delegate, iterHolder, partitionColumns); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperatorFactory.java index c836007e77ef..2b243bdadc9d 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperatorFactory.java @@ -22,26 +22,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.ArrayList; import java.util.List; -import java.util.Objects; -public class NaivePartitioningOperatorFactory implements OperatorFactory +public class NaivePartitioningOperatorFactory extends AbstractPartitioningOperatorFactory { - private final List partitionColumns; - @JsonCreator public NaivePartitioningOperatorFactory( @JsonProperty("partitionColumns") List partitionColumns ) { - this.partitionColumns = partitionColumns == null ? new ArrayList<>() : partitionColumns; - } - - @JsonProperty("partitionColumns") - public List getPartitionColumns() - { - return partitionColumns; + super(partitionColumns); } @Override @@ -49,40 +39,4 @@ public Operator wrap(Operator op) { return new NaivePartitioningOperator(partitionColumns, op); } - - @Override - public boolean validateEquivalent(OperatorFactory other) - { - if (other instanceof NaivePartitioningOperatorFactory) { - return partitionColumns.equals(((NaivePartitioningOperatorFactory) other).getPartitionColumns()); - } - return false; - } - - @Override - public String toString() - { - return "NaivePartitioningOperatorFactory{" + - "partitionColumns=" + partitionColumns + - '}'; - } - - @Override - public final int hashCode() - { - return Objects.hash(partitionColumns); - } - - @Override - public final boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || obj.getClass() != getClass()) { - return false; - } - NaivePartitioningOperatorFactory other = (NaivePartitioningOperatorFactory) obj; - return Objects.equals(partitionColumns, other.partitionColumns); - } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java index 486d20482824..0d3e5de78767 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperator.java @@ -31,18 +31,14 @@ * that it has to accumulate all of the data of its child operator first before it can sort. This limitation * means that hopefully this operator is only planned in a very small number of circumstances. */ -public class NaiveSortOperator implements Operator +public class NaiveSortOperator extends AbstractSortOperator { - private final Operator child; - private final List sortColumns; - public NaiveSortOperator( Operator child, List sortColumns ) { - this.child = child; - this.sortColumns = sortColumns; + super(child, sortColumns); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperatorFactory.java index 623d0ed0fe5b..c281123a8f1d 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/NaiveSortOperatorFactory.java @@ -23,24 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import java.util.Objects; -public class NaiveSortOperatorFactory implements OperatorFactory +public class NaiveSortOperatorFactory extends AbstractSortOperatorFactory { - private final List sortColumns; - @JsonCreator public NaiveSortOperatorFactory( @JsonProperty("columns") List sortColumns ) { - this.sortColumns = sortColumns; - } - - @JsonProperty("columns") - public List getSortColumns() - { - return sortColumns; + super(sortColumns); } @Override @@ -48,38 +39,4 @@ public Operator wrap(Operator op) { return new NaiveSortOperator(op, sortColumns); } - - @Override - public boolean validateEquivalent(OperatorFactory other) - { - if (other instanceof NaiveSortOperatorFactory) { - return sortColumns.equals(((NaiveSortOperatorFactory) other).getSortColumns()); - } - return false; - } - - @Override - public int hashCode() - { - return Objects.hash(sortColumns); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - NaiveSortOperatorFactory other = (NaiveSortOperatorFactory) obj; - return Objects.equals(sortColumns, other.sortColumns); - } - - @Override - public String toString() - { - return "NaiveSortOperatorFactory{sortColumns=" + sortColumns + "}"; - } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/Operator.java b/processing/src/main/java/org/apache/druid/query/operator/Operator.java index 57bc1013fc44..6609f2455420 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java @@ -68,7 +68,7 @@ static void go(Operator op, Receiver receiver) * to indicate its degree of readiness for more data to be received. *

* If a Receiver returns a {@link Signal#PAUSE} signal, then if there is processing left to do, then it is expected - * that a non-null "continuation" object nwill be returned. This allows for flow control to be returned to the + * that a non-null "continuation" object will be returned. This allows for flow control to be returned to the * caller to, e.g., process another Operator or just exert backpressure. In this case, when the controller wants to * resume, it must call this method again and include the continuation object that it received. *

@@ -99,7 +99,7 @@ static void go(Operator op, Receiver receiver) * if there is any state that an Operator requires to be able to resume its processing, then it is expected that the * Operator will cast the object back to an instance of the type that it had originally returned. * - * @param receiver a receiver that will receiver data + * @param receiver a receiver that will receive data * @return null if processing is complete, non-null if the Receiver returned a {@link Signal#PAUSE} signal */ @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java index a97c332505cf..95410b98dd5e 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/OperatorFactory.java @@ -31,7 +31,9 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "naivePartition", value = NaivePartitioningOperatorFactory.class), + @JsonSubTypes.Type(name = "glueingPartition", value = GlueingPartitioningOperatorFactory.class), @JsonSubTypes.Type(name = "naiveSort", value = NaiveSortOperatorFactory.class), + @JsonSubTypes.Type(name = "partitionSort", value = PartitionSortOperatorFactory.class), @JsonSubTypes.Type(name = "window", value = WindowOperatorFactory.class), @JsonSubTypes.Type(name = "scan", value = ScanOperatorFactory.class), }) diff --git a/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperator.java b/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperator.java new file mode 100644 index 000000000000..68d10fd77557 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; + +/** + * This operator sorts rows inside partitioned RACs, on the sort columns. + * This operator expects to receive a "complete" partition of data. Each input RAC is expected to be a separate partition. + */ +public class PartitionSortOperator extends AbstractSortOperator +{ + public PartitionSortOperator( + Operator child, + List sortColumns + ) + { + super(child, sortColumns); + } + + @Override + public Closeable goOrContinue(Closeable continuation, Receiver receiver) + { + return child.goOrContinue( + continuation, + new Receiver() + { + @Override + public Signal push(RowsAndColumns rac) + { + NaiveSortMaker.NaiveSorter sorter = NaiveSortMaker.fromRAC(rac).make(new ArrayList<>(sortColumns)); + receiver.push(sorter.complete()); + return Signal.GO; + } + + @Override + public void completed() + { + receiver.completed(); + } + } + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperatorFactory.java b/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperatorFactory.java new file mode 100644 index 000000000000..39f7a9afe5b8 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/operator/PartitionSortOperatorFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class PartitionSortOperatorFactory extends AbstractSortOperatorFactory +{ + @JsonCreator + public PartitionSortOperatorFactory( + @JsonProperty("columns") List sortColumns + ) + { + super(sortColumns); + } + + @Override + public Operator wrap(Operator op) + { + return new PartitionSortOperator(op, sortColumns); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index bb35f6837976..a45c999b3c8c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -77,6 +77,7 @@ public class LazilyDecoratedRowsAndColumns implements RowsAndColumns private OffsetLimit limit; private LinkedHashSet viewableColumns; private List ordering; + private final Integer allocatorCapacity; public LazilyDecoratedRowsAndColumns( RowsAndColumns base, @@ -85,7 +86,8 @@ public LazilyDecoratedRowsAndColumns( VirtualColumns virtualColumns, OffsetLimit limit, List ordering, - LinkedHashSet viewableColumns + LinkedHashSet viewableColumns, + Long allocatorCapacity ) { this.base = base; @@ -95,6 +97,7 @@ public LazilyDecoratedRowsAndColumns( this.limit = limit; this.ordering = ordering; this.viewableColumns = viewableColumns; + this.allocatorCapacity = allocatorCapacity != null ? allocatorCapacity.intValue() : 200 << 20; } @Override @@ -268,7 +271,7 @@ private Pair materializeCursorFactory(CursorFactory cursor } final FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( - new ArenaMemoryAllocatorFactory(200 << 20), // 200 MB, because, why not? + new ArenaMemoryAllocatorFactory(allocatorCapacity), signature, sortColumns ); @@ -367,8 +370,7 @@ private Pair naiveMaterialize(RowsAndColumns rac) // is being left as an exercise for the future. final RowSignature.Builder sigBob = RowSignature.builder(); - final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(200 << 20); - + final ArenaMemoryAllocatorFactory memFactory = new ArenaMemoryAllocatorFactory(allocatorCapacity); for (String column : columnsToGenerate) { final Column racColumn = rac.findColumn(column); diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index a5c2528dd1bd..e64f086edd7f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -20,7 +20,6 @@ package org.apache.druid.query.rowsandcols; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn; @@ -29,7 +28,6 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; /** @@ -130,16 +128,7 @@ public boolean isNull(int rowNum) @Override public Object getObject(int rowNum) { - Object value = accessor.getObject(pointers[start + rowNum]); - if (ColumnType.STRING.equals(getType()) && value instanceof List) { - // special handling to reject MVDs - throw new UOE( - "Encountered a multi value column [%s]. Window processing does not support MVDs. " - + "Consider using UNNEST or MV_TO_ARRAY.", - name - ); - } - return value; + return accessor.getObject(pointers[start + rowNum]); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultRowsAndColumnsDecorator.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultRowsAndColumnsDecorator.java index 3cfcfeec6142..9710c5324512 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultRowsAndColumnsDecorator.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultRowsAndColumnsDecorator.java @@ -133,7 +133,8 @@ public RowsAndColumns restrictColumns(List columns) virtualColumns, offsetLimit, ordering, - columns == null ? null : new LinkedHashSet<>(columns) + columns == null ? null : new LinkedHashSet<>(columns), + null ); } diff --git a/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactoryTest.java new file mode 100644 index 000000000000..cb5b0404f8be --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorFactoryTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class GlueingPartitioningOperatorFactoryTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(GlueingPartitioningOperatorFactory.class) + .usingGetClass() + .verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorTest.java b/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorTest.java new file mode 100644 index 000000000000..6f0d70015884 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/GlueingPartitioningOperatorTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; +import org.apache.druid.query.operator.window.RowsAndColumnsHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class GlueingPartitioningOperatorTest +{ + @Test + public void testPartitioning() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column") + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2), + RowsAndColumnsHelper.expectedSingleColumnRac(1) + ) + .runToCompletion(op); + } + + @Test + public void testPartitioningWithMultipleRACs() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 2, 2, 1) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column") + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2), + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2), + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2), + RowsAndColumnsHelper.expectedSingleColumnRac(1) + ) + .runToCompletion(op); + } + + @Test + public void testPartitioningWithMultipleConcatenationBetweenRACs() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 2) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column") + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2) + ) + .runToCompletion(op); + } + + @Test + public void testPartitioningWithNoGlueing() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 3), + RowsAndColumnsHelper.makeSingleColumnRac(4, 5, 6), + RowsAndColumnsHelper.makeSingleColumnRac(7, 8, 9) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column") + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1), + RowsAndColumnsHelper.expectedSingleColumnRac(2), + RowsAndColumnsHelper.expectedSingleColumnRac(3), + RowsAndColumnsHelper.expectedSingleColumnRac(4), + RowsAndColumnsHelper.expectedSingleColumnRac(5), + RowsAndColumnsHelper.expectedSingleColumnRac(6), + RowsAndColumnsHelper.expectedSingleColumnRac(7), + RowsAndColumnsHelper.expectedSingleColumnRac(8), + RowsAndColumnsHelper.expectedSingleColumnRac(9) + ) + .runToCompletion(op); + } + + @Test + public void testPartitioningWithNoPartitionColumns() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + Collections.emptyList() + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 2, 1) + ) + .runToCompletion(op); + } + + @Test + public void testMaxRowsConstraintViolation() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column"), + 2 + ); + + Assert.assertThrows( + "Too many rows to process (requested = 3, max = 2).", + DruidException.class, + () -> new OperatorTestHelper().expectRowsAndColumns().runToCompletion(op) + ); + } + + @Test + public void testMaxRowsConstraintViolationWhenGlueing() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 3) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column"), + 3 + ); + + Assert.assertThrows( + "Too many rows to process (requested = 4, max = 3).", + DruidException.class, + () -> new OperatorTestHelper().expectRowsAndColumns().runToCompletion(op) + ); + } + + @Test + public void testMaxRowsConstraintWhenGlueing() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.makeSingleColumnRac(2, 2, 2) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column"), + 3 + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2, 2) + ) + .runToCompletion(op); + } + + @Test + public void testStopMidStream() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(1, 1, 1, 2, 2, 1) + ); + + GlueingPartitioningOperator op = new GlueingPartitioningOperator( + inlineScanOperator, + ImmutableList.of("column") + ); + + new OperatorTestHelper() + .expectAndStopAfter( + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2) + ) + .runToCompletion(op); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java index 07ab0c12aa0a..e6e340a25306 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java @@ -32,38 +32,46 @@ public class NaivePartitioningOperatorTest { @Test - public void testDefaultImplementation() + public void testPartitioning() { - RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap( - ImmutableMap.of( - "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}), - "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1, 2, 4, 4, 4) + ); + + NaivePartitioningOperator op = new NaivePartitioningOperator( + ImmutableList.of("column"), + inlineScanOperator + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0), + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(2), + RowsAndColumnsHelper.expectedSingleColumnRac(4, 4, 4) ) + .runToCompletion(op); + } + + @Test + public void testPartitioningWithMultipleRACs() + { + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1), + RowsAndColumnsHelper.makeSingleColumnRac(1, 2, 2, 2) ); NaivePartitioningOperator op = new NaivePartitioningOperator( - ImmutableList.of("sorted"), - InlineScanOperator.make(rac) + ImmutableList.of("column"), + inlineScanOperator ); new OperatorTestHelper() .expectRowsAndColumns( - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{0, 0, 0}) - .expectColumn("unsorted", new int[]{3, 54, 21}) - .allColumnsRegistered(), - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{1, 1}) - .expectColumn("unsorted", new int[]{1, 5}) - .allColumnsRegistered(), - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{2}) - .expectColumn("unsorted", new int[]{54}) - .allColumnsRegistered(), - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{4, 4, 4}) - .expectColumn("unsorted", new int[]{2, 3, 92}) - .allColumnsRegistered() + RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0), + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1), + RowsAndColumnsHelper.expectedSingleColumnRac(1), + RowsAndColumnsHelper.expectedSingleColumnRac(2, 2, 2) ) .runToCompletion(op); } @@ -71,26 +79,19 @@ public void testDefaultImplementation() @Test public void testStopMidStream() { - RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap( - ImmutableMap.of( - "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}), - "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) - ) + InlineScanOperator inlineScanOperator = InlineScanOperator.make( + RowsAndColumnsHelper.makeSingleColumnRac(0, 0, 0, 1, 1, 2, 4, 4, 4) ); NaivePartitioningOperator op = new NaivePartitioningOperator( - ImmutableList.of("sorted"), - InlineScanOperator.make(rac) + ImmutableList.of("column"), + inlineScanOperator ); new OperatorTestHelper() .expectAndStopAfter( - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{0, 0, 0}) - .expectColumn("unsorted", new int[]{3, 54, 21}), - new RowsAndColumnsHelper() - .expectColumn("sorted", new int[]{1, 1}) - .expectColumn("unsorted", new int[]{1, 5}) + RowsAndColumnsHelper.expectedSingleColumnRac(0, 0, 0), + RowsAndColumnsHelper.expectedSingleColumnRac(1, 1) ) .runToCompletion(op); } diff --git a/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorFactoryTest.java new file mode 100644 index 000000000000..43401c505ff5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorFactoryTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class PartitionSortOperatorFactoryTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(NaiveSortOperatorFactory.class) + .usingGetClass() + .verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorTest.java b/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorTest.java new file mode 100644 index 000000000000..83f4afb405d3 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/operator/PartitionSortOperatorTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.operator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.query.operator.window.RowsAndColumnsHelper; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Test; + +public class PartitionSortOperatorTest +{ + @Test + public void testDefaultImplementation() + { + RowsAndColumns rac1 = MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "sorted", new IntArrayColumn(new int[]{1, 1, 1, 2, 2, 1}), + "unsorted", new IntArrayColumn(new int[]{10, 10, 10, 20, 20, 11}) + ) + ); + + InlineScanOperator inlineScanOperator = InlineScanOperator.make(rac1); + + PartitionSortOperator op = new PartitionSortOperator( + inlineScanOperator, + ImmutableList.of(new ColumnWithDirection("unsorted", ColumnWithDirection.Direction.ASC)) + ); + + new OperatorTestHelper() + .expectRowsAndColumns( + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{1, 1, 1, 1, 2, 2}) + .expectColumn("unsorted", new int[]{10, 10, 10, 11, 20, 20}) + .allColumnsRegistered() + ) + .runToCompletion(op); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index cdc84620ab0f..e6ed59d5d1de 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -19,13 +19,16 @@ package org.apache.druid.query.operator.window; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; @@ -129,6 +132,18 @@ public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, Colu return this; } + public static RowsAndColumns makeSingleColumnRac(int... values) + { + return MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of("column", new IntArrayColumn(values)) + ); + } + + public static RowsAndColumnsHelper expectedSingleColumnRac(int... values) + { + return new RowsAndColumnsHelper().expectColumn("column", values).allColumnsRegistered(); + } + public ColumnHelper columnHelper(String column, int expectedSize, ColumnType expectedType) { if (this.expectedSize.get() == null) { diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index f6a10e011464..be5be4ef672f 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -44,6 +44,7 @@ public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColum null, OffsetLimit.limit(Integer.MAX_VALUE), null, + null, null ); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java index e2cee35a8e9a..a0f6319a6094 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/EvaluateRowsAndColumnsTest.java @@ -89,6 +89,7 @@ public void testMaterializeColumns() TestExprMacroTable.INSTANCE)), OffsetLimit.NONE, null, + null, null); // do the materialziation