Skip to content

Commit

Permalink
One more case and add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekrb19 committed Jul 26, 2024
1 parent d2a92fe commit 3c186a2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,23 @@ private void killUnusedSegments(

int submittedTasks = 0;
for (String dataSource : datasourceCircularKillList) {
if (dataSource.equals(prevDatasourceKilled)) {
datasourceCircularKillList.advanceCursor();
if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) {
// Skip this dataSource if it's the same as the previous one and there are others left to kill.
continue;
} else {
prevDatasourceKilled = dataSource;
remainingDatasourcesToKill.remove(dataSource);
}

final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
remainingDatasourcesToKill.remove(dataSource);
if (remainingDatasourcesToKill.size() == 0) {
if (remainingDatasourcesToKill.isEmpty()) {
break;
}
continue;

}

try {
Expand All @@ -224,20 +228,11 @@ private void killUnusedSegments(
),
true
);
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
prevDatasourceKilled = dataSource;
++submittedTasks;
remainingDatasourcesToKill.remove(dataSource);

if (remainingDatasourcesToKill.size() == 0) {
break;
}
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());

if (submittedTasks >= availableKillTaskSlots) {
log.info(
"Submitted [%d] kill tasks and reached kill task slot limit [%d].",
submittedTasks, availableKillTaskSlots
);
// Check for termination conditions.
if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));

validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));

stats = runDutyAndGetStats();

Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Expand All @@ -270,9 +267,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

validateLastKillStateAndReset(DS3, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));

stats = runDutyAndGetStats();

Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Expand All @@ -281,20 +275,102 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));

validateLastKillStateAndReset(DS2, NEXT_DAY);
validateLastKillStateAndReset(DS3, NEXT_DAY);

stats = runDutyAndGetStats();

Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(7, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}

/**
* The set of datasources to kill change in consecutive runs. The kill duty should avoid selecting two
* consecutive datasources across runs as long as there are other datasources to kill.
*/
@Test
public void testKillInRoundRobinMannerWhenDatasourcesChange()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
dynamicConfigBuilder.withMaxKillTaskSlots(1);

createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));

initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();

Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));

createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));

stats = runDutyAndGetStats();

Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));

stats = runDutyAndGetStats();

Assert.assertEquals(3, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(3, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

stats = runDutyAndGetStats();

Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}

/**
* There is a single datasource to kill across multiple runs. The duty should keep picking the same datasource.
*/
@Test
public void testKillSingleDatasourceMultipleRuns()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
dynamicConfigBuilder.withMaxKillTaskSlots(2);

createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));

initDuty();
CoordinatorRunStats stats = runDutyAndGetStats();

Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

stats = runDutyAndGetStats();

Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

stats = runDutyAndGetStats();

Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));

validateLastKillStateAndReset(DS1, NEXT_MONTH);
validateLastKillStateAndReset(DS2, null);
validateLastKillStateAndReset(DS3, null);
}

/**
Expand Down

0 comments on commit 3c186a2

Please sign in to comment.