Skip to content

Commit

Permalink
Improve the fallback strategy when the broker is unable to materializ…
Browse files Browse the repository at this point in the history
…e the subquery's results as frames for estimating the bytes (#16679)

Better fallback strategy when the broker is unable to materialize the subquery's results as frames for estimating the bytes:
a. We don't touch the subquery sequence till we know that we can materialize the result as frames
  • Loading branch information
LakshSingla authored Jul 12, 2024
1 parent 197c54f commit 3a1b437
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import com.google.inject.Injector;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.SketchQueryContext;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
Expand All @@ -43,6 +47,7 @@
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator;
import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand All @@ -53,6 +58,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -263,6 +269,143 @@ public void testQuantileOnComplexColumn()
);
}

@Test
public void testSubqueryWithNestedGroupBy()
{
final List<Object[]> expectedResults = ImmutableList.of(
new Object[]{946684800000L, "", 1L, "val1"},
new Object[]{946684800000L, "1", 1L, "val1"},
new Object[]{946684800000L, "10.1", 1L, "val1"},
new Object[]{946684800000L, "2", 1L, "val1"},
new Object[]{946684800000L, "abc", 1L, "val1"},
new Object[]{946684800000L, "def", 1L, "val1"}
);

testQuery(
"SELECT\n"
+ " MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n"
+ " alias.\"user\",\n"
+ " alias.days,\n"
+ " (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n"
+ " WHEN alias.days >= quantiles.first_quartile AND alias.days < quantiles.third_quartile THEN 'val3' \n"
+ " WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS val4\n"
+ "FROM (\n"
+ " SELECT\n"
+ " APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n"
+ " APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n"
+ " FROM (\n"
+ " SELECT\n"
+ " dim1 \"user\",\n"
+ " COUNT(DISTINCT __time) AS days\n"
+ " FROM \"foo\"\n"
+ " GROUP BY 1\n"
+ " ) AS alias\n"
+ ") AS quantiles, (\n"
+ " SELECT\n"
+ " dim1 \"user\",\n"
+ " COUNT(DISTINCT __time) AS days\n"
+ " FROM \"foo\"\n"
+ " GROUP BY 1\n"
+ ") AS alias\n",
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000")
// Disallows the fallback to row based limiting
.put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10")
.build(),
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addDimension(new DefaultDimensionSpec(
"dim1",
"d0",
ColumnType.STRING
))
.addAggregator(new CardinalityAggregatorFactory(
"a0:a",
null,
Collections.singletonList(new DefaultDimensionSpec(
"__time",
"__time",
ColumnType.LONG
)),
false,
true
))
.setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator(
"a0",
"a0:a"
))
.build()
)
)
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addAggregator(new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128))
.setPostAggregatorSpecs(
new DoublesSketchToQuantilePostAggregator(
"_a0",
new FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
0.25
),
new DoublesSketchToQuantilePostAggregator(
"_a1",
new FieldAccessPostAggregator("_a0:agg", "_a0:agg"),
0.75
)
)
.build()

),
new QueryDataSource(
GroupByQuery.builder()
.setDataSource("foo")
.setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY))
.setGranularity(Granularities.ALL)
.addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
.addAggregator(new CardinalityAggregatorFactory(
"a0",
null,
Collections.singletonList(new DefaultDimensionSpec(
"__time",
"__time",
ColumnType.LONG
)),
false,
true
))
.build()
),
"j0.",
"1",
JoinType.INNER,
null,
TestExprMacroTable.INSTANCE,
null
)
)
.intervals(querySegmentSpec(Intervals.ETERNITY))
.virtualColumns(
new ExpressionVirtualColumn("v0", "946684800000", ColumnType.LONG, TestExprMacroTable.INSTANCE),
new ExpressionVirtualColumn("v1", "case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") && (\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)", ColumnType.STRING, TestExprMacroTable.INSTANCE)
)
.columns("j0.a0", "j0.d0", "v0", "v1")
.build()
),
expectedResults
);
}


@Test
public void testQuantileOnCastedString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.UnsupportedColumnTypeException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -32,6 +33,7 @@
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.joda.time.Interval;
Expand Down Expand Up @@ -100,13 +102,18 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval)
/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
* and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor
* as required
* as required.
* <p>
* If the type is missing from the signature, the method throws an exception without advancing/modifying/closing the
* cursor
*/
public static Iterable<Frame> cursorToFramesIterable(
final Cursor cursor,
final FrameWriterFactory frameWriterFactory
)
{
throwIfColumnsHaveUnknownType(frameWriterFactory.signature());

return () -> new Iterator<Frame>()
{
@Override
Expand Down Expand Up @@ -158,7 +165,19 @@ public static Sequence<Frame> cursorToFramesSequence(
final FrameWriterFactory frameWriterFactory
)
{

return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory));
}

/**
* Throws {@link UnsupportedColumnTypeException} if the row signature has columns with unknown types. This is used to
* pre-determine if the frames can be materialized as rows, without touching the resource generating the frames.
*/
public static void throwIfColumnsHaveUnknownType(final RowSignature rowSignature)
{
for (int i = 0; i < rowSignature.size(); ++i) {
if (!rowSignature.getColumnType(i).isPresent()) {
throw new UnsupportedColumnTypeException(rowSignature.getColumnName(i), null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;

FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);

FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
rowSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
{
Optional<Sequence<FrameSignaturePair>> framesOptional;

boolean startedAccumulating = false;
try {
framesOptional = toolChest.resultsAsFrames(
query,
Expand All @@ -760,6 +761,9 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR

Sequence<FrameSignaturePair> frames = framesOptional.get();
List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();

startedAccumulating = true;

frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
Expand All @@ -772,21 +776,29 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
}
);
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));

}
catch (ResourceLimitExceededException e) {
throw e;
}
catch (UnsupportedColumnTypeException e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
log.debug(e, "Type info in signature insufficient to materialize rows as frames.");
return Optional.empty();
}
catch (ResourceLimitExceededException e) {
throw e;
}
catch (Exception e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason();
log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception "
+ "while conversion. Defaulting to materializing the results as rows");
return Optional.empty();
if (startedAccumulating) {
// If we have opened the resultSequence, we can't fall back safely as the resultSequence might hold some resources
// that we release on exception, and we need to throw the exception to disable the 'maxSubqueryBytes' configuration
throw DruidException.defensive()
.build(
e,
"Unable to materialize the results as frames for estimating the byte footprint. "
+ "Please disable the 'maxSubqueryBytes' by setting it to 'disabled' in the query context or removing it altogether "
+ "from the query context and/or the server config."
);
} else {
return Optional.empty();
}
}
}

Expand Down

0 comments on commit 3a1b437

Please sign in to comment.