From a4f476d46aa9a3467152dd0e259c12f6963561b7 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 29 Nov 2023 14:33:03 +0530 Subject: [PATCH] fix test --- ...ByLimitPushDownInsufficientBufferTest.java | 102 ++++++++++-------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 7750fe92ba40..60c9736c4c34 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -53,7 +53,6 @@ import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -259,6 +258,8 @@ private void setupGroupByFactory() executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); + final TestBufferPool bufferPool2 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); + final TestBufferPool bufferPool3 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); // TODO(laksh) final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); @@ -338,13 +339,12 @@ public String getFormatString() final GroupingEngine tooSmallEngine = new GroupingEngine( tooSmallDruidProcessingConfig, configSupplier, - bufferPool, + bufferPool2, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ); - groupByFactory = new GroupByQueryRunnerFactory( groupingEngine, new GroupByQueryQueryToolChest(groupingEngine, mergePool) @@ -404,21 +404,33 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run( - queryPlus.withQuery( - queryPlus.getQuery().withOverriddenContext( - ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) - ) - ), - ConcurrentResponseContext.createEmpty() + Sequences.simple( + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of( + GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, + true + ) + ) + ), + responseContext + ) + .toList() ), - theRunner2.run( - queryPlus.withQuery( - queryPlus.getQuery().withOverriddenContext( - ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) - ) - ), - ConcurrentResponseContext.createEmpty() + Sequences.simple( + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of( + GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, + true + ) + ) + ), + responseContext + ) + .toList() ) ) ) @@ -487,19 +499,37 @@ public void testPartialLimitPushDownMergeForceAggs() // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity QueryToolChest toolChest = groupByFactory.getToolchest(); - QueryRunner theRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory.mergeRunners(executorService, getRunner1()) - ), + QueryRunner theRunner = new FinalizeResultsQueryRunner( + (queryPlus, responseContext) -> { + return toolChest.mergeResults( + groupByFactory.mergeRunners(executorService, getRunner1()) + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); + }, (QueryToolChest) toolChest ); - - QueryRunner theRunner2 = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - tooSmallGroupByFactory.mergeRunners(executorService, getRunner2()) - ), - (QueryToolChest) toolChest + QueryToolChest tooSmalltoolChest = tooSmallGroupByFactory.getToolchest(); + QueryRunner theRunner2 = new FinalizeResultsQueryRunner( + (queryPlus, responseContext) -> { + return tooSmalltoolChest.mergeResults( + tooSmallGroupByFactory.mergeRunners(executorService, getRunner2()) + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); + }, + (QueryToolChest) tooSmalltoolChest ); QueryRunner theRunner3 = new FinalizeResultsQueryRunner<>( @@ -512,22 +542,8 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run( - queryPlus.withQuery( - queryPlus.getQuery().withOverriddenContext( - ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) - ) - ), - ConcurrentResponseContext.createEmpty() - ), - theRunner2.run( - queryPlus.withQuery( - queryPlus.getQuery().withOverriddenContext( - ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) - ) - ), - ConcurrentResponseContext.createEmpty() - ) + Sequences.simple(theRunner.run(queryPlus, responseContext).toList()), + Sequences.simple(theRunner2.run(queryPlus, responseContext).toList()) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering());