diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index ac2738534da9..d0a035be17c2 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -364,7 +365,7 @@ Pair, Map>> checkSegment // Pair max(created_date), interval -> list> Pair, Map>> baseSegmentsSnapshot = getMaxCreateDateAndBaseSegments( - metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource()) + metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY) ); // baseSegments are used to create HadoopIndexTask Map> baseSegments = baseSegmentsSnapshot.rhs; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java new file mode 100644 index 000000000000..78e6ada5c1e2 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.Interval; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This action exists in addition to retrieveUsedSegmentsAction because that action suffers + * from a race condition described by the following sequence of events: + * + * -Segments S1, S2, S3 exist + * -Compact acquires a replace lock + * -A concurrent appending job publishes a segment S4 which needs to be upgraded to the replace lock's version + * -Compact task processes S1-S4 to create new segments + * -Compact task publishes new segments and carries S4 forward to the new version + * + * This can lead to the data in S4 being duplicated + * + * This TaskAction returns a collection of segments which have data within the specified interval and are marked as + * used, and have been created before a REPLACE lock, if any, was acquired. + * This ensures that a consistent set of segments is returned each time this action is called + */ +public class RetrieveSegmentsToReplaceAction implements TaskAction> +{ + private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class); + + @JsonIgnore + private final String dataSource; + + @JsonIgnore + private final Interval interval; + + @JsonCreator + public RetrieveSegmentsToReplaceAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + this.dataSource = dataSource; + this.interval = interval; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public TypeReference> getReturnTypeReference() + { + return new TypeReference>() {}; + } + + @Override + public Collection perform(Task task, TaskActionToolbox toolbox) + { + // The DruidInputSource can be used to read from one datasource and write to another. + // In such a case, the race condition described in the class-level docs cannot occur, + // and the action can simply fetch all visible segments for the datasource and interval + if (!task.getDataSource().equals(dataSource)) { + return retrieveAllVisibleSegments(toolbox); + } + + final String supervisorId; + if (task instanceof AbstractBatchSubtask) { + supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); + } else { + supervisorId = task.getId(); + } + + final Set replaceLocksForTask = toolbox + .getTaskLockbox() + .getAllReplaceLocksForDatasource(task.getDataSource()) + .stream() + .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) + .collect(Collectors.toSet()); + + // If there are no replace locks for the task, simply fetch all visible segments for the interval + if (replaceLocksForTask.isEmpty()) { + return retrieveAllVisibleSegments(toolbox); + } + + Map>> intervalToCreatedToSegments = new HashMap<>(); + for (Pair segmentAndCreatedDate : + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) { + final DataSegment segment = segmentAndCreatedDate.lhs; + final String created = segmentAndCreatedDate.rhs; + intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) + .computeIfAbsent(created, c -> new HashSet<>()) + .add(segment); + } + + Set allSegmentsToBeReplaced = new HashSet<>(); + for (final Map.Entry>> entry : intervalToCreatedToSegments.entrySet()) { + final Interval segmentInterval = entry.getKey(); + String lockVersion = null; + for (ReplaceTaskLock replaceLock : replaceLocksForTask) { + if (replaceLock.getInterval().contains(segmentInterval)) { + lockVersion = replaceLock.getVersion(); + } + } + final Map> createdToSegmentsMap = entry.getValue(); + for (Map.Entry> createdAndSegments : createdToSegmentsMap.entrySet()) { + if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { + allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); + } else { + for (DataSegment segment : createdAndSegments.getValue()) { + log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", + segment.getId(), createdAndSegments.getKey(), lockVersion); + } + } + } + } + + return SegmentTimeline.forSegments(allSegmentsToBeReplaced) + .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + } + + private Collection retrieveAllVisibleSegments(TaskActionToolbox toolbox) + { + return toolbox.getIndexerMetadataStorageCoordinator() + .retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE); + } + + @Override + public boolean isAudited() + { + return false; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + return interval.equals(that.interval); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSource, interval); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "dataSource='" + dataSource + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 171d53b9cdd6..e251626f8690 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,6 +38,7 @@ @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), + @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = RetrieveSegmentsToReplaceAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java index 37b70c53ed55..e6de0cf25949 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractBatchSubtask.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.TaskResource; @@ -29,16 +30,20 @@ public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask { + private final String supervisorTaskId; + protected AbstractBatchSubtask( String id, @Nullable String groupId, @Nullable TaskResource taskResource, String dataSource, @Nullable Map context, - @Nonnull IngestionMode ingestionMode + @Nonnull IngestionMode ingestionMode, + @Nonnull String supervisorTaskId ) { super(id, groupId, taskResource, dataSource, context, -1, ingestionMode); + this.supervisorTaskId = supervisorTaskId; } /** @@ -46,4 +51,13 @@ protected AbstractBatchSubtask( * This ID is used to identify duplicate work of retry tasks for the same spec. */ public abstract String getSubtaskSpecId(); + + /** + * @return Task ID of the {@code ParallelIndexSupervisorTask} which launched this sub-task. + */ + @JsonProperty + public String getSupervisorTaskId() + { + return supervisorTaskId; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java index 625b20517836..698f8c9d0e95 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java @@ -69,7 +69,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask private final int numAttempts; private final ParallelIndexIngestionSpec ingestionSchema; - private final String supervisorTaskId; private final String subtaskSpecId; private final ObjectMapper jsonMapper; @@ -95,7 +94,8 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask taskResource, ingestionSchema.getDataSchema(), ingestionSchema.getTuningConfig(), - context + context, + supervisorTaskId ); Preconditions.checkArgument( @@ -107,7 +107,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask this.subtaskSpecId = subtaskSpecId; this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; - this.supervisorTaskId = supervisorTaskId; this.jsonMapper = jsonMapper; } @@ -123,12 +122,6 @@ private ParallelIndexIngestionSpec getIngestionSchema() return ingestionSchema; } - @JsonProperty - private String getSupervisorTaskId() - { - return supervisorTaskId; - } - @JsonProperty @Override public String getSubtaskSpecId() @@ -163,7 +156,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception { if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { return tryTimeChunkLock( - new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() ); } else { @@ -274,7 +267,7 @@ private void sendReport(TaskToolbox toolbox, DimensionCardinalityReport report) { final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( - supervisorTaskId, + getSupervisorTaskId(), ingestionSchema.getTuningConfig().getChatHandlerTimeout(), ingestionSchema.getTuningConfig().getChatHandlerNumRetries() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index b2ecd3dc2695..8f03c3bfa55e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -82,7 +82,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask private final int numAttempts; private final ParallelIndexIngestionSpec ingestionSchema; - private final String supervisorTaskId; private final String subtaskSpecId; // For testing @@ -136,7 +135,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask taskResource, ingestionSchema.getDataSchema(), ingestionSchema.getTuningConfig(), - context + context, + supervisorTaskId ); Preconditions.checkArgument( @@ -148,7 +148,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask this.subtaskSpecId = subtaskSpecId; this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; - this.supervisorTaskId = supervisorTaskId; this.dedupInputRowFilterSupplier = dedupRowDimValueFilterSupplier; } @@ -164,12 +163,6 @@ private ParallelIndexIngestionSpec getIngestionSchema() return ingestionSchema; } - @JsonProperty - private String getSupervisorTaskId() - { - return supervisorTaskId; - } - @JsonProperty @Override public String getSubtaskSpecId() @@ -204,7 +197,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception { if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) { return tryTimeChunkLock( - new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() ); } else { @@ -326,7 +319,7 @@ private Map determineDistribution( private void sendReport(TaskToolbox toolbox, DimensionDistributionReport report) { final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( - supervisorTaskId, + getSupervisorTaskId(), ingestionSchema.getTuningConfig().getChatHandlerTimeout(), ingestionSchema.getTuningConfig().getChatHandlerNumRetries() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index b91a6ce3a821..49e3591ff18a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -63,7 +63,6 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask intervalToNumShardsOverride; @@ -96,7 +95,6 @@ public PartialHashSegmentGenerateTask( this.subtaskSpecId = subtaskSpecId; this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; - this.supervisorTaskId = supervisorTaskId; this.intervalToNumShardsOverride = intervalToNumShardsOverride; } @@ -112,12 +110,6 @@ public ParallelIndexIngestionSpec getIngestionSchema() return ingestionSchema; } - @JsonProperty - public String getSupervisorTaskId() - { - return supervisorTaskId; - } - @JsonProperty @Override public String getSubtaskSpecId() @@ -158,7 +150,7 @@ public Set getInputSourceResources() public boolean isReady(TaskActionClient taskActionClient) throws Exception { return tryTimeChunkLock( - new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -175,7 +167,7 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd getDataSource(), getSubtaskSpecId(), granularitySpec, - new SupervisorTaskAccess(supervisorTaskId, taskClient), + new SupervisorTaskAccess(getSupervisorTaskId(), taskClient), createHashPartitionAnalysisFromPartitionsSpec( granularitySpec, partitionsSpec, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 147a1fbf1212..27604eb7e770 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -64,7 +64,6 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask< private static final String PROP_SPEC = "spec"; private static final boolean SKIP_NULL = true; - private final String supervisorTaskId; private final String subtaskSpecId; private final int numAttempts; private final ParallelIndexIngestionSpec ingestionSchema; @@ -98,7 +97,6 @@ public PartialRangeSegmentGenerateTask( this.subtaskSpecId = subtaskSpecId; this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; - this.supervisorTaskId = supervisorTaskId; this.intervalToPartitions = intervalToPartitions; } @@ -131,12 +129,6 @@ public ParallelIndexIngestionSpec getIngestionSchema() return ingestionSchema; } - @JsonProperty - public String getSupervisorTaskId() - { - return supervisorTaskId; - } - @JsonProperty @Override public String getSubtaskSpecId() @@ -176,7 +168,7 @@ public Set getInputSourceResources() public boolean isReady(TaskActionClient taskActionClient) throws IOException { return tryTimeChunkLock( - new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -194,7 +186,7 @@ SegmentAllocatorForBatch createSegmentAllocator(TaskToolbox toolbox, ParallelInd getDataSource(), getSubtaskSpecId(), ingestionSchema.getDataSchema().getGranularitySpec(), - new SupervisorTaskAccess(supervisorTaskId, taskClient), + new SupervisorTaskAccess(getSupervisorTaskId(), taskClient), partitionAnalysis ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index dc2a7ef5bf92..e20c7bdbe352 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -94,7 +94,8 @@ abstract class PartialSegmentGenerateTask e taskResource, ingestionSchema.getDataSchema(), ingestionSchema.getTuningConfig(), - context + context, + supervisorTaskId ); Preconditions.checkArgument( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index bb933169d4b7..b59ec65716c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -78,7 +78,6 @@ abstract class PartialSegmentMergeTask extends PerfectRollu private final PartialSegmentMergeIOConfig ioConfig; private final int numAttempts; - private final String supervisorTaskId; private final String subtaskSpecId; PartialSegmentMergeTask( @@ -101,7 +100,8 @@ abstract class PartialSegmentMergeTask extends PerfectRollu taskResource, dataSchema, tuningConfig, - context + context, + supervisorTaskId ); Preconditions.checkArgument( @@ -111,7 +111,6 @@ abstract class PartialSegmentMergeTask extends PerfectRollu this.subtaskSpecId = subtaskSpecId; this.ioConfig = ioConfig; this.numAttempts = numAttempts; - this.supervisorTaskId = supervisorTaskId; } @JsonProperty @@ -120,12 +119,6 @@ public int getNumAttempts() return numAttempts; } - @JsonProperty - public String getSupervisorTaskId() - { - return supervisorTaskId; - } - @JsonProperty @Override public String getSubtaskSpecId() @@ -151,7 +144,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } final List locks = toolbox.getTaskActionClient().submit( - new SurrogateAction<>(supervisorTaskId, new LockListAction()) + new SurrogateAction<>(getSupervisorTaskId(), new LockListAction()) ); final Map intervalToVersion = Maps.newHashMapWithExpectedSize(locks.size()); locks.forEach(lock -> { @@ -179,7 +172,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception LOG.info("Fetch took [%s] seconds", fetchTime); final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( - supervisorTaskId, + getSupervisorTaskId(), getTuningConfig().getChatHandlerTimeout(), getTuningConfig().getChatHandlerNumRetries() ); @@ -225,7 +218,7 @@ private Map>> fetchSegmentFiles( ); FileUtils.mkdirp(partitionDir); for (PartitionLocation location : entryPerBucketId.getValue()) { - final File unzippedDir = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, location); + final File unzippedDir = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, getSupervisorTaskId(), location); intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>()) .computeIfAbsent(bucketId, k -> new ArrayList<>()) .add(unzippedDir); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java index 4259922b43ad..3b00f0fedf67 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java @@ -49,10 +49,11 @@ abstract class PerfectRollupWorkerTask extends AbstractBatchSubtask @Nullable TaskResource taskResource, DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, - @Nullable Map context + @Nullable Map context, + String supervisorTaskId ) { - super(id, groupId, taskResource, dataSchema.getDataSource(), context, IngestionMode.NONE); + super(id, groupId, taskResource, dataSchema.getDataSource(), context, IngestionMode.NONE, supervisorTaskId); Preconditions.checkArgument( tuningConfig.isForceGuaranteedRollup(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 183cde7c66d0..a3bd47d29603 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -118,7 +118,6 @@ public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHand private final int numAttempts; private final ParallelIndexIngestionSpec ingestionSchema; - private final String supervisorTaskId; private final String subtaskSpecId; /** @@ -169,7 +168,8 @@ public SinglePhaseSubTask( taskResource, ingestionSchema.getDataSchema().getDataSource(), context, - AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()) + AbstractTask.computeBatchIngestionMode(ingestionSchema.getIOConfig()), + supervisorTaskId ); if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup()) { @@ -179,7 +179,6 @@ public SinglePhaseSubTask( this.subtaskSpecId = subtaskSpecId; this.numAttempts = numAttempts; this.ingestionSchema = ingestionSchema; - this.supervisorTaskId = supervisorTaskId; this.missingIntervalsInOverwriteMode = ingestionSchema.getIOConfig().isAppendToExisting() != true && ingestionSchema.getDataSchema() .getGranularitySpec() @@ -217,7 +216,7 @@ public Set getInputSourceResources() public boolean isReady(TaskActionClient taskActionClient) throws IOException { return determineLockGranularityAndTryLock( - new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), + new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient), ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -234,12 +233,6 @@ public ParallelIndexIngestionSpec getIngestionSchema() return ingestionSchema; } - @JsonProperty - public String getSupervisorTaskId() - { - return supervisorTaskId; - } - @Override @JsonProperty public String getSubtaskSpecId() @@ -272,7 +265,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(toolbox); final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build( - supervisorTaskId, + getSupervisorTaskId(), ingestionSchema.getTuningConfig().getChatHandlerTimeout(), ingestionSchema.getTuningConfig().getChatHandlerNumRetries() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index bf8b4bfb1d51..8056c69901fc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -48,10 +48,9 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Comparators; @@ -552,14 +551,7 @@ public static List> getTimelineForInte } else { try { usedSegments = toolbox.getTaskActionClient() - .submit( - new RetrieveUsedSegmentsAction( - dataSource, - null, - Collections.singletonList(interval), - Segments.ONLY_VISIBLE - ) - ); + .submit(new RetrieveSegmentsToReplaceAction(dataSource, interval)); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java index 71b474e77359..b95fd53c74f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java @@ -111,7 +111,8 @@ PerfectRollupWorkerTask build() null, createDataSchema(granularitySpecInputIntervals), createTuningConfig(forceGuaranteedRollup, partitionsSpec), - null + null, + "supervisor-id" ); } @@ -149,10 +150,11 @@ private static class TestPerfectRollupWorkerTask extends PerfectRollupWorkerTask @Nullable TaskResource taskResource, DataSchema dataSchema, ParallelIndexTuningConfig tuningConfig, - @Nullable Map context + @Nullable Map context, + String supervisorId ) { - super(id, groupId, taskResource, dataSchema, tuningConfig, context); + super(id, groupId, taskResource, dataSchema, tuningConfig, context, supervisorId); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 22f21fb79b62..1c4b6809c387 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; +import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -845,6 +846,54 @@ public void testSegmentIsAllocatedAtLatestVersion() verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21, segmentV22, segmentV23); } + @Test + public void testSegmentsToReplace() + { + final SegmentIdWithShardSpec pendingSegmentV01 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval()); + final DataSegment segment1 = asSegment(pendingSegmentV01); + appendTask.commitAppendSegments(segment1); + + final SegmentIdWithShardSpec pendingSegmentV02 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), pendingSegmentV02.asSegmentId()); + Assert.assertEquals(SEGMENT_V0, pendingSegmentV02.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV02.getInterval()); + + verifyInputSegments(replaceTask, JAN_23, segment1); + + replaceTask.acquireReplaceLockOn(JAN_23); + + final DataSegment segment2 = asSegment(pendingSegmentV02); + appendTask.commitAppendSegments(segment2); + + // Despite segment2 existing, it is not chosen to be replaced because it was created after the tasklock was acquired + verifyInputSegments(replaceTask, JAN_23, segment1); + + replaceTask.releaseLock(JAN_23); + + final SegmentIdWithShardSpec pendingSegmentV03 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), pendingSegmentV03.asSegmentId()); + Assert.assertNotEquals(pendingSegmentV02.asSegmentId(), pendingSegmentV03.asSegmentId()); + Assert.assertEquals(SEGMENT_V0, pendingSegmentV03.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV03.getInterval()); + final DataSegment segment3 = asSegment(pendingSegmentV03); + appendTask.commitAppendSegments(segment3); + appendTask.releaseLock(JAN_23); + + replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23); + // The new lock was acquired before segment3 was created but it doesn't contain the month's interval + // So, all three segments are chosen + verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3); + + replaceTask.releaseLock(FIRST_OF_JAN_23); + // All the segments are chosen when there's no lock + verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3); + } + @Nullable private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) { @@ -901,6 +950,23 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. } } + private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments) + { + try { + final TaskActionClient taskActionClient = taskActionClientFactory.create(task); + Collection allUsedSegments = taskActionClient.submit( + new RetrieveSegmentsToReplaceAction( + WIKI, + interval + ) + ); + Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); + } + catch (IOException e) { + throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval); + } + } + private TaskToolboxFactory createToolboxFactory( TaskConfig taskConfig, TaskActionClientFactory taskActionClientFactory diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d1c72485011f..108833422c88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -89,7 +89,7 @@ public List retrieveAllUsedSegments(String dataSource, Segments vis } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) { return ImmutableList.of(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 2c2a6bc0f77b..7c6710048a1a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -82,8 +82,9 @@ default Collection retrieveUsedSegmentsForInterval( Collection retrieveAllUsedSegments(String dataSource, Segments visibility); /** + * * Retrieve all published segments which are marked as used and the created_date of these segments belonging to the - * given data source from the metadata store. + * given data source and interval from the metadata store. * * Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility" * parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link @@ -91,10 +92,11 @@ default Collection retrieveUsedSegmentsForInterval( * if needed. * * @param dataSource The data source to query + * @param interval The interval to query * * @return The DataSegments and the related created_date of segments */ - Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource); + Collection> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval); /** * Retrieve all published segments which may include any data in the given intervals and are marked as used from the diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 226663c32330..c654d5e229b7 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -174,15 +174,34 @@ private Collection doRetrieveUsedSegments( } @Override - public List> retrieveUsedSegmentsAndCreatedDates(String dataSource) + public List> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval) { - String rawQueryString = "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true"; - final String queryString = StringUtils.format(rawQueryString, dbTables.getSegmentsTable()); + StringBuilder queryBuilder = new StringBuilder( + "SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true" + ); + + final List intervals = new ArrayList<>(); + // Do not need an interval condition if the interval is ETERNITY + if (!Intervals.isEternity(interval)) { + intervals.add(interval); + } + + SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode( + queryBuilder, + intervals, + SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS, + connector + ); + + final String queryString = StringUtils.format(queryBuilder.toString(), dbTables.getSegmentsTable()); return connector.retryWithHandle( handle -> { Query> query = handle .createQuery(queryString) .bind("dataSource", dataSource); + + SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals); + return query .map((int index, ResultSet r, StatementContext ctx) -> new Pair<>( diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 20b176c50914..76e4f9745762 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -261,6 +261,82 @@ public DataSegment retrieveSegmentForId(String id) return null; } + /** + * Append the condition for the interval and match mode to the given string builder with a partial query + * @param sb - StringBuilder containing the paritial query with SELECT clause and WHERE condition for used, datasource + * @param intervals - intervals to fetch the segments for + * @param matchMode - Interval match mode - overlaps or contains + * @param connector - SQL connector + */ + public static void appendConditionForIntervalsAndMatchMode( + final StringBuilder sb, + final Collection intervals, + final IntervalMode matchMode, + final SQLMetadataConnector connector + ) + { + if (intervals.isEmpty()) { + return; + } + + sb.append(" AND ("); + for (int i = 0; i < intervals.size(); i++) { + sb.append( + matchMode.makeSqlCondition( + connector.getQuoteString(), + StringUtils.format(":start%d", i), + StringUtils.format(":end%d", i) + ) + ); + + // Add a special check for a segment which have one end at eternity and the other at some finite value. Since + // we are using string comparison, a segment with this start or end will not be returned otherwise. + if (matchMode.equals(IntervalMode.OVERLAPS)) { + sb.append(StringUtils.format( + " OR (start = '%s' AND \"end\" != '%s' AND \"end\" > :start%d)", + Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i + )); + sb.append(StringUtils.format( + " OR (start != '%s' AND \"end\" = '%s' AND start < :end%d)", + Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i + )); + } + + if (i != intervals.size() - 1) { + sb.append(" OR "); + } + } + + // Add a special check for a single segment with eternity. Since we are using string comparison, a segment with + // this start and end will not be returned otherwise. + // Known Issue: https://github.com/apache/druid/issues/12860 + if (matchMode.equals(IntervalMode.OVERLAPS)) { + sb.append(StringUtils.format( + " OR (start = '%s' AND \"end\" = '%s')", Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd() + )); + } + sb.append(")"); + } + + /** + * Given a Query object bind the input intervals to it + * @param query Query to fetch segments + * @param intervals Intervals to fetch segments for + */ + public static void bindQueryIntervals(final Query> query, final Collection intervals) + { + if (intervals.isEmpty()) { + return; + } + + final Iterator iterator = intervals.iterator(); + for (int i = 0; iterator.hasNext(); i++) { + Interval interval = iterator.next(); + query.bind(StringUtils.format("start%d", i), interval.getStart().toString()) + .bind(StringUtils.format("end%d", i), interval.getEnd().toString()); + } + } + private CloseableIterator retrieveSegments( final String dataSource, final Collection intervals, @@ -275,36 +351,8 @@ private CloseableIterator retrieveSegments( final StringBuilder sb = new StringBuilder(); sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource"); - if (compareAsString && !intervals.isEmpty()) { - sb.append(" AND ("); - for (int i = 0; i < intervals.size(); i++) { - sb.append( - matchMode.makeSqlCondition( - connector.getQuoteString(), - StringUtils.format(":start%d", i), - StringUtils.format(":end%d", i) - ) - ); - - // Add a special check for a segment which have one end at eternity and the other at some finite value. Since - // we are using string comparison, a segment with this start or end will not be returned otherwise. - if (matchMode.equals(IntervalMode.OVERLAPS)) { - sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" != '%s' AND \"end\" > :start%d)", Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i)); - sb.append(StringUtils.format(" OR (start != '%s' AND \"end\" = '%s' AND start < :end%d)", Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd(), i)); - } - - if (i != intervals.size() - 1) { - sb.append(" OR "); - } - } - - // Add a special check for a single segment with eternity. Since we are using string comparison, a segment with - // this start and end will not be returned otherwise. - // Known Issue: https://github.com/apache/druid/issues/12860 - if (matchMode.equals(IntervalMode.OVERLAPS)) { - sb.append(StringUtils.format(" OR (start = '%s' AND \"end\" = '%s')", Intervals.ETERNITY.getStart(), Intervals.ETERNITY.getEnd())); - } - sb.append(")"); + if (compareAsString) { + appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } final Query> sql = handle @@ -317,12 +365,7 @@ private CloseableIterator retrieveSegments( } if (compareAsString) { - final Iterator iterator = intervals.iterator(); - for (int i = 0; iterator.hasNext(); i++) { - Interval interval = iterator.next(); - sql.bind(StringUtils.format("start%d", i), interval.getStart().toString()) - .bind(StringUtils.format("end%d", i), interval.getEnd().toString()); - } + bindQueryIntervals(sql, intervals); } final ResultIterator resultIterator = diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b1b6f3aa16ea..0512357ffc10 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2554,6 +2554,52 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ); } + @Test + public void testRetrieveUsedSegmentsAndCreatedDates() + { + insertUsedSegments(ImmutableSet.of(defaultSegment)); + + List> resultForIntervalOnTheLeft = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001")); + Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty()); + + List> resultForIntervalOnTheRight = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001")); + Assert.assertTrue(resultForIntervalOnTheRight.isEmpty()); + + List> resultForExactInterval = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + Assert.assertEquals(1, resultForExactInterval.size()); + Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs); + + List> resultForIntervalWithLeftOverlap = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02")); + Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap); + + List> resultForIntervalWithRightOverlap = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000")); + Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap); + + List> resultForEternity = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY); + Assert.assertEquals(resultForExactInterval, resultForEternity); + } + + @Test + public void testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval() + { + + insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment)); + + List> resultForRandomInterval = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval()); + Assert.assertEquals(3, resultForRandomInterval.size()); + + List> resultForEternity = + coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval()); + Assert.assertEquals(3, resultForEternity.size()); + } + private static class DS { static final String WIKI = "wiki";