Skip to content

Commit

Permalink
A Replacing task must read segments created before it acquired its lo…
Browse files Browse the repository at this point in the history
…ck (#15085)

* Replacing tasks must read segments created before they acquired their locks
  • Loading branch information
AmatyaAvadhanula authored Oct 19, 2023
1 parent fa311dd commit a8febd4
Show file tree
Hide file tree
Showing 20 changed files with 478 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -364,7 +365,7 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource())
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY)
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This action exists in addition to retrieveUsedSegmentsAction because that action suffers
* from a race condition described by the following sequence of events:
*
* -Segments S1, S2, S3 exist
* -Compact acquires a replace lock
* -A concurrent appending job publishes a segment S4 which needs to be upgraded to the replace lock's version
* -Compact task processes S1-S4 to create new segments
* -Compact task publishes new segments and carries S4 forward to the new version
*
* This can lead to the data in S4 being duplicated
*
* This TaskAction returns a collection of segments which have data within the specified interval and are marked as
* used, and have been created before a REPLACE lock, if any, was acquired.
* This ensures that a consistent set of segments is returned each time this action is called
*/
public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<DataSegment>>
{
private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class);

@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;

@JsonCreator
public RetrieveSegmentsToReplaceAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.dataSource = dataSource;
this.interval = interval;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public TypeReference<Collection<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Collection<DataSegment>>() {};
}

@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
// The DruidInputSource can be used to read from one datasource and write to another.
// In such a case, the race condition described in the class-level docs cannot occur,
// and the action can simply fetch all visible segments for the datasource and interval
if (!task.getDataSource().equals(dataSource)) {
return retrieveAllVisibleSegments(toolbox);
}

final String supervisorId;
if (task instanceof AbstractBatchSubtask) {
supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
} else {
supervisorId = task.getId();
}

final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());

// If there are no replace locks for the task, simply fetch all visible segments for the interval
if (replaceLocksForTask.isEmpty()) {
return retrieveAllVisibleSegments(toolbox);
}

Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = new HashMap<>();
for (Pair<DataSegment, String> segmentAndCreatedDate :
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) {
final DataSegment segment = segmentAndCreatedDate.lhs;
final String created = segmentAndCreatedDate.rhs;
intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>())
.computeIfAbsent(created, c -> new HashSet<>())
.add(segment);
}

Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
for (final Map.Entry<Interval, Map<String, Set<DataSegment>>> entry : intervalToCreatedToSegments.entrySet()) {
final Interval segmentInterval = entry.getKey();
String lockVersion = null;
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
if (replaceLock.getInterval().contains(segmentInterval)) {
lockVersion = replaceLock.getVersion();
}
}
final Map<String, Set<DataSegment>> createdToSegmentsMap = entry.getValue();
for (Map.Entry<String, Set<DataSegment>> createdAndSegments : createdToSegmentsMap.entrySet()) {
if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) {
allSegmentsToBeReplaced.addAll(createdAndSegments.getValue());
} else {
for (DataSegment segment : createdAndSegments.getValue()) {
log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]",
segment.getId(), createdAndSegments.getKey(), lockVersion);
}
}
}
}

return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
}

private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;

if (!dataSource.equals(that.dataSource)) {
return false;
}
return interval.equals(that.interval);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, interval);
}

@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = RetrieveSegmentsToReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;

Expand All @@ -29,21 +30,34 @@
public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
{

private final String supervisorTaskId;

protected AbstractBatchSubtask(
String id,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context,
@Nonnull IngestionMode ingestionMode
@Nonnull IngestionMode ingestionMode,
@Nonnull String supervisorTaskId
)
{
super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
this.supervisorTaskId = supervisorTaskId;
}

/**
* Returns the ID of {@link SubTaskSpec} that is assigned to this subtask.
* This ID is used to identify duplicate work of retry tasks for the same spec.
*/
public abstract String getSubtaskSpecId();

/**
* @return Task ID of the {@code ParallelIndexSupervisorTask} which launched this sub-task.
*/
@JsonProperty
public String getSupervisorTaskId()
{
return supervisorTaskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask

private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final String subtaskSpecId;

private final ObjectMapper jsonMapper;
Expand All @@ -95,7 +94,8 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
context
context,
supervisorTaskId
);

Preconditions.checkArgument(
Expand All @@ -107,7 +107,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.jsonMapper = jsonMapper;
}

Expand All @@ -123,12 +122,6 @@ private ParallelIndexIngestionSpec getIngestionSchema()
return ingestionSchema;
}

@JsonProperty
private String getSupervisorTaskId()
{
return supervisorTaskId;
}

@JsonProperty
@Override
public String getSubtaskSpecId()
Expand Down Expand Up @@ -163,7 +156,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
Expand Down Expand Up @@ -274,7 +267,7 @@ private void sendReport(TaskToolbox toolbox, DimensionCardinalityReport report)
{
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,
getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask

private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final String subtaskSpecId;

// For testing
Expand Down Expand Up @@ -136,7 +135,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
context
context,
supervisorTaskId
);

Preconditions.checkArgument(
Expand All @@ -148,7 +148,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.dedupInputRowFilterSupplier = dedupRowDimValueFilterSupplier;
}

Expand All @@ -164,12 +163,6 @@ private ParallelIndexIngestionSpec getIngestionSchema()
return ingestionSchema;
}

@JsonProperty
private String getSupervisorTaskId()
{
return supervisorTaskId;
}

@JsonProperty
@Override
public String getSubtaskSpecId()
Expand Down Expand Up @@ -204,7 +197,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
Expand Down Expand Up @@ -326,7 +319,7 @@ private Map<Interval, StringDistribution> determineDistribution(
private void sendReport(TaskToolbox toolbox, DimensionDistributionReport report)
{
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,
getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
Expand Down
Loading

0 comments on commit a8febd4

Please sign in to comment.