Skip to content

Commit

Permalink
Refactor: Miscellaneous batch task cleanup (#16730)
Browse files Browse the repository at this point in the history
Changes
- No functional change
- Remove unused method `IndexTuningConfig.withPartitionsSpec()`
- Remove unused method `ParallelIndexTuningConfig.withPartitionsSpec()`
- Remove redundant method `CompactTask.emitIngestionModeMetrics()`
- Remove Clock argument from `CompactionTask.createDataSchemasForInterval()` as it was only needed
for one test which was just verifying the value passed by the test itself. The code now uses a `Stopwatch`
instead and test simply verifies that the metric has been emitted.
- Other minor cleanup changes
  • Loading branch information
kfaraz authored Jul 13, 2024
1 parent 3a1b437 commit a618c5d
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ private CompactionTask createCompactionTask(
new ClientCompactionTaskTransformSpec(dimFilter);
final CompactionTask.Builder builder = new CompactionTask.Builder(
DATA_SOURCE,
null,
null
);
IndexSpec indexSpec = createIndexSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
Expand All @@ -62,6 +61,7 @@
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
Expand Down Expand Up @@ -100,7 +100,6 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -125,8 +124,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
{
public static final String TYPE = "compact";
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();


/**
* The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager}
Expand Down Expand Up @@ -449,27 +446,12 @@ public boolean isPerfectRollup()
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}

@VisibleForTesting
void emitCompactIngestionModeMetrics(
ServiceEmitter emitter,
boolean isDropExisting
)
{

if (emitter == null) {
return;
}
emitMetric(emitter, "ingest/count", 1);
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
// emit metric for compact ingestion mode:
emitCompactIngestionModeMetrics(toolbox.getEmitter(), ioConfig.isDropExisting());
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

final Map<Interval, DataSchema> intervalDataSchemas = createDataSchemasForIntervals(
UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
Expand All @@ -489,13 +471,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

/**
* Generate dataschema for segments in each interval
* @return
* @throws IOException
* Generate dataschema for segments in each interval.
*
* @throws IOException if an exception occurs whie retrieving used segments to
* determine schemas.
*/
@VisibleForTesting
static Map<Interval, DataSchema> createDataSchemasForIntervals(
final Clock clock,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
Expand All @@ -506,13 +488,13 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = retrieveRelevantTimelineHolders(
final Iterable<DataSegment> timelineSegments = retrieveRelevantTimelineHolders(
toolbox,
segmentProvider,
lockGranularityInUse
);

if (timelineSegments.isEmpty()) {
if (!timelineSegments.iterator().hasNext()) {
return Collections.emptyMap();
}

Expand All @@ -524,7 +506,7 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
Comparators.intervalsByStartThenEnd()
);

for (final DataSegment dataSegment : VersionedIntervalTimeline.getAllObjects(timelineSegments)) {
for (final DataSegment dataSegment : timelineSegments) {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>())
.add(dataSegment);
}
Expand Down Expand Up @@ -557,7 +539,6 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
Expand All @@ -576,18 +557,17 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
VersionedIntervalTimeline.getAllObjects(timelineSegments),
timelineSegments,
DataSegment::getInterval
)
),
lazyFetchSegments(
VersionedIntervalTimeline.getAllObjects(timelineSegments),
timelineSegments,
toolbox.getSegmentCacheManager(),
toolbox.getIndexIO()
),
Expand All @@ -600,7 +580,7 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
}
}

private static List<TimelineObjectHolder<String, DataSegment>> retrieveRelevantTimelineHolders(
private static Iterable<DataSegment> retrieveRelevantTimelineHolders(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
Expand All @@ -612,11 +592,10 @@ private static List<TimelineObjectHolder<String, DataSegment>> retrieveRelevantT
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = SegmentTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return timelineSegments;
return VersionedIntervalTimeline.getAllObjects(timelineSegments);
}

private static DataSchema createDataSchema(
Clock clock,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder,
String dataSource,
Expand All @@ -636,24 +615,30 @@ private static DataSchema createDataSchema(
dimensionsSpec == null,
metricsSpec == null
);
long start = clock.millis();

final Stopwatch stopwatch = Stopwatch.createStarted();
try {
existingSegmentAnalyzer.fetchAndProcessIfNeeded();
}
finally {
if (emitter != null) {
emitter.emit(metricBuilder.setMetric("compact/segmentAnalyzer/fetchAndProcessMillis", clock.millis() - start));
emitter.emit(
metricBuilder.setMetric(
"compact/segmentAnalyzer/fetchAndProcessMillis",
stopwatch.millisElapsed()
)
);
}
}

final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
queryGranularityToUse = existingSegmentAnalyzer.getQueryGranularity();
log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse);
log.info("Generate compaction task spec with segments original query granularity[%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info(
"Generate compaction task spec with new query granularity overrided from input [%s]",
"Generate compaction task spec with new query granularity overrided from input[%s].",
queryGranularityToUse
);
}
Expand Down Expand Up @@ -1033,7 +1018,6 @@ public static class Builder
{
private final String dataSource;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
private final RetryPolicyFactory retryPolicyFactory;

private CompactionIOConfig ioConfig;
@Nullable
Expand All @@ -1054,13 +1038,11 @@ public static class Builder

public Builder(
String dataSource,
SegmentCacheManagerFactory segmentCacheManagerFactory,
RetryPolicyFactory retryPolicyFactory
SegmentCacheManagerFactory segmentCacheManagerFactory
)
{
this.dataSource = dataSource;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
this.retryPolicyFactory = retryPolicyFactory;
}

public Builder interval(Interval interval)
Expand Down Expand Up @@ -1288,7 +1270,9 @@ public CompactionTuningConfig(
);
}

@Override
/**
* Creates a copy of this tuning config with the partition spec changed.
*/
public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new CompactionTuningConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,31 +1471,6 @@ public IndexTuningConfig withBasePersistDirectory(File dir)
);
}

