Skip to content

Commit

Permalink
Review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekrb19 committed Jul 26, 2024
1 parent 52dc479 commit 9263b9c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,30 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;

/**
* A circular list that is backed by an ordered list of elements containing no duplicates. The list is ordered by the
* supplied comparator. Callers are responsible for terminating the iterator explicitly.
* supplied comparator. The iterator keeps track of the current position, so iterating the list multiple times will
* resume from the last location and continue until a caller explicitly terminates it.
* <p>
* This class is not thread-safe and must be used from a single thread.
*/
@NotThreadSafe
public class CircularList<T> implements Iterable<T>
{
private final List<T> collection = new ArrayList<>();
private final Comparator<? super T> comparator;
private final List<T> elements = new ArrayList<>();
private int currentPosition;

public CircularList(final Set<T> elements, Comparator<? super T> comparator)
public CircularList(final Set<T> elements, final Comparator<? super T> comparator)
{
this.collection.addAll(elements);
this.comparator = comparator;
this.collection.sort(comparator);
this.elements.addAll(elements);
this.elements.sort(comparator);
this.currentPosition = -1;
}

@Override
Expand All @@ -55,7 +56,7 @@ public Iterator<T> iterator()
@Override
public boolean hasNext()
{
return collection.size() > 0;
return elements.size() > 0;
}

@Override
Expand All @@ -65,20 +66,13 @@ public T next()
throw new NoSuchElementException();
}

T nextCandidate = peekNext();
advanceCursor();
return nextCandidate;
}

private T peekNext()
{
int nextPosition = currentPosition < collection.size() ? currentPosition : 0;
return collection.get(nextPosition);
return elements.get(currentPosition);
}

private void advanceCursor()
{
if (++currentPosition >= collection.size()) {
if (++currentPosition >= elements.size()) {
currentPosition = 0;
}
}
Expand All @@ -90,8 +84,6 @@ private void advanceCursor()
*/
public boolean equalsSet(final Set<T> inputSet)
{
final List<T> sortedList = new ArrayList<>(inputSet);
sortedList.sort(comparator);
return collection.equals(sortedList);
return new HashSet<>(elements).equals(inputSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ public void testIterateInReverseOrder()
Assert.assertEquals(ImmutableList.of(100, 0, -1, -4, 100, 0, -1, -4), observedElements);
}

@Test
public void testIteratorResumesFromLastPosition()
{
final Set<String> input = ImmutableSet.of("a", "b", "c", "d", "e", "f");
final CircularList<String> circularList = new CircularList<>(input, Comparator.naturalOrder());

List<String> observedElements = new ArrayList<>();
int cnt = 0;
for (String element : circularList) {
observedElements.add(element);
if (++cnt >= input.size() / 2) {
break;
}
}

Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);

observedElements = new ArrayList<>();
for (String element : circularList) {
observedElements.add(element);
// Resume and go till the end, and add two more elements looping around
if (++cnt == input.size() + 2) {
break;
}
}

Assert.assertEquals(ImmutableList.of("d", "e", "f", "a", "b"), observedElements);
}

@Test
public void testEqualsSet()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void killUnusedSegments(
int submittedTasks = 0;
for (String dataSource : datasourceCircularKillList) {
if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) {
// Skip this dataSource if it's the same as the previous one and there are others left to kill.
// Skip this dataSource if it's the same as the previous one and there are remaining datasources to kill.
continue;
} else {
prevDatasourceKilled = dataSource;
Expand All @@ -209,11 +209,11 @@ private void killUnusedSegments(
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
// If no interval is found for this datasource, either terminate or continue based on remaining datasources to kill.
if (remainingDatasourcesToKill.isEmpty()) {
break;
}
continue;

}

try {
Expand All @@ -231,6 +231,7 @@ private void killUnusedSegments(
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());

// Termination conditions.
if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testKillWithMultipleDatasources()
* slots. Running the kill duty each time should pick at least one unique datasource in a round-robin manner.
*/
@Test
public void testKillMultipleDatasourcesInRoundRobinManner()
public void testRoundRobinKillMultipleDatasources()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
* consecutive datasources across runs as long as there are other datasources to kill.
*/
@Test
public void testKillInRoundRobinMannerWhenDatasourcesChange()
public void testRoundRobinKillWhenDatasourcesChange()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
Expand Down Expand Up @@ -335,9 +335,6 @@ public void testKillInRoundRobinMannerWhenDatasourcesChange()
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}

/**
* There is a single datasource to kill across multiple runs. The duty should keep picking the same datasource.
*/
@Test
public void testKillSingleDatasourceMultipleRuns()
{
Expand Down Expand Up @@ -901,7 +898,7 @@ private void validateLastKillStateAndReset(final String dataSource, @Nullable fi
overlordClient.deleteLastKillInterval(dataSource);
}

private void createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
private DataSegment createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
{
final DataSegment segment = createSegment(dataSource, interval, version);
try {
Expand All @@ -910,6 +907,7 @@ private void createAndAddUsedSegment(final String dataSource, final Interval int
catch (IOException e) {
throw new RuntimeException(e);
}
return segment;
}

private void createAndAddUnusedSegment(
Expand All @@ -919,13 +917,7 @@ private void createAndAddUnusedSegment(
final DateTime lastUpdatedTime
)
{
final DataSegment segment = createSegment(dataSource, interval, version);
try {
SqlSegmentsMetadataManagerTestBase.publishSegment(connector, config, TestHelper.makeJsonMapper(), segment);
}
catch (IOException e) {
throw new RuntimeException(e);
}
final DataSegment segment = createAndAddUsedSegment(dataSource, interval, version);
sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId()));
derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime);
}
Expand Down

0 comments on commit 9263b9c

Please sign in to comment.