From 91edf9ea2eb2e0e5b0c1b345f2ccad15a0a1df18 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Wed, 20 Mar 2024 16:25:23 -0700 Subject: [PATCH] Adding guardrails for materialization and avoiding partition bossting in presence of partition by in next window --- .../org/apache/druid/msq/exec/Limits.java | 7 ++ .../error/TooManyRowsInAWindowFault.java | 81 +++++++++++++++++++ .../WindowOperatorQueryFrameProcessor.java | 26 ++++++ .../msq/querykit/groupby/GroupByQueryKit.java | 76 +++++++++-------- .../druid/msq/querykit/scan/ScanQueryKit.java | 5 +- .../msq/util/MultiStageQueryContext.java | 2 + .../apache/druid/msq/exec/MSQWindowTest.java | 26 +++++- 7 files changed, 180 insertions(+), 43 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java index fd2d02f28e9c..99d307b22f59 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java @@ -96,4 +96,11 @@ public class Limits * Max number of partition buckets for ingestion queries. */ public static final int MAX_PARTITION_BUCKETS = 5_000; + + /** + * Max number of rows with the same key in a window. This acts as a guardrail for + * data distribution with high cardinality + */ + public static final int MAX_ROWS_MATERIALIZED_IN_FRAMES = 100_000; + } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java new file mode 100644 index 000000000000..a926910d37cb --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Objects; + +@JsonTypeName(TooManyRowsInAWindowFault.CODE) +public class TooManyRowsInAWindowFault extends BaseMSQFault +{ + + static final String CODE = "TooManyRowsInAWindow"; + + private final int numRows; + private final int maxRows; + + @JsonCreator + public TooManyRowsInAWindowFault( + @JsonProperty("numRows") final int numRows, + @JsonProperty("maxRows") final int maxRows + ) + { + super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows); + this.numRows = numRows; + this.maxRows = maxRows; + } + + @JsonProperty + public int getNumRows() + { + return numRows; + } + + @JsonProperty + public int getMaxRows() + { + return maxRows; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TooManyRowsInAWindowFault that = (TooManyRowsInAWindowFault) o; + return numRows == that.numRows && maxRows == that.maxRows; + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), numRows, maxRows); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 82e583f37b6b..455a98ae861a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -36,6 +36,10 @@ import org.apache.druid.frame.write.FrameWriterFactory; import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.OffsetLimit; @@ -81,6 +85,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final FrameReader frameReader; private final SettableLongVirtualColumn partitionBoostVirtualColumn; private final ArrayList objectsOfASingleRac; + private final int maxRowsMaterialized; List partitionColsIndex; private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private Cursor frameCursor = null; @@ -115,6 +120,12 @@ public WindowOperatorQueryFrameProcessor( this.objectsOfASingleRac = new ArrayList<>(); this.partitionColsIndex = new ArrayList<>(); this.isOverEmpty = isOverEmpty; + if (query.context() != null && query.context() + .containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { + maxRowsMaterialized = (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + } else { + maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_FRAMES; + } } private static VirtualColumns makeVirtualColumnsForFrameWriter( @@ -217,6 +228,9 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) // let all operators run on the giant rac when channel is finished if (inputChannel.canRead()) { final Frame frame = inputChannel.read(); + if (frame.numRows() > maxRowsMaterialized) { + throw new MSQException(new TooManyRowsInAWindowFault(frame.numRows(), maxRowsMaterialized)); + } convertRowFrameToRowsAndColumns(frame); } else if (inputChannel.isFinished()) { runAllOpsOnMultipleRac(frameRowsAndCols); @@ -259,6 +273,9 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) // write it into a rac // and run operators on it if (!objectsOfASingleRac.isEmpty()) { + if (objectsOfASingleRac.size() > maxRowsMaterialized) { + throw new MSQException(new TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized)); + } RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( objectsOfASingleRac, frameReader.signature() @@ -285,6 +302,12 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) // create rac from the rows seen before // run the operators on these rows and columns // clean up the object to hold the new rows only + if (objectsOfASingleRac.size() > maxRowsMaterialized) { + throw new MSQException(new TooManyRowsInAWindowFault( + objectsOfASingleRac.size(), + maxRowsMaterialized + )); + } RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow( objectsOfASingleRac, frameReader.signature() @@ -470,6 +493,9 @@ private void convertRowFrameToRowsAndColumns(Frame frame) null ); frameRowsAndCols.add(ldrc); + if (frameRowsAndColumns.numRows() > maxRowsMaterialized) { + throw new MSQException(new TooManyRowsInAWindowFault(frameRowsAndColumns.numRows(), maxRowsMaterialized)); + } } private List findPartitionColumns(RowSignature rowSignature) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index b7c0b46aaa02..76999b1add53 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -65,6 +65,25 @@ public GroupByQueryKit(ObjectMapper jsonMapper) this.jsonMapper = jsonMapper; } + /** + * Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all + * aggregations are nonfinalized. + */ + private static RowSignature computeIntermediateSignature(final GroupByQuery query) + { + final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO); + final RowSignature.Builder builder = RowSignature.builder(); + + for (int i = 0; i < query.getResultRowSizeWithoutPostAggregators(); i++) { + builder.add( + postAggregationSignature.getColumnName(i), + postAggregationSignature.getColumnType(i).orElse(null) + ); + } + + return builder.build(); + } + @Override public QueryDefinition makeQueryDefinition( final String queryId, @@ -167,17 +186,8 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); - final ShuffleSpec nextShuffleWindowSpec; - if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { - final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext() - .get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); - nextShuffleWindowSpec = new HashShuffleSpec( - windowClusterBy, - maxWorkerCount - ); - } else { - nextShuffleWindowSpec = null; - } + final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount); + if (nextShuffleWindowSpec == null) { queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 1) @@ -209,23 +219,11 @@ public QueryDefinition makeQueryDefinition( ); } } else { - final ShuffleSpec shuffleSpecWithPartitionByFromWindow; - if (shuffleSpecFactoryPostAggregation != null) { - List columns = resultClusterBy.getColumns(); - columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns()); - // Creating a new cluster by with the columns from existing - // plus the columns from the next window partition column - final ClusterBy tmp = new ClusterBy(columns, resultClusterBy.getBucketByCount()); - shuffleSpecWithPartitionByFromWindow = shuffleSpecFactoryPostAggregation.build(tmp, false); - } else { - shuffleSpecWithPartitionByFromWindow = nextShuffleWindowSpec; - } final RowSignature stageSignature; - // sort the signature to make sure the prefix is aligned stageSignature = QueryKitUtils.sortableSignature( resultSignature, - shuffleSpecWithPartitionByFromWindow.clusterBy().getColumns() + nextShuffleWindowSpec.clusterBy().getColumns() ); queryDefBuilder.add( @@ -233,9 +231,7 @@ public QueryDefinition makeQueryDefinition( .inputs(new StageInputSpec(firstStageNumber)) .signature(stageSignature) .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecWithPartitionByFromWindow - ) + .shuffleSpec(nextShuffleWindowSpec) .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) ); } @@ -244,22 +240,24 @@ public QueryDefinition makeQueryDefinition( } /** - * Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all - * aggregations are nonfinalized. + * @param originalQuery which has the context for the next shuffle if that's present in the next window + * @param maxWorkerCount max worker count + * @return shuffle spec without partition boosting for next stage */ - private static RowSignature computeIntermediateSignature(final GroupByQuery query) + private ShuffleSpec getShuffleSpecForNextWindow(GroupByQuery originalQuery, int maxWorkerCount) { - final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO); - final RowSignature.Builder builder = RowSignature.builder(); - - for (int i = 0; i < query.getResultRowSizeWithoutPostAggregators(); i++) { - builder.add( - postAggregationSignature.getColumnName(i), - postAggregationSignature.getColumnType(i).orElse(null) + final ShuffleSpec nextShuffleWindowSpec; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext() + .get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount ); + } else { + nextShuffleWindowSpec = null; } - - return builder.build(); + return nextShuffleWindowSpec; } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 4c6b95f547a4..8bc6f0bfa96d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -138,10 +138,12 @@ public QueryDefinition makeQueryDefinition( ); } + // Update partition by of next window final RowSignature signatureSoFar = signatureBuilder.build(); boolean addShuffle = true; if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { - final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext() + .get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); for (KeyColumn c : windowClusterBy.getColumns()) { if (!signatureSoFar.contains(c.columnName())) { addShuffle = false; @@ -157,6 +159,7 @@ public QueryDefinition makeQueryDefinition( signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); } + final ClusterBy clusterBy = QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity); shuffleSpec = resultShuffleSpecFactory.build(clusterBy, false); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 4fc942ab2d72..121fcb64c0ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -156,6 +156,8 @@ public class MultiStageQueryContext public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol"; + public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "__maxRowsMaterializedInWindow"; + public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 65758e9e9292..bd0805ece3bb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -27,8 +27,10 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; +import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; @@ -857,8 +859,6 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns() } - - @Test public void testWindowOnFooWithNoGroupByAndEmptyOver() { @@ -1167,7 +1167,10 @@ public void testWindowOnFooWithDim2() final Map contextWithRowSignature = ImmutableMap.builder() .putAll(context) - .put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]") + .put( + DruidQuery.CTX_SCAN_SIGNATURE, + "[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]" + ) .build(); final WindowOperatorQuery query = new WindowOperatorQuery( @@ -1730,4 +1733,21 @@ public void testSelectWithWikipedia() .setQueryContext(context) .verifyResults(); } + + @Test + public void testSelectWithWikipediaEmptyOverWithCustomContext() + { + final Map customContext = + ImmutableMap.builder() + .putAll(context) + .put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200) + .build(); + + testSelectQuery() + .setSql( + "select cityName, added, SUM(added) OVER () cc from wikipedia") + .setQueryContext(customContext) + .setExpectedMSQFault(new TooManyRowsInAWindowFault(15676, 200)) + .verifyResults(); + } }