Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Replacing task must read segments created before it acquired its lock #15085

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -364,7 +365,7 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource())
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY)
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* This TaskAction returns a collection of segments which have data within the specified interval and are marked as
* used, and have been created before a REPLACE lock, if any, was acquired.
*
* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in
* the collection only once.
*
* @implNote This action doesn't produce a {@link Set} because it's implemented via {@link
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
* org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns
* a collection. Producing a {@link Set} would require an unnecessary copy of segments collection.
*/
public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<DataSegment>>
{
@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;

@JsonCreator
public RetrieveSegmentsToReplaceAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.dataSource = dataSource;
this.interval = interval;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public TypeReference<Collection<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Collection<DataSegment>>() {};
}

@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
if (!task.getDataSource().equals(dataSource)) {
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE);
}

final String supervisorId;
if (task instanceof AbstractBatchSubtask) {
supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
} else {
supervisorId = task.getId();
}

final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());

Collection<Pair<DataSegment, String>> segmentsAndCreatedDates =
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return early if there are no REPLACE locks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval);

Set<DataSegment> allSegmentsToBeReplaced = new HashSet<>();
segmentsAndCreatedDates.forEach(segmentAndCreatedDate -> allSegmentsToBeReplaced.add(segmentAndCreatedDate.lhs));
for (Pair<DataSegment, String> segmentAndCreatedDate : segmentsAndCreatedDates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that a single interval may have several segments, see if this loop can be improved by using a map from Interval to version to Set<DataSegment>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks!

final DataSegment segment = segmentAndCreatedDate.lhs;
final String createdDate = segmentAndCreatedDate.rhs;
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
if (replaceLock.getInterval().contains(segment.getInterval())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we don't really have much of a choice here other than doing this processing in-memory since these filters would be too complicated to push down.

&& replaceLock.getVersion().compareTo(createdDate) < 0) {
// If a REPLACE lock covers a segment but has a version less than the segment's created date, remove it
allSegmentsToBeReplaced.remove(segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than going through the collection of segments and created dates twice (once to add everything, then to remove the invalid ones), start with an empty set and add only the valid entries to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
}

return SegmentTimeline.forSegments(allSegmentsToBeReplaced)
.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;

if (!dataSource.equals(that.dataSource)) {
return false;
}
return interval.equals(that.interval);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, interval);
}

@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListLocked", value = RetrieveSegmentsToReplaceAction.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

segmentListToReplace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I use retrieveSegmentsToReplace since the other actions do not match the class names for backward compatibility?

// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;

Expand All @@ -29,21 +30,34 @@
public abstract class AbstractBatchSubtask extends AbstractBatchIndexTask
{

private final String supervisorTaskId;

protected AbstractBatchSubtask(
String id,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
@Nullable Map<String, Object> context,
@Nonnull IngestionMode ingestionMode
@Nonnull IngestionMode ingestionMode,
@Nonnull String supervisorTaskId
)
{
super(id, groupId, taskResource, dataSource, context, -1, ingestionMode);
this.supervisorTaskId = supervisorTaskId;
}

/**
* Returns the ID of {@link SubTaskSpec} that is assigned to this subtask.
* This ID is used to identify duplicate work of retry tasks for the same spec.
*/
public abstract String getSubtaskSpecId();

/**
* @return the id of the subtask's supervisor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return the id of the subtask's supervisor
* @return Task ID of the {@code ParallelIndexSupervisorTask} which launched this sub-task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@JsonProperty
public String getSupervisorTaskId()
{
return supervisorTaskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask

private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final String subtaskSpecId;

private final ObjectMapper jsonMapper;
Expand All @@ -95,7 +94,8 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
context
context,
supervisorTaskId
);

Preconditions.checkArgument(
Expand All @@ -107,7 +107,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.jsonMapper = jsonMapper;
}

Expand All @@ -123,12 +122,6 @@ private ParallelIndexIngestionSpec getIngestionSchema()
return ingestionSchema;
}

@JsonProperty
private String getSupervisorTaskId()
{
return supervisorTaskId;
}

@JsonProperty
@Override
public String getSubtaskSpecId()
Expand Down Expand Up @@ -163,7 +156,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
Expand Down Expand Up @@ -274,7 +267,7 @@ private void sendReport(TaskToolbox toolbox, DimensionCardinalityReport report)
{
final ParallelIndexSupervisorTaskClient taskClient =
toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,
getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask

private final int numAttempts;
private final ParallelIndexIngestionSpec ingestionSchema;
private final String supervisorTaskId;
private final String subtaskSpecId;

// For testing
Expand Down Expand Up @@ -136,7 +135,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
taskResource,
ingestionSchema.getDataSchema(),
ingestionSchema.getTuningConfig(),
context
context,
supervisorTaskId
);

Preconditions.checkArgument(
Expand All @@ -148,7 +148,6 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
this.subtaskSpecId = subtaskSpecId;
this.numAttempts = numAttempts;
this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId;
this.dedupInputRowFilterSupplier = dedupRowDimValueFilterSupplier;
}

Expand All @@ -164,12 +163,6 @@ private ParallelIndexIngestionSpec getIngestionSchema()
return ingestionSchema;
}

@JsonProperty
private String getSupervisorTaskId()
{
return supervisorTaskId;
}

@JsonProperty
@Override
public String getSubtaskSpecId()
Expand Down Expand Up @@ -204,7 +197,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
if (!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty()) {
return tryTimeChunkLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
new SurrogateTaskActionClient(getSupervisorTaskId(), taskActionClient),
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
);
} else {
Expand Down Expand Up @@ -326,7 +319,7 @@ private Map<Interval, StringDistribution> determineDistribution(
private void sendReport(TaskToolbox toolbox, DimensionDistributionReport report)
{
final ParallelIndexSupervisorTaskClient taskClient = toolbox.getSupervisorTaskClientProvider().build(
supervisorTaskId,
getSupervisorTaskId(),
ingestionSchema.getTuningConfig().getChatHandlerTimeout(),
ingestionSchema.getTuningConfig().getChatHandlerNumRetries()
);
Expand Down
Loading
Loading