Skip to content

Commit

Permalink
Adding guardrails for materialization and avoiding partition bossting…
Browse files Browse the repository at this point in the history
… in presence of partition by in next window
  • Loading branch information
somu-imply committed Mar 20, 2024
1 parent 60c6290 commit 91edf9e
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,11 @@ public class Limits
* Max number of partition buckets for ingestion queries.
*/
public static final int MAX_PARTITION_BUCKETS = 5_000;

/**
* Max number of rows with the same key in a window. This acts as a guardrail for
* data distribution with high cardinality
*/
public static final int MAX_ROWS_MATERIALIZED_IN_FRAMES = 100_000;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName(TooManyRowsInAWindowFault.CODE)
public class TooManyRowsInAWindowFault extends BaseMSQFault
{

static final String CODE = "TooManyRowsInAWindow";

private final int numRows;
private final int maxRows;

@JsonCreator
public TooManyRowsInAWindowFault(
@JsonProperty("numRows") final int numRows,
@JsonProperty("maxRows") final int maxRows
)
{
super(CODE, "Too many rows in a window (requested = %d, max = %d)", numRows, maxRows);
this.numRows = numRows;
this.maxRows = maxRows;
}

@JsonProperty
public int getNumRows()
{
return numRows;
}

@JsonProperty
public int getMaxRows()
{
return maxRows;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyRowsInAWindowFault that = (TooManyRowsInAWindowFault) o;
return numRows == that.numRows && maxRows == that.maxRows;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numRows, maxRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
Expand Down Expand Up @@ -81,6 +85,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameReader frameReader;
private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
List<Integer> partitionColsIndex;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
Expand Down Expand Up @@ -115,6 +120,12 @@ public WindowOperatorQueryFrameProcessor(
this.objectsOfASingleRac = new ArrayList<>();
this.partitionColsIndex = new ArrayList<>();
this.isOverEmpty = isOverEmpty;
if (query.context() != null && query.context()
.containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) {
maxRowsMaterialized = (int) query.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
} else {
maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_FRAMES;
}
}

private static VirtualColumns makeVirtualColumnsForFrameWriter(
Expand Down Expand Up @@ -217,6 +228,9 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
// let all operators run on the giant rac when channel is finished
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
if (frame.numRows() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(frame.numRows(), maxRowsMaterialized));
}
convertRowFrameToRowsAndColumns(frame);
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
Expand Down Expand Up @@ -259,6 +273,9 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
// write it into a rac
// and run operators on it
if (!objectsOfASingleRac.isEmpty()) {
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(objectsOfASingleRac.size(), maxRowsMaterialized));
}
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
objectsOfASingleRac,
frameReader.signature()
Expand All @@ -285,6 +302,12 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
// create rac from the rows seen before
// run the operators on these rows and columns
// clean up the object to hold the new rows only
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromResultRow(
objectsOfASingleRac,
frameReader.signature()
Expand Down Expand Up @@ -470,6 +493,9 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
null
);
frameRowsAndCols.add(ldrc);
if (frameRowsAndColumns.numRows() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(frameRowsAndColumns.numRows(), maxRowsMaterialized));
}
}

private List<Integer> findPartitionColumns(RowSignature rowSignature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ public GroupByQueryKit(ObjectMapper jsonMapper)
this.jsonMapper = jsonMapper;
}

/**
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
* aggregations are nonfinalized.
*/
private static RowSignature computeIntermediateSignature(final GroupByQuery query)
{
final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO);
final RowSignature.Builder builder = RowSignature.builder();

for (int i = 0; i < query.getResultRowSizeWithoutPostAggregators(); i++) {
builder.add(
postAggregationSignature.getColumnName(i),
postAggregationSignature.getColumnType(i).orElse(null)
);
}

return builder.build();
}

@Override
public QueryDefinition makeQueryDefinition(
final String queryId,
Expand Down Expand Up @@ -167,17 +186,8 @@ public QueryDefinition makeQueryDefinition(
partitionBoost
);

final ShuffleSpec nextShuffleWindowSpec;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
nextShuffleWindowSpec = new HashShuffleSpec(
windowClusterBy,
maxWorkerCount
);
} else {
nextShuffleWindowSpec = null;
}
final ShuffleSpec nextShuffleWindowSpec = getShuffleSpecForNextWindow(originalQuery, maxWorkerCount);

