Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A Replacing task must read segments created before it acquired its lock #15085

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;

/**
* This TaskAction returns a collection of segments which have data within the specified interval and are marked as
* used, and have been created before a REPLACE lock, if any, was acquired.
*
* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in
* the collection only once.
*
* @implNote This action doesn't produce a {@link Set} because it's implemented via {@link
* org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns
* a collection. Producing a {@link Set} would require an unnecessary copy of segments collection.
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
*/
public class RetrieveLockedSegmentsAction implements TaskAction<Collection<DataSegment>>
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
{
@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;

@JsonIgnore
private final Segments visibility;

@JsonCreator
public RetrieveLockedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
// When JSON object is deserialized, this parameter is optional for backward compatibility.
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
// Otherwise, it shouldn't be considered optional.
@JsonProperty("visibility") @Nullable Segments visibility
)
{
this.dataSource = dataSource;
this.interval = interval;
// Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@JsonProperty
public Segments getVisibility()
{
return visibility;
}

@Override
public TypeReference<Collection<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Collection<DataSegment>>() {};
}

@Override
public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task);
String createdBefore = null;
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
if (task.getDataSource().equals(dataSource)) {
abhishekagarwal87 marked this conversation as resolved.
Show resolved Hide resolved
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
if (replaceLock.getInterval().contains(interval)) {
createdBefore = replaceLock.getVersion();
break;
}
}
}
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval), visibility, createdBefore);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

RetrieveLockedSegmentsAction that = (RetrieveLockedSegmentsAction) o;

if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!interval.equals(that.interval)) {
return false;
}
return visibility.equals(that.visibility);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, interval, visibility);
}

@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", visibility=" + visibility +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListLocked", value = RetrieveLockedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveLockedSegmentsAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.overlord.Segments;
Expand Down Expand Up @@ -553,10 +553,9 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInte
try {
usedSegments = toolbox.getTaskActionClient()
.submit(
new RetrieveUsedSegmentsAction(
new RetrieveLockedSegmentsAction(
dataSource,
null,
Collections.singletonList(interval),
interval,
Segments.ONLY_VISIBLE
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.actions.RetrieveLockedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
Expand Down Expand Up @@ -771,6 +772,26 @@ public void testMultipleGranularities()
verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11, segmentV13);
}

@Test
public void testInputSegmentsToBeReplaced()
{
appendTask.acquireAppendLockOn(YEAR_23);
final SegmentIdWithShardSpec pendingSegment0
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.YEAR);
DataSegment segment0 = asSegment(pendingSegment0);
appendTask.commitAppendSegments(segment0);

replaceTask.acquireReplaceLockOn(YEAR_23);

// This segment must not be in the input set of the replacing task as it was committed after lock acquisition
final SegmentIdWithShardSpec pendingSegment1
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.YEAR);
DataSegment segment1 = asSegment(pendingSegment1);
appendTask.commitAppendSegments(segment1);

verifyInputSegments(YEAR_23, replaceTask, segment0);
}

private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
Expand Down Expand Up @@ -814,6 +835,24 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment.
}
}

private void verifyInputSegments(Interval interval, Task task, DataSegment... expectedSegments)
{
try {
final TaskActionClient taskActionClient = taskActionClientFactory.create(task);
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveLockedSegmentsAction(
WIKI,
interval,
Segments.ONLY_VISIBLE
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
}
catch (IOException e) {
throw new ISE(e, "Error while fetching locked segments in interval[%s]", interval);
}
}

private TaskToolboxFactory createToolboxFactory(
TaskConfig taskConfig,
TaskActionClientFactory taskActionClientFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
public List<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
Segments visibility
Segments visibility,
String createdBefore
)
{
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ default Collection<DataSegment> retrieveUsedSegmentsForInterval(
* outside of the specified intervals, but overshadowed on the specified intervals will not be
* returned if {@link Segments#ONLY_VISIBLE} is passed. See more precise description in the doc for
* {@link Segments}.
* @param createdBefore The time before which the segments were created.
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the
* requested intervals.
*
Expand All @@ -121,9 +122,20 @@ default Collection<DataSegment> retrieveUsedSegmentsForInterval(
Collection<DataSegment> retrieveUsedSegmentsForIntervals(
String dataSource,
List<Interval> intervals,
Segments visibility
Segments visibility,
String createdBefore
);

default Collection<DataSegment> retrieveUsedSegmentsForIntervals(
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
String dataSource,
List<Interval> intervals,
Segments visibility
)
{
return retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility, null);
}


/**
* see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,20 @@ public void start()
public Collection<DataSegment> retrieveUsedSegmentsForIntervals(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
final Segments visibility,
final String createdBefore
)
{
if (intervals == null || intervals.isEmpty()) {
throw new IAE("null/empty intervals");
}
return doRetrieveUsedSegments(dataSource, intervals, visibility);
return doRetrieveUsedSegments(dataSource, intervals, visibility, createdBefore);
}

@Override
public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segments visibility)
{
return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility);
return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility, null);
}

/**
Expand All @@ -155,17 +156,18 @@ public Collection<DataSegment> retrieveAllUsedSegments(String dataSource, Segmen
private Collection<DataSegment> doRetrieveUsedSegments(
final String dataSource,
final List<Interval> intervals,
final Segments visibility
final Segments visibility,
final String createdBefore
)
{
return connector.retryWithHandle(
handle -> {
if (visibility == Segments.ONLY_VISIBLE) {
final SegmentTimeline timeline =
getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
getTimelineForIntervalsWithHandle(handle, dataSource, intervals, createdBefore);
return timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
} else {
return retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals);
return retrieveAllUsedSegmentsForIntervalsWithHandle(handle, dataSource, intervals, createdBefore);
}
}
);
Expand Down Expand Up @@ -280,25 +282,27 @@ private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
private SegmentTimeline getTimelineForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
final List<Interval> intervals,
final String createdBefore
) throws IOException
{
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
.retrieveUsedSegments(dataSource, intervals, createdBefore)) {
return SegmentTimeline.forSegments(iterator);
}
}

private Collection<DataSegment> retrieveAllUsedSegmentsForIntervalsWithHandle(
final Handle handle,
final String dataSource,
final List<Interval> intervals
final List<Interval> intervals,
final String createdBefore
) throws IOException
{
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegments(dataSource, intervals)) {
.retrieveUsedSegments(dataSource, intervals, createdBefore)) {
final List<DataSegment> retVal = new ArrayList<>();
iterator.forEachRemaining(retVal::add);
return retVal;
Expand Down Expand Up @@ -1228,7 +1232,7 @@ private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(

// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval), null)
.lookup(interval);

if (existingChunks.size() > 1) {
Expand Down Expand Up @@ -1439,7 +1443,8 @@ private SegmentIdWithShardSpec createNewSegment(
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
ImmutableList.of(interval),
null
).lookup(interval);

if (existingChunks.size() > 1) {
Expand Down
Loading
Loading