diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index e1a7656f23cd..365a9135e3c5 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -39,7 +39,6 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; @@ -429,9 +428,10 @@ protected SeekableStreamDataSourceMetadata createDataSourceMetad } @Override - public LagMetric getLagMetricForAutoScaler() + public long computeLagForAutoScaler() { - return LagMetric.MAX; + LagStats lagStats = computeLagStats(); + return lagStats == null ? 0L : lagStats.getMaxLag(); } private SeekableStreamDataSourceMetadata createDataSourceMetadataWithClosedOrExpiredPartitions( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 7813725733b1..f8618b06f74b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -21,7 +21,6 @@ import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.common.StringUtils; @@ -155,13 +154,8 @@ private Runnable computeAndCollectLag() LOCK.lock(); try { if (!spec.isSuspended()) { - LagStats lagStats = supervisor.computeLagStats(); - if (lagStats == null) { - lagMetricsQueue.offer(0L); - } else { - long lag = lagStats.get(supervisor.getLagMetricForAutoScaler()); - lagMetricsQueue.offer(lag > 0 ? lag : 0L); - } + long lag = supervisor.computeLagForAutoScaler(); + lagMetricsQueue.offer(lag > 0 ? lag : 0L); log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 8befa2adae33..9b9511cbf3da 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -95,11 +94,12 @@ default Boolean isHealthy() LagStats computeLagStats(); /** - * Used by AutoScaler to either scale by max/total/avg. + * Used by AutoScaler to make scaling decisions. */ - default LagMetric getLagMetricForAutoScaler() + default long computeLagForAutoScaler() { - return LagMetric.TOTAL; + LagStats lagStats = computeLagStats(); + return lagStats == null ? 0L : lagStats.getTotalLag(); } int getActiveTaskGroupsCount(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java deleted file mode 100644 index d3f00b5c2c83..000000000000 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.overlord.supervisor.autoscaler; - -public enum LagMetric -{ - TOTAL, - MAX, - AVERAGE; -} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java index c7a6dfc61328..7b6e5fd0bab1 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -46,17 +46,4 @@ public long getAvgLag() { return avgLag; } - - public long get(LagMetric metric) - { - switch (metric) { - case AVERAGE: - return avgLag; - case TOTAL: - return totalLag; - case MAX: - return maxLag; - } - throw new IllegalStateException("Unknown Metric"); - } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java similarity index 69% rename from server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java rename to server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java index b51882538b9c..79811079d341 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java @@ -19,24 +19,22 @@ package org.apache.druid.indexing.overlord.supervisor; -import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; -public class LagStatsTest +public class SupervisorTest { - @Test - public void lagStatsByMetric() + public void testAutoScalerLagComputation() { - int max = 1; - int avg = 2; - int total = 3; - LagStats lag = new LagStats(max, total, avg); + Supervisor supervisor = Mockito.spy(Supervisor.class); + + Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3)); + Assert.assertEquals(2, supervisor.computeLagForAutoScaler()); - Assert.assertEquals(max, lag.get(LagMetric.MAX)); - Assert.assertEquals(total, lag.get(LagMetric.TOTAL)); - Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE)); + Mockito.when(supervisor.computeLagStats()).thenReturn(null); + Assert.assertEquals(0, supervisor.computeLagForAutoScaler()); } }