public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new IndexTuningConfig(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
maxPendingPersists,
forceGuaranteedRollup,
reportParseExceptions,
pushTimeout,
basePersistDirectory,
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
maxColumnsToMerge,
awaitSegmentAvailabilityTimeoutMillis,
numPersistThreads
);
}

@JsonProperty
@Override
public AppendableIndexSpec getAppendableIndexSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@

public class NativeCompactionRunner implements CompactionRunner
{
private static final Logger log = new Logger(NativeCompactionRunner.class);
public static final String TYPE = "native";

private static final Logger log = new Logger(NativeCompactionRunner.class);
private static final boolean STORE_COMPACTION_STATE = true;

@JsonIgnore
private final SegmentCacheManagerFactory segmentCacheManagerFactory;

@JsonIgnore
private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder(
(taskObject, config) -> {
Expand Down Expand Up @@ -183,7 +186,6 @@ public TaskStatus runCompactionTasks(
final PartitionConfigurationManager partitionConfigurationManager =
new NativeCompactionRunner.PartitionConfigurationManager(compactionTask.getTuningConfig());


final List<ParallelIndexIngestionSpec> ingestionSpecs = createIngestionSpecs(
intervalDataSchemaMap,
taskToolbox,
Expand Down Expand Up @@ -278,8 +280,11 @@ private TaskStatus runParallelIndexSubtasks(
return failCnt == 0 ? TaskStatus.success(compactionTaskId) : TaskStatus.failure(compactionTaskId, msg);
}

@VisibleForTesting
ParallelIndexSupervisorTask newTask(CompactionTask compactionTask, String baseSequenceName, ParallelIndexIngestionSpec ingestionSpec)
private ParallelIndexSupervisorTask newTask(
CompactionTask compactionTask,
String baseSequenceName,
ParallelIndexIngestionSpec ingestionSpec
)
{
return new ParallelIndexSupervisorTask(
compactionTask.getId(),
Expand All @@ -305,7 +310,6 @@ Map<String, Object> createContextForSubtask(CompactionTask compactionTask)
@VisibleForTesting
static class PartitionConfigurationManager
{
@Nullable
private final CompactionTask.CompactionTuningConfig tuningConfig;

PartitionConfigurationManager(@Nullable CompactionTask.CompactionTuningConfig tuningConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,45 +275,6 @@ public int getMaxAllowedLockCount()
return maxAllowedLockCount;
}

@Override
public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new ParallelIndexTuningConfig(
null,
null,
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
isSkipBytesInMemoryOverheadCheck(),
null,
null,
getSplitHintSpec(),
partitionsSpec,
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
getMaxPendingPersists(),
isForceGuaranteedRollup(),
isReportParseExceptions(),
getPushTimeout(),
getSegmentWriteOutMediumFactory(),
null,
getMaxNumConcurrentSubTasks(),
getMaxRetry(),
getTaskStatusCheckPeriodMs(),
getChatHandlerTimeout(),
getChatHandlerNumRetries(),
getMaxNumSegmentsToMerge(),
getTotalNumMergeTasks(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxColumnsToMerge(),
getAwaitSegmentAvailabilityTimeoutMillis(),
getMaxAllowedLockCount(),
getNumPersistThreads()
);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
Expand Down Expand Up @@ -340,8 +338,7 @@ private CompactionTask createCompactionTask(ClientCompactionTaskTransformSpec tr
{
CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder(
"datasource",
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER),
new RetryPolicyFactory(new RetryPolicyConfig())
new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER)
)
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
.tuningConfig(
Expand Down
Loading

0 comments on commit a618c5d

Please sign in to comment.