Skip to content

Commit

Permalink
Add method Supervisor.computeLagForAutoScaler (apache#16314)
Browse files Browse the repository at this point in the history
Tries to address the comments made on apache#16284 after merged.

Changes:
- Remove method `Supervisor.getLagMetric()`
- Add method `Supervisor.computeLagForAutoScaler()`
- Remove classes `LagMetric` and `LagMetricTest`
  • Loading branch information
adithyachakilam authored Apr 20, 2024
1 parent 3e42ebb commit cff5d1e
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
Expand Down Expand Up @@ -429,9 +428,10 @@ protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetad
}

@Override
public LagMetric getLagMetricForAutoScaler()
public long computeLagForAutoScaler()
{
return LagMetric.MAX;
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getMaxLag();
}

private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -155,13 +154,8 @@ private Runnable computeAndCollectLag()
LOCK.lock();
try {
if (!spec.isSuspended()) {
LagStats lagStats = supervisor.computeLagStats();
if (lagStats == null) {
lagMetricsQueue.offer(0L);
} else {
long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
}
long lag = supervisor.computeLagForAutoScaler();
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;

Expand Down Expand Up @@ -95,11 +94,12 @@ default Boolean isHealthy()
LagStats computeLagStats();

/**
* Used by AutoScaler to either scale by max/total/avg.
* Used by AutoScaler to make scaling decisions.
*/
default LagMetric getLagMetricForAutoScaler()
default long computeLagForAutoScaler()
{
return LagMetric.TOTAL;
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getTotalLag();
}

int getActiveTaskGroupsCount();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,4 @@ public long getAvgLag()
{
return avgLag;
}

public long get(LagMetric metric)
{
switch (metric) {
case AVERAGE:
return avgLag;
case TOTAL:
return totalLag;
case MAX:
return maxLag;
}
throw new IllegalStateException("Unknown Metric");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@

package org.apache.druid.indexing.overlord.supervisor;

import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class LagStatsTest
public class SupervisorTest
{

@Test
public void lagStatsByMetric()
public void testAutoScalerLagComputation()
{
int max = 1;
int avg = 2;
int total = 3;
LagStats lag = new LagStats(max, total, avg);
Supervisor supervisor = Mockito.spy(Supervisor.class);

Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
Assert.assertEquals(2, supervisor.computeLagForAutoScaler());

Assert.assertEquals(max, lag.get(LagMetric.MAX));
Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
Mockito.when(supervisor.computeLagStats()).thenReturn(null);
Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
}
}

0 comments on commit cff5d1e

Please sign in to comment.