Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Nov 29, 2023
1 parent a34bcde commit a4f476d
Showing 1 changed file with 59 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TestBufferPool;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -259,6 +258,8 @@ private void setupGroupByFactory()
executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]");

final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
final TestBufferPool bufferPool2 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);
final TestBufferPool bufferPool3 = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE);

// TODO(laksh)
final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2);
Expand Down Expand Up @@ -338,13 +339,12 @@ public String getFormatString()
final GroupingEngine tooSmallEngine = new GroupingEngine(
tooSmallDruidProcessingConfig,
configSupplier,
bufferPool,
bufferPool2,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
);


groupByFactory = new GroupByQueryRunnerFactory(
groupingEngine,
new GroupByQueryQueryToolChest(groupingEngine, mergePool)
Expand Down Expand Up @@ -404,21 +404,33 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
return Sequences
.simple(
ImmutableList.of(
theRunner.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
Sequences.simple(
theRunner.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(
GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2,
true
)
)
),
responseContext
)
.toList()
),
theRunner2.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
Sequences.simple(
theRunner2.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(
GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2,
true
)
)
),
responseContext
)
.toList()
)
)
)
Expand Down Expand Up @@ -487,19 +499,37 @@ public void testPartialLimitPushDownMergeForceAggs()
// one segment's results use limit push down, the other doesn't because of insufficient buffer capacity

QueryToolChest<ResultRow, GroupByQuery> toolChest = groupByFactory.getToolchest();
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
groupByFactory.mergeRunners(executorService, getRunner1())
),
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<ResultRow>(
(queryPlus, responseContext) -> {
return toolChest.mergeResults(
groupByFactory.mergeRunners(executorService, getRunner1())
).run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
responseContext
);
},
(QueryToolChest) toolChest
);


QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
tooSmallGroupByFactory.mergeRunners(executorService, getRunner2())
),
(QueryToolChest) toolChest
QueryToolChest<ResultRow, GroupByQuery> tooSmalltoolChest = tooSmallGroupByFactory.getToolchest();
QueryRunner<ResultRow> theRunner2 = new FinalizeResultsQueryRunner<ResultRow>(
(queryPlus, responseContext) -> {
return tooSmalltoolChest.mergeResults(
tooSmallGroupByFactory.mergeRunners(executorService, getRunner2())
).run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
responseContext
);
},
(QueryToolChest) tooSmalltoolChest
);

QueryRunner<ResultRow> theRunner3 = new FinalizeResultsQueryRunner<>(
Expand All @@ -512,22 +542,8 @@ public Sequence<ResultRow> run(QueryPlus<ResultRow> queryPlus, ResponseContext r
return Sequences
.simple(
ImmutableList.of(
theRunner.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
),
theRunner2.run(
queryPlus.withQuery(
queryPlus.getQuery().withOverriddenContext(
ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true)
)
),
ConcurrentResponseContext.createEmpty()
)
Sequences.simple(theRunner.run(queryPlus, responseContext).toList()),
Sequences.simple(theRunner2.run(queryPlus, responseContext).toList())
)
)
.flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering());
Expand Down

0 comments on commit a4f476d

Please sign in to comment.