From 3a1b4370567f9655da3d24d427052d550092f8ac Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 12 Jul 2024 21:49:12 +0530 Subject: [PATCH] Improve the fallback strategy when the broker is unable to materialize the subquery's results as frames for estimating the bytes (#16679) Better fallback strategy when the broker is unable to materialize the subquery's results as frames for estimating the bytes: a. We don't touch the subquery sequence till we know that we can materialize the result as frames --- .../sql/DoublesSketchSqlAggregatorTest.java | 143 ++++++++++++++++++ .../druid/frame/segment/FrameCursorUtils.java | 23 ++- .../groupby/GroupByQueryQueryToolChest.java | 2 + .../TimeseriesQueryQueryToolChest.java | 2 + .../query/topn/TopNQueryQueryToolChest.java | 2 + .../server/ClientQuerySegmentWalker.java | 28 +++- 6 files changed, 190 insertions(+), 10 deletions(-) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 9122e1ecc7e8..a85d508a8791 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -24,8 +24,11 @@ import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -33,6 +36,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; @@ -43,6 +47,7 @@ import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToRankPostAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToStringPostAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchSqlAggregatorTest.DoublesSketchComponentSupplier; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -53,6 +58,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; @@ -263,6 +269,143 @@ public void testQuantileOnComplexColumn() ); } + @Test + public void testSubqueryWithNestedGroupBy() + { + final List expectedResults = ImmutableList.of( + new Object[]{946684800000L, "", 1L, "val1"}, + new Object[]{946684800000L, "1", 1L, "val1"}, + new Object[]{946684800000L, "10.1", 1L, "val1"}, + new Object[]{946684800000L, "2", 1L, "val1"}, + new Object[]{946684800000L, "abc", 1L, "val1"}, + new Object[]{946684800000L, "def", 1L, "val1"} + ); + + testQuery( + "SELECT\n" + + " MILLIS_TO_TIMESTAMP(946684800000) AS __time,\n" + + " alias.\"user\",\n" + + " alias.days,\n" + + " (CASE WHEN alias.days < quantiles.first_quartile THEN 'val2' \n" + + " WHEN alias.days >= quantiles.first_quartile AND alias.days < quantiles.third_quartile THEN 'val3' \n" + + " WHEN alias.days >= quantiles.third_quartile THEN 'val1' END) AS val4\n" + + "FROM (\n" + + " SELECT\n" + + " APPROX_QUANTILE_DS(alias.days, 0.25) AS first_quartile,\n" + + " APPROX_QUANTILE_DS(alias.days, 0.75) AS third_quartile\n" + + " FROM (\n" + + " SELECT\n" + + " dim1 \"user\",\n" + + " COUNT(DISTINCT __time) AS days\n" + + " FROM \"foo\"\n" + + " GROUP BY 1\n" + + " ) AS alias\n" + + ") AS quantiles, (\n" + + " SELECT\n" + + " dim1 \"user\",\n" + + " COUNT(DISTINCT __time) AS days\n" + + " FROM \"foo\"\n" + + " GROUP BY 1\n" + + ") AS alias\n", + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") + // Disallows the fallback to row based limiting + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "10") + .build(), + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec( + "dim1", + "d0", + ColumnType.STRING + )) + .addAggregator(new CardinalityAggregatorFactory( + "a0:a", + null, + Collections.singletonList(new DefaultDimensionSpec( + "__time", + "__time", + ColumnType.LONG + )), + false, + true + )) + .setPostAggregatorSpecs(new HyperUniqueFinalizingPostAggregator( + "a0", + "a0:a" + )) + .build() + ) + ) + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addAggregator(new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)) + .setPostAggregatorSpecs( + new DoublesSketchToQuantilePostAggregator( + "_a0", + new FieldAccessPostAggregator("_a0:agg", "_a0:agg"), + 0.25 + ), + new DoublesSketchToQuantilePostAggregator( + "_a1", + new FieldAccessPostAggregator("_a0:agg", "_a0:agg"), + 0.75 + ) + ) + .build() + + ), + new QueryDataSource( + GroupByQuery.builder() + .setDataSource("foo") + .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING)) + .addAggregator(new CardinalityAggregatorFactory( + "a0", + null, + Collections.singletonList(new DefaultDimensionSpec( + "__time", + "__time", + ColumnType.LONG + )), + false, + true + )) + .build() + ), + "j0.", + "1", + JoinType.INNER, + null, + TestExprMacroTable.INSTANCE, + null + ) + ) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .virtualColumns( + new ExpressionVirtualColumn("v0", "946684800000", ColumnType.LONG, TestExprMacroTable.INSTANCE), + new ExpressionVirtualColumn("v1", "case_searched((\"j0.a0\" < \"_a0\"),'val2',((\"j0.a0\" >= \"_a0\") && (\"j0.a0\" < \"_a1\")),'val3',(\"j0.a0\" >= \"_a1\"),'val1',null)", ColumnType.STRING, TestExprMacroTable.INSTANCE) + ) + .columns("j0.a0", "j0.d0", "v0", "v1") + .build() + ), + expectedResults + ); + } + + @Test public void testQuantileOnCastedString() { diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java index 3cb5c686e9d6..de970363bb4f 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java @@ -23,6 +23,7 @@ import org.apache.druid.frame.Frame; import org.apache.druid.frame.write.FrameWriter; import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.UnsupportedColumnTypeException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -32,6 +33,7 @@ import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.filter.BoundFilter; import org.apache.druid.segment.filter.Filters; import org.joda.time.Interval; @@ -100,13 +102,18 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval) /** * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor, * and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor - * as required + * as required. + *

