diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 2373a59ec926..9aa94e9e4766 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1306,7 +1306,7 @@ private void publishAllSegments(final Set segments) throws IOExcept } else { performSegmentPublish( context.taskActionClient(), - SegmentTransactionalInsertAction.overwriteAction(null, null, segmentsWithTombstones) + SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones) ); } } else if (!segments.isEmpty()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index a0567dce04bf..cd11e0befdca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -66,12 +66,6 @@ public class SegmentTransactionalInsertAction implements TaskAction segments; - /** - * Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments}, - * are inserted into metadata storage. - */ - @Nullable - private final Set segmentsToBeDropped; @Nullable private final DataSourceMetadata startMetadata; @@ -82,11 +76,10 @@ public class SegmentTransactionalInsertAction implements TaskAction segmentsToBeOverwritten, - @Nullable Set segmentsToBeDropped, Set segmentsToPublish ) { - return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null); + return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null); } public static SegmentTransactionalInsertAction appendAction( @@ -95,7 +88,7 @@ public static SegmentTransactionalInsertAction appendAction( @Nullable DataSourceMetadata endMetadata ) { - return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null); + return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null); } public static SegmentTransactionalInsertAction commitMetadataOnlyAction( @@ -104,13 +97,12 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction( DataSourceMetadata endMetadata ) { - return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource); + return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource); } @JsonCreator private SegmentTransactionalInsertAction( @JsonProperty("segmentsToBeOverwritten") @Nullable Set segmentsToBeOverwritten, - @JsonProperty("segmentsToBeDropped") @Nullable Set segmentsToBeDropped, @JsonProperty("segments") @Nullable Set segments, @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, @@ -118,7 +110,6 @@ private SegmentTransactionalInsertAction( ) { this.segmentsToBeOverwritten = segmentsToBeOverwritten; - this.segmentsToBeDropped = segmentsToBeDropped; this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments); this.startMetadata = startMetadata; this.endMetadata = endMetadata; @@ -132,13 +123,6 @@ public Set getSegmentsToBeOverwritten() return segmentsToBeOverwritten; } - @JsonProperty - @Nullable - public Set getSegmentsToBeDropped() - { - return segmentsToBeDropped; - } - @JsonProperty public Set getSegments() { @@ -202,9 +186,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) if (segmentsToBeOverwritten != null) { allSegments.addAll(segmentsToBeOverwritten); } - if (segmentsToBeDropped != null) { - allSegments.addAll(segmentsToBeDropped); - } TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments); @@ -224,7 +205,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) .onValidLocks( () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( segments, - segmentsToBeDropped, startMetadata, endMetadata ) @@ -359,7 +339,6 @@ public String toString() ", startMetadata=" + startMetadata + ", endMetadata=" + endMetadata + ", dataSource='" + dataSource + '\'' + - ", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 45746aa63604..0d12f2ba2e9b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -350,19 +350,13 @@ public TaskStatus runTask(final TaskToolbox toolbox) int sequenceNumber = 0; String sequenceName = makeSequenceName(getId(), sequenceNumber); - final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> { + final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> { if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { throw new ISE( "Stream ingestion task unexpectedly attempted to overwrite segments: %s", SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) ); } - if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) { - throw new ISE( - "Stream ingestion task unexpectedly attempted to drop segments: %s", - SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments) - ); - } final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction( segments, null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 3a4b574d4e6c..4642b4391d96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -912,9 +912,9 @@ private TaskStatus generateAndPublishSegments( } - final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) -> + final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish)); + .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish)); String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null); if (effectiveId == null) { @@ -996,7 +996,6 @@ private TaskStatus generateAndPublishSegments( final SegmentsAndCommitMetadata published = awaitPublish(driver.publishAll( inputSegments, - null, tombStones, publisher, annotateFunction diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index bfe69d6ee0aa..f5653c1c86bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1170,16 +1170,13 @@ private void publishSegments( } } - final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) -> + final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( - SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish) + SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish) ); final boolean published = newSegments.isEmpty() - || publisher.publishSegments(oldSegments, - Collections.emptySet(), - newSegments, annotateFunction, - null).isSuccess(); + || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess(); if (published) { LOG.info("Published [%d] segments", newSegments.size()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 6a786523d4b4..161a36de2fd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -337,7 +337,6 @@ public SequenceMetadataTransactionalSegmentPublisher( @Override public SegmentPublishResult publishAnnotatedSegments( @Nullable Set mustBeNullOrEmptyOverwriteSegments, - @Nullable Set mustBeNullOrEmptyDropSegments, Set segmentsToPush, @Nullable Object commitMetadata ) throws IOException @@ -348,13 +347,7 @@ public SegmentPublishResult publishAnnotatedSegments( SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) ); } - if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) { - throw new ISE( - "Stream ingestion task unexpectedly attempted to drop segments: %s", - SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments) - ); - } - final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata"); + final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata"); final SeekableStreamEndSequenceNumbers finalPartitions = runner.deserializePartitionsFromMetadata( toolbox.getJsonMapper(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 0243a0d20d00..b8bdbfb2add3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -134,39 +134,6 @@ public void testTransactionalUpdateDataSourceMetadata() throws Exception ); } - @Test - public void testTransactionalDropSegments() throws Exception - { - final Task task = NoopTask.create(); - actionTestKit.getTaskLockbox().add(task); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); - - SegmentPublishResult result1 = SegmentTransactionalInsertAction.overwriteAction( - null, - null, - ImmutableSet.of(SEGMENT1) - ).perform( - task, - actionTestKit.getTaskActionToolbox() - ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1); - - SegmentPublishResult result2 = SegmentTransactionalInsertAction.overwriteAction( - null, - ImmutableSet.of(SEGMENT1), - ImmutableSet.of(SEGMENT2) - ).perform( - task, - actionTestKit.getTaskActionToolbox() - ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); - - Assertions.assertThat( - actionTestKit.getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder(SEGMENT2); - } - @Test public void testFailTransactionalUpdateDataSourceMetadata() throws Exception { @@ -193,38 +160,11 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception ); } - @Test - public void testFailTransactionalDropSegment() throws Exception - { - final Task task = NoopTask.create(); - actionTestKit.getTaskLockbox().add(task); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); - - SegmentPublishResult result = SegmentTransactionalInsertAction.overwriteAction( - null, - // SEGMENT1 does not exist, hence will fail to drop - ImmutableSet.of(SEGMENT1), - ImmutableSet.of(SEGMENT2) - ).perform( - task, - actionTestKit.getTaskActionToolbox() - ); - - Assert.assertEquals( - SegmentPublishResult.fail( - "org.apache.druid.metadata.RetryTransactionException: " + - "Failed to drop some segments. Only 0 could be dropped out of 1. Trying again" - ), - result - ); - } - @Test public void testFailBadVersion() throws Exception { final Task task = NoopTask.create(); final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.overwriteAction( - null, null, ImmutableSet.of(SEGMENT3) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 7ea82b8f57b0..049bc11e0fdb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1525,16 +1525,15 @@ public Set announceHistoricalSegments(Set segments) th @Override public SegmentPublishResult announceHistoricalSegments( Set segments, - Set segmentsToDrop, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata ) throws IOException { - SegmentPublishResult result = super.announceHistoricalSegments(segments, segmentsToDrop, startMetadata, endMetadata); + SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); - Assert.assertFalse( + Assert.assertNotNull( "Segment latch not initialized, did you forget to call expectPublishSegments?", - segmentLatch == null + segmentLatch ); publishedSegments.addAll(result.getSegments()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java index 9f03d5f2e8a0..aae07194bb94 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java @@ -85,37 +85,7 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull "Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment) ); - transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, null, ImmutableSet.of(), null); - } - - @Test - public void testPublishAnnotatedSegmentsThrowExceptionIfDropSegmentsNotNullAndNotEmpty() throws Exception - { - DataSegment dataSegment = DataSegment.builder() - .dataSource("foo") - .interval(Intervals.of("2001/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("b") - .size(0) - .build(); - - Set notNullNotEmptySegment = ImmutableSet.of(dataSegment); - SequenceMetadata sequenceMetadata = new SequenceMetadata<>( - 1, - "test", - ImmutableMap.of(), - ImmutableMap.of(), - true, - ImmutableSet.of() - ); - TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); - - expectedException.expect(ISE.class); - expectedException.expectMessage( - "Stream ingestion task unexpectedly attempted to drop segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment) - ); - - transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableSet.of(), null); + transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null); } @Test @@ -143,6 +113,6 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment ); TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false); - transactionalSegmentPublisher.publishAnnotatedSegments(null, null, notNullNotEmptySegment, ImmutableMap.of()); + transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 0606889b7ff5..41a688fc9fae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -159,7 +159,6 @@ public Map allocatePendingSegments @Override public SegmentPublishResult announceHistoricalSegments( Set segments, - Set segmentsToDrop, DataSourceMetadata oldCommitMetadata, DataSourceMetadata newCommitMetadata ) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java index 6cad3c12649b..23579a0c8a4c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java @@ -26,7 +26,7 @@ /** * Commit metadata for a dataSource. Used by - * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, Set, DataSourceMetadata, DataSourceMetadata)} + * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} * to provide metadata transactions for segment inserts. * * Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 589b60f027f3..a954c1f2d670 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -257,7 +257,6 @@ SegmentIdWithShardSpec allocatePendingSegment( * {@param segments} and dropping {@param segmentsToDrop} * * @param segments set of segments to add, must all be from the same dataSource - * @param segmentsToDrop set of segments to drop, must all be from the same dataSource * @param startMetadata dataSource metadata pre-insert must match this startMetadata according to * {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will * not involve a metadata transaction @@ -274,7 +273,6 @@ SegmentIdWithShardSpec allocatePendingSegment( */ SegmentPublishResult announceHistoricalSegments( Set segments, - Set segmentsToDrop, @Nullable DataSourceMetadata startMetadata, @Nullable DataSourceMetadata endMetadata ) throws IOException; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3773aa466d6c..25a96a307655 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -303,7 +303,7 @@ private Collection retrieveAllUsedSegmentsForIntervalsWithHandle( @Override public Set announceHistoricalSegments(final Set segments) throws IOException { - final SegmentPublishResult result = announceHistoricalSegments(segments, null, null, null); + final SegmentPublishResult result = announceHistoricalSegments(segments, null, null); // Metadata transaction cannot fail because we are not trying to do one. if (!result.isSuccess()) { @@ -316,7 +316,6 @@ public Set announceHistoricalSegments(final Set segmen @Override public SegmentPublishResult announceHistoricalSegments( final Set segments, - final Set segmentsToDrop, @Nullable final DataSourceMetadata startMetadata, @Nullable final DataSourceMetadata endMetadata ) throws IOException @@ -382,27 +381,7 @@ public SegmentPublishResult inTransaction( } } - if (segmentsToDrop != null && !segmentsToDrop.isEmpty()) { - final DataStoreMetadataUpdateResult result = dropSegmentsWithHandle( - handle, - segmentsToDrop, - dataSource - ); - if (result.isFailed()) { - // Metadata store was definitely not updated. - transactionStatus.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw new RuntimeException(result.getErrorMsg()); - } - } - } - final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); - return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } }, @@ -1533,7 +1512,7 @@ private Set segmentExistsBatch(final Handle handle, final Set segmentsToDrop, - final String dataSource - ) - { - Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(segmentsToDrop, "segmentsToDrop"); - - if (segmentsToDrop.isEmpty()) { - return DataStoreMetadataUpdateResult.SUCCESS; - } - - if (segmentsToDrop.stream().anyMatch(segment -> !dataSource.equals(segment.getDataSource()))) { - // All segments to drop must belong to the same datasource - return new DataStoreMetadataUpdateResult( - true, - false, - "Not dropping segments, as not all segments belong to the datasource[%s].", - dataSource); - } - - final int numChangedSegments = - SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper).markSegments( - segmentsToDrop.stream().map(DataSegment::getId).collect(Collectors.toList()), - false - ); - - if (numChangedSegments != segmentsToDrop.size()) { - return new DataStoreMetadataUpdateResult( - true, - true, - "Failed to drop some segments. Only %d could be dropped out of %d. Trying again", - numChangedSegments, - segmentsToDrop.size() - ); - } - return DataStoreMetadataUpdateResult.SUCCESS; - } - @Override public boolean deleteDataSourceMetadata(final String dataSource) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 65a8a676d3d2..c050d9c5e49b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -560,7 +560,6 @@ ListenableFuture dropInBackground(SegmentsAndCommitMe */ ListenableFuture publishInBackground( @Nullable Set segmentsToBeOverwritten, - @Nullable Set segmentsToBeDropped, @Nullable Set tombstones, SegmentsAndCommitMetadata segmentsAndCommitMetadata, TransactionalSegmentPublisher publisher, @@ -601,7 +600,6 @@ ListenableFuture publishInBackground( final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, - segmentsToBeDropped, ourSegments, outputSegmentsAnnotateFunction, callerMetadata diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 7a99f200bea6..cf2efef2f887 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -192,14 +192,12 @@ private SegmentsAndCommitMetadata pushAndClear( * Publish all segments. * * @param segmentsToBeOverwritten segments which can be overwritten by new segments published by the given publisher - * @param segmentsToBeDropped segments which will be dropped and marked unused * @param publisher segment publisher * * @return a {@link ListenableFuture} for the publish task */ public ListenableFuture publishAll( @Nullable final Set segmentsToBeOverwritten, - @Nullable final Set segmentsToBeDropped, @Nullable final Set tombstones, final TransactionalSegmentPublisher publisher, final Function, Set> outputSegmentsAnnotateFunction @@ -212,7 +210,6 @@ public ListenableFuture publishAll( return publishInBackground( segmentsToBeOverwritten, - segmentsToBeDropped, tombstones == null ? Collections.emptySet() : tombstones, new SegmentsAndCommitMetadata( snapshot diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 8c7ec417f8cd..f82266319672 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -282,7 +282,6 @@ public ListenableFuture publish( // version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs pushInBackground(wrapCommitter(committer), theSegments, true), (AsyncFunction) sam -> publishInBackground( - null, null, null, sam, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index a71e4cdb6d36..2ffb4dd572a3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -41,14 +41,12 @@ public interface TransactionalSegmentPublisher */ SegmentPublishResult publishAnnotatedSegments( @Nullable Set segmentsToBeOverwritten, - @Nullable Set segmentsToDrop, Set segmentsToPublish, @Nullable Object commitMetadata ) throws IOException; default SegmentPublishResult publishSegments( @Nullable Set segmentsToBeOverwritten, - @Nullable Set segmentsToDrop, Set segmentsToPublish, Function, Set> outputSegmentsAnnotateFunction, @Nullable Object commitMetadata @@ -58,7 +56,6 @@ default SegmentPublishResult publishSegments( .andThen(SegmentPublisherHelper::annotateShardSpec); return publishAnnotatedSegments( segmentsToBeOverwritten, - segmentsToDrop, annotateFunction.apply(segmentsToPublish), commitMetadata ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9fadb8b106c8..68654c88550b 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -352,18 +352,6 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( return super.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); } - @Override - protected DataStoreMetadataUpdateResult dropSegmentsWithHandle( - final Handle handle, - final Collection segmentsToDrop, - final String dataSource - ) - { - // Count number of times this method is called. - segmentTableDropUpdateCounter.getAndIncrement(); - return super.dropSegmentsWithHandle(handle, segmentsToDrop, dataSource); - } - @Override public int getSqlMetadataMaxRetry() { @@ -560,7 +548,6 @@ public void testTransactionalAnnounceSuccess() throws IOException // Insert first segment. final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -579,7 +566,6 @@ public void testTransactionalAnnounceSuccess() throws IOException // Insert second segment. final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), - ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -636,7 +622,6 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( // Insert first segment. final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -658,7 +643,6 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( // Insert second segment. final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), - ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -689,7 +673,6 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -702,116 +685,11 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException Assert.assertEquals(1, metadataUpdateCounter.get()); } - @Test - public void testTransactionalAnnounceFailSegmentDropFailWithoutRetry() throws IOException - { - insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); - - Assert.assertEquals( - ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - - DataSegment dataSegmentBar = DataSegment.builder() - .dataSource("bar") - .interval(Intervals.of("2001/P1D")) - .shardSpec(new LinearShardSpec(1)) - .version("b") - .size(0) - .build(); - Set dropSegments = ImmutableSet.of(existingSegment1, existingSegment2, dataSegmentBar); - - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - SEGMENTS, - dropSegments, - null, - null - ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Not dropping segments, " + - "as not all segments belong to the datasource[fooDataSource]."), result1); - - // Should only be tried once. Since dropSegmentsWithHandle will return FAILURE (not TRY_AGAIN) as set of - // segments to drop contains more than one datasource. - Assert.assertEquals(1, segmentTableDropUpdateCounter.get()); - - Assert.assertEquals( - ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - } - - @Test - public void testTransactionalAnnounceSucceedWithSegmentDrop() throws IOException - { - insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); - - Assert.assertEquals( - ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - SEGMENTS, - ImmutableSet.of(existingSegment1, existingSegment2), - null, - null - ); - - Assert.assertEquals(SegmentPublishResult.ok(SEGMENTS), result1); - - for (DataSegment segment : SEGMENTS) { - Assert.assertArrayEquals( - mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), - derbyConnector.lookup( - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), - "id", - "payload", - segment.getId().toString() - ) - ); - } - - Assert.assertEquals( - ImmutableList.of(defaultSegment.getId().toString(), defaultSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - } - - @Test - public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOException - { - insertUsedSegments(ImmutableSet.of(existingSegment1, existingSegment2)); - - Assert.assertEquals( - ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - - Set dropSegments = ImmutableSet.of(existingSegment1, defaultSegment4); - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( - SEGMENTS, - dropSegments, - null, - null - ); - Assert.assertEquals(SegmentPublishResult.fail( - "org.apache.druid.metadata.RetryTransactionException: Failed to drop some segments. " + - "Only 1 could be dropped out of 2. Trying again"), result1); - - Assert.assertEquals(MAX_SQL_MEATADATA_RETRY_FOR_TEST, segmentTableDropUpdateCounter.get()); - - Assert.assertEquals( - ImmutableList.of(existingSegment1.getId().toString(), existingSegment2.getId().toString()), - retrieveUsedSegmentIds() - ); - } - @Test public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -819,7 +697,6 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -837,7 +714,6 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep { final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -845,7 +721,6 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), - ImmutableSet.of(), new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); @@ -1393,7 +1268,6 @@ public void testDeleteDataSourceMetadata() throws IOException { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -2304,52 +2178,11 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor Assert.assertNull(id); } - @Test - public void testDropSegmentsWithHandleForSegmentThatExist() - { - try (Handle handle = derbyConnector.getDBI().open()) { - Assert.assertTrue(insertUsedSegments(ImmutableSet.of(defaultSegment))); - List usedSegments = retrieveUsedSegmentIds(); - Assert.assertEquals(1, usedSegments.size()); - Assert.assertEquals(defaultSegment.getId().toString(), usedSegments.get(0)); - - // Try drop segment - IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle( - handle, - ImmutableSet.of(defaultSegment), - defaultSegment.getDataSource() - ); - - Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.SUCCESS, result); - usedSegments = retrieveUsedSegmentIds(); - Assert.assertEquals(0, usedSegments.size()); - } - } - - @Test - public void testDropSegmentsWithHandleForSegmentThatDoesNotExist() - { - try (Handle handle = derbyConnector.getDBI().open()) { - // Try drop segment - IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult result = coordinator.dropSegmentsWithHandle( - handle, - ImmutableSet.of(defaultSegment), - defaultSegment.getDataSource() - ); - Assert.assertEquals(new IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult( - true, - true, - "Failed to drop some segments. Only 0 could be dropped out of 1. Trying again"), - result); - } - } - @Test public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted() throws Exception { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -2378,7 +2211,6 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThan { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); @@ -2404,7 +2236,6 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderT { coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment), - ImmutableSet.of(), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java index a2f24ebd7eb1..0dcb987c59ba 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java @@ -127,7 +127,7 @@ public void testSimple() throws Exception checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED); final SegmentsAndCommitMetadata published = - driver.publishAll(null, null, Collections.emptySet(), makeOkPublisher(), Function.identity()) + driver.publishAll(null, Collections.emptySet(), makeOkPublisher(), Function.identity()) .get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( @@ -162,7 +162,7 @@ public void testIncrementalPush() throws Exception } final SegmentsAndCommitMetadata published = - driver.publishAll(null, null, Collections.emptySet(), makeOkPublisher(), Function.identity()) + driver.publishAll(null, Collections.emptySet(), makeOkPublisher(), Function.identity()) .get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( @@ -204,7 +204,7 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); } static class TestSegmentAllocator implements SegmentAllocator diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java index 1fb6c7909615..9657521a5261 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java @@ -125,7 +125,7 @@ public void testSimple() throws Exception checkSegmentStates(2, SegmentState.PUSHED_AND_DROPPED); final SegmentsAndCommitMetadata published = - driver.publishAll(null, null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); + driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -159,7 +159,7 @@ public void testIncrementalPush() throws Exception } final SegmentsAndCommitMetadata published = - driver.publishAll(null, null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); + driver.publishAll(null, null, makeOkPublisher(), Function.identity()).get(TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals( ImmutableSet.of( @@ -200,6 +200,6 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 476ecded7eaa..8f2b77c4fc0f 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -375,13 +375,13 @@ private Set asIdentifiers(Iterable segments static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet()); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { - return (segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, commitMetadata) -> { + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> { final RuntimeException exception = new RuntimeException("test"); if (failWithException) { throw exception;