From 09689e90c664fb21243d029c74a27b3b0af63330 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 28 Nov 2023 02:31:53 +0530 Subject: [PATCH] test cases fix 2 --- .../msq/exec/LoadedSegmentDataProvider.java | 2 +- .../context/ConcurrentResponseContext.java | 21 +- .../query/context/DefaultResponseContext.java | 22 ++- .../druid/query/context/ResponseContext.java | 3 + .../groupby/GroupByQueryQueryToolChest.java | 9 +- .../GroupByMergingQueryRunnerV2.java | 2 +- .../query/context/ResponseContextTest.java | 6 + ...ByLimitPushDownInsufficientBufferTest.java | 55 +++++- ...roupByLimitPushDownMultiNodeMergeTest.java | 186 ++++++++++++------ .../groupby/GroupByMultiSegmentTest.java | 3 +- .../query/groupby/GroupByQueryRunnerTest.java | 170 ++++++++++++---- .../druid/query/groupby/GroupByQueryTest.java | 3 +- .../druid/discovery/DataServerClientTest.java | 2 +- 13 files changed, 371 insertions(+), 113 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java index d9d789e3d2ba..8a8368e97b42 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java @@ -144,7 +144,7 @@ public Pair> fetchR final int numRetriesOnMissingSegments = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES); log.debug("Querying severs[%s] for segment[%s], retries:[%d]", servers, segmentDescriptor, numRetriesOnMissingSegments); - final ResponseContext responseContext = new DefaultResponseContext(); + final ResponseContext responseContext = DefaultResponseContext.createEmpty(); Pair> statusSequencePair; try { diff --git a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java index fb58f4f96e2d..a652c2f0fb38 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ConcurrentResponseContext.java @@ -21,6 +21,7 @@ import org.apache.druid.guice.annotations.PublicApi; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,16 +31,34 @@ @PublicApi public class ConcurrentResponseContext extends ResponseContext { + + private final ConcurrentHashMap delegate; + + private ConcurrentResponseContext() + { + this(Collections.emptyMap()); + } + + private ConcurrentResponseContext(final Map delegate) + { + this.delegate = new ConcurrentHashMap<>(delegate); + } + public static ConcurrentResponseContext createEmpty() { return new ConcurrentResponseContext(); } - private final ConcurrentHashMap delegate = new ConcurrentHashMap<>(); @Override protected Map getDelegate() { return delegate; } + + @Override + public ResponseContext clone() + { + return new ConcurrentResponseContext(delegate); + } } diff --git a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java index 5e8a1fdde4a3..6ad96317643b 100644 --- a/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/DefaultResponseContext.java @@ -21,6 +21,7 @@ import org.apache.druid.guice.annotations.PublicApi; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -30,16 +31,33 @@ @PublicApi public class DefaultResponseContext extends ResponseContext { + + private final HashMap delegate; + + private DefaultResponseContext() + { + this(Collections.emptyMap()); + } + + private DefaultResponseContext(final Map delegate) + { + this.delegate = new HashMap<>(delegate); + } + public static DefaultResponseContext createEmpty() { return new DefaultResponseContext(); } - private final HashMap delegate = new HashMap<>(); - @Override protected Map getDelegate() { return delegate; } + + @Override + public ResponseContext clone() + { + return new DefaultResponseContext(delegate); + } } diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java index 988dd5e4a3b1..42f74ce1eceb 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java @@ -557,6 +557,9 @@ public Key find(String name) protected abstract Map getDelegate(); + @Override + public abstract ResponseContext clone(); + public Map toMap() { return CollectionUtils.mapKeys(getDelegate(), k -> k.getName()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index a39833893c90..a0ffac94f365 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -161,6 +161,7 @@ private Sequence initAndMergeGroupByResults( { // .. 1. Historicals, Broker -> Which is using localWalker // MerginV2 -> + ResponseContext clonedContext = context.clone(); Boolean usesGroupByMergingQueryRunner = (Boolean) query .getContext() .getOrDefault(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true); @@ -171,18 +172,20 @@ private Sequence initAndMergeGroupByResults( queryConfig ); if (usesGroupByMergingQueryRunner) { - context.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, resource); + clonedContext.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, resource); } try { final Sequence mergedSequence = mergeGroupByResults( query, resource, runner, - context + clonedContext ); Closer closer = Closer.create(); closer.register(resource); - closer.register(() -> context.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS)); + closer.register(() -> + clonedContext.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS) + ); return Sequences.withBaggage(mergedSequence, closer); } catch (Exception e) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 6010542a5257..9a91790983c4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -321,7 +321,7 @@ private List> getMergeBuffersHolder( throw DruidException.defensive( "Query needs %d merge buffers for GroupByMergingQueryRunnerV2, however only %d were provided.", numBuffers, - resource.getMergingQueryRunnerMergeBuffer() + resource.getNumMergingQueryRunnerMergeBuffers() ); } final List> mergeBufferHolders = new ArrayList<>(); diff --git a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java index 03ed376f9385..1ee30247297b 100644 --- a/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java +++ b/processing/src/test/java/org/apache/druid/query/context/ResponseContextTest.java @@ -216,6 +216,12 @@ protected Map getDelegate() { return ImmutableMap.of(UNREGISTERED_KEY, "non-registered-key"); } + + @Override + public ResponseContext clone() + { + return this; + } }; ResponseContext.createEmpty().merge(ctx); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index b3101dfa47a8..7750fe92ba40 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -53,6 +53,7 @@ import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -259,9 +260,9 @@ private void setupGroupByFactory() final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); - // limit of 2 is required since we simulate both historical merge and broker merge in the same process + // TODO(laksh) final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); - // limit of 2 is required since we simulate both historical merge and broker merge in the same process + // TODO(laksh) final TestBufferPool tooSmallMergePool = TestBufferPool.onHeap(255, 2); resourceCloser.register(() -> { @@ -403,8 +404,22 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run(queryPlus, responseContext), - theRunner2.run(queryPlus, responseContext) + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ), + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); @@ -433,7 +448,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r .setGranularity(Granularities.ALL) .build(); - Sequence queryResult = theRunner3.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + Sequence queryResult = theRunner3.run( + QueryPlus.wrap(query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )), + ResponseContext.createEmpty() + ); List results = queryResult.toList(); ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( @@ -492,8 +512,22 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run(queryPlus, responseContext), - theRunner2.run(queryPlus, responseContext) + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ), + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); @@ -530,7 +564,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r ) .build(); - Sequence queryResult = theRunner3.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + Sequence queryResult = theRunner3.run( + QueryPlus.wrap(query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )), + ResponseContext.createEmpty() + ); List results = queryResult.toList(); ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index cafbc31cc0af..2a4b87e57c3c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -55,6 +55,7 @@ import org.apache.druid.query.TestBufferPool; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; @@ -80,6 +81,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -98,7 +100,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Function; -public class GroupByLimitPushDownMultiNodeMergeTest +public class GroupByLimitPushDownMultiNodeMergeTest extends InitializedNullHandlingTest { public static final ObjectMapper JSON_MAPPER; @@ -106,8 +108,9 @@ public class GroupByLimitPushDownMultiNodeMergeTest private static final IndexIO INDEX_IO; private File tmpDir; - private QueryRunnerFactory groupByFactory; - private QueryRunnerFactory groupByFactory2; + private QueryRunnerFactory groupByFactoryBroker; + private QueryRunnerFactory groupByFactoryHistorical; + private QueryRunnerFactory groupByFactoryHistorical2; private List incrementalIndices = new ArrayList<>(); private List groupByIndices = new ArrayList<>(); private ExecutorService executorService; @@ -530,15 +533,16 @@ private void setupGroupByFactory() executorService = Execs.multiThreaded(3, "GroupByThreadPool[%d]"); final TestBufferPool bufferPool = TestBufferPool.offHeap(10_000_000, Integer.MAX_VALUE); - final TestBufferPool mergePool = TestBufferPool.offHeap(10_000_000, 2); - // limit of 2 is required since we simulate both historical merge and broker merge in the same process - final TestBufferPool mergePool2 = TestBufferPool.offHeap(10_000_000, 2); + final TestBufferPool mergePoolBroker = TestBufferPool.offHeap(10_000_000, 1); + final TestBufferPool mergePoolHistorical = TestBufferPool.offHeap(10_000_000, 1); + final TestBufferPool mergePoolHistorical2 = TestBufferPool.offHeap(10_000_000, 1); resourceCloser.register(() -> { // Verify that all objects have been returned to the pools. Assert.assertEquals(0, bufferPool.getOutstandingObjectCount()); - Assert.assertEquals(0, mergePool.getOutstandingObjectCount()); - Assert.assertEquals(0, mergePool2.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePoolBroker.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePoolHistorical.getOutstandingObjectCount()); + Assert.assertEquals(0, mergePoolHistorical2.getOutstandingObjectCount()); }); final GroupByQueryConfig config = new GroupByQueryConfig() @@ -575,6 +579,7 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupingEngine groupingEngine = new GroupingEngine( druidProcessingConfig, configSupplier, @@ -584,23 +589,19 @@ public String getFormatString() NOOP_QUERYWATCHER ); - final GroupingEngine groupingEngine2 = new GroupingEngine( - druidProcessingConfig, - configSupplier, - bufferPool, - TestHelper.makeJsonMapper(), - new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + groupByFactoryBroker = new GroupByQueryRunnerFactory( + groupingEngine, + new GroupByQueryQueryToolChest(groupingEngine, mergePoolBroker) ); - groupByFactory = new GroupByQueryRunnerFactory( + groupByFactoryHistorical = new GroupByQueryRunnerFactory( groupingEngine, - new GroupByQueryQueryToolChest(groupingEngine, mergePool) + new GroupByQueryQueryToolChest(groupingEngine, mergePoolHistorical) ); - groupByFactory2 = new GroupByQueryRunnerFactory( - groupingEngine2, - new GroupByQueryQueryToolChest(groupingEngine2, mergePool2) + groupByFactoryHistorical2 = new GroupByQueryRunnerFactory( + groupingEngine, + new GroupByQueryQueryToolChest(groupingEngine, mergePoolHistorical2) ); } @@ -625,23 +626,26 @@ public void tearDown() throws Exception @Test public void testDescendingNumerics() { - QueryToolChest toolChest = groupByFactory.getToolchest(); + QueryToolChest toolChestHistorical = groupByFactoryHistorical.getToolchest(); QueryRunner theRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory.mergeRunners(executorService, getRunner1(2)) + toolChestHistorical.mergeResults( + groupByFactoryHistorical.mergeRunners(executorService, getRunner1(2)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical ); + QueryToolChest toolChestHistorical2 = groupByFactoryHistorical2.getToolchest(); QueryRunner theRunner2 = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory2.mergeRunners(executorService, getRunner2(3)) + toolChestHistorical2.mergeResults( + groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(3)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical2 ); + QueryToolChest toolChestBroker = groupByFactoryHistorical.getToolchest(); + QueryRunner finalRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( + toolChestBroker.mergeResults( new QueryRunner() { @Override @@ -650,15 +654,29 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run(queryPlus, responseContext), - theRunner2.run(queryPlus, responseContext) + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ), + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); } } ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestBroker ); QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( @@ -714,7 +732,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r .setGranularity(Granularities.ALL) .build(); - Sequence queryResult = finalRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + Sequence queryResult = finalRunner.run( + QueryPlus.wrap(query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )), + ResponseContext.createEmpty() + ); List results = queryResult.toList(); ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( @@ -762,23 +785,26 @@ public void testPartialLimitPushDownMerge() { // one segment's results use limit push down, the other doesn't because of insufficient buffer capacity - QueryToolChest toolChest = groupByFactory.getToolchest(); + QueryToolChest toolChestHistorical = groupByFactoryHistorical.getToolchest(); QueryRunner theRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory.mergeRunners(executorService, getRunner1(0)) + toolChestHistorical.mergeResults( + groupByFactoryHistorical.mergeRunners(executorService, getRunner1(0)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical ); + QueryToolChest toolChestHistorical2 = groupByFactoryHistorical2.getToolchest(); QueryRunner theRunner2 = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory2.mergeRunners(executorService, getRunner2(1)) + toolChestHistorical2.mergeResults( + groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(1)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical2 ); + QueryToolChest toolchestBroker = groupByFactoryBroker.getToolchest(); + QueryRunner finalRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( + toolchestBroker.mergeResults( new QueryRunner() { @Override @@ -787,15 +813,29 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run(queryPlus, responseContext), - theRunner2.run(queryPlus, responseContext) + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ), + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + ConcurrentResponseContext.createEmpty() + ) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); } } ), - (QueryToolChest) toolChest + (QueryToolChest) toolchestBroker ); QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( @@ -839,7 +879,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r .setGranularity(Granularities.ALL) .build(); - Sequence queryResult = finalRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + Sequence queryResult = finalRunner.run( + QueryPlus.wrap(query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )), + ResponseContext.createEmpty() + ); List results = queryResult.toList(); ResultRow expectedRow0 = GroupByQueryRunnerTestHelper.createExpectedRow( @@ -903,23 +948,25 @@ public void testForcePushLimitDownAccuracyWhenSortHasNonGroupingFields() private List testForcePushLimitDownAccuracyWhenSortHasNonGroupingFieldsHelper(Map context) { - QueryToolChest toolChest = groupByFactory.getToolchest(); + QueryToolChest toolChestHistorical = groupByFactoryHistorical.getToolchest(); QueryRunner theRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory.mergeRunners(executorService, getRunner1(4)) + toolChestHistorical.mergeResults( + groupByFactoryHistorical.mergeRunners(executorService, getRunner1(4)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical ); + QueryToolChest toolChestHistorical2 = groupByFactoryHistorical2.getToolchest(); QueryRunner theRunner2 = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( - groupByFactory2.mergeRunners(executorService, getRunner2(5)) + toolChestHistorical2.mergeResults( + groupByFactoryHistorical2.mergeRunners(executorService, getRunner2(5)) ), - (QueryToolChest) toolChest + (QueryToolChest) toolChestHistorical2 ); + QueryToolChest toolchestBroker = groupByFactoryHistorical2.getToolchest(); QueryRunner finalRunner = new FinalizeResultsQueryRunner<>( - toolChest.mergeResults( + toolchestBroker.mergeResults( new QueryRunner() { @Override @@ -928,15 +975,29 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return Sequences .simple( ImmutableList.of( - theRunner.run(queryPlus, responseContext), - theRunner2.run(queryPlus, responseContext) + theRunner.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ), + theRunner2.run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ) ) ) .flatMerge(Function.identity(), queryPlus.getQuery().getResultOrdering()); } } ), - (QueryToolChest) toolChest + (QueryToolChest) toolchestBroker ); QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( @@ -963,7 +1024,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r .setGranularity(Granularities.ALL) .build(); - Sequence queryResult = finalRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()); + Sequence queryResult = finalRunner.run( + QueryPlus.wrap(query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )), + ResponseContext.createEmpty() + ); return queryResult.toList(); } @@ -972,11 +1038,11 @@ private List> getRunner1(int qIndexNumber) List> runners = new ArrayList<>(); QueryableIndex index = groupByIndices.get(qIndexNumber); QueryRunner runner = makeQueryRunner( - groupByFactory, + groupByFactoryHistorical, SegmentId.dummy(index.toString()), new QueryableIndexSegment(index, SegmentId.dummy(index.toString())) ); - runners.add(groupByFactory.getToolchest().preMergeQueryDecoration(runner)); + runners.add(groupByFactoryHistorical.getToolchest().preMergeQueryDecoration(runner)); return runners; } @@ -985,11 +1051,11 @@ private List> getRunner2(int qIndexNumber) List> runners = new ArrayList<>(); QueryableIndex index2 = groupByIndices.get(qIndexNumber); QueryRunner tooSmallRunner = makeQueryRunner( - groupByFactory2, + groupByFactoryHistorical2, SegmentId.dummy(index2.toString()), new QueryableIndexSegment(index2, SegmentId.dummy(index2.toString())) ); - runners.add(groupByFactory2.getToolchest().preMergeQueryDecoration(tooSmallRunner)); + runners.add(groupByFactoryHistorical2.getToolchest().preMergeQueryDecoration(tooSmallRunner)); return runners; } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 783cb3a3ed7f..bab1bfe04f1b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -69,6 +69,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.junit.After; import org.junit.Assert; @@ -84,7 +85,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; -public class GroupByMultiSegmentTest +public class GroupByMultiSegmentTest extends InitializedNullHandlingTest { public static final ObjectMapper JSON_MAPPER; diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 5a4e34101bab..d7b7faec1d87 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -361,6 +361,8 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, + () -> config, + DefaultGroupByQueryMetricsFactory.instance(), bufferPools.getMergePool() ); return new GroupByQueryRunnerFactory(groupingEngine, toolChest); @@ -3745,7 +3747,10 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return new MergeSequence( queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) + Arrays.asList( + Sequences.simple(runner.run(queryPlus1, responseContext).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext).toList()) + ) ) ); } @@ -3764,7 +3769,6 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r makeRow(fullQuery, "2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L) ); - ResponseContext context = ResponseContext.createEmpty(); TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(QueryPlus.wrap(fullQuery)), "merged"); List allGranExpectedResults = Arrays.asList( @@ -4091,7 +4095,10 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return new MergeSequence( queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) + Arrays.asList( + Sequences.simple(runner.run(queryPlus1, responseContext).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext).toList()) + ) ) ); } @@ -4852,7 +4859,10 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return new MergeSequence( queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) + Arrays.asList( + Sequences.simple(runner.run(queryPlus1, responseContext).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext).toList()) + ) ) ); } @@ -5294,7 +5304,10 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return new MergeSequence( queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) + Arrays.asList( + Sequences.simple(runner.run(queryPlus1, responseContext).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext).toList()) + ) ) ); } @@ -5375,22 +5388,38 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r return new MergeSequence( queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) + Arrays.asList( + Sequences.simple(runner.run(queryPlus1, responseContext).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext).toList()) + ) ) ); } } ); - ResponseContext context = ResponseContext.createEmpty(); // add an extra layer of merging, simulate broker forwarding query to historical TestHelper.assertExpectedObjects( expectedResults, factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( - factory.getToolchest().preMergeQueryDecoration(mergedRunner) + (queryPlus, responseContext) -> + factory.getToolchest() + .preMergeQueryDecoration(mergedRunner) + .run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ) + ) + ).run(QueryPlus.wrap( + query.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) ) - ).run(QueryPlus.wrap(query)), + )), "merged" ); @@ -5398,9 +5427,23 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r expectedResults, factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( - factory.getToolchest().preMergeQueryDecoration(mergedRunner) + (queryPlus, responseContext) -> + factory.getToolchest() + .preMergeQueryDecoration(mergedRunner) + .run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ) ) - ).run(QueryPlus.wrap(expressionQuery)), + ).run(QueryPlus.wrap( + expressionQuery.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + ) + )), "merged" ); } @@ -11098,15 +11141,14 @@ public void testMergeLimitPushDownResultsWithLongDimensionNotInLimitSpec() queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run(queryPlus, ResponseContext.createEmpty()); } ); - Map context = new HashMap<>(); List allGranExpectedResults = Arrays.asList( makeRow(allGranQuery, "2011-04-02", "qualityLen", 4L, "rows", 2L), makeRow(allGranQuery, "2011-04-02", "qualityLen", 6L, "rows", 4L), @@ -11166,12 +11208,19 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); } } ); @@ -11186,7 +11235,9 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r TestHelper.assertExpectedObjects( allGranExpectedResults, - mergedRunner.run(QueryPlus.wrap(allGranQuery)), + mergedRunner.run(QueryPlus.wrap(allGranQuery.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + ))), "merged" ); } @@ -11233,12 +11284,19 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); } } ); @@ -11251,7 +11309,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r makeRow(allGranQuery, "2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L) ); - Iterable results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList(); + Iterable results = + mergedRunner.run( + QueryPlus.wrap(allGranQuery.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )) + ).toList(); TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); } @@ -11293,17 +11356,31 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r ) ); + Object groupByResource = responseContext.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS); + return factory.getToolchest().mergeResults( (queryPlus3, responseContext1) -> new MergeSequence<>( queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ).withBaggage(() -> { + responseContext.remove(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS); + if (groupByResource != null) { + responseContext.add(GroupByUtils.RESPONSE_KEY_GROUP_BY_MERGING_QUERY_RUNNER_BUFFERS, groupByResource); + } + }); } } ); @@ -11365,12 +11442,19 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); } } ); @@ -11383,7 +11467,13 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r makeRow(allGranQuery, "2011-04-02", "alias", "premium", "market", "spot", "rows", 2L, "idx", 257L) ); - Iterable results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList(); + Iterable results = + mergedRunner.run( + QueryPlus.wrap(allGranQuery.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )) + ) + .toList(); TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); } @@ -11447,12 +11537,19 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r queryPlus3.getQuery().getResultOrdering(), Sequences.simple( Arrays.asList( - runner.run(queryPlus1, responseContext1), - runner.run(queryPlus2, responseContext1) + Sequences.simple(runner.run(queryPlus1, responseContext1).toList()), + Sequences.simple(runner.run(queryPlus2, responseContext1).toList()) ) ) ) - ).run(queryPlus, responseContext); + ).run( + queryPlus.withQuery( + queryPlus.getQuery().withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, true) + ) + ), + responseContext + ); } } ); @@ -11465,7 +11562,12 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r makeRow(allGranQuery, "2011-04-02", "alias", "premium", "market", "spot", "rows", 2L, "idx", 257L) ); - Iterable results = mergedRunner.run(QueryPlus.wrap(allGranQuery)).toList(); + Iterable results = + mergedRunner.run( + QueryPlus.wrap(allGranQuery.withOverriddenContext( + ImmutableMap.of(GroupByUtils.CTX_KEY_RUNNER_MERGES_USING_GROUP_BY_MERGING_QUERY_RUNNER_V2, false) + )) + ).toList(); TestHelper.assertExpectedObjects(allGranExpectedResults, results, "merged"); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java index b80d715d0041..22dabb34cdb7 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java @@ -44,6 +44,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.ComparableStringArray; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Test; @@ -51,7 +52,7 @@ import java.util.Collections; import java.util.List; -public class GroupByQueryTest +public class GroupByQueryTest extends InitializedNullHandlingTest { private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); diff --git a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java index 7be5a13474d5..2638226a24f1 100644 --- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java @@ -106,7 +106,7 @@ public void testFetchSegmentFromDataServer() throws JsonProcessingException jsonMapper.writeValueAsBytes(Collections.singletonList(scanResultValue)) ); - ResponseContext responseContext = new DefaultResponseContext(); + ResponseContext responseContext = DefaultResponseContext.createEmpty(); Sequence result = target.run( query, responseContext,