diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 9d1e5bd2862d..a9baf10ece40 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -153,7 +153,7 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t upgradedPendingSegments.forEach( (oldId, newId) -> toolbox.getSupervisorManager() - .updatePendingSegmentMapping(activeSupervisorId.get(), oldId, newId) + .registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 276d82eedc84..fe19b35391e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -401,16 +401,16 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular /** * Builds a TaskAction to publish segments based on the type of locks that this - * task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}). + * task acquires. + * + * @see #determineLockType */ protected TaskAction buildPublishAction( Set segmentsToBeOverwritten, - Set segmentsToPublish + Set segmentsToPublish, + TaskLockType lockType ) { - TaskLockType lockType = TaskLockType.valueOf( - getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name()) - ); switch (lockType) { case REPLACE: return SegmentTransactionalReplaceAction.create(segmentsToPublish); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index a2ca4f869ea7..d880f3eb86a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -50,6 +50,7 @@ import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; @@ -910,10 +911,11 @@ private TaskStatus generateAndPublishSegments( throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType()); } - + final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> - toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish)); + (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( + buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType) + ); String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null); if (effectiveId == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index d3e218623cda..e99ef35d9423 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -42,6 +42,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -1167,9 +1168,11 @@ private void publishSegments( } } + final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> - toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish)); + (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( + buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType) + ); final boolean published = newSegments.isEmpty() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 1e0cc3ece8d6..177123c900b6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -265,15 +265,21 @@ public boolean checkPointDataSourceMetadata( return false; } - public boolean updatePendingSegmentMapping( + /** + * Registers a new version of the given pending segment on a supervisor. This + * allows the supervisor to include the pending segment in queries fired against + * that segment version. + */ + public boolean registerNewVersionOfPendingSegmentOnSupervisor( String supervisorId, - SegmentIdWithShardSpec rootPendingSegment, - SegmentIdWithShardSpec upgradedPendingSegment + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newSegmentVersion ) { try { Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); - Preconditions.checkNotNull(rootPendingSegment, "rootPendingSegment cannot be null"); + Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null"); + Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null"); Pair supervisor = supervisors.get(supervisorId); Preconditions.checkNotNull(supervisor, "supervisor could not be found"); @@ -282,7 +288,7 @@ public boolean updatePendingSegmentMapping( } SeekableStreamSupervisor seekableStreamSupervisor = (SeekableStreamSupervisor) supervisor.lhs; - seekableStreamSupervisor.updatePendingSegmentMapping(rootPendingSegment, upgradedPendingSegment); + seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); return true; } catch (Exception e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java new file mode 100644 index 000000000000..146b0afc4b9d --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; + +/** + * Contains a new version of an existing base pending segment. Used by realtime + * tasks to serve queries against multiple versions of the same pending segment. + */ +public class PendingSegmentVersions +{ + private final SegmentIdWithShardSpec baseSegment; + private final SegmentIdWithShardSpec newVersion; + + @JsonCreator + public PendingSegmentVersions( + @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment, + @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion + ) + { + this.baseSegment = baseSegment; + this.newVersion = newVersion; + } + + @JsonProperty + public SegmentIdWithShardSpec getBaseSegment() + { + return baseSegment; + } + + @JsonProperty + public SegmentIdWithShardSpec getNewVersion() + { + return newVersion; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java index 0c749d45cf7a..5e5924249608 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; public interface SeekableStreamIndexTaskClient @@ -159,15 +158,15 @@ ListenableFuture setEndOffsetsAsync( * Update the task state to redirect queries for later versions to the root pending segment. * The task also announces that it is serving the segments belonging to the subsequent versions. * The update is processed only if the task is serving the original pending segment. - * @param id - task id - * @param rootPendingSegment - the pending segment that was originally allocated - * @param versionsOfPendingSegment - the ids belonging to the versions to which the root segment needs to be updated + * @param taskId - task id + * @param basePendingSegment - the pending segment that was originally allocated + * @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated * @return true if the update succeeds */ - ListenableFuture updatePendingSegmentMappingAsync( - String id, - SegmentIdWithShardSpec rootPendingSegment, - Set versionsOfPendingSegment + ListenableFuture registerNewVersionOfPendingSegmentAsync( + String taskId, + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newVersionOfSegment ); Class getPartitionType(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java index 66d2bed6fdeb..40d475909e68 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java @@ -69,7 +69,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -196,24 +195,17 @@ public ListenableFuture> getEndOffsetsA } @Override - public ListenableFuture updatePendingSegmentMappingAsync( - String id, - SegmentIdWithShardSpec rootPendingSegment, - Set versionsOfPendingSegment + public ListenableFuture registerNewVersionOfPendingSegmentAsync( + String taskId, + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newVersionOfSegment ) { - if (versionsOfPendingSegment.isEmpty()) { - return Futures.immediateFuture(true); - } - final List allVersionsOfPendingSegment = new ArrayList<>(); - allVersionsOfPendingSegment.add(rootPendingSegment); - allVersionsOfPendingSegment.addAll(versionsOfPendingSegment); - final RequestBuilder requestBuilder = new RequestBuilder( - HttpMethod.POST, - "/pendingSegmentMapping" - ).jsonContent(jsonMapper, allVersionsOfPendingSegment); + final RequestBuilder requestBuilder + = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion") + .jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment)); - return makeRequest(id, requestBuilder) + return makeRequest(taskId, requestBuilder) .handler(IgnoreHttpResponseHandler.INSTANCE) .onSuccess(r -> true) .go(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index fb4ff1a3876f..537a04f3c84a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1553,17 +1553,20 @@ public Response setEndOffsetsHTTP( } @POST - @Path("pendingSegmentMapping") + @Path("/pendingSegmentVersion") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response updatePendingSegmentMapping( - List allVersionsOfPendingSegment, + public Response registerNewVersionOfPendingSegment( + PendingSegmentVersions pendingSegmentVersions, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) { authorizationCheck(req, Action.WRITE); - return updatePendingSegmentMapping(allVersionsOfPendingSegment); + return registerNewVersionOfPendingSegment( + pendingSegmentVersions.getBaseSegment(), + pendingSegmentVersions.getNewVersion() + ); } public Map doGetRowStats() @@ -1771,12 +1774,15 @@ public Response setEndOffsets( return Response.ok(sequenceNumbers).build(); } - private Response updatePendingSegmentMapping(List allVersionsOfPendingSegment) + private Response registerNewVersionOfPendingSegment( + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newSegmentVersion + ) { try { - ((StreamAppenderator) appenderator).updatePendingSegmentMapping( - allVersionsOfPendingSegment.get(0), - allVersionsOfPendingSegment.subList(1, allVersionsOfPendingSegment.size()) + ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment( + basePendingSegment, + newSegmentVersion ); return Response.ok().build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4e1850f2f3ef..1d05169e3fb1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1093,28 +1093,20 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } - public void updatePendingSegmentMapping( - SegmentIdWithShardSpec rootPendingSegment, - SegmentIdWithShardSpec upgradedPendingSegment + public void registerNewVersionOfPendingSegment( + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newSegmentVersion ) { for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { for (String taskId : taskGroup.taskIds()) { - taskClient.updatePendingSegmentMappingAsync( - taskId, - rootPendingSegment, - Collections.singleton(upgradedPendingSegment) - ); + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); } } for (List taskGroupList : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroupList) { for (String taskId : taskGroup.taskIds()) { - taskClient.updatePendingSegmentMappingAsync( - taskId, - rootPendingSegment, - Collections.singleton(upgradedPendingSegment) - ); + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); } } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 00260ea1d254..81cd4db25561 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -69,7 +69,6 @@ import org.joda.time.Interval; import java.io.Closeable; -import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -95,7 +94,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; - private final ConcurrentMap newIdToRootPendingSegment + private final ConcurrentMap newIdToBasePendingSegment = new ConcurrentHashMap<>(); public SinkQuerySegmentWalker( @@ -188,7 +187,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable> perSegmentRunners = Iterables.transform( specs, newDescriptor -> { - final SegmentDescriptor descriptor = newIdToRootPendingSegment.getOrDefault(newDescriptor, newDescriptor); + final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); final PartitionChunk chunk = sinkTimeline.findChunk( descriptor.getInterval(), descriptor.getVersion(), @@ -303,17 +302,15 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final ); } - public void updatePendingSegmentMapping( - SegmentIdWithShardSpec rootPendingSegment, - List versionsOfPendingSegment + public void registerNewVersionOfPendingSegment( + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newSegmentVersion ) { - for (SegmentIdWithShardSpec versionOfPendingSegment : versionsOfPendingSegment) { - newIdToRootPendingSegment.put( - versionOfPendingSegment.asSegmentId().toDescriptor(), - rootPendingSegment.asSegmentId().toDescriptor() - ); - } + newIdToBasePendingSegment.put( + newSegmentVersion.asSegmentId().toDescriptor(), + basePendingSegment.asSegmentId().toDescriptor() + ); } @VisibleForTesting diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 2209cf082deb..c02e23b21791 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1059,36 +1059,35 @@ private void unannounceRootSegmentAndUpgradedVersions(Sink sink) throws IOExcept } } - public void updatePendingSegmentMapping( - SegmentIdWithShardSpec rootPendingSegment, - List versionsOfPendingSegment + public void registerNewVersionOfPendingSegment( + SegmentIdWithShardSpec basePendingSegment, + SegmentIdWithShardSpec newSegmentVersion ) throws IOException { - if (!sinks.containsKey(rootPendingSegment) || droppingSinks.contains(rootPendingSegment)) { + if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { return; } // Update query mapping with SinkQuerySegmentWalker - ((SinkQuerySegmentWalker) texasRanger).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment); + ((SinkQuerySegmentWalker) texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); // Announce segments - rootPendingSegmentToNewerVersions.putIfAbsent(rootPendingSegment.asSegmentId(), new HashSet<>()); - final DataSegment rootSegment = sinks.get(rootPendingSegment).getSegment(); - for (SegmentIdWithShardSpec idWithShardSpec : versionsOfPendingSegment) { - final DataSegment newSegment = new DataSegment( - idWithShardSpec.getDataSource(), - idWithShardSpec.getInterval(), - idWithShardSpec.getVersion(), - rootSegment.getLoadSpec(), - rootSegment.getDimensions(), - rootSegment.getMetrics(), - idWithShardSpec.getShardSpec(), - rootSegment.getBinaryVersion(), - rootSegment.getSize() - ); - segmentAnnouncer.announceSegment(newSegment); - rootPendingSegmentToNewerVersions.get(rootPendingSegment.asSegmentId()).add(idWithShardSpec); - } + rootPendingSegmentToNewerVersions.putIfAbsent(basePendingSegment.asSegmentId(), new HashSet<>()); + final DataSegment rootSegment = sinks.get(basePendingSegment).getSegment(); + + final DataSegment newSegment = new DataSegment( + newSegmentVersion.getDataSource(), + newSegmentVersion.getInterval(), + newSegmentVersion.getVersion(), + rootSegment.getLoadSpec(), + rootSegment.getDimensions(), + rootSegment.getMetrics(), + newSegmentVersion.getShardSpec(), + rootSegment.getBinaryVersion(), + rootSegment.getSize() + ); + segmentAnnouncer.announceSegment(newSegment); + rootPendingSegmentToNewerVersions.get(basePendingSegment.asSegmentId()).add(newSegmentVersion); } private void lockBasePersistDirectory() diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 879cfc6eb11c..301d9631b7d8 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -871,18 +871,18 @@ public Response isHandOffComplete( final Interval theInterval = Intervals.of(interval); final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber); final DateTime now = DateTimes.nowUtc(); - // dropped means a segment will never be handed off, i.e it completed hand off - // init to true, reset to false only if this segment can be loaded by rules - boolean dropped = true; + + // A segment that is not eligible for load will never be handed off + boolean notEligibleForLoad = true; for (Rule rule : rules) { if (rule.appliesTo(theInterval, now)) { if (rule instanceof LoadRule) { - dropped = false; + notEligibleForLoad = false; } break; } } - if (dropped) { + if (notEligibleForLoad) { return Response.ok(true).build(); } @@ -894,10 +894,11 @@ public Response isHandOffComplete( return Response.ok(false).build(); } - // If the segment being handed off has a lower version than the current chunk's, do not wait. - // This can happen when a concurrent replace occurs and there are multiple versions of segments being appended - if (!timeline.lookup(Intervals.of(interval)).isEmpty() - && timeline.lookup(Intervals.of(interval)).get(0).getVersion().compareTo(version) > 0) { + // A segment with version lower than that of the latest chunk might never get handed off + // If there are multiple versions of this segment (due to a concurrent replace task), + // only the latest version would get handed off + List> timelineObjects = timeline.lookup(Intervals.of(interval)); + if (!timelineObjects.isEmpty() && timelineObjects.get(0).getVersion().compareTo(version) > 0) { return Response.ok(true).build(); }