Skip to content

Commit

Permalink
MSQ: Fix issue where AUTO assignment would not respect maxWorkerCount. (
Browse files Browse the repository at this point in the history
#16214)

WorkerAssignmentStrategy.AUTO was missing a check for maxWorkerCount
in the case where the inputs to a stage are not dynamically sliceable.
A common case here is when the inputs to a stage are other stages.
  • Loading branch information
gianm authored Mar 28, 2024
1 parent f995011 commit 7649957
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public class StripedReadablePartitions implements ReadablePartitions
private final int numWorkers;
private final IntSortedSet partitionNumbers;

StripedReadablePartitions(final int stageNumber, final int numWorkers, final IntSortedSet partitionNumbers)
/**
* Constructor. Most callers should use {@link ReadablePartitions#striped(int, int, int)} instead, which takes
* a partition count rather than a set of partition numbers.
*/
public StripedReadablePartitions(final int stageNumber, final int numWorkers, final IntSortedSet partitionNumbers)
{
this.stageNumber = stageNumber;
this.numWorkers = numWorkers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public List<InputSlice> assign(

final IntSet inputStages = stageDef.getInputStageNumbers();
final OptionalInt maxInputStageWorkerCount = inputStages.intStream().map(stageWorkerCountMap).max();
final int workerCount = maxInputStageWorkerCount.orElse(1);
final int workerCount = Math.min(stageDef.getMaxWorkerCount(), maxInputStageWorkerCount.orElse(1));
return slicer.sliceStatic(inputSpec, workerCount);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.druid.msq.kernel.controller;

import com.google.common.collect.ImmutableMap;
import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
Expand All @@ -31,6 +34,11 @@
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.SlicerUtils;
import org.apache.druid.msq.input.stage.ReadablePartitions;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
import org.apache.druid.msq.input.stage.StripedReadablePartitions;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
Expand Down Expand Up @@ -216,6 +224,87 @@ public void test_auto_threeInputs_fourWorkers()
);
}

@Test
public void test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_fourWorkerMax()
{
final StageDefinition stageDef =
StageDefinition.builder(1)
.inputs(new StageInputSpec(0))
.maxWorkerCount(4)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);

final WorkerInputs inputs = WorkerInputs.create(
stageDef,
new Int2IntAVLTreeMap(ImmutableMap.of(0, 2)),
new StageInputSpecSlicer(
new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, ReadablePartitions.striped(0, 2, 3)))
),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);

Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(
new StageInputSlice(
0,
new StripedReadablePartitions(0, 2, new IntAVLTreeSet(new int[]{0, 2}))
)
)
)
.put(
1,
Collections.singletonList(
new StageInputSlice(
0,
new StripedReadablePartitions(0, 2, new IntAVLTreeSet(new int[]{1}))
)
)
)
.build(),
inputs.assignmentsMap()
);
}

@Test
public void test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_oneWorkerMax()
{
final StageDefinition stageDef =
StageDefinition.builder(1)
.inputs(new StageInputSpec(0))
.maxWorkerCount(1)
.processorFactory(new OffsetLimitFrameProcessorFactory(0, 0L))
.build(QUERY_ID);

final WorkerInputs inputs = WorkerInputs.create(
stageDef,
new Int2IntAVLTreeMap(ImmutableMap.of(0, 2)),
new StageInputSpecSlicer(
new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, ReadablePartitions.striped(0, 2, 3)))
),
WorkerAssignmentStrategy.AUTO,
Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
);

Assert.assertEquals(
ImmutableMap.<Integer, List<InputSlice>>builder()
.put(
0,
Collections.singletonList(
new StageInputSlice(
0,
new StripedReadablePartitions(0, 2, new IntAVLTreeSet(new int[]{0, 1, 2}))
)
)
)
.build(),
inputs.assignmentsMap()
);
}

@Test
public void test_auto_threeBigInputs_fourWorkers()
{
Expand Down

0 comments on commit 7649957

Please sign in to comment.