Skip to content

Commit

Permalink
Allow CompactionSegmentIterator to have custom priority (apache#16737)
Browse files Browse the repository at this point in the history
Changes:
- Break `NewestSegmentFirstIterator` into two parts
  - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator`
  but now handles a single datasource and allows a priority to be specified
  - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and
  combines the results into a single queue to be used by a compaction search policy
- Update `NewestSegmentFirstPolicy` to use the above new classes
- Cleanup `CompactionStatistics` and `AutoCompactionSnapshot`
- Cleanup `CompactSegments`
- Remove unused methods from `Tasks`
- Remove unneeded `TasksTest`
- Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest`
and `DatasourceCompactibleSegmentIteratorTest`
  • Loading branch information
kfaraz authored Jul 16, 2024
1 parent 6cf6838 commit 01d67ae
Show file tree
Hide file tree
Showing 18 changed files with 587 additions and 935 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void setup()
@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@

import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class Tasks
Expand Down Expand Up @@ -63,44 +56,19 @@ public class Tasks
* Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details.
*
* <p>
* The value of this flag is true by default which corresponds to the old method
* of estimation.
*/
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";

/**
* This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
* Context flag to denote if segments published to metadata by a task should
* have the {@code lastCompactionState} field set.
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";

static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
}

public static SortedSet<Interval> computeCondensedIntervals(SortedSet<Interval> intervals)
{
final SortedSet<Interval> condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
List<Interval> toBeAccumulated = new ArrayList<>();
for (Interval interval : intervals) {
if (toBeAccumulated.size() == 0) {
toBeAccumulated.add(interval);
} else {
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
toBeAccumulated.add(interval);
} else {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
toBeAccumulated.clear();
toBeAccumulated.add(interval);
}
}
}
if (toBeAccumulated.size() > 0) {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
}
return condensedIntervals;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;

Expand Down Expand Up @@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig
@Nullable
private final AppendableIndexSpec appendableIndexSpec;

public static ClientCompactionTaskQueryTuningConfig from(
DataSourceCompactionConfig compactionConfig
)
{
if (compactionConfig == null) {
return from(null, null, null);
} else {
return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null);
}
}

public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;

import javax.validation.constraints.NotNull;
import java.util.Objects;
Expand Down Expand Up @@ -193,15 +194,9 @@ public static class Builder
private final String dataSource;
private final AutoCompactionScheduleStatus scheduleStatus;

private long bytesAwaitingCompaction;
private long bytesCompacted;
private long bytesSkipped;
private long segmentCountAwaitingCompaction;
private long segmentCountCompacted;
private long segmentCountSkipped;
private long intervalCountAwaitingCompaction;
private long intervalCountCompacted;
private long intervalCountSkipped;
private final CompactionStatistics compactedStats = new CompactionStatistics();
private final CompactionStatistics skippedStats = new CompactionStatistics();
private final CompactionStatistics waitingStats = new CompactionStatistics();

private Builder(
@NotNull String dataSource,
Expand All @@ -217,85 +212,37 @@ private Builder(

this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.bytesAwaitingCompaction = 0;
this.bytesCompacted = 0;
this.bytesSkipped = 0;
this.segmentCountAwaitingCompaction = 0;
this.segmentCountCompacted = 0;
this.segmentCountSkipped = 0;
this.intervalCountAwaitingCompaction = 0;
this.intervalCountCompacted = 0;
this.intervalCountSkipped = 0;
}

public Builder incrementBytesAwaitingCompaction(long incrementValue)
public void incrementWaitingStats(CompactionStatistics entry)
{
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
return this;
waitingStats.increment(entry);
}

public Builder incrementBytesCompacted(long incrementValue)
public void incrementCompactedStats(CompactionStatistics entry)
{
this.bytesCompacted = this.bytesCompacted + incrementValue;
return this;
compactedStats.increment(entry);
}

public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
public void incrementSkippedStats(CompactionStatistics entry)
{
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
return this;
}

public Builder incrementSegmentCountCompacted(long incrementValue)
{
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
return this;
}

public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
{
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
return this;
}

public Builder incrementIntervalCountCompacted(long incrementValue)
{
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
return this;
}

public Builder incrementBytesSkipped(long incrementValue)
{
this.bytesSkipped = this.bytesSkipped + incrementValue;
return this;
}

public Builder incrementSegmentCountSkipped(long incrementValue)
{
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
return this;
}

public Builder incrementIntervalCountSkipped(long incrementValue)
{
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
return this;
skippedStats.increment(entry);
}

public AutoCompactionSnapshot build()
{
return new AutoCompactionSnapshot(
dataSource,
scheduleStatus,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
segmentCountAwaitingCompaction,
segmentCountCompacted,
segmentCountSkipped,
intervalCountAwaitingCompaction,
intervalCountCompacted,
intervalCountSkipped
waitingStats.getTotalBytes(),
compactedStats.getTotalBytes(),
skippedStats.getTotalBytes(),
waitingStats.getNumSegments(),
compactedStats.getNumSegments(),
skippedStats.getNumSegments(),
waitingStats.getNumIntervals(),
compactedStats.getNumIntervals(),
skippedStats.getNumIntervals()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
public interface CompactionSegmentSearchPolicy
{
/**
* Reset the current states of this policy. This method should be called whenever iterating starts.
* Creates an iterator that returns compactible segments.
*/
CompactionSegmentIterator reset(
CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ public class CompactionStatistics
private long numSegments;
private long numIntervals;

public static CompactionStatistics create()
public static CompactionStatistics create(long bytes, long numSegments, long numIntervals)
{
return new CompactionStatistics();
final CompactionStatistics stats = new CompactionStatistics();
stats.totalBytes = bytes;
stats.numIntervals = numIntervals;
stats.numSegments = numSegments;
return stats;
}

public long getTotalBytes()
Expand All @@ -48,10 +52,10 @@ public long getNumIntervals()
return numIntervals;
}

public void addFrom(SegmentsToCompact segments)
public void increment(CompactionStatistics other)
{
totalBytes += segments.getTotalBytes();
numIntervals += segments.getNumIntervals();
numSegments += segments.size();
totalBytes += other.getTotalBytes();
numIntervals += other.getNumIntervals();
numSegments += other.getNumSegments();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,7 @@ private Evaluator(
this.objectMapper = objectMapper;
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
this.compactionConfig = compactionConfig;
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
compactionConfig.getTuningConfig(),
compactionConfig.getMaxRowsPerSegment(),
null
);

this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
if (lastCompactionState == null) {
this.existingGranularitySpec = null;
Expand Down
Loading

0 comments on commit 01d67ae

Please sign in to comment.