if (nextShuffleWindowSpec == null) {
queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
Expand Down Expand Up @@ -209,33 +219,19 @@ public QueryDefinition makeQueryDefinition(
);
}
} else {
final ShuffleSpec shuffleSpecWithPartitionByFromWindow;
if (shuffleSpecFactoryPostAggregation != null) {
List<KeyColumn> columns = resultClusterBy.getColumns();
columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns());
// Creating a new cluster by with the columns from existing
// plus the columns from the next window partition column
final ClusterBy tmp = new ClusterBy(columns, resultClusterBy.getBucketByCount());
shuffleSpecWithPartitionByFromWindow = shuffleSpecFactoryPostAggregation.build(tmp, false);
} else {
shuffleSpecWithPartitionByFromWindow = nextShuffleWindowSpec;
}
final RowSignature stageSignature;

// sort the signature to make sure the prefix is aligned
stageSignature = QueryKitUtils.sortableSignature(
resultSignature,
shuffleSpecWithPartitionByFromWindow.clusterBy().getColumns()
nextShuffleWindowSpec.clusterBy().getColumns()
);

queryDefBuilder.add(
StageDefinition.builder(firstStageNumber + 1)
.inputs(new StageInputSpec(firstStageNumber))
.signature(stageSignature)
.maxWorkerCount(maxWorkerCount)
.shuffleSpec(
shuffleSpecWithPartitionByFromWindow
)
.shuffleSpec(nextShuffleWindowSpec)
.processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun))
);
}
Expand All @@ -244,22 +240,24 @@ public QueryDefinition makeQueryDefinition(
}

/**
* Intermediate signature of a particular {@link GroupByQuery}. Does not include post-aggregators, and all
* aggregations are nonfinalized.
* @param originalQuery which has the context for the next shuffle if that's present in the next window
* @param maxWorkerCount max worker count
* @return shuffle spec without partition boosting for next stage
*/
private static RowSignature computeIntermediateSignature(final GroupByQuery query)
private ShuffleSpec getShuffleSpecForNextWindow(GroupByQuery originalQuery, int maxWorkerCount)
{
final RowSignature postAggregationSignature = query.getResultRowSignature(RowSignature.Finalization.NO);
final RowSignature.Builder builder = RowSignature.builder();

for (int i = 0; i < query.getResultRowSizeWithoutPostAggregators(); i++) {
builder.add(
postAggregationSignature.getColumnName(i),
postAggregationSignature.getColumnType(i).orElse(null)
final ShuffleSpec nextShuffleWindowSpec;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
nextShuffleWindowSpec = new HashShuffleSpec(
windowClusterBy,
maxWorkerCount
);
} else {
nextShuffleWindowSpec = null;
}

return builder.build();
return nextShuffleWindowSpec;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ public QueryDefinition makeQueryDefinition(
);
}

// Update partition by of next window
final RowSignature signatureSoFar = signatureBuilder.build();
boolean addShuffle = true;
if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
for (KeyColumn c : windowClusterBy.getColumns()) {
if (!signatureSoFar.contains(c.columnName())) {
addShuffle = false;
Expand All @@ -157,6 +159,7 @@ public QueryDefinition makeQueryDefinition(
signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG);
}


final ClusterBy clusterBy =
QueryKitUtils.clusterByWithSegmentGranularity(new ClusterBy(clusterByColumns, 0), segmentGranularity);
shuffleSpec = resultShuffleSpecFactory.build(clusterBy, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ public class MultiStageQueryContext

public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";

public static final String MAX_ROWS_MATERIALIZED_IN_WINDOW = "__maxRowsMaterializedInWindow";

public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification";

private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
Expand Down Expand Up @@ -857,8 +859,6 @@ public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns()
}




@Test
public void testWindowOnFooWithNoGroupByAndEmptyOver()
{
Expand Down Expand Up @@ -1167,7 +1167,10 @@ public void testWindowOnFooWithDim2()
final Map<String, Object> contextWithRowSignature =
ImmutableMap.<String, Object>builder()
.putAll(context)
.put(DruidQuery.CTX_SCAN_SIGNATURE, "[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]")
.put(
DruidQuery.CTX_SCAN_SIGNATURE,
"[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
)
.build();

final WindowOperatorQuery query = new WindowOperatorQuery(
Expand Down Expand Up @@ -1730,4 +1733,21 @@ public void testSelectWithWikipedia()
.setQueryContext(context)
.verifyResults();
}

@Test
public void testSelectWithWikipediaEmptyOverWithCustomContext()
{
final Map<String, Object> customContext =
ImmutableMap.<String, Object>builder()
.putAll(context)
.put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200)
.build();

testSelectQuery()
.setSql(
"select cityName, added, SUM(added) OVER () cc from wikipedia")
.setQueryContext(customContext)
.setExpectedMSQFault(new TooManyRowsInAWindowFault(15676, 200))
.verifyResults();
}
}

0 comments on commit 91edf9e

Please sign in to comment.