diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 3b2ebd45aec4..3b552f00acf2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -197,19 +197,23 @@ private void killUnusedSegments( int submittedTasks = 0; for (String dataSource : datasourceCircularKillList) { - if (dataSource.equals(prevDatasourceKilled)) { - datasourceCircularKillList.advanceCursor(); + if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) { + // Skip this dataSource if it's the same as the previous one and there are others left to kill. + continue; + } else { + prevDatasourceKilled = dataSource; + remainingDatasourcesToKill.remove(dataSource); } final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod); final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats); if (intervalToKill == null) { datasourceToLastKillIntervalEnd.remove(dataSource); - remainingDatasourcesToKill.remove(dataSource); - if (remainingDatasourcesToKill.size() == 0) { + if (remainingDatasourcesToKill.isEmpty()) { break; } continue; + } try { @@ -224,20 +228,11 @@ private void killUnusedSegments( ), true ); - datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); - prevDatasourceKilled = dataSource; ++submittedTasks; - remainingDatasourcesToKill.remove(dataSource); - - if (remainingDatasourcesToKill.size() == 0) { - break; - } + datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); - if (submittedTasks >= availableKillTaskSlots) { - log.info( - "Submitted [%d] kill tasks and reached kill task slot limit [%d].", - submittedTasks, availableKillTaskSlots - ); + // Check for termination conditions. + if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) { break; } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 8bdfc63e213d..e2cbbdf5e434 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -259,9 +259,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner() Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY)); - validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd())); - validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd())); - stats = runDutyAndGetStats(); Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS)); @@ -270,9 +267,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner() Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY)); Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); - validateLastKillStateAndReset(DS3, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd())); - validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd())); - stats = runDutyAndGetStats(); Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS)); @@ -281,9 +275,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner() Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY)); Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY)); - validateLastKillStateAndReset(DS2, NEXT_DAY); - validateLastKillStateAndReset(DS3, NEXT_DAY); - stats = runDutyAndGetStats(); Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS)); @@ -291,10 +282,95 @@ public void testKillMultipleDatasourcesInRoundRobinManner() Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS)); Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY)); + } + + /** + * The set of datasources to kill change in consecutive runs. The kill duty should avoid selecting two + * consecutive datasources across runs as long as there are other datasources to kill. + */ + @Test + public void testKillInRoundRobinMannerWhenDatasourcesChange() + { + configBuilder.withIgnoreDurationToRetain(true) + .withMaxSegmentsToKill(2); + dynamicConfigBuilder.withMaxKillTaskSlots(1); + + createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1)); + + initDuty(); + CoordinatorRunStats stats = runDutyAndGetStats(); + + Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + + validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd())); + + createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1)); + + stats = runDutyAndGetStats(); + + Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY)); + + stats = runDutyAndGetStats(); + + Assert.assertEquals(3, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(3, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + + stats = runDutyAndGetStats(); + + Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY)); + } + + /** + * There is a single datasource to kill across multiple runs. The duty should keep picking the same datasource. + */ + @Test + public void testKillSingleDatasourceMultipleRuns() + { + configBuilder.withIgnoreDurationToRetain(true) + .withMaxSegmentsToKill(2); + dynamicConfigBuilder.withMaxKillTaskSlots(2); + + createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1)); + createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1)); + + initDuty(); + CoordinatorRunStats stats = runDutyAndGetStats(); + + Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + + stats = runDutyAndGetStats(); + + Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); + + stats = runDutyAndGetStats(); + + Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS)); + Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS)); + Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS)); + Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); - validateLastKillStateAndReset(DS1, NEXT_MONTH); - validateLastKillStateAndReset(DS2, null); - validateLastKillStateAndReset(DS3, null); } /**