Skip to content

Commit

Permalink
supervisor/autoscaler: Skip scaling when partitions are less than min…
Browse files Browse the repository at this point in the history
…TaskCount (#17335)
  • Loading branch information
adithyachakilam authored Oct 15, 2024
1 parent 32ce341 commit c57bd3b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,16 @@ private int computeDesiredTaskCount(List<Long> lags)

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int desiredActiveTaskCount;
int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
return -1;
}

if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();

int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
return -1;
}

int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
Expand All @@ -248,7 +247,8 @@ private int computeDesiredTaskCount(List<Long> lags)
if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) {
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMin) {
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
Expand All @@ -260,7 +260,7 @@ private int computeDesiredTaskCount(List<Long> lags)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
}
return desiredActiveTaskCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,58 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce
autoScaler.stop();
}

@Test
public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);

EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);

EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);

TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
Map<String, Object> modifiedScaleInProps = getScaleInProperties();

modifiedScaleInProps.put("taskCountMax", 20);
modifiedScaleInProps.put("taskCountMin", 15);

LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
modifiedScaleInProps,
LagBasedAutoScalerConfig.class
),
spec,
emitter
);

// enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();

Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
Thread.sleep(2000);
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());

autoScaler.reset();
autoScaler.stop();
}

@Test
public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException
{
Expand Down

0 comments on commit c57bd3b

Please sign in to comment.