diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index ec81c5f9f99b..22e36841199b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -216,17 +216,16 @@ private int computeDesiredTaskCount(List lags) int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); int desiredActiveTaskCount; + int partitionCount = supervisor.getPartitionCount(); + if (partitionCount <= 0) { + log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource); + return -1; + } if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { // Do Scale out int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); - int partitionCount = supervisor.getPartitionCount(); - if (partitionCount <= 0) { - log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource); - return -1; - } - int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); if (currentActiveTaskCount == actualTaskCountMax) { log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", @@ -248,7 +247,8 @@ private int computeDesiredTaskCount(List lags) if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { // Do Scale in int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); - if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) { + int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + if (currentActiveTaskCount == actualTaskCountMin) { log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", dataSource ); @@ -260,7 +260,7 @@ private int computeDesiredTaskCount(List lags) .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); return -1; } else { - desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin()); + desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin); } return desiredActiveTaskCount; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index baff5fc765b2..3281360f5806 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -992,6 +992,58 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce autoScaler.stop(); } + @Test + public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() throws InterruptedException + { + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); + Map modifiedScaleInProps = getScaleInProperties(); + + modifiedScaleInProps.put("taskCountMax", 20); + modifiedScaleInProps.put("taskCountMin", 15); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( + supervisor, + DATASOURCE, + mapper.convertValue( + modifiedScaleInProps, + LagBasedAutoScalerConfig.class + ), + spec, + emitter + ); + + // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. + Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + + Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); + Thread.sleep(2000); + Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); + + autoScaler.reset(); + autoScaler.stop(); + } + @Test public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException {