Skip to content

Commit

Permalink
Fix the uncaught exceptions when materializing results as frames (#14970
Browse files Browse the repository at this point in the history
)

When materializing the results as frames, we defer the creation of the frames in ScanQueryQueryToolChest, which passes through the catch-all block reserved for catching cases when we don't have the complete row signature in the query (and falls back to the old code).
This PR aims to resolve it by adding the frame generation code to the try-catch block we have at the outer level.
  • Loading branch information
LakshSingla authored Sep 13, 2023
1 parent b7bb5ee commit 4c57504
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,32 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
new ArenaMemoryAllocatorFactory(FRAME_SIZE),
useNestedForUnknownTypeInSubquery
);

if (!framesOptional.isPresent()) {
throw DruidException.defensive("Unable to materialize the results as frames. Defaulting to materializing the results as rows");
}

Sequence<FrameSignaturePair> frames = framesOptional.get();
List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
subqueryStatsProvider.incrementQueriesExceedingByteLimit();
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] bytes",
memoryLimit
);

}
frameSignaturePairs.add(frame);
}
);
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));

}
catch (ResourceLimitExceededException e) {
throw e;
}
catch (UnsupportedColumnTypeException e) {
subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo();
Expand All @@ -779,29 +805,6 @@ private static <T, QueryType extends Query<T>> Optional<DataSource> materializeR
+ "while conversion. Defaulting to materializing the results as rows");
return Optional.empty();
}

if (!framesOptional.isPresent()) {
log.debug("Unable to materialize the results as frames. Defaulting to materializing the results as rows");
return Optional.empty();
}

Sequence<FrameSignaturePair> frames = framesOptional.get();
List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
frames.forEach(
frame -> {
limitAccumulator.addAndGet(frame.getFrame().numRows());
if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
subqueryStatsProvider.incrementQueriesExceedingByteLimit();
throw ResourceLimitExceededException.withMessage(
"Subquery generated results beyond maximum[%d] bytes",
memoryLimit
);

}
frameSignaturePairs.add(frame);
}
);
return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,4 +951,90 @@ public void testSelfJoin()
)
);
}

@Test
public void testJoinWithSubqueries()
{
cannotVectorize();

List<Object[]> results = new ArrayList<>(ImmutableList.of(
new Object[]{"", NullHandling.defaultStringValue()},
new Object[]{"10.1", NullHandling.defaultStringValue()},
new Object[]{"2", NullHandling.defaultStringValue()},
new Object[]{"1", NullHandling.defaultStringValue()},
new Object[]{"def", NullHandling.defaultStringValue()},
new Object[]{"abc", NullHandling.defaultStringValue()}
));

if (NullHandling.replaceWithDefault()) {
results.add(new Object[]{NullHandling.defaultStringValue(), NullHandling.defaultStringValue()});
}


testQuery(
"SELECT a.dim1, b.dim2\n"
+ "FROM (SELECT na.dim1 as dim1, nb.dim2 as dim2 FROM foo na LEFT JOIN foo2 nb ON na.dim1 = nb.dim1) a\n"
+ "FULL OUTER JOIN\n"
+ "(SELECT nc.dim1 as dim1, nd.dim2 as dim2 FROM foo nc LEFT JOIN foo2 nd ON nc.dim1 = nd.dim1) b\n"
+ "ON a.dim1 = b.dim1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
JoinDataSource.create(
new TableDataSource("foo"),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo2")
.columns("dim1")
.eternityInterval()
.build()
),
"j0.",
"(\"dim1\" == \"j0.dim1\")",
JoinType.LEFT,
null,
ExprMacroTable.nil(),
CalciteTests.createJoinableFactoryWrapper()
),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(
JoinDataSource.create(
new TableDataSource("foo"),
new QueryDataSource(
newScanQueryBuilder()
.dataSource("foo2")
.columns("dim1", "dim2")
.eternityInterval()
.build()
),
"j0.",
"(\"dim1\" == \"j0.dim1\")",
JoinType.LEFT,
null,
ExprMacroTable.nil(),
CalciteTests.createJoinableFactoryWrapper()
)
)
.columns("dim1", "j0.dim2")
.eternityInterval()
.build()
),
"_j0.",
"(\"dim1\" == \"_j0.dim1\")",
JoinType.FULL,
null,
ExprMacroTable.nil(),
CalciteTests.createJoinableFactoryWrapper()
)
)
.columns("_j0.j0.dim2", "dim1")
.eternityInterval()
.build()
),
results
);
}
}

0 comments on commit 4c57504

Please sign in to comment.