Skip to content

Commit

Permalink
Remove segmentsToBeDropped from SegmentTransactionInsertAction (#14883)
Browse files Browse the repository at this point in the history
Motivation:
- There is no usage of the `SegmentTransactionInsertAction` which passes a
non-null non-empty value of `segmentsToBeDropped`.
- This is not really needed either as overshadowed segments are marked as unused
by the Coordinator and need not be done in the same transaction as committing segments.
- It will also help simplify the changes being made in #14407 

Changes:
- Remove `segmentsToBeDropped` from the task action and all intermediate methods
- Remove related tests which are not needed anymore
  • Loading branch information
kfaraz authored Aug 21, 2023
1 parent c211dcc commit 9290605
Show file tree
Hide file tree
Showing 21 changed files with 27 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
} else {
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones)
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
/**
* Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments},
* are inserted into metadata storage.
*/
@Nullable
private final Set<DataSegment> segmentsToBeDropped;

@Nullable
private final DataSourceMetadata startMetadata;
Expand All @@ -82,11 +76,10 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli

public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
@Nullable Set<DataSegment> segmentsToBeDropped,
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null);
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null);
}

public static SegmentTransactionalInsertAction appendAction(
Expand All @@ -95,7 +88,7 @@ public static SegmentTransactionalInsertAction appendAction(
@Nullable DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null);
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null);
}

public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
Expand All @@ -104,21 +97,19 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource);
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource);
}

@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> segmentsToBeOverwritten,
@JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> segmentsToBeDropped,
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
this.segmentsToBeDropped = segmentsToBeDropped;
this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
Expand All @@ -132,13 +123,6 @@ public Set<DataSegment> getSegmentsToBeOverwritten()
return segmentsToBeOverwritten;
}

@JsonProperty
@Nullable
public Set<DataSegment> getSegmentsToBeDropped()
{
return segmentsToBeDropped;
}

@JsonProperty
public Set<DataSegment> getSegments()
{
Expand Down Expand Up @@ -202,9 +186,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
}
if (segmentsToBeDropped != null) {
allSegments.addAll(segmentsToBeDropped);
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments);

Expand All @@ -224,7 +205,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
segmentsToBeDropped,
startMetadata,
endMetadata
)
Expand Down Expand Up @@ -359,7 +339,6 @@ public String toString()
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource='" + dataSource + '\'' +
", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,19 +350,13 @@ public TaskStatus runTask(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> {
final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
segments,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,9 +912,9 @@ private TaskStatus generateAndPublishSegments(
}


final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish));
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down Expand Up @@ -996,7 +996,6 @@ private TaskStatus generateAndPublishSegments(
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(
inputSegments,
null,
tombStones,
publisher,
annotateFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,16 +1170,13 @@ private void publishSegments(
}
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish)
SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)
);
final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments,
Collections.emptySet(),
newSegments, annotateFunction,
null).isSuccess();
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();

if (published) {
LOG.info("Published [%d] segments", newSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ public SequenceMetadataTransactionalSegmentPublisher(
@Override
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> mustBeNullOrEmptyOverwriteSegments,
@Nullable Set<DataSegment> mustBeNullOrEmptyDropSegments,
Set<DataSegment> segmentsToPush,
@Nullable Object commitMetadata
) throws IOException
Expand All @@ -348,13 +347,7 @@ public SegmentPublishResult publishAnnotatedSegments(
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final Map<?, ?> commitMetaMap = (Map<?, ?>) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> finalPartitions =
runner.deserializePartitionsFromMetadata(
toolbox.getJsonMapper(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,39 +134,6 @@ public void testTransactionalUpdateDataSourceMetadata() throws Exception
);
}

@Test
public void testTransactionalDropSegments() throws Exception
{
final Task task = NoopTask.create();
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

SegmentPublishResult result1 = SegmentTransactionalInsertAction.overwriteAction(
null,
null,
ImmutableSet.of(SEGMENT1)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1);

SegmentPublishResult result2 = SegmentTransactionalInsertAction.overwriteAction(
null,
ImmutableSet.of(SEGMENT1),
ImmutableSet.of(SEGMENT2)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2);

Assertions.assertThat(
actionTestKit.getMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(SEGMENT2);
}

@Test
public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
{
Expand All @@ -193,38 +160,11 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
);
}

@Test
public void testFailTransactionalDropSegment() throws Exception
{
final Task task = NoopTask.create();
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

SegmentPublishResult result = SegmentTransactionalInsertAction.overwriteAction(
null,
// SEGMENT1 does not exist, hence will fail to drop
ImmutableSet.of(SEGMENT1),
ImmutableSet.of(SEGMENT2)
).perform(
task,
actionTestKit.getTaskActionToolbox()
);

Assert.assertEquals(
SegmentPublishResult.fail(
"org.apache.druid.metadata.RetryTransactionException: " +
"Failed to drop some segments. Only 0 could be dropped out of 1. Trying again"
),
result
);
}

@Test
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction(
null,
null,
ImmutableSet.of(SEGMENT3)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,16 +1525,15 @@ public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) th
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException
{
SegmentPublishResult result = super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata, endMetadata);
SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata);

Assert.assertFalse(
Assert.assertNotNull(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
segmentLatch == null
segmentLatch
);

publishedSegments.addAll(result.getSegments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,7 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull
"Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
);

transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, null, ImmutableSet.of(), null);
}

@Test
public void testPublishAnnotatedSegmentsThrowExceptionIfDropSegmentsNotNullAndNotEmpty() throws Exception
{
DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
.interval(Intervals.of("2001/P1D"))
.shardSpec(new LinearShardSpec(1))
.version("b")
.size(0)
.build();

Set<DataSegment> notNullNotEmptySegment = ImmutableSet.of(dataSegment);
SequenceMetadata<Integer, Integer> sequenceMetadata = new SequenceMetadata<>(
1,
"test",
ImmutableMap.of(),
ImmutableMap.of(),
true,
ImmutableSet.of()
);
TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true);

expectedException.expect(ISE.class);
expectedException.expectMessage(
"Stream ingestion task unexpectedly attempted to drop segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
);

transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableSet.of(), null);
transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null);
}

@Test
Expand Down Expand Up @@ -143,6 +113,6 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment
);
TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false);

transactionalSegmentPublisher.publishAnnotatedSegments(null, null, notNullNotEmptySegment, ImmutableMap.of());
transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Commit metadata for a dataSource. Used by
* {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, Set, DataSourceMetadata, DataSourceMetadata)}
* {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* to provide metadata transactions for segment inserts.
*
* Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ SegmentIdWithShardSpec allocatePendingSegment(
* {@param segments} and dropping {@param segmentsToDrop}
*
* @param segments set of segments to add, must all be from the same dataSource
* @param segmentsToDrop set of segments to drop, must all be from the same dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
* not involve a metadata transaction
Expand All @@ -274,7 +273,6 @@ SegmentIdWithShardSpec allocatePendingSegment(
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
Set<DataSegment> segmentsToDrop,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
) throws IOException;
Expand Down
Loading

0 comments on commit 9290605

Please sign in to comment.