diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 40b114511c28..bc449d141203 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -30,6 +30,8 @@ import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.server.DruidNode; /** @@ -100,4 +102,10 @@ WorkerManager newWorkerManager( * Client for communicating with workers. */ WorkerClient newWorkerClient(); + + /** + * Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using + * {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}. + */ + int defaultTargetPartitionsPerWorker(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 1ab7460156dc..4b63d85cda7b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -563,11 +563,16 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient()); closer.register(netClient); + final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( queryId(), makeQueryControllerToolKit(), querySpec, context.jsonMapper(), + MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + queryContext, + context.defaultTargetPartitionsPerWorker() + ), resultsContext ); @@ -612,7 +617,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) ); } - final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); + final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext); this.faultsExceededChecker = new FaultsExceededChecker( ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions) ); @@ -624,7 +629,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) stageDefinition.getId().getStageNumber(), finalizeClusterStatisticsMergeMode( stageDefinition, - MultiStageQueryContext.getClusterStatisticsMergeMode(querySpec.getQuery().context()) + MultiStageQueryContext.getClusterStatisticsMergeMode(queryContext) ) ) ); @@ -1718,17 +1723,18 @@ private static QueryDefinition makeQueryDefinition( @SuppressWarnings("rawtypes") final QueryKit toolKit, final MSQSpec querySpec, final ObjectMapper jsonMapper, + final int targetPartitionsPerWorker, final ResultsContext resultsContext ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; - final ShuffleSpecFactory shuffleSpecFactory; + final ShuffleSpecFactory resultShuffleSpecFactory; if (MSQControllerTask.isIngestion(querySpec)) { - shuffleSpecFactory = querySpec.getDestination() - .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); + resultShuffleSpecFactory = querySpec.getDestination() + .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); if (!columnMappings.hasUniqueOutputColumnNames()) { // We do not expect to hit this case in production, because the SQL validator checks that column names @@ -1752,7 +1758,7 @@ private static QueryDefinition makeQueryDefinition( queryToPlan = querySpec.getQuery(); } } else { - shuffleSpecFactory = + resultShuffleSpecFactory = querySpec.getDestination() .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())); queryToPlan = querySpec.getQuery(); @@ -1765,8 +1771,9 @@ private static QueryDefinition makeQueryDefinition( queryId, queryToPlan, toolKit, - shuffleSpecFactory, + resultShuffleSpecFactory, tuningConfig.getMaxNumWorkers(), + targetPartitionsPerWorker, 0 ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1037aa6c2af0..e60f1c5c9622 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -74,6 +74,7 @@ public class IndexerControllerContext implements ControllerContext private final ServiceClientFactory clientFactory; private final OverlordClient overlordClient; private final ServiceMetricEvent.Builder metricBuilder; + private final MemoryIntrospector memoryIntrospector; public IndexerControllerContext( final MSQControllerTask task, @@ -89,6 +90,7 @@ public IndexerControllerContext( this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); + this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); IndexTaskUtils.setTaskDimensions(metricBuilder, task); } @@ -98,7 +100,6 @@ public ControllerQueryKernelConfig queryKernelConfig( final QueryDefinition queryDef ) { - final MemoryIntrospector memoryIntrospector = injector.getInstance(MemoryIntrospector.class); final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, @@ -200,6 +201,14 @@ public WorkerManager newWorkerManager( ); } + @Override + public int defaultTargetPartitionsPerWorker() + { + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per task, for maximum parallelism. + return memoryIntrospector.numProcessorsInJvm(); + } + /** * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index e6ddb4d723dc..15fe6263ed83 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -135,6 +135,7 @@ public class DataSourcePlan * @param maxWorkerCount maximum number of workers for subqueries * @param minStageNumber starting stage number for subqueries * @param broadcast whether the plan should broadcast data for this datasource + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( @@ -146,6 +147,7 @@ public static DataSourcePlan forDataSource( @Nullable DimFilter filter, @Nullable Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -186,6 +188,7 @@ public static DataSourcePlan forDataSource( (FilteredDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -197,6 +200,7 @@ public static DataSourcePlan forDataSource( (UnnestDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -207,6 +211,7 @@ public static DataSourcePlan forDataSource( queryId, (QueryDataSource) dataSource, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast, queryContext @@ -221,6 +226,7 @@ public static DataSourcePlan forDataSource( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -242,6 +248,7 @@ public static DataSourcePlan forDataSource( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -253,6 +260,7 @@ public static DataSourcePlan forDataSource( (JoinDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -418,6 +426,7 @@ private static DataSourcePlan forQuery( final String queryId, final QueryDataSource dataSource, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast, @Nullable final QueryContext parentContext @@ -429,8 +438,9 @@ private static DataSourcePlan forQuery( // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), queryKit, - ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount), + ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker), maxWorkerCount, + targetPartitionsPerWorker, minStageNumber ); @@ -451,6 +461,7 @@ private static DataSourcePlan forFilteredDataSource( final FilteredDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -464,6 +475,7 @@ private static DataSourcePlan forFilteredDataSource( null, null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -491,6 +503,7 @@ private static DataSourcePlan forUnnest( final UnnestDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -505,6 +518,7 @@ private static DataSourcePlan forUnnest( null, null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -537,6 +551,7 @@ private static DataSourcePlan forUnion( @Nullable DimFilter filter, @Nullable Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -559,6 +574,7 @@ private static DataSourcePlan forUnion( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast ); @@ -590,6 +606,7 @@ private static DataSourcePlan forBroadcastHashJoin( @Nullable final DimFilter filter, @Nullable final Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -606,6 +623,7 @@ private static DataSourcePlan forBroadcastHashJoin( filter, filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast ); @@ -626,6 +644,7 @@ private static DataSourcePlan forBroadcastHashJoin( null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. null, maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. ); @@ -660,6 +679,7 @@ private static DataSourcePlan forSortMergeJoin( final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -682,6 +702,7 @@ private static DataSourcePlan forSortMergeJoin( queryId, (QueryDataSource) dataSource.getLeft(), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), false, null @@ -696,6 +717,7 @@ private static DataSourcePlan forSortMergeJoin( queryId, (QueryDataSource) dataSource.getRight(), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), false, null @@ -707,8 +729,9 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); + final int hashPartitionCount = maxWorkerCount * targetPartitionsPerWorker; final List leftPartitionKey = partitionKeys.get(0); - leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), maxWorkerCount)); + leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); // Build up the right stage. @@ -717,7 +740,7 @@ private static DataSourcePlan forSortMergeJoin( ); final List rightPartitionKey = partitionKeys.get(1); - rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), maxWorkerCount)); + rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), hashPartitionCount)); rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(), rightPartitionKey)); // Compute join signature. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index a795f6496053..37f453f6c060 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -46,6 +46,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ) { @@ -59,6 +60,7 @@ public QueryDefinition makeQueryDefinition( this, resultShuffleSpecFactory, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber ); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index b259022bba5b..2bc0ad0725a8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -40,6 +40,7 @@ public interface QueryKit> * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ QueryDefinition makeQueryDefinition( String queryId, @@ -47,6 +48,7 @@ QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b3686359d2a4..b1af153fafde 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -68,6 +68,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ) { @@ -97,11 +98,13 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); - ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); + ShuffleSpec nextShuffleSpec = + findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount * targetPartitionsPerWorker); final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); @@ -192,7 +195,8 @@ public QueryDefinition makeQueryDefinition( stageRowSignature = finalWindowStageRowSignature; nextShuffleSpec = finalWindowStageShuffleSpec; } else { - nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount); + nextShuffleSpec = + findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount * targetPartitionsPerWorker); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -285,7 +289,7 @@ private List> getOperatorListFromQuery(WindowOperatorQuery return operatorList; } - private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int maxWorkerCount) + private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) { NaivePartitioningOperatorFactory partition = null; NaiveSortOperatorFactory sort = null; @@ -325,7 +329,7 @@ private ShuffleSpec findShuffleSpecForNextWindow(List operatorF keyColsOfWindow.add(kc); } - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount); + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 7e4ebf5e7fab..45a91a3d8870 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -71,6 +71,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber ) { @@ -86,6 +87,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); @@ -139,9 +141,10 @@ public QueryDefinition makeQueryDefinition( // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' AS __time FROM bar PARTITIONED BY DAY - shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() - ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); + shuffleSpecFactoryPreAggregation = + intermediateClusterBy.isEmpty() + ? ShuffleSpecFactories.singlePartition() + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index f4f50106e813..051caeb0e718 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -91,6 +91,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber ) { @@ -104,6 +105,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index ed6a7c0e7b9b..63601c907a24 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -190,6 +190,12 @@ public class MultiStageQueryContext public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; + /** + * Number of partitions to target per worker when creating shuffle specs that involve specific numbers of + * partitions. This helps us utilize more parallelism when workers are multi-threaded. + */ + public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker"; + private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); public static String getMSQMode(final QueryContext queryContext) @@ -380,6 +386,14 @@ public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE); } + public static int getTargetPartitionsPerWorkerWithDefault( + final QueryContext queryContext, + final int defaultValue + ) + { + return queryContext.getInt(CTX_TARGET_PARTITIONS_PER_WORKER, defaultValue); + } + /** * See {@link #CTX_INCLUDE_ALL_COUNTERS}. */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e65104302032..3034be399849 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -342,4 +342,10 @@ public WorkerClient newWorkerClient() { return new MSQTestWorkerClient(inMemoryWorkers); } + + @Override + public int defaultTargetPartitionsPerWorker() + { + return 1; + } }