From a618c5dd0dea8917986166e1d4d988f7101ec88d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 12 Jul 2024 19:42:51 -0700 Subject: [PATCH] Refactor: Miscellaneous batch task cleanup (#16730) Changes - No functional change - Remove unused method `IndexTuningConfig.withPartitionsSpec()` - Remove unused method `ParallelIndexTuningConfig.withPartitionsSpec()` - Remove redundant method `CompactTask.emitIngestionModeMetrics()` - Remove Clock argument from `CompactionTask.createDataSchemasForInterval()` as it was only needed for one test which was just verifying the value passed by the test itself. The code now uses a `Stopwatch` instead and test simply verifies that the metric has been emitted. - Other minor cleanup changes --- .../msq/indexing/MSQCompactionRunnerTest.java | 1 - .../indexing/common/task/CompactionTask.java | 70 ++++++-------- .../druid/indexing/common/task/IndexTask.java | 25 ----- .../common/task/NativeCompactionRunner.java | 14 ++- .../parallel/ParallelIndexTuningConfig.java | 39 -------- .../ClientCompactionTaskQuerySerdeTest.java | 5 +- .../task/CompactionTaskParallelRunTest.java | 45 +++------ .../common/task/CompactionTaskRunTest.java | 66 ++++--------- .../common/task/CompactionTaskTest.java | 96 ++++--------------- ...stractParallelIndexSupervisorTaskTest.java | 20 ---- .../ParallelIndexSupervisorTaskKillTest.java | 35 ++----- ...rallelIndexSupervisorTaskResourceTest.java | 20 +--- .../batch/parallel/PartialCompactionTest.java | 3 +- 13 files changed, 104 insertions(+), 335 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 35eca8cfcb4f..6f1a4396ada9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -369,7 +369,6 @@ private CompactionTask createCompactionTask( new ClientCompactionTaskTransformSpec(dimFilter); final CompactionTask.Builder builder = new CompactionTask.Builder( DATA_SOURCE, - null, null ); IndexSpec indexSpec = createIndexSpec(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index fe4d09d8481a..8659eb0f397e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -48,7 +48,6 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; @@ -62,6 +61,7 @@ import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -100,7 +100,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -125,8 +124,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg { public static final String TYPE = "compact"; private static final Logger log = new Logger(CompactionTask.class); - private static final Clock UTC_CLOCK = Clock.systemUTC(); - /** * The CompactionTask creates and runs multiple IndexTask instances. When the {@link AppenderatorsManager} @@ -449,27 +446,12 @@ public boolean isPerfectRollup() return tuningConfig != null && tuningConfig.isForceGuaranteedRollup(); } - @VisibleForTesting - void emitCompactIngestionModeMetrics( - ServiceEmitter emitter, - boolean isDropExisting - ) - { - - if (emitter == null) { - return; - } - emitMetric(emitter, "ingest/count", 1); - } - @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { - // emit metric for compact ingestion mode: - emitCompactIngestionModeMetrics(toolbox.getEmitter(), ioConfig.isDropExisting()); + emitMetric(toolbox.getEmitter(), "ingest/count", 1); final Map intervalDataSchemas = createDataSchemasForIntervals( - UTC_CLOCK, toolbox, getTaskLockHelper().getLockGranularityToUse(), segmentProvider, @@ -489,13 +471,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } /** - * Generate dataschema for segments in each interval - * @return - * @throws IOException + * Generate dataschema for segments in each interval. + * + * @throws IOException if an exception occurs whie retrieving used segments to + * determine schemas. */ @VisibleForTesting static Map createDataSchemasForIntervals( - final Clock clock, final TaskToolbox toolbox, final LockGranularity lockGranularityInUse, final SegmentProvider segmentProvider, @@ -506,13 +488,13 @@ static Map createDataSchemasForIntervals( final ServiceMetricEvent.Builder metricBuilder ) throws IOException { - final List> timelineSegments = retrieveRelevantTimelineHolders( + final Iterable timelineSegments = retrieveRelevantTimelineHolders( toolbox, segmentProvider, lockGranularityInUse ); - if (timelineSegments.isEmpty()) { + if (!timelineSegments.iterator().hasNext()) { return Collections.emptyMap(); } @@ -524,7 +506,7 @@ static Map createDataSchemasForIntervals( Comparators.intervalsByStartThenEnd() ); - for (final DataSegment dataSegment : VersionedIntervalTimeline.getAllObjects(timelineSegments)) { + for (final DataSegment dataSegment : timelineSegments) { intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>()) .add(dataSegment); } @@ -557,7 +539,6 @@ static Map createDataSchemasForIntervals( // creates new granularitySpec and set segmentGranularity Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); final DataSchema dataSchema = createDataSchema( - clock, toolbox.getEmitter(), metricBuilder, segmentProvider.dataSource, @@ -576,18 +557,17 @@ static Map createDataSchemasForIntervals( } else { // given segment granularity final DataSchema dataSchema = createDataSchema( - clock, toolbox.getEmitter(), metricBuilder, segmentProvider.dataSource, JodaUtils.umbrellaInterval( Iterables.transform( - VersionedIntervalTimeline.getAllObjects(timelineSegments), + timelineSegments, DataSegment::getInterval ) ), lazyFetchSegments( - VersionedIntervalTimeline.getAllObjects(timelineSegments), + timelineSegments, toolbox.getSegmentCacheManager(), toolbox.getIndexIO() ), @@ -600,7 +580,7 @@ static Map createDataSchemasForIntervals( } } - private static List> retrieveRelevantTimelineHolders( + private static Iterable retrieveRelevantTimelineHolders( TaskToolbox toolbox, SegmentProvider segmentProvider, LockGranularity lockGranularityInUse @@ -612,11 +592,10 @@ private static List> retrieveRelevantT final List> timelineSegments = SegmentTimeline .forSegments(usedSegments) .lookup(segmentProvider.interval); - return timelineSegments; + return VersionedIntervalTimeline.getAllObjects(timelineSegments); } private static DataSchema createDataSchema( - Clock clock, ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder, String dataSource, @@ -636,24 +615,30 @@ private static DataSchema createDataSchema( dimensionsSpec == null, metricsSpec == null ); - long start = clock.millis(); + + final Stopwatch stopwatch = Stopwatch.createStarted(); try { existingSegmentAnalyzer.fetchAndProcessIfNeeded(); } finally { if (emitter != null) { - emitter.emit(metricBuilder.setMetric("compact/segmentAnalyzer/fetchAndProcessMillis", clock.millis() - start)); + emitter.emit( + metricBuilder.setMetric( + "compact/segmentAnalyzer/fetchAndProcessMillis", + stopwatch.millisElapsed() + ) + ); } } final Granularity queryGranularityToUse; if (granularitySpec.getQueryGranularity() == null) { queryGranularityToUse = existingSegmentAnalyzer.getQueryGranularity(); - log.info("Generate compaction task spec with segments original query granularity [%s]", queryGranularityToUse); + log.info("Generate compaction task spec with segments original query granularity[%s]", queryGranularityToUse); } else { queryGranularityToUse = granularitySpec.getQueryGranularity(); log.info( - "Generate compaction task spec with new query granularity overrided from input [%s]", + "Generate compaction task spec with new query granularity overrided from input[%s].", queryGranularityToUse ); } @@ -1033,7 +1018,6 @@ public static class Builder { private final String dataSource; private final SegmentCacheManagerFactory segmentCacheManagerFactory; - private final RetryPolicyFactory retryPolicyFactory; private CompactionIOConfig ioConfig; @Nullable @@ -1054,13 +1038,11 @@ public static class Builder public Builder( String dataSource, - SegmentCacheManagerFactory segmentCacheManagerFactory, - RetryPolicyFactory retryPolicyFactory + SegmentCacheManagerFactory segmentCacheManagerFactory ) { this.dataSource = dataSource; this.segmentCacheManagerFactory = segmentCacheManagerFactory; - this.retryPolicyFactory = retryPolicyFactory; } public Builder interval(Interval interval) @@ -1288,7 +1270,9 @@ public CompactionTuningConfig( ); } - @Override + /** + * Creates a copy of this tuning config with the partition spec changed. + */ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) { return new CompactionTuningConfig( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index cc253f46a520..dc6c07b6b83c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1471,31 +1471,6 @@ public IndexTuningConfig withBasePersistDirectory(File dir) ); } - public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) - { - return new IndexTuningConfig( - appendableIndexSpec, - maxRowsInMemory, - maxBytesInMemory, - skipBytesInMemoryOverheadCheck, - partitionsSpec, - indexSpec, - indexSpecForIntermediatePersists, - maxPendingPersists, - forceGuaranteedRollup, - reportParseExceptions, - pushTimeout, - basePersistDirectory, - segmentWriteOutMediumFactory, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions, - maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis, - numPersistThreads - ); - } - @JsonProperty @Override public AppendableIndexSpec getAppendableIndexSpec() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java index 722c6010b206..f2eacb8c1c6c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java @@ -56,11 +56,14 @@ public class NativeCompactionRunner implements CompactionRunner { - private static final Logger log = new Logger(NativeCompactionRunner.class); public static final String TYPE = "native"; + + private static final Logger log = new Logger(NativeCompactionRunner.class); private static final boolean STORE_COMPACTION_STATE = true; + @JsonIgnore private final SegmentCacheManagerFactory segmentCacheManagerFactory; + @JsonIgnore private final CurrentSubTaskHolder currentSubTaskHolder = new CurrentSubTaskHolder( (taskObject, config) -> { @@ -183,7 +186,6 @@ public TaskStatus runCompactionTasks( final PartitionConfigurationManager partitionConfigurationManager = new NativeCompactionRunner.PartitionConfigurationManager(compactionTask.getTuningConfig()); - final List ingestionSpecs = createIngestionSpecs( intervalDataSchemaMap, taskToolbox, @@ -278,8 +280,11 @@ private TaskStatus runParallelIndexSubtasks( return failCnt == 0 ? TaskStatus.success(compactionTaskId) : TaskStatus.failure(compactionTaskId, msg); } - @VisibleForTesting - ParallelIndexSupervisorTask newTask(CompactionTask compactionTask, String baseSequenceName, ParallelIndexIngestionSpec ingestionSpec) + private ParallelIndexSupervisorTask newTask( + CompactionTask compactionTask, + String baseSequenceName, + ParallelIndexIngestionSpec ingestionSpec + ) { return new ParallelIndexSupervisorTask( compactionTask.getId(), @@ -305,7 +310,6 @@ Map createContextForSubtask(CompactionTask compactionTask) @VisibleForTesting static class PartitionConfigurationManager { - @Nullable private final CompactionTask.CompactionTuningConfig tuningConfig; PartitionConfigurationManager(@Nullable CompactionTask.CompactionTuningConfig tuningConfig) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 7e33261ee7ce..d97d9beff343 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -275,45 +275,6 @@ public int getMaxAllowedLockCount() return maxAllowedLockCount; } - @Override - public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) - { - return new ParallelIndexTuningConfig( - null, - null, - getAppendableIndexSpec(), - getMaxRowsInMemory(), - getMaxBytesInMemory(), - isSkipBytesInMemoryOverheadCheck(), - null, - null, - getSplitHintSpec(), - partitionsSpec, - getIndexSpec(), - getIndexSpecForIntermediatePersists(), - getMaxPendingPersists(), - isForceGuaranteedRollup(), - isReportParseExceptions(), - getPushTimeout(), - getSegmentWriteOutMediumFactory(), - null, - getMaxNumConcurrentSubTasks(), - getMaxRetry(), - getTaskStatusCheckPeriodMs(), - getChatHandlerTimeout(), - getChatHandlerNumRetries(), - getMaxNumSegmentsToMerge(), - getTotalNumMergeTasks(), - isLogParseExceptions(), - getMaxParseExceptions(), - getMaxSavedParseExceptions(), - getMaxColumnsToMerge(), - getAwaitSegmentAvailabilityTimeoutMillis(), - getMaxAllowedLockCount(), - getNumPersistThreads() - ); - } - @Override public boolean equals(Object o) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 3d6c8085c98b..fcded3ab9eeb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -43,8 +43,6 @@ import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; -import org.apache.druid.indexing.common.RetryPolicyConfig; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; @@ -340,8 +338,7 @@ private CompactionTask createCompactionTask(ClientCompactionTaskTransformSpec tr { CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder( "datasource", - new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER), - new RetryPolicyFactory(new RetryPolicyConfig()) + new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER) ) .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true) .tuningConfig( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index ba9a6e3e2be2..188ea3cdd071 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -41,8 +41,6 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.RetryPolicyConfig; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.CompactionTask.Builder; @@ -114,7 +112,6 @@ public static Iterable constructorFeeder() } private static final String DATA_SOURCE = "test"; - private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private static final Interval INTERVAL_TO_INDEX = Intervals.of("2014-01-01/2014-01-02"); private final LockGranularity lockGranularity; @@ -160,8 +157,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -215,8 +211,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -281,8 +276,7 @@ public void testRunParallelWithRangePartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -332,8 +326,7 @@ public void testRunParallelWithRangePartitioningAndNoUpfrontSegmentFetching() th final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder @@ -395,8 +388,7 @@ public void testRunParallelWithMultiDimensionRangePartitioning() throws Exceptio final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -449,8 +441,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -500,8 +491,7 @@ public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() t final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -551,8 +541,7 @@ public void testRunCompactionStateNotStoreIfContextSetToFalse() final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -582,8 +571,7 @@ public void testRunCompactionWithFilterShouldStoreInState() throws Exception final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -639,8 +627,7 @@ public void testRunCompactionWithNewMetricsShouldStoreInState() throws Exception final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -702,8 +689,7 @@ public void testCompactHashAndDynamicPartitionedSegments() runIndexTask(null, true); final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -752,8 +738,7 @@ public void testCompactRangeAndDynamicPartitionedSegments() runIndexTask(null, true); final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) @@ -844,8 +829,7 @@ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet() throws Ex final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder // Set the dropExisting flag to true in the IOConfig of the compaction task @@ -891,8 +875,7 @@ public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet() throws Exception final Builder builder = new Builder( DATA_SOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); final CompactionTask compactionTask = builder .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 599a24fac802..54902d5f7c64 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -45,8 +45,6 @@ import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.RetryPolicyConfig; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -180,7 +178,6 @@ public static Iterable constructorFeeder() } private static final String DATA_SOURCE = "test"; - private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private final OverlordClient overlordClient; private final CoordinatorClient coordinatorClient; private final SegmentCacheManagerFactory segmentCacheManagerFactory; @@ -284,8 +281,7 @@ public void testRunWithDynamicPartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -352,8 +348,7 @@ public void testRunWithHashPartitioning() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -452,8 +447,7 @@ public void testRunCompactionTwice() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask1 = builder @@ -547,8 +541,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -659,8 +652,7 @@ public void testWithSegmentGranularity() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity @@ -729,8 +721,7 @@ public void testWithSegmentGranularityMisalignedInterval() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask1 = builder @@ -764,8 +755,7 @@ public void testWithSegmentGranularityMisalignedIntervalAllowed() throws Excepti final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity @@ -809,8 +799,7 @@ public void testCompactionWithFilterInTransformSpec() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity @@ -869,8 +858,7 @@ public void testCompactionWithNewMetricInMetricsSpec() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity @@ -935,8 +923,7 @@ public void testWithGranularitySpecNonNullSegmentGranularityAndNullQueryGranular final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity @@ -1004,8 +991,7 @@ public void testWithGranularitySpecNonNullQueryGranularityAndNullSegmentGranular final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day queryGranularity @@ -1058,8 +1044,7 @@ public void testWithGranularitySpecNonNullQueryGranularityAndNonNullSegmentGranu final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // day segmentGranularity and day queryGranularity @@ -1097,8 +1082,7 @@ public void testWithGranularitySpecNullQueryGranularityAndNullSegmentGranularity final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask1 = builder @@ -1150,8 +1134,7 @@ public void testCompactThenAppend() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -1212,8 +1195,7 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // Setup partial compaction: @@ -1368,8 +1350,7 @@ public void testCompactDatasourceOverIntervalWithOnlyTombstones() throws Excepti final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); // Setup partial interval compaction: @@ -1476,8 +1457,7 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); @@ -1548,8 +1528,7 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -1603,8 +1582,7 @@ public void testRunIndexAndCompactForSameSegmentAtTheSameTime2() throws Exceptio final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -1694,8 +1672,7 @@ public void testRunWithSpatialDimensions() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder @@ -1826,8 +1803,7 @@ public void testRunWithAutoCastDimensions() throws Exception final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask compactionTask = builder diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 48a7932a2414..b138a25469f1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -57,8 +57,6 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; -import org.apache.druid.indexing.common.RetryPolicyConfig; -import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; @@ -84,8 +82,6 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.java.util.emitter.core.NoopEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -148,15 +144,12 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -198,7 +191,6 @@ public class CompactionTaskTest private static final Map SEGMENT_MAP = new HashMap<>(); private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP); private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig()); private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT = "Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n" + "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity"; @@ -375,8 +367,6 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig() @Rule public ExpectedException expectedException = ExpectedException.none(); - @Mock - private Clock clock; private StubServiceEmitter emitter; @Before @@ -389,7 +379,6 @@ public void setup() testIndexIO, SEGMENT_MAP ); - Mockito.when(clock.millis()).thenReturn(0L, 10_000L); segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, OBJECT_MAPPER); } @@ -398,8 +387,7 @@ public void testCreateCompactionTaskWithGranularitySpec() { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -408,8 +396,7 @@ public void testCreateCompactionTaskWithGranularitySpec() final Builder builder2 = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder2.tuningConfig(createTuningConfig()); @@ -421,33 +408,12 @@ public void testCreateCompactionTaskWithGranularitySpec() ); } - @Test - public void testCompactionTaskEmitter() - { - final Builder builder = new Builder( - DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY - ); - builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); - builder.tuningConfig(createTuningConfig()); - builder.segmentGranularity(Granularities.HOUR); - final CompactionTask taskCreatedWithSegmentGranularity = builder.build(); - - // null emitter should work - taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(null, false); - // non-null should also work - ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new NoopEmitter()); - taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, false); - taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, true); - } - @Test(expected = IAE.class) public void testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE() { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -477,7 +443,7 @@ public void testCreateCompactionTaskWithTransformSpec() new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", "foo", null)); final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -495,8 +461,7 @@ public void testCreateCompactionTaskWithMetricsSpec() AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("cnt")}; final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -513,8 +478,7 @@ public void testCreateCompactionTaskWithNullSegmentGranularityInGranularitySpecA { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -542,8 +506,7 @@ public void testCreateCompactionTaskWithSameGranularitySpecAndSegmentGranularity { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))); builder.tuningConfig(createTuningConfig()); @@ -558,8 +521,7 @@ public void testSerdeWithInterval() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder .inputSpec( @@ -579,8 +541,7 @@ public void testSerdeWithSegments() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder .segments(SEGMENTS) @@ -598,8 +559,7 @@ public void testSerdeWithDimensions() throws IOException { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder @@ -675,14 +635,12 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws toolbox.getRowIngestionMetersFactory(), COORDINATOR_CLIENT, segmentCacheManagerFactory, - RETRY_POLICY_FACTORY, toolbox.getAppenderatorsManager() ); final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask expectedFromJson = builder @@ -702,8 +660,7 @@ public void testInputSourceResources() { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, - RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder .inputSpec( @@ -910,7 +867,6 @@ public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException() public void testCreateIngestionSchema() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -992,7 +948,6 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null ); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1075,7 +1030,6 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null ); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1158,7 +1112,6 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null ); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1229,7 +1182,6 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti ); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1280,7 +1232,6 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException }; final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1324,7 +1275,6 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException public void testCreateIngestionSchemaWithCustomSegments() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1375,7 +1325,6 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio // Remove one segment in the middle segments.remove(segments.size() / 2); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), @@ -1406,7 +1355,6 @@ public void testMissingMetadata() throws IOException indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null)); final List segments = new ArrayList<>(SEGMENTS); final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)), @@ -1435,7 +1383,7 @@ public void testEmptyInterval() final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); @SuppressWarnings("unused") @@ -1448,7 +1396,6 @@ public void testEmptyInterval() public void testSegmentGranularityAndNullQueryGranularity() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1493,7 +1440,6 @@ public void testSegmentGranularityAndNullQueryGranularity() throws IOException public void testQueryGranularityAndNullSegmentGranularity() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1535,7 +1481,6 @@ public void testQueryGranularityAndNullSegmentGranularity() throws IOException public void testQueryGranularityAndSegmentGranularityNonNull() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1580,14 +1525,13 @@ public void testQueryGranularityAndSegmentGranularityNonNull() throws IOExceptio new PeriodGranularity(Period.months(3), null, null), BatchIOConfig.DEFAULT_DROP_EXISTING ); - emitter.verifyValue("compact/segmentAnalyzer/fetchAndProcessMillis", 10_000L); + emitter.verifyEmitted("compact/segmentAnalyzer/fetchAndProcessMillis", 1); } @Test public void testNullGranularitySpec() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1633,7 +1577,6 @@ public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1679,7 +1622,6 @@ public void testGranularitySpecWithNotNullRollup() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1710,7 +1652,6 @@ public void testGranularitySpecWithNullRollup() throws IOException { final Map dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals( - clock, toolbox, LockGranularity.TIME_CHUNK, new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)), @@ -1752,7 +1693,7 @@ public void testChooseFinestGranularityWithNulls() Granularities.ALL, Granularities.MINUTE ); - Assert.assertTrue(Granularities.SECOND.equals(chooseFinestGranularityHelper(input))); + Assert.assertEquals(Granularities.SECOND, chooseFinestGranularityHelper(input)); } @Test @@ -1769,7 +1710,7 @@ public void testChooseFinestGranularityNone() Granularities.NONE, Granularities.MINUTE ); - Assert.assertTrue(Granularities.NONE.equals(chooseFinestGranularityHelper(input))); + Assert.assertEquals(Granularities.NONE, chooseFinestGranularityHelper(input)); } @Test @@ -1789,7 +1730,7 @@ public void testGetDefaultLookupLoadingSpec() { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder .interval(Intervals.of("2000-01-01/2000-01-02")) @@ -1802,7 +1743,7 @@ public void testGetDefaultLookupLoadingSpecWithTransformSpec() { final Builder builder = new Builder( DATA_SOURCE, - segmentCacheManagerFactory, RETRY_POLICY_FACTORY + segmentCacheManagerFactory ); final CompactionTask task = builder .interval(Intervals.of("2000-01-01/2000-01-02")) @@ -2270,7 +2211,6 @@ public OldCompactionTaskWithAnyTuningConfigType( @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, @JacksonInject CoordinatorClient coordinatorClient, @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory, - @JacksonInject RetryPolicyFactory retryPolicyFactory, @JacksonInject AppenderatorsManager appenderatorsManager ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index ecc4f702d6ae..8f68846cd645 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -64,7 +64,6 @@ import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.IngestionTestBase; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.indexing.overlord.Segments; @@ -754,25 +753,6 @@ public File getStorageDirectory() .build(); } - static class TestParallelIndexSupervisorTask extends ParallelIndexSupervisorTask - { - TestParallelIndexSupervisorTask( - String id, - TaskResource taskResource, - ParallelIndexIngestionSpec ingestionSchema, - Map context - ) - { - super( - id, - null, - taskResource, - ingestionSchema, - context - ); - } - } - static class LocalShuffleClient implements ShuffleClient { private final IntermediaryDataManager intermediaryDataManager; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index a32aed819e0c..14e1bab9540b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -40,13 +40,10 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; -import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import javax.annotation.Nullable; import java.util.ArrayList; @@ -59,8 +56,6 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest { - @Rule - public ExpectedException expectedException = ExpectedException.none(); public ParallelIndexSupervisorTaskKillTest() { @@ -81,7 +76,7 @@ public void testStopGracefully() throws Exception Intervals.of("2017/2018"), new ParallelIndexIOConfig( null, - // Sub tasks would run forever + // Sub-tasks would run forever new TestInputSource(Pair.of(new TestInput(Integer.MAX_VALUE, TaskState.SUCCESS), 4)), new NoopInputFormat(), false, @@ -93,16 +88,12 @@ public void testStopGracefully() throws Exception Thread.sleep(100); } task.stopGracefully(null); - expectedException.expect(RuntimeException.class); - expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class)); - getIndexingServiceClient().waitToFinish(task, 3000L, TimeUnit.MILLISECONDS); - final SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner(); - Assert.assertTrue(runner.getRunningTaskIds().isEmpty()); - // completeSubTaskSpecs should be empty because no task has reported its status to TaskMonitor - Assert.assertTrue(runner.getCompleteSubTaskSpecs().isEmpty()); - - Assert.assertEquals(4, runner.getTaskMonitor().getNumCanceledTasks()); + Exception e = Assert.assertThrows( + RuntimeException.class, + () -> getIndexingServiceClient().waitToFinish(task, 3000L, TimeUnit.MILLISECONDS) + ); + Assert.assertTrue(e.getCause() instanceof ExecutionException); } @Test(timeout = 5000L) @@ -273,28 +264,20 @@ public boolean needsFormat() } } - private static class TestSupervisorTask extends TestParallelIndexSupervisorTask + private static class TestSupervisorTask extends ParallelIndexSupervisorTask { private TestSupervisorTask( ParallelIndexIngestionSpec ingestionSchema, Map context ) { - super( - null, - null, - ingestionSchema, - context - ); + super(null, null, null, ingestionSchema, context); } @Override SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox) { - return new TestRunner( - toolbox, - this - ); + return new TestRunner(toolbox, this); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 772bdafb2b17..01d502de85cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -290,19 +290,11 @@ private void checkState( Assert.assertEquals(200, response.getStatus()); final ParallelIndexingPhaseProgress monitorStatus = (ParallelIndexingPhaseProgress) response.getEntity(); - // numRunningTasks + // Verify the number of tasks in different states Assert.assertEquals(runningTasks.size(), monitorStatus.getRunning()); - - // numSucceededTasks Assert.assertEquals(expectedSucceededTasks, monitorStatus.getSucceeded()); - - // numFailedTasks Assert.assertEquals(expectedFailedTask, monitorStatus.getFailed()); - - // numCompleteTasks Assert.assertEquals(expectedSucceededTasks + expectedFailedTask, monitorStatus.getComplete()); - - // numTotalTasks Assert.assertEquals(runningTasks.size() + expectedSucceededTasks + expectedFailedTask, monitorStatus.getTotal()); // runningSubTasks @@ -407,7 +399,6 @@ private TestSupervisorTask newTask( ParallelIndexIOConfig ioConfig ) { - // set up ingestion spec final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec( new DataSchema( "dataSource", @@ -460,7 +451,6 @@ private TestSupervisorTask newTask( ) ); - // set up test tools return new TestSupervisorTask( null, null, @@ -503,7 +493,7 @@ public boolean needsFormat() } } - private class TestSupervisorTask extends TestParallelIndexSupervisorTask + private class TestSupervisorTask extends ParallelIndexSupervisorTask { TestSupervisorTask( String id, @@ -514,6 +504,7 @@ private class TestSupervisorTask extends TestParallelIndexSupervisorTask { super( id, + null, taskResource, ingestionSchema, context @@ -523,10 +514,7 @@ private class TestSupervisorTask extends TestParallelIndexSupervisorTask @Override SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox) { - return new TestRunner( - toolbox, - this - ); + return new TestRunner(toolbox, this); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java index a14d11d6f784..15201e884d3e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java @@ -241,8 +241,7 @@ private Builder newCompactionTaskBuilder() { return new Builder( DATASOURCE, - getSegmentCacheManagerFactory(), - RETRY_POLICY_FACTORY + getSegmentCacheManagerFactory() ); } }