diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 9f21fe8e6874..c5f858ae1633 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -767,6 +767,32 @@ private static > Optional materializeR new ArenaMemoryAllocatorFactory(FRAME_SIZE), useNestedForUnknownTypeInSubquery ); + + if (!framesOptional.isPresent()) { + throw DruidException.defensive("Unable to materialize the results as frames. Defaulting to materializing the results as rows"); + } + + Sequence frames = framesOptional.get(); + List frameSignaturePairs = new ArrayList<>(); + frames.forEach( + frame -> { + limitAccumulator.addAndGet(frame.getFrame().numRows()); + if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); + throw ResourceLimitExceededException.withMessage( + "Subquery generated results beyond maximum[%d] bytes", + memoryLimit + ); + + } + frameSignaturePairs.add(frame); + } + ); + return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); + + } + catch (ResourceLimitExceededException e) { + throw e; } catch (UnsupportedColumnTypeException e) { subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); @@ -779,29 +805,6 @@ private static > Optional materializeR + "while conversion. Defaulting to materializing the results as rows"); return Optional.empty(); } - - if (!framesOptional.isPresent()) { - log.debug("Unable to materialize the results as frames. Defaulting to materializing the results as rows"); - return Optional.empty(); - } - - Sequence frames = framesOptional.get(); - List frameSignaturePairs = new ArrayList<>(); - frames.forEach( - frame -> { - limitAccumulator.addAndGet(frame.getFrame().numRows()); - if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { - subqueryStatsProvider.incrementQueriesExceedingByteLimit(); - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] bytes", - memoryLimit - ); - - } - frameSignaturePairs.add(frame); - } - ); - return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 7b21210904a4..d39c9bf1388e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -951,4 +951,90 @@ public void testSelfJoin() ) ); } + + @Test + public void testJoinWithSubqueries() + { + cannotVectorize(); + + List results = new ArrayList<>(ImmutableList.of( + new Object[]{"", NullHandling.defaultStringValue()}, + new Object[]{"10.1", NullHandling.defaultStringValue()}, + new Object[]{"2", NullHandling.defaultStringValue()}, + new Object[]{"1", NullHandling.defaultStringValue()}, + new Object[]{"def", NullHandling.defaultStringValue()}, + new Object[]{"abc", NullHandling.defaultStringValue()} + )); + + if (NullHandling.replaceWithDefault()) { + results.add(new Object[]{NullHandling.defaultStringValue(), NullHandling.defaultStringValue()}); + } + + + testQuery( + "SELECT a.dim1, b.dim2\n" + + "FROM (SELECT na.dim1 as dim1, nb.dim2 as dim2 FROM foo na LEFT JOIN foo2 nb ON na.dim1 = nb.dim1) a\n" + + "FULL OUTER JOIN\n" + + "(SELECT nc.dim1 as dim1, nd.dim2 as dim2 FROM foo nc LEFT JOIN foo2 nd ON nc.dim1 = nd.dim1) b\n" + + "ON a.dim1 = b.dim1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + JoinDataSource.create( + new TableDataSource("foo"), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo2") + .columns("dim1") + .eternityInterval() + .build() + ), + "j0.", + "(\"dim1\" == \"j0.dim1\")", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ), + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new TableDataSource("foo"), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo2") + .columns("dim1", "dim2") + .eternityInterval() + .build() + ), + "j0.", + "(\"dim1\" == \"j0.dim1\")", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ) + ) + .columns("dim1", "j0.dim2") + .eternityInterval() + .build() + ), + "_j0.", + "(\"dim1\" == \"_j0.dim1\")", + JoinType.FULL, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ) + ) + .columns("_j0.j0.dim2", "dim1") + .eternityInterval() + .build() + ), + results + ); + } }