+ * If the type is missing from the signature, the method throws an exception without advancing/modifying/closing the + * cursor */ public static Iterable cursorToFramesIterable( final Cursor cursor, final FrameWriterFactory frameWriterFactory ) { + throwIfColumnsHaveUnknownType(frameWriterFactory.signature()); + return () -> new Iterator() { @Override @@ -158,7 +165,19 @@ public static Sequence cursorToFramesSequence( final FrameWriterFactory frameWriterFactory ) { - return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory)); } + + /** + * Throws {@link UnsupportedColumnTypeException} if the row signature has columns with unknown types. This is used to + * pre-determine if the frames can be materialized as rows, without touching the resource generating the frames. + */ + public static void throwIfColumnsHaveUnknownType(final RowSignature rowSignature) + { + for (int i = 0; i < rowSignature.size(); ++i) { + if (!rowSignature.getColumnType(i).isPresent()) { + throw new UnsupportedColumnTypeException(rowSignature.getColumnName(i), null); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 7588848cf5be..b19b479c26d1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -816,6 +816,8 @@ public Optional> resultsAsFrames( ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, modifiedRowSignature, diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 8527d551cf5c..17a2f8be956b 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -485,6 +485,8 @@ public Optional> resultsAsFrames( RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, modifiedRowSignature, diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 02d07e255709..25a4284aa427 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -569,6 +569,8 @@ public Optional> resultsAsFrames( RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature); + FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory( memoryAllocatorFactory, rowSignature, diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 1d3b38b2fdbd..d49ce3909f71 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -746,6 +746,7 @@ private static > Optional materializeR { Optional> framesOptional; + boolean startedAccumulating = false; try { framesOptional = toolChest.resultsAsFrames( query, @@ -760,6 +761,9 @@ private static > Optional materializeR Sequence frames = framesOptional.get(); List frameSignaturePairs = new ArrayList<>(); + + startedAccumulating = true; + frames.forEach( frame -> { limitAccumulator.addAndGet(frame.getFrame().numRows()); @@ -772,21 +776,29 @@ private static > Optional materializeR } ); return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); - - } - catch (ResourceLimitExceededException e) { - throw e; } catch (UnsupportedColumnTypeException e) { subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); log.debug(e, "Type info in signature insufficient to materialize rows as frames."); return Optional.empty(); } + catch (ResourceLimitExceededException e) { + throw e; + } catch (Exception e) { - subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnknownReason(); - log.debug(e, "Unable to materialize the results as frames due to an unhandleable exception " - + "while conversion. Defaulting to materializing the results as rows"); - return Optional.empty(); + if (startedAccumulating) { + // If we have opened the resultSequence, we can't fall back safely as the resultSequence might hold some resources + // that we release on exception, and we need to throw the exception to disable the 'maxSubqueryBytes' configuration + throw DruidException.defensive() + .build( + e, + "Unable to materialize the results as frames for estimating the byte footprint. " + + "Please disable the 'maxSubqueryBytes' by setting it to 'disabled' in the query context or removing it altogether " + + "from the query context and/or the server config." + ); + } else { + return Optional.empty(); + } } }