Skip to content

Commit

Permalink
Refactor: Rename UsedSegmentChecker and cleanup task actions (#16644)
Browse files Browse the repository at this point in the history
Changes:
- Rename `UsedSegmentChecker` to `PublishedSegmentsRetriever`
- Remove deprecated single `Interval` argument from `RetrieveUsedSegmentsAction`
as it is now unused and has been deprecated since #1988 
- Return `Set` of segments instead of a `Collection` from `IndexerMetadataStorageCoordinator.retrieveUsedSegments()`
  • Loading branch information
kfaraz authored Jun 26, 2024
1 parent 52c9929 commit d9bd022
Show file tree
Hide file tree
Showing 29 changed files with 137 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.segment.realtime.appenderator.PublishedSegmentRetriever;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
Expand All @@ -40,13 +40,13 @@
import java.util.Set;
import java.util.stream.Collectors;

public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
public class ActionBasedPublishedSegmentRetriever implements PublishedSegmentRetriever
{
private static final Logger log = new Logger(ActionBasedUsedSegmentChecker.class);
private static final Logger log = new Logger(ActionBasedPublishedSegmentRetriever.class);

private final TaskActionClient taskActionClient;

public ActionBasedUsedSegmentChecker(TaskActionClient taskActionClient)
public ActionBasedPublishedSegmentRetriever(TaskActionClient taskActionClient)
{
this.taskActionClient = taskActionClient;
}
Expand Down Expand Up @@ -92,7 +92,7 @@ public Set<DataSegment> findPublishedSegments(Set<SegmentId> segmentIds) throws
Iterables.transform(segmentIds, SegmentId::getInterval)
);
final Collection<DataSegment> foundUsedSegments = taskActionClient.submit(
new RetrieveUsedSegmentsAction(dataSource, null, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED)
new RetrieveUsedSegmentsAction(dataSource, usedSearchIntervals, Segments.INCLUDING_OVERSHADOWED)
);
for (DataSegment segment : foundUsedSegments) {
if (segmentIds.contains(segment.getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask;
import org.apache.druid.indexing.overlord.Segments;
Expand All @@ -35,6 +35,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand All @@ -48,19 +49,13 @@
import java.util.stream.Collectors;

/**
* This TaskAction returns a collection of segments which have data within the specified intervals and are marked as
* used.
* Task action to retrieve a collection of segments which have data within the
* specified intervals and are marked as used.
* <p>
* If the task holds REPLACE locks and is writing back to the same datasource,
* only segments that were created before the REPLACE lock was acquired are returned for an interval.
* This ensures that the input set of segments for this replace task remains consistent
* even when new data is appended by other concurrent tasks.
*
* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in
* the collection only once.
*
* @implNote This action doesn't produce a {@link Set} because it's implemented via {@link
* org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns
* a collection. Producing a {@link Set} would require an unnecessary copy of segments collection.
* only segments that were created before the REPLACE lock was acquired are
* returned for an interval. This ensures that the input set of segments for this
* replace task remains consistent even when new data is appended by other concurrent tasks.
*/
public class RetrieveUsedSegmentsAction implements TaskAction<Collection<DataSegment>>
{
Expand All @@ -73,35 +68,22 @@ public class RetrieveUsedSegmentsAction implements TaskAction<Collection<DataSeg
@JsonCreator
public RetrieveUsedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@Deprecated @JsonProperty("interval") Interval interval,
@JsonProperty("intervals") Collection<Interval> intervals,
// When JSON object is deserialized, this parameter is optional for backward compatibility.
// Otherwise, it shouldn't be considered optional.
@JsonProperty("visibility") @Nullable Segments visibility
)
{
this.dataSource = dataSource;

Preconditions.checkArgument(
interval == null || intervals == null,
"please specify intervals only"
);

List<Interval> theIntervals = null;
if (interval != null) {
theIntervals = ImmutableList.of(interval);
} else if (intervals != null && intervals.size() > 0) {
theIntervals = JodaUtils.condenseIntervals(intervals);
if (CollectionUtils.isNullOrEmpty(intervals)) {
throw InvalidInput.exception("No interval specified for retrieving used segments");
}
this.intervals = Preconditions.checkNotNull(theIntervals, "no intervals found");

// Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility
this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE;
this.dataSource = dataSource;
this.intervals = JodaUtils.condenseIntervals(intervals);
this.visibility = Configs.valueOrDefault(visibility, Segments.ONLY_VISIBLE);
}

public RetrieveUsedSegmentsAction(String dataSource, Collection<Interval> intervals)
{
this(dataSource, null, intervals, Segments.ONLY_VISIBLE);
this(dataSource, intervals, Segments.ONLY_VISIBLE);
}

@JsonProperty
Expand Down Expand Up @@ -198,7 +180,7 @@ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
}
}

private Collection<DataSegment> retrieveUsedSegments(TaskActionToolbox toolbox)
private Set<DataSegment> retrieveUsedSegments(TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "segmentListById", value = RetrieveSegmentsByIdAction.class),
@JsonSubTypes.Type(name = "retrieveSegmentsById", value = RetrieveSegmentsByIdAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = RetrieveUnusedSegmentsAction.class),
@JsonSubTypes.Type(name = "markSegmentsAsUnused", value = MarkSegmentsAsUnusedAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.indexing.common.task;

import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -134,7 +134,7 @@ public static BatchAppenderatorDriver newDriver(
return new BatchAppenderatorDriver(
appenderator,
segmentAllocator,
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
null,
ImmutableList.of(getInterval()),
Segments.INCLUDING_OVERSHADOWED
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public Collection<DataSegment> retrieveUsedSegmentsForIntervals(
{
return toolbox
.getTaskActionClient()
.submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, visibility));
.submit(new RetrieveUsedSegmentsAction(dataSource, intervals, visibility));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -237,7 +237,7 @@ public StreamAppenderatorDriver newDriver(
)
),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()),
toolbox.getDataSegmentKiller(),
toolbox.getJsonMapper(),
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, Sequenc
sequenceMetadata.getCommitterSupplier(this, stream, lastPersistedOffsets).get(),
Collections.singletonList(sequenceMetadata.getSequenceName())
),
(Function<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) publishedSegmentsAndMetadata -> {
publishedSegmentsAndMetadata -> {
if (publishedSegmentsAndMetadata == null) {
throw new ISE(
"Transaction failure publishing segments for sequence [%s]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,13 @@ public SegmentPublishResult publishAnnotatedSegments(
// if we created no segments and didn't change any offsets, just do nothing and return.
log.info(
"With empty segment set, start offsets [%s] and end offsets [%s] are the same, skipping metadata commit.",
startPartitions,
finalPartitions
startPartitions, finalPartitions
);
return SegmentPublishResult.ok(segmentsToPush);
} else {
log.info(
"With empty segment set, start offsets [%s] and end offsets [%s] changed, committing new metadata.",
startPartitions,
finalPartitions
startPartitions, finalPartitions
);
action = SegmentTransactionalInsertAction.commitMetadataOnlyAction(
runner.getAppenderator().getDataSource(),
Expand All @@ -419,12 +417,10 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata,
segmentSchemaMapping
)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata,
segmentSchemaMapping
);
? SegmentTransactionalAppendAction
.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, segmentSchemaMapping)
: SegmentTransactionalInsertAction
.appendAction(segmentsToPush, startMetadata, endMetadata, segmentSchemaMapping);
} else {
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.forSegments(segmentsToPush, segmentSchemaMapping)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@
import java.util.Set;
import java.util.stream.Collectors;

public class ActionBasedUsedSegmentCheckerTest
public class ActionBasedPublishedSegmentRetrieverTest
{
private TaskActionClient taskActionClient;
private ActionBasedUsedSegmentChecker segmentRetriever;
private ActionBasedPublishedSegmentRetriever segmentRetriever;

@Before
public void setup()
{
taskActionClient = EasyMock.createMock(TaskActionClient.class);
segmentRetriever = new ActionBasedUsedSegmentChecker(taskActionClient);
segmentRetriever = new ActionBasedPublishedSegmentRetriever(taskActionClient);
}

@Test
Expand Down Expand Up @@ -103,7 +103,6 @@ public void testRetrieveUsedSegmentsIfNotFoundById() throws IOException
taskActionClient.submit(
new RetrieveUsedSegmentsAction(
"wiki",
null,
Collections.singletonList(Intervals.of("2013-01-01/P3D")),
Segments.INCLUDING_OVERSHADOWED
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -42,7 +43,7 @@ public void testSingleIntervalSerde() throws Exception
Interval interval = Intervals.of("2014/2015");

RetrieveUsedSegmentsAction expected =
new RetrieveUsedSegmentsAction("dataSource", interval, null, Segments.ONLY_VISIBLE);
new RetrieveUsedSegmentsAction("dataSource", Collections.singletonList(interval), Segments.ONLY_VISIBLE);

RetrieveUsedSegmentsAction actual =
MAPPER.readValue(MAPPER.writeValueAsString(expected), RetrieveUsedSegmentsAction.class);
Expand All @@ -68,11 +69,15 @@ public void testMultiIntervalSerde() throws Exception
@Test
public void testOldJsonDeserialization() throws Exception
{
String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"interval\": \"2014/2015\"}";
String jsonStr = "{\"type\": \"segmentListUsed\", \"dataSource\": \"test\", \"intervals\": [\"2014/2015\"]}";
RetrieveUsedSegmentsAction actual = (RetrieveUsedSegmentsAction) MAPPER.readValue(jsonStr, TaskAction.class);

Assert.assertEquals(
new RetrieveUsedSegmentsAction("test", Intervals.of("2014/2015"), null, Segments.ONLY_VISIBLE),
new RetrieveUsedSegmentsAction(
"test",
Collections.singletonList(Intervals.of("2014/2015")),
Segments.ONLY_VISIBLE
),
actual
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,13 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class SegmentTransactionalInsertActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();

@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();

Expand Down Expand Up @@ -157,8 +152,8 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end"
+ " state[null]. Try resetting the supervisor."
"The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ " ahead of the last committed end state[null]. Try resetting the supervisor."
).toString()
),
result
Expand All @@ -169,17 +164,15 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction(
null,
ImmutableSet.of(SEGMENT3),
null
);
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction
.overwriteAction(null, ImmutableSet.of(SEGMENT3), null);
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT3)), result);
IllegalStateException exception = Assert.assertThrows(
IllegalStateException.class,
() -> action.perform(task, actionTestKit.getTaskActionToolbox())
);
Assert.assertTrue(exception.getMessage().contains("are not covered by locks"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment.
Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(interval),
visibility
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment.
Collection<DataSegment> allUsedSegments = dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(interval),
visibility
)
Expand Down Expand Up @@ -829,7 +828,6 @@ private Collection<DataSegment> getAllUsedSegments()
return dummyTaskActionClient.submit(
new RetrieveUsedSegmentsAction(
WIKI,
null,
ImmutableList.of(Intervals.ETERNITY),
Segments.INCLUDING_OVERSHADOWED
)
Expand Down
Loading

0 comments on commit d9bd022

Please sign in to comment.