Skip to content

Commit

Permalink
Replacing tasks must read segments created before it acquired its lock
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 4, 2023
1 parent f3d1c8b commit b158ce8
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;

/**
* This TaskAction returns a collection of segments which have data within the specified interval and are marked as
* used, and have been created before a REPLACE lock, if any, was acquired.
*
* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in
* the collection only once.
*
* @implNote This action doesn't produce a {@link Set} because it's implemented via {@link
* org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns
* a collection. Producing a {@link Set} would require an unnecessary copy of segments collection.
*/
public class RetrieveLockedSegmentsAction implements TaskAction<Collection<DataSegment>>
{
@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;

@JsonIgnore
private final Segments visibility;

@JsonCreator
public RetrieveLockedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
// When JSON object is deserialized, this parameter is optional for backward compatibility.
// Otherwise, it shouldn't be considered optional.
@JsonProperty("visibility") @Nullable Segments visibility
)
{
this.dataSource = dataSource;
this.interval = interval;
// Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility
this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@JsonProperty
public Segments getVisibility()
{
return visibility;
}

@Override
public TypeReference<Collection<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Collection<DataSegment>>() {};
}

@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task);
String createdBefore = null;
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
if (replaceLock.getInterval().contains(interval)) {
createdBefore = replaceLock.getVersion();
break;
}
}
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval), visibility, createdBefore);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RetrieveLockedSegmentsAction that = (RetrieveLockedSegmentsAction) o;

if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!interval.equals(that.interval)) {
return false;
}
return visibility.equals(that.visibility);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, interval, visibility);
}

@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", visibility=" + visibility +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListLocked", value = RetrieveLockedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveLockedSegmentsAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
Expand Down Expand Up @@ -553,10 +553,9 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInte
try {
usedSegments = toolbox.getTaskActionClient()
.submit(
new RetrieveUsedSegmentsAction(
new RetrieveLockedSegmentsAction(
dataSource,
null,
Collections.singletonList(interval),
interval,
Segments.ONLY_VISIBLE
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
public List<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
Segments visibility
Segments visibility,
String createdBefore
)
{
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ default Collection<DataSegment> retrieveUsedSegmentsForInterval(
* outside of the specified intervals, but overshadowed on the specified intervals will not be
* returned if {@link Segments#ONLY_VISIBLE} is passed. See more precise description in the doc for
* {@link Segments}.
* @param createdBefore The time before which the segments were created.
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the
* requested intervals.
*
Expand All @@ -121,9 +122,20 @@ default Collection<DataSegment> retrieveUsedSegmentsForInterval(
Collection<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
Segments visibility
Segments visibility,
String createdBefore
);

default Collection<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
Segments visibility
)
{
return retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility, null);
}


/**
* see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,20 @@ public void start()
public Collection<DataSegment> retrieveUsedSegmentsForIntervals(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
final Segments visibility,
final String createdBefore
)
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
}
return doRetrieveUsedSegments(dataSource, intervals, visibility);
return doRetrieveUsedSegments(dataSource, intervals, visibility, createdBefore);
}

@Override
public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments visibility)
{
return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility);
return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility, null);
}

/**
Expand All @@ -155,17 +156,18 @@ public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segmen
private Collection<DataSegment> doRetrieveUsedSegments(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
final Segments visibility,
final String createdBefore
)
{
return connector.retryWithHandle(
handle -> {
if (visibility == Segments.ONLY_VISIBLE) {
final SegmentTimeline timeline =
getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
getTimelineForIntervalsWithHandle(handle, dataSource, intervals, createdBefore);
return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
} else {
return retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
return retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals, createdBefore);
}
}
);
Expand Down Expand Up @@ -280,25 +282,27 @@ private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
private SegmentTimeline getTimelineForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
final List<Interval> intervals,
final String createdBefore
) throws IOException
{
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
.retrieveUsedSegments(dataSource, intervals, createdBefore)) {
return SegmentTimeline.forSegments(iterator);
}
}

private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
final List<Interval> intervals,
final String createdBefore
) throws IOException
{
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
.retrieveUsedSegments(dataSource, intervals, createdBefore)) {
final List<DataSegment> retVal = new ArrayList<>();
iterator.forEachRemaining(retVal::add);
return retVal;
Expand Down Expand Up @@ -1228,7 +1232,7 @@ private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(

// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval), null)
.lookup(interval);

if (existingChunks.size() > 1) {
Expand Down Expand Up @@ -1439,7 +1443,8 @@ private SegmentIdWithShardSpec createNewSegment(
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
ImmutableList.of(interval),
null
).lookup(interval);

if (existingChunks.size() > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,21 @@ public static SqlSegmentsMetadataQuery forHandle(
*
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals,
final String createdBefore
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, createdBefore);
}

public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null);
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null);
}

/**
Expand All @@ -125,7 +134,7 @@ public CloseableIterator<DataSegment> retrieveUnusedSegments(
@Nullable final Integer limit
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit);
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, null);
}

/**
Expand Down Expand Up @@ -207,7 +216,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null, null),
DataSegment::getId
)
);
Expand Down Expand Up @@ -266,7 +275,8 @@ private CloseableIterator<DataSegment> retrieveSegments(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String createdBefore
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
Expand All @@ -275,6 +285,10 @@ private CloseableIterator<DataSegment> retrieveSegments(
final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource");

if (createdBefore != null) {
sb.append(" AND created_date <= :createdBefore");
}

if (compareAsString && !intervals.isEmpty()) {
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
Expand Down Expand Up @@ -316,6 +330,10 @@ private CloseableIterator<DataSegment> retrieveSegments(
sql.setMaxRows(limit);
}

if (createdBefore != null) {
sql.bind("createdBefore", createdBefore);
}

if (compareAsString) {
final Iterator<Interval> iterator = intervals.iterator();
for (int i = 0; iterator.hasNext(); i++) {
Expand Down

0 comments on commit b158ce8

Please sign in to comment.