From f93fbd3e42934dbcbe4bf11d3c3634611ae49ab0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 5 Oct 2023 19:09:50 +0530 Subject: [PATCH 1/7] Add CommitRealtimeSegmentsAndMetadataAction --- ...mmitRealtimeSegmentsAndMetadataAction.java | 168 ++++++++++++++++++ .../SegmentTransactionalAppendAction.java | 16 +- .../SegmentTransactionalInsertAction.java | 44 +---- .../SegmentTransactionalReplaceAction.java | 20 +-- .../indexing/common/actions/TaskAction.java | 1 + .../AppenderatorDriverRealtimeIndexTask.java | 2 +- .../indexing/common/task/IndexTaskUtils.java | 49 +++-- .../SeekableStreamIndexTaskRunner.java | 18 +- .../seekablestream/SequenceMetadata.java | 42 +++-- .../task/concurrent/ActionsTestTask.java | 28 +++ .../ConcurrentReplaceAndAppendTest.java | 90 +++++++++- .../seekablestream/SequenceMetadataTest.java | 6 +- .../emitter/service/SegmentMetadataEvent.java | 13 ++ 13 files changed, 373 insertions(+), 124 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java new file mode 100644 index 000000000000..67ad47080723 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.timeline.DataSegment; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Task action to commit realtime segments and metadata when using APPEND task locks. + *

+ * This action performs the following operations in a single transaction: + *

+ * This action differs from {@link SegmentTransactionalInsertAction} as it is used + * only with APPEND locks and also upgrades segments as needed. + */ +public class CommitRealtimeSegmentsAndMetadataAction implements TaskAction +{ + /** + * Set of segments to be inserted into metadata storage + */ + private final Set segments; + + private final DataSourceMetadata startMetadata; + private final DataSourceMetadata endMetadata; + + public static CommitRealtimeSegmentsAndMetadataAction create( + Set segments, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) + { + return new CommitRealtimeSegmentsAndMetadataAction(segments, startMetadata, endMetadata); + } + + @JsonCreator + private CommitRealtimeSegmentsAndMetadataAction( + @JsonProperty("segments") Set segments, + @JsonProperty("startMetadata") DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") DataSourceMetadata endMetadata + ) + { + Preconditions.checkArgument( + segments != null && !segments.isEmpty(), + "Segments to commit should not be empty" + ); + this.segments = segments; + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + } + + @JsonProperty + public Set getSegments() + { + return segments; + } + + @JsonProperty + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + /** + * Performs some sanity checks and publishes the given segments. + */ + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + final SegmentPublishResult publishResult; + + TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); + + try { + publishResult = toolbox.getTaskLockbox().doInCriticalSection( + task, + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), + CriticalAction.builder() + .onValidLocks( + // TODO: this might need to call a new method which does the following in the same transaction + // - commit append segments + // - upgrade append segments to replace versions + // - commit metadata + () -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata( + segments, + startMetadata, + endMetadata + ) + ) + .onInvalidLocks( + () -> SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); + return publishResult; + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "CommitRealtimeSegmentsAndMetadataAction{" + + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", startMetadata=" + startMetadata + + ", endMetadata=" + endMetadata + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 171c4f6640f3..36b69270f991 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -107,20 +106,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throw new RuntimeException(e); } - // Emit metrics - final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); - IndexTaskUtils.setTaskDimensions(metricBuilder, task); - - if (retVal.isSuccess()) { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); - for (DataSegment segment : retVal.getSegments()) { - IndexTaskUtils.setSegmentDimensions(metricBuilder, segment); - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); - } - } else { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); - } - + IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); return retVal; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 9b23db71d464..5a9ca0cacdfe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -33,13 +33,8 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -222,47 +217,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throw new RuntimeException(e); } - // Emit metrics - final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); - IndexTaskUtils.setTaskDimensions(metricBuilder, task); - - if (retVal.isSuccess()) { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); - } else { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); - } - - // getSegments() should return an empty set if announceHistoricalSegments() failed - for (DataSegment segment : retVal.getSegments()) { - metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); - metricBuilder.setDimension( - DruidMetrics.PARTITIONING_TYPE, - segment.getShardSpec() == null ? null : segment.getShardSpec().getType() - ); - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); - // Emit the segment related metadata using the configured emitters. - // There is a possibility that some segments' metadata event might get missed if the - // server crashes after commiting segment but before emitting the event. - this.emitSegmentMetadata(segment, toolbox); - } - + IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); return retVal; } - private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox) - { - SegmentMetadataEvent event = new SegmentMetadataEvent( - segment.getDataSource(), - DateTime.now(DateTimeZone.UTC), - segment.getInterval().getStart(), - segment.getInterval().getEnd(), - segment.getVersion(), - segment.getLastCompactionState() != null - ); - - toolbox.getEmitter().emit(event); - } - private void checkWithSegmentLock() { final Map> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 5a1228e1dd1d..119a59ea648a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -27,9 +27,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.ReplaceTaskLock; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -111,23 +109,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throw new RuntimeException(e); } - // Emit metrics - final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); - IndexTaskUtils.setTaskDimensions(metricBuilder, task); - - if (retVal.isSuccess()) { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); - - for (DataSegment segment : retVal.getSegments()) { - final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType(); - metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType); - metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); - } - } else { - toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); - } - + IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); return retVal; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 171d53b9cdd6..0948c036ea73 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,6 +38,7 @@ @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), + @JsonSubTypes.Type(name = "commitRealtimeSegments", value = CommitRealtimeSegmentsAndMetadataAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index dfa1f85fde72..8293c51a768f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -696,7 +696,7 @@ private void publishSegments( ); pendingHandoffs.add(Futures.transformAsync( publishFuture, - (AsyncFunction) driver::registerHandoff, + driver::registerHandoff, MoreExecutors.directExecutor() )); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java index 20f7584c8eb2..79a3e8993a8c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.common.task; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.actions.TaskActionToolbox; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -35,7 +37,6 @@ import org.apache.druid.server.security.ResourceType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CircularBuffer; -import org.joda.time.DateTime; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; @@ -45,29 +46,6 @@ public class IndexTaskUtils { - @Nullable - public static List getMessagesFromSavedParseExceptions( - CircularBuffer savedParseExceptions, - boolean includeTimeOfException - ) - { - if (savedParseExceptions == null) { - return null; - } - - List events = new ArrayList<>(); - for (int i = 0; i < savedParseExceptions.size(); i++) { - if (includeTimeOfException) { - DateTime timeOfException = DateTimes.utc(savedParseExceptions.getLatest(i).getTimeOfExceptionMillis()); - events.add(timeOfException + ", " + savedParseExceptions.getLatest(i).getMessage()); - } else { - events.add(savedParseExceptions.getLatest(i).getMessage()); - } - } - - return events; - } - @Nullable public static List getReportListFromSavedParseExceptions( CircularBuffer savedParseExceptionReports @@ -152,4 +130,25 @@ public static void setSegmentDimensions( metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType); metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); } + + public static void emitSegmentPublishMetrics( + SegmentPublishResult publishResult, + Task task, + TaskActionToolbox toolbox + ) + { + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, task); + + if (publishResult.isSuccess()) { + toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); + for (DataSegment segment : publishResult.getSegments()) { + IndexTaskUtils.setSegmentDimensions(metricBuilder, segment); + toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); + toolbox.getEmitter().emit(SegmentMetadataEvent.create(segment, DateTimes.nowUtc())); + } + } else { + toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index e44dfe9a451e..c2b548cd779a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -60,6 +60,7 @@ import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction; +import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -319,7 +320,8 @@ public void initializeSequences() throws IOException previous.getValue(), current.getValue(), true, - exclusiveStartPartitions + exclusiveStartPartitions, + getTaskLockType() ) ); previous = current; @@ -334,7 +336,8 @@ public void initializeSequences() throws IOException previous.getValue(), endOffsets, false, - exclusiveStartPartitions + exclusiveStartPartitions, + getTaskLockType() ) ); } else { @@ -345,7 +348,8 @@ public void initializeSequences() throws IOException ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), endOffsets, false, - ioConfig.getStartSequenceNumbers().getExclusivePartitions() + ioConfig.getStartSequenceNumbers().getExclusivePartitions(), + getTaskLockType() ) ); } @@ -925,6 +929,11 @@ public void onFailure(Throwable t) return TaskStatus.success(task.getId()); } + private TaskLockType getTaskLockType() + { + return TaskLocks.determineLockTypeForAppend(task.getContext()); + } + private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException { // Check if any publishFuture failed. @@ -1709,7 +1718,8 @@ public Response setEndOffsets( sequenceNumbers, endOffsets, false, - exclusiveStartPartitions + exclusiveStartPartitions, + getTaskLockType() ); log.info( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 161a36de2fd0..aa009c674900 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -25,8 +25,13 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.Committer; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.CommitRealtimeSegmentsAndMetadataAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; @@ -54,6 +59,7 @@ public class SequenceMetadata private final String sequenceName; private final Set exclusiveStartPartitions; private final Set assignments; + private final TaskLockType taskLockType; private final boolean sentinel; /** * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because @@ -73,7 +79,8 @@ public SequenceMetadata( @JsonProperty("startOffsets") Map startOffsets, @JsonProperty("endOffsets") Map endOffsets, @JsonProperty("checkpointed") boolean checkpointed, - @JsonProperty("exclusiveStartPartitions") Set exclusiveStartPartitions + @JsonProperty("exclusiveStartPartitions") Set exclusiveStartPartitions, + @JsonProperty("taskLockType") TaskLockType taskLockType ) { Preconditions.checkNotNull(sequenceName); @@ -86,6 +93,7 @@ public SequenceMetadata( this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; + this.taskLockType = taskLockType; this.exclusiveStartPartitions = exclusiveStartPartitions == null ? Collections.emptySet() : exclusiveStartPartitions; @@ -139,6 +147,12 @@ public Map getEndOffsets() } } + @JsonProperty + public TaskLockType getTaskLockType() + { + return taskLockType; + } + @JsonProperty public boolean isSentinel() { @@ -363,7 +377,7 @@ public SegmentPublishResult publishAnnotatedSegments( ); } - final SegmentTransactionalInsertAction action; + final TaskAction action; if (segmentsToPush.isEmpty()) { // If a task ingested no data but made progress reading through its assigned partitions, @@ -395,19 +409,21 @@ public SegmentPublishResult publishAnnotatedSegments( ); } } else if (useTransaction) { - action = SegmentTransactionalInsertAction.appendAction( - segmentsToPush, - runner.createDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - finalPartitions.getStream(), - getStartOffsets(), - exclusiveStartPartitions - ) - ), - runner.createDataSourceMetadata(finalPartitions) + final DataSourceMetadata startMetadata = runner.createDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + finalPartitions.getStream(), + getStartOffsets(), + exclusiveStartPartitions + ) ); + final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions); + action = taskLockType == TaskLockType.APPEND + ? CommitRealtimeSegmentsAndMetadataAction.create(segmentsToPush, startMetadata, endMetadata) + : SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata); } else { - action = SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null); + action = taskLockType == TaskLockType.APPEND + ? SegmentTransactionalAppendAction.create(segmentsToPush) + : SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null); } return toolbox.getTaskActionClient().submit(action); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index b78efcbc3469..6a1a92641d3f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -23,6 +23,7 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; @@ -60,6 +61,11 @@ public TaskLock acquireReplaceLockOn(Interval interval) return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.REPLACE, interval)); } + public Void releaseLock(Interval interval) + { + return runAction(new LockReleaseAction(interval)); + } + public TaskLock acquireAppendLockOn(Interval interval) { return runAction(new TimeChunkLockTryAcquireAction(TaskLockType.APPEND, interval)); @@ -97,6 +103,28 @@ public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Gr ); } + public SegmentIdWithShardSpec allocateSegmentForTimestamp( + DateTime timestamp, + Granularity preferredSegmentGranularity, + String sequenceName + ) + { + return runAction( + new SegmentAllocateAction( + getDataSource(), + timestamp, + Granularities.SECOND, + preferredSegmentGranularity, + getId() + "__" + sequenceName, + null, + false, + NumberedPartialShardSpec.instance(), + LockGranularity.TIME_CHUNK, + TaskLockType.APPEND + ) + ); + } + private T runAction(TaskAction action) { return execute(() -> client.submit(action)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 293503b1c723..678d17aefbf4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -66,11 +66,14 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -626,10 +629,10 @@ public void testLockReplaceAllocateLockReplaceLockReplaceAppend() // Allocate an append segment for v1 final ActionsTestTask appendTask1 = createAndStartTask(); - appendTask1.acquireAppendLockOn(YEAR_23); final SegmentIdWithShardSpec pendingSegmentV11 = appendTask1.allocateSegmentForTimestamp(YEAR_23.getStart(), Granularities.YEAR); - Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV11.getVersion()); + Assert.assertEquals(v1, pendingSegmentV11.getVersion()); + Assert.assertEquals(YEAR_23, pendingSegmentV11.getInterval()); // Commit replace segment for v2 final ActionsTestTask replaceTask2 = createAndStartTask(); @@ -771,6 +774,89 @@ public void testMultipleGranularities() verifyIntervalHasVisibleSegments(YEAR_23, segmentV10, segmentV11, segmentV13); } + @Test + public void testSegmentIsAllocatedAtLatestVersion() + { + final SegmentIdWithShardSpec pendingSegmentV01 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(SEGMENT_V0, pendingSegmentV01.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV01.getInterval()); + + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + final DataSegment segmentV10 = createSegment(JAN_23, v1); + replaceTask.commitReplaceSegments(segmentV10); + verifyIntervalHasUsedSegments(JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10); + + final SegmentIdWithShardSpec pendingSegmentV12 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), pendingSegmentV12.asSegmentId()); + Assert.assertEquals(v1, pendingSegmentV12.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV12.getInterval()); + + replaceTask.releaseLock(JAN_23); + final ActionsTestTask replaceTask2 = createAndStartTask(); + final String v2 = replaceTask2.acquireReplaceLockOn(JAN_23).getVersion(); + final DataSegment segmentV20 = createSegment(JAN_23, v2); + replaceTask2.commitReplaceSegments(segmentV20); + verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV20); + verifyIntervalHasVisibleSegments(JAN_23, segmentV20); + + final SegmentIdWithShardSpec pendingSegmentV23 + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertNotEquals(pendingSegmentV01.asSegmentId(), pendingSegmentV23.asSegmentId()); + Assert.assertEquals(v2, pendingSegmentV23.getVersion()); + Assert.assertEquals(JAN_23, pendingSegmentV23.getInterval()); + + // Commit the append segments + final DataSegment segmentV01 = asSegment(pendingSegmentV01); + final DataSegment segmentV12 = asSegment(pendingSegmentV12); + final DataSegment segmentV23 = asSegment(pendingSegmentV23); + + Set appendedSegments + = appendTask.commitAppendSegments(segmentV01, segmentV12, segmentV23).getSegments(); + Assert.assertEquals(3 + 3, appendedSegments.size()); + + // Verify that the original append segments have been committed + Assert.assertTrue(appendedSegments.remove(segmentV01)); + Assert.assertTrue(appendedSegments.remove(segmentV12)); + Assert.assertTrue(appendedSegments.remove(segmentV23)); + + // Verify that segmentV01 has been upgraded to both v1 and v2 + final DataSegment segmentV11 = findSegmentWith(v1, segmentV01.getLoadSpec(), appendedSegments); + Assert.assertNotNull(segmentV11); + final DataSegment segmentV21 = findSegmentWith(v2, segmentV01.getLoadSpec(), appendedSegments); + Assert.assertNotNull(segmentV21); + + // Verify that segmentV12 has been upgraded to v2 + final DataSegment segmentV22 = findSegmentWith(v2, segmentV12.getLoadSpec(), appendedSegments); + Assert.assertNotNull(segmentV22); + + // Verify that segmentV23 is not downgraded to v1 + final DataSegment segmentV13 = findSegmentWith(v1, segmentV23.getLoadSpec(), appendedSegments); + Assert.assertNull(segmentV13); + + verifyIntervalHasUsedSegments( + YEAR_23, + segmentV01, + segmentV10, segmentV11, segmentV12, + segmentV20, segmentV21, segmentV22, segmentV23 + ); + verifyIntervalHasVisibleSegments(YEAR_23, segmentV20, segmentV21, segmentV22, segmentV23); + } + + @Nullable + private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) { + for (DataSegment segment : segments) { + if (version.equals(segment.getVersion()) + && Objects.equals(segment.getLoadSpec(), loadSpec)) { + return segment; + } + } + + return null; + } + private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) { final SegmentId id = pendingSegment.asSegmentId(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java index aae07194bb94..af7410bd1d79 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java @@ -76,7 +76,8 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull ImmutableMap.of(), ImmutableMap.of(), true, - ImmutableSet.of() + ImmutableSet.of(), + null ); TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); @@ -109,7 +110,8 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment ImmutableMap.of(), ImmutableMap.of(), true, - ImmutableSet.of() + ImmutableSet.of(), + null ); TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false); diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java index bc3769b62361..7e249f72d0a6 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonValue; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; /** @@ -62,6 +63,18 @@ public class SegmentMetadataEvent implements Event */ private final boolean isCompacted; + public static SegmentMetadataEvent create(DataSegment segment, DateTime eventTime) + { + return new SegmentMetadataEvent( + segment.getDataSource(), + eventTime, + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getVersion(), + segment.getLastCompactionState() != null + ); + } + public SegmentMetadataEvent( String dataSource, DateTime createdTime, From 1182c74e113ffc0265c850cf7bd0ed0170609770 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 6 Oct 2023 10:12:38 +0530 Subject: [PATCH 2/7] Add IndexerMetadataStorageCoordinator.commitAppendSegmentsAndMetadata --- ...mmitRealtimeSegmentsAndMetadataAction.java | 31 +++-- .../SegmentTransactionalAppendAction.java | 3 - .../AppenderatorDriverRealtimeIndexTask.java | 1 - .../ConcurrentReplaceAndAppendTest.java | 3 +- ...TestIndexerMetadataStorageCoordinator.java | 11 ++ .../IndexerMetadataStorageCoordinator.java | 15 +++ .../IndexerSQLMetadataStorageCoordinator.java | 109 ++++++++++++++---- 7 files changed, 134 insertions(+), 39 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java index 67ad47080723..7a73db0d4a4a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java @@ -23,14 +23,20 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -107,28 +113,35 @@ public TypeReference getReturnTypeReference() }; } - /** - * Performs some sanity checks and publishes the given segments. - */ @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { - final SegmentPublishResult publishResult; + // Verify that all the locks are of expected type + final List locks = toolbox.getTaskLockbox().findLocksForTask(task); + for (TaskLock lock : locks) { + if (lock.getType() != TaskLockType.APPEND) { + throw InvalidInput.exception( + "Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].", + "CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType() + ); + } + } TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); + final String datasource = task.getDataSource(); + final Map segmentToReplaceLock + = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments); + final SegmentPublishResult publishResult; try { publishResult = toolbox.getTaskLockbox().doInCriticalSection( task, segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() .onValidLocks( - // TODO: this might need to call a new method which does the following in the same transaction - // - commit append segments - // - upgrade append segments to replace versions - // - commit metadata - () -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata( + () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata( segments, + segmentToReplaceLock, startMetadata, endMetadata ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 36b69270f991..994454a9a4b2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -69,9 +69,6 @@ public TypeReference getReturnTypeReference() }; } - /** - * Performs some sanity checks and publishes the given segments. - */ @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 8293c51a768f..3a599dd485be 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -27,7 +27,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 678d17aefbf4..22f21fb79b62 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -846,7 +846,8 @@ public void testSegmentIsAllocatedAtLatestVersion() } @Nullable - private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) { + private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) + { for (DataSegment segment : segments) { if (version.equals(segment.getVersion()) && Objects.equals(segment.getLoadSpec(), loadSpec)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 34d2e44552a7..624279e8141b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -175,6 +175,17 @@ public SegmentPublishResult commitAppendSegments( return SegmentPublishResult.ok(commitSegments(appendSegments)); } + @Override + public SegmentPublishResult commitAppendSegmentsAndMetadata( + Set appendSegments, + Map appendSegmentToReplaceLock, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) + { + return SegmentPublishResult.ok(commitSegments(appendSegments)); + } + @Override public SegmentPublishResult commitSegmentsAndMetadata( Set segments, diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 3cbabea78fae..de1aadd320d3 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -300,6 +300,21 @@ SegmentPublishResult commitAppendSegments( Map appendSegmentToReplaceLock ); + /** + * Commits segments created by an APPEND task. This method also handles segment + * upgrade scenarios that may result from concurrent append and replace. Also + * commits start and end {@link DataSourceMetadata}. + * + * @see #commitAppendSegments + * @see #commitSegmentsAndMetadata + */ + SegmentPublishResult commitAppendSegmentsAndMetadata( + Set appendSegments, + Map appendSegmentToReplaceLock, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ); + /** * Commits segments created by a REPLACE task. This method also handles the * segment upgrade scenarios that may result from concurrent append and replace. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 7eaac692f7ce..74e4011ada05 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -438,33 +438,28 @@ public SegmentPublishResult commitAppendSegments( final Map appendSegmentToReplaceLock ) { - verifySegmentsToCommit(appendSegments); - - final String dataSource = appendSegments.iterator().next().getDataSource(); - final Set upgradedSegments = connector.retryTransaction( - (handle, transactionStatus) - -> getSegmentsToUpgradeOnAppend(handle, dataSource, appendSegments), - 0, - SQLMetadataConnector.DEFAULT_MAX_TRIES + return commitAppendSegmentsAndMetadataInTransaction( + appendSegments, + appendSegmentToReplaceLock, + null, + null ); + } - // Create entries for all required versions of the append segments - final Set allSegmentsToInsert = new HashSet<>(appendSegments); - allSegmentsToInsert.addAll(upgradedSegments); - - try { - return connector.retryTransaction( - (handle, transactionStatus) -> { - insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); - return SegmentPublishResult.ok(insertSegments(handle, allSegmentsToInsert)); - }, - 3, - getSqlMetadataMaxRetry() - ); - } - catch (CallbackFailedException e) { - return SegmentPublishResult.fail(e.getMessage()); - } + @Override + public SegmentPublishResult commitAppendSegmentsAndMetadata( + Set appendSegments, + Map appendSegmentToReplaceLock, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) + { + return commitAppendSegmentsAndMetadataInTransaction( + appendSegments, + appendSegmentToReplaceLock, + startMetadata, + endMetadata + ); } @Override @@ -971,6 +966,70 @@ private static class CheckExistingSegmentIdResult } } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( + Set appendSegments, + Map appendSegmentToReplaceLock, + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata + ) + { + verifySegmentsToCommit(appendSegments); + if ((startMetadata == null && endMetadata != null) + || (startMetadata != null && endMetadata == null)) { + throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); + } + + final String dataSource = appendSegments.iterator().next().getDataSource(); + final Set upgradedSegments = connector.retryTransaction( + (handle, transactionStatus) + -> getSegmentsToUpgradeOnAppend(handle, dataSource, appendSegments), + 0, + SQLMetadataConnector.DEFAULT_MAX_TRIES + ); + + // Create entries for all required versions of the append segments + final Set allSegmentsToInsert = new HashSet<>(appendSegments); + allSegmentsToInsert.addAll(upgradedSegments); + + final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false); + try { + return connector.retryTransaction( + (handle, transactionStatus) -> { + metadataNotUpdated.set(false); + + if (startMetadata != null) { + final DataStoreMetadataUpdateResult metadataUpdateResult + = updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata); + + if (metadataUpdateResult.isFailed()) { + transactionStatus.setRollbackOnly(); + metadataNotUpdated.set(true); + + if (metadataUpdateResult.canRetry()) { + throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); + } else { + throw new RuntimeException(metadataUpdateResult.getErrorMsg()); + } + } + } + + insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); + return SegmentPublishResult.ok(insertSegments(handle, allSegmentsToInsert)); + }, + 3, + getSqlMetadataMaxRetry() + ); + } + catch (CallbackFailedException e) { + if (metadataNotUpdated.get()) { + // Return failed result if metadata was definitely not updated + return SegmentPublishResult.fail(e.getMessage()); + } else { + throw e; + } + } + } + private void insertPendingSegmentsIntoMetastore( Handle handle, Map createdSegments, From 9ddfd5e8462c17043412dd307147c7541b433b45 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 7 Oct 2023 09:03:29 +0530 Subject: [PATCH 3/7] Remove extra task action --- ...mmitRealtimeSegmentsAndMetadataAction.java | 181 ------------------ .../SegmentTransactionalAppendAction.java | 81 +++++++- .../indexing/common/actions/TaskAction.java | 1 - .../common/task/AbstractBatchIndexTask.java | 2 +- .../seekablestream/SequenceMetadata.java | 5 +- .../task/concurrent/ActionsTestTask.java | 2 +- .../seekablestream/SequenceMetadataTest.java | 24 +-- .../service/SegmentMetadataEventTest.java | 31 +++ 8 files changed, 119 insertions(+), 208 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java deleted file mode 100644 index 7a73db0d4a4a..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Preconditions; -import org.apache.druid.error.InvalidInput; -import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.CriticalAction; -import org.apache.druid.indexing.overlord.DataSourceMetadata; -import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.metadata.ReplaceTaskLock; -import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.timeline.DataSegment; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * Task action to commit realtime segments and metadata when using APPEND task locks. - *

- * This action performs the following operations in a single transaction: - *

    - *
  • Commit the append segments
  • - *
  • Upgrade the append segments to all visible REPLACE versions
  • - *
  • Commit start and end {@link DataSourceMetadata}.
  • - *
- * This action differs from {@link SegmentTransactionalInsertAction} as it is used - * only with APPEND locks and also upgrades segments as needed. - */ -public class CommitRealtimeSegmentsAndMetadataAction implements TaskAction -{ - /** - * Set of segments to be inserted into metadata storage - */ - private final Set segments; - - private final DataSourceMetadata startMetadata; - private final DataSourceMetadata endMetadata; - - public static CommitRealtimeSegmentsAndMetadataAction create( - Set segments, - DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata - ) - { - return new CommitRealtimeSegmentsAndMetadataAction(segments, startMetadata, endMetadata); - } - - @JsonCreator - private CommitRealtimeSegmentsAndMetadataAction( - @JsonProperty("segments") Set segments, - @JsonProperty("startMetadata") DataSourceMetadata startMetadata, - @JsonProperty("endMetadata") DataSourceMetadata endMetadata - ) - { - Preconditions.checkArgument( - segments != null && !segments.isEmpty(), - "Segments to commit should not be empty" - ); - this.segments = segments; - this.startMetadata = startMetadata; - this.endMetadata = endMetadata; - } - - @JsonProperty - public Set getSegments() - { - return segments; - } - - @JsonProperty - public DataSourceMetadata getStartMetadata() - { - return startMetadata; - } - - @JsonProperty - public DataSourceMetadata getEndMetadata() - { - return endMetadata; - } - - @Override - public TypeReference getReturnTypeReference() - { - return new TypeReference() - { - }; - } - - @Override - public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) - { - // Verify that all the locks are of expected type - final List locks = toolbox.getTaskLockbox().findLocksForTask(task); - for (TaskLock lock : locks) { - if (lock.getType() != TaskLockType.APPEND) { - throw InvalidInput.exception( - "Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].", - "CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType() - ); - } - } - - TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); - final String datasource = task.getDataSource(); - final Map segmentToReplaceLock - = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments); - - final SegmentPublishResult publishResult; - try { - publishResult = toolbox.getTaskLockbox().doInCriticalSection( - task, - segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), - CriticalAction.builder() - .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata( - segments, - segmentToReplaceLock, - startMetadata, - endMetadata - ) - ) - .onInvalidLocks( - () -> SegmentPublishResult.fail( - "Invalid task locks. Maybe they are revoked by a higher priority task." - + " Please check the overlord log for details." - ) - ) - .build() - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - - IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); - return publishResult; - } - - @Override - public boolean isAudited() - { - return true; - } - - @Override - public String toString() - { - return "CommitRealtimeSegmentsAndMetadataAction{" + - ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + - ", startMetadata=" + startMetadata + - ", endMetadata=" + endMetadata + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 994454a9a4b2..125a008cdc90 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -22,14 +22,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -41,18 +47,40 @@ public class SegmentTransactionalAppendAction implements TaskAction { private final Set segments; + @Nullable + private final DataSourceMetadata startMetadata; + @Nullable + private final DataSourceMetadata endMetadata; - public static SegmentTransactionalAppendAction create(Set segments) + public static SegmentTransactionalAppendAction forSegments(Set segments) { - return new SegmentTransactionalAppendAction(segments); + return new SegmentTransactionalAppendAction(segments, null, null); + } + + public static SegmentTransactionalAppendAction forSegmentsAndMetadata( + Set segments, + DataSourceMetadata startMetadata, + DataSourceMetadata endMetadata + ) + { + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata); } @JsonCreator private SegmentTransactionalAppendAction( - @JsonProperty("segments") Set segments + @JsonProperty("segments") Set segments, + @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata ) { this.segments = segments; + this.startMetadata = startMetadata; + this.endMetadata = endMetadata; + + if ((startMetadata == null && endMetadata != null) + || (startMetadata != null && endMetadata == null)) { + throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null."); + } } @JsonProperty @@ -61,6 +89,20 @@ public Set getSegments() return segments; } + @JsonProperty + @Nullable + public DataSourceMetadata getStartMetadata() + { + return startMetadata; + } + + @JsonProperty + @Nullable + public DataSourceMetadata getEndMetadata() + { + return endMetadata; + } + @Override public TypeReference getReturnTypeReference() { @@ -72,24 +114,45 @@ public TypeReference getReturnTypeReference() @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { + // Verify that all the locks are of expected type + final List locks = toolbox.getTaskLockbox().findLocksForTask(task); + for (TaskLock lock : locks) { + if (lock.getType() != TaskLockType.APPEND) { + throw InvalidInput.exception( + "Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].", + "CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType() + ); + } + } + TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); final String datasource = task.getDataSource(); final Map segmentToReplaceLock = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments); + final CriticalAction.Action publishAction; + if (startMetadata == null) { + publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments( + segments, + segmentToReplaceLock + ); + } else { + publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata( + segments, + segmentToReplaceLock, + startMetadata, + endMetadata + ); + } + final SegmentPublishResult retVal; try { retVal = toolbox.getTaskLockbox().doInCriticalSection( task, segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() - .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments( - segments, - segmentToReplaceLock - ) - ) + .onValidLocks(publishAction) .onInvalidLocks( () -> SegmentPublishResult.fail( "Invalid task locks. Maybe they are revoked by a higher priority task." diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 0948c036ea73..171d53b9cdd6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,7 +38,6 @@ @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), - @JsonSubTypes.Type(name = "commitRealtimeSegments", value = CommitRealtimeSegmentsAndMetadataAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index ea61f37c7e90..276d82eedc84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -415,7 +415,7 @@ protected TaskAction buildPublishAction( case REPLACE: return SegmentTransactionalReplaceAction.create(segmentsToPublish); case APPEND: - return SegmentTransactionalAppendAction.create(segmentsToPublish); + return SegmentTransactionalAppendAction.forSegments(segmentsToPublish); default: return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index aa009c674900..47ccbaa00b35 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -27,7 +27,6 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.CommitRealtimeSegmentsAndMetadataAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; @@ -418,11 +417,11 @@ public SegmentPublishResult publishAnnotatedSegments( ); final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions); action = taskLockType == TaskLockType.APPEND - ? CommitRealtimeSegmentsAndMetadataAction.create(segmentsToPush, startMetadata, endMetadata) + ? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata) : SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata); } else { action = taskLockType == TaskLockType.APPEND - ? SegmentTransactionalAppendAction.create(segmentsToPush) + ? SegmentTransactionalAppendAction.forSegments(segmentsToPush) : SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index 6a1a92641d3f..69a8b6cc1030 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -81,7 +81,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments) public SegmentPublishResult commitAppendSegments(DataSegment... segments) { return runAction( - SegmentTransactionalAppendAction.create(Sets.newHashSet(segments)) + SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments)) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java index af7410bd1d79..fbe63ffe2689 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java @@ -29,9 +29,8 @@ import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; -import org.junit.Rule; +import org.junit.Assert; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; @@ -43,9 +42,6 @@ @RunWith(MockitoJUnitRunner.class) public class SequenceMetadataTest { - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Mock private SeekableStreamIndexTaskRunner mockSeekableStreamIndexTaskRunner; @@ -59,7 +55,7 @@ public class SequenceMetadataTest private TaskToolbox mockTaskToolbox; @Test - public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty() throws Exception + public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty() { DataSegment dataSegment = DataSegment.builder() .dataSource("foo") @@ -79,14 +75,18 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull ImmutableSet.of(), null ); - TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); + TransactionalSegmentPublisher transactionalSegmentPublisher + = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true); - expectedException.expect(ISE.class); - expectedException.expectMessage( - "Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment) + ISE exception = Assert.assertThrows( + ISE.class, + () -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null) + ); + Assert.assertEquals( + "Stream ingestion task unexpectedly attempted to overwrite segments: " + + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment), + exception.getMessage() ); - - transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null); } @Test diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java index 83a4fcba7dc5..a926b004c0af 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java @@ -21,6 +21,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -51,4 +56,30 @@ public void testBasicEvent() event.toMap() ); } + + @Test + public void testCreate() + { + final DataSegment segment = DataSegment.builder() + .dataSource("wiki") + .interval(Intervals.of("2023/2024")) + .shardSpec(new NumberedShardSpec(1, 1)) + .version("v1") + .size(100) + .build(); + final DateTime eventTime = DateTimes.nowUtc(); + SegmentMetadataEvent event = SegmentMetadataEvent.create(segment, eventTime); + Assert.assertEquals( + EventMap.builder() + .put(SegmentMetadataEvent.FEED, "segment_metadata") + .put(SegmentMetadataEvent.DATASOURCE, segment.getDataSource()) + .put(SegmentMetadataEvent.CREATED_TIME, eventTime) + .put(SegmentMetadataEvent.START_TIME, segment.getInterval().getStart()) + .put(SegmentMetadataEvent.END_TIME, segment.getInterval().getEnd()) + .put(SegmentMetadataEvent.VERSION, segment.getVersion()) + .put(SegmentMetadataEvent.IS_COMPACTED, false) + .build(), + event.toMap() + ); + } } From e19b2cad4a73923e209dca1826675cb74cc983fa Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 9 Oct 2023 21:07:20 +0530 Subject: [PATCH 4/7] Upgrade pending segments --- .../SegmentTransactionalReplaceAction.java | 35 ++- .../indexing/common/actions/TaskLocks.java | 2 + .../druid/indexing/overlord/TaskLockbox.java | 3 +- ...TestIndexerMetadataStorageCoordinator.java | 18 +- .../IndexerMetadataStorageCoordinator.java | 12 + .../IndexerSQLMetadataStorageCoordinator.java | 220 +++++++++++++++--- .../druid/metadata/SQLMetadataConnector.java | 4 + 7 files changed, 257 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 119a59ea648a..238b6a0e7743 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -27,10 +27,13 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -40,6 +43,8 @@ */ public class SegmentTransactionalReplaceAction implements TaskAction { + private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class); + /** * Set of segments to be inserted into metadata storage */ @@ -86,9 +91,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) final Set replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task); - final SegmentPublishResult retVal; + final SegmentPublishResult publishResult; try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( + publishResult = toolbox.getTaskLockbox().doInCriticalSection( task, segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() @@ -109,8 +114,30 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throw new RuntimeException(e); } - IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); - return retVal; + IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); + + if (publishResult.isSuccess()) { + // If upgrade of pending segments fails, the segments will still get upgraded + // when the corresponding APPEND task commits the segments. + // Thus, the upgrade of pending segments should not be done in the same + // transaction as the commit of replace segments and failure to upgrade + // pending segments should not affect success of replace commit. + try { + List upgradedPendingSegments = + toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegments(segments); + log.info( + "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", + upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments + ); + + // These upgraded pending segments should be forwarded to the SupervisorManager + } + catch (Exception e) { + log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); + } + } + + return publishResult; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java index bb835997801f..d60649359256 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java @@ -125,6 +125,8 @@ public static boolean isLockCoversSegments( && timeChunkLock.getDataSource().equals(segment.getDataSource()) && (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0 || TaskLockType.APPEND.equals(timeChunkLock.getType())); + // APPEND locks always have the version DateTimes.EPOCH (1970-01-01) + // and cover the segments irrespective of the segment version } else { final SegmentLock segmentLock = (SegmentLock) lock; return segmentLock.getInterval().contains(segment.getInterval()) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 761c0b591605..b7aab8505346 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; @@ -941,7 +942,7 @@ private Set getNonRevokedReplaceLocks(List posse // Replace locks are always held by the supervisor task if (posse.taskIds.size() > 1) { - throw new ISE( + throw DruidException.defensive( "Replace lock[%s] for datasource[%s] is held by multiple tasks[%s]", lock, datasource, posse.taskIds ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 624279e8141b..dfaa1eb86e43 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -113,7 +113,11 @@ public List retrieveUnusedSegmentsForInterval(String dataSource, In } @Override - public List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval, @Nullable Integer limit) + public List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit + ) { synchronized (unusedSegments) { Stream resultStream = unusedSegments.stream(); @@ -233,6 +237,18 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } + @Override + public Set upgradePendingSegments(Set replaceSegments) + { + return Collections.emptySet(); + } + + @Override + public Set findAllVersionsOfPendingSegment(SegmentIdWithShardSpec segmentIdWithShardSpec) + { + return Collections.emptySet(); + } + @Override public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index de1aadd320d3..540616ab7e07 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -334,6 +334,18 @@ SegmentPublishResult commitReplaceSegments( Set locksHeldByReplaceTask ); + /** + * Creates new versions for the pending segments that overlap with the given + * replace segments being committed. + * + * @param replaceSegments Segments being committed by a REPLACE task + * @return List of pending segments chosen for upgrade. The returned list does + * not contain the new versions of the pending segments. + */ + Set upgradePendingSegments(Set replaceSegments); + + Set findAllVersionsOfPendingSegment(SegmentIdWithShardSpec segmentIdWithShardSpec); + /** * Retrieves data source's metadata from the metadata store. Returns null if there is no metadata. */ diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 74e4011ada05..93d6bf932b5b 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -105,6 +105,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); private static final int MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE = 100; + private static final String UPGRADED_PENDING_SEGMENT_PREFIX = "upgraded_to_replace_version_"; + private final ObjectMapper jsonMapper; private final MetadataStorageTablesConfig dbTables; private final SQLMetadataConnector connector; @@ -237,44 +239,45 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv /** * Fetches all the pending segments, whose interval overlaps with the given - * search interval from the metadata store. + * search interval from the metadata store. Returns a Map from the + * pending segment ID to the sequence name. */ - private Set getPendingSegmentsForIntervalWithHandle( + private Map getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, final Interval interval ) throws IOException { - final Set identifiers = new HashSet<>(); - - final ResultIterator dbSegments = + final ResultIterator dbSegments = handle.createQuery( StringUtils.format( // This query might fail if the year has a different number of digits // See https://github.com/apache/druid/pull/11582 for a similar issue // Using long for these timestamps instead of varchar would give correct time comparisons - "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", + "SELECT sequence_name, payload FROM %1$s" + + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", dbTables.getPendingSegmentsTable(), connector.getQuoteString() ) ) .bind("dataSource", dataSource) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) + .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) .iterator(); + final Map pendingSegmentToSequenceName = new HashMap<>(); while (dbSegments.hasNext()) { - final byte[] payload = dbSegments.next(); - final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload, SegmentIdWithShardSpec.class); + PendingSegmentsRecord record = dbSegments.next(); + final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class); if (interval.overlaps(identifier.getInterval())) { - identifiers.add(identifier); + pendingSegmentToSequenceName.put(identifier, record.sequenceName); } } dbSegments.close(); - return identifiers; + return pendingSegmentToSequenceName; } private SegmentTimeline getTimelineForIntervalsWithHandle( @@ -417,7 +420,7 @@ public SegmentPublishResult commitReplaceSegments( (handle, transactionStatus) -> { final Set segmentsToInsert = new HashSet<>(replaceSegments); segmentsToInsert.addAll( - getSegmentsToUpgradeOnReplace(handle, replaceSegments, locksHeldByReplaceTask) + createUpgradedVersionsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask) ); return SegmentPublishResult.ok( insertSegments(handle, segmentsToInsert) @@ -596,6 +599,157 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } + @Override + public Set upgradePendingSegments(Set replaceSegments) + { + if (replaceSegments.isEmpty()) { + return Collections.emptySet(); + } + + // Any replace interval has exactly one version of segments + final Map replaceIntervalToMaxId = new HashMap<>(); + for (DataSegment segment : replaceSegments) { + DataSegment committedMaxId = replaceIntervalToMaxId.get(segment.getInterval()); + if (committedMaxId == null + || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + replaceIntervalToMaxId.put(segment.getInterval(), segment); + } + } + + final String datasource = replaceSegments.iterator().next().getDataSource(); + return connector.retryWithHandle( + handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId) + ); + } + + @Override + public Set findAllVersionsOfPendingSegment(SegmentIdWithShardSpec pendingSegment) + { + return connector.retryWithHandle( + handle -> findAllVersionsOfPendingSegment(handle, pendingSegment) + ); + } + + private Set findAllVersionsOfPendingSegment( + Handle handle, + SegmentIdWithShardSpec pendingSegment + ) throws IOException + { + final Interval interval = pendingSegment.getInterval(); + final Query> query = handle + .createQuery( + StringUtils.format( + "SELECT payload " + + "FROM %s WHERE " + + "dataSource = :dataSource AND " + + "start = :start AND " + + "%2$send%2$s = :end AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ) + .bind("dataSource", pendingSegment.getDataSource()) + .bind("sequence_prev_id", pendingSegment.asSegmentId().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + final ResultIterator dbSegments = query + .map(ByteArrayMapper.FIRST) + .iterator(); + + final Set allVersions = new HashSet<>(); + while (dbSegments.hasNext()) { + final byte[] payload = dbSegments.next(); + final SegmentIdWithShardSpec segmentId = + jsonMapper.readValue(payload, SegmentIdWithShardSpec.class); + allVersions.add(segmentId); + } + + return allVersions; + } + + /** + * Finds pending segments contained in each replace interval and upgrades them + * to the replace version. + */ + private Set upgradePendingSegments( + Handle handle, + String datasource, + Map replaceIntervalToMaxId + ) throws IOException + { + final Map newPendingSegmentVersions = new HashMap<>(); + + for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { + final Interval replaceInterval = entry.getKey(); + final DataSegment maxSegmentId = entry.getValue(); + final String replaceVersion = maxSegmentId.getVersion(); + + final int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions(); + int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum(); + + final Map overlappingPendingSegments + = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval); + + for (Map.Entry overlappingPendingSegment + : overlappingPendingSegments.entrySet()) { + final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); + final String pendingSegmentSequence = overlappingPendingSegment.getValue(); + if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) { + // There cannot be any duplicates because this version not been committed before + newPendingSegmentVersions.put( + new SegmentCreateRequest( + UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion, + pendingSegmentId.toString(), + replaceVersion, + NumberedPartialShardSpec.instance() + ), + new SegmentIdWithShardSpec( + datasource, + replaceInterval, + replaceVersion, + new NumberedShardSpec(++currentPartitionNumber, numCorePartitions) + ) + ); + } + } + } + + // Do not skip lineage check so that the sequence_name_prev_id_sha1 + // includes hash of both sequence_name and prev_segment_id + int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore( + handle, + newPendingSegmentVersions, + datasource, + false + ); + log.info( + "Inserted total [%d] new versions for [%d] pending segments.", + numInsertedPendingSegments, newPendingSegmentVersions.size() + ); + + return new HashSet<>(newPendingSegmentVersions.values()); + } + + private boolean shouldUpgradePendingSegment( + SegmentIdWithShardSpec pendingSegmentId, + String pendingSegmentSequenceName, + Interval replaceInterval, + String replaceVersion + ) + { + if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) { + return false; + } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) { + return false; + } else { + // Do not upgrade already upgraded pending segment + return pendingSegmentSequenceName == null + || !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX); + } + } + @Nullable private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final Handle handle, @@ -716,7 +870,6 @@ private Map allocatePendingSegment handle, createdSegments, dataSource, - interval, skipSegmentLineageCheck ); @@ -982,7 +1135,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( final String dataSource = appendSegments.iterator().next().getDataSource(); final Set upgradedSegments = connector.retryTransaction( (handle, transactionStatus) - -> getSegmentsToUpgradeOnAppend(handle, dataSource, appendSegments), + -> createUpgradedVersionsOfAppendSegments(handle, dataSource, appendSegments), 0, SQLMetadataConnector.DEFAULT_MAX_TRIES ); @@ -1030,11 +1183,10 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } } - private void insertPendingSegmentsIntoMetastore( + private int insertPendingSegmentsIntoMetastore( Handle handle, Map createdSegments, String dataSource, - Interval interval, boolean skipSegmentLineageCheck ) throws JsonProcessingException { @@ -1055,6 +1207,8 @@ private void insertPendingSegmentsIntoMetastore( for (Map.Entry entry : segmentIdToRequest.entrySet()) { final SegmentCreateRequest request = entry.getValue(); final SegmentIdWithShardSpec segmentId = entry.getKey(); + final Interval interval = segmentId.getInterval(); + insertBatch.add() .bind("id", segmentId.toString()) .bind("dataSource", dataSource) @@ -1069,7 +1223,8 @@ private void insertPendingSegmentsIntoMetastore( ) .bind("payload", jsonMapper.writeValueAsBytes(segmentId)); } - insertBatch.execute(); + int[] updated = insertBatch.execute(); + return Arrays.stream(updated).sum(); } private void insertPendingSegmentIntoMetastore( @@ -1105,7 +1260,7 @@ private void insertPendingSegmentIntoMetastore( } /** - * Allocates and returns any extra versions that need to be committed for the + * Creates and returns any extra versions that need to be committed for the * given append segments. *

* This is typically needed when a REPLACE task started and finished after @@ -1113,7 +1268,7 @@ private void insertPendingSegmentIntoMetastore( * there would be some used segments in the DB with versions higher than these * append segments. */ - private Set getSegmentsToUpgradeOnAppend( + private Set createUpgradedVersionsOfAppendSegments( Handle handle, String dataSource, Set segmentsToAppend @@ -1138,17 +1293,17 @@ private Set getSegmentsToUpgradeOnAppend( Segments.INCLUDING_OVERSHADOWED ); - final Map> committedVersionToIntervals = new HashMap<>(); - final Map> committedIntervalToSegments = new HashMap<>(); + final Map> overlappingVersionToIntervals = new HashMap<>(); + final Map> overlappingIntervalToSegments = new HashMap<>(); for (DataSegment segment : overlappingSegments) { - committedVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) + overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) .add(segment.getInterval()); - committedIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>()) + overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>()) .add(segment); } final Set upgradedSegments = new HashSet<>(); - for (Map.Entry> entry : committedVersionToIntervals.entrySet()) { + for (Map.Entry> entry : overlappingVersionToIntervals.entrySet()) { final String upgradeVersion = entry.getKey(); Map> segmentsToUpgrade = getSegmentsWithVersionLowerThan( upgradeVersion, @@ -1156,12 +1311,12 @@ private Set getSegmentsToUpgradeOnAppend( appendVersionToSegments ); for (Map.Entry> upgradeEntry : segmentsToUpgrade.entrySet()) { - Set segmentsUpgradedToVersion = upgradeSegmentsToVersion( + Set segmentsUpgradedToVersion = createUpgradedVersionOfSegments( handle, upgradeVersion, upgradeEntry.getKey(), upgradeEntry.getValue(), - committedIntervalToSegments + overlappingIntervalToSegments ); log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion); upgradedSegments.addAll(segmentsUpgradedToVersion); @@ -1212,7 +1367,7 @@ private Map> getSegmentsWithVersionLowerThan( * Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded * to the given {@code upgradeVersion}. */ - private Set upgradeSegmentsToVersion( + private Set createUpgradedVersionOfSegments( Handle handle, String upgradeVersion, Interval interval, @@ -1237,7 +1392,7 @@ private Set upgradeSegmentsToVersion( // Get pending segments for the new version, if any final String dataSource = segmentsToUpgrade.iterator().next().getDataSource(); final Set pendingSegments - = getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval); + = getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet(); // Determine new IDs for each append segment by taking into account both // committed and pending segments for this version @@ -1337,7 +1492,7 @@ private Map createNewSegments( // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. final Set pendingSegments = - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval); + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet(); final Map createdSegments = new HashMap<>(); final Map sequenceHashToSegment = new HashMap<>(); @@ -1547,7 +1702,7 @@ private SegmentIdWithShardSpec createNewSegment( handle, dataSource, interval - ); + ).keySet(); if (committedMaxId != null) { pendings.add(committedMaxId); } @@ -1725,7 +1880,10 @@ private Set announceHistoricalSegmentBatch( return toInsertSegments; } - private Set getSegmentsToUpgradeOnReplace( + /** + * Creates new versions of segments appended while a REPLACE task was in progress. + */ + private Set createUpgradedVersionsOfAppendSegmentsAfterReplace( final Handle handle, final Set replaceSegments, final Set locksHeldByReplaceTask diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 6feaf9e07a38..b518a210a9cf 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -285,6 +285,10 @@ tableName, getPayloadType(), getQuoteString(), getCollation() StringUtils.format( "CREATE INDEX idx_%1$s_datasource_sequence ON %1$s(dataSource, sequence_name)", tableName + ), + StringUtils.format( + "CREATE INDEX idx_%1$s_datasource_sequence_prev_id ON %1$s(dataSource, sequence_prev_id)", + tableName ) ) ); From 260d7bdfa85a33c0cb91a63c4eb875dda88a17e9 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 9 Oct 2023 21:08:20 +0530 Subject: [PATCH 5/7] Use correct jdbi query object --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 93d6bf932b5b..2aca6032ff98 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -636,7 +636,7 @@ private Set findAllVersionsOfPendingSegment( ) throws IOException { final Interval interval = pendingSegment.getInterval(); - final Query> query = handle + final Query> query = handle .createQuery( StringUtils.format( "SELECT payload " From b7a7e6f6cde6ded39c411d2381fc91e6ec2d6c1e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 9 Oct 2023 21:09:02 +0530 Subject: [PATCH 6/7] Remove extra index on pending_segments table --- .../java/org/apache/druid/metadata/SQLMetadataConnector.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index b518a210a9cf..6feaf9e07a38 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -285,10 +285,6 @@ tableName, getPayloadType(), getQuoteString(), getCollation() StringUtils.format( "CREATE INDEX idx_%1$s_datasource_sequence ON %1$s(dataSource, sequence_name)", tableName - ), - StringUtils.format( - "CREATE INDEX idx_%1$s_datasource_sequence_prev_id ON %1$s(dataSource, sequence_prev_id)", - tableName ) ) ); From 46a1b1e521db55d680de8a26c5cb8e1b1c0ac62b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 9 Oct 2023 21:11:34 +0530 Subject: [PATCH 7/7] Fix SegmentTransactionalReplaceAction --- .../common/actions/SegmentTransactionalReplaceAction.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 238b6a0e7743..6001a5dc093f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -33,7 +33,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -123,7 +122,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // transaction as the commit of replace segments and failure to upgrade // pending segments should not affect success of replace commit. try { - List upgradedPendingSegments = + Set upgradedPendingSegments = toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegments(segments); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",