Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/updateTaskPendingSegmentMapping'…
Browse files Browse the repository at this point in the history
… into updateTaskPendingSegmentMapping
  • Loading branch information
AmatyaAvadhanula committed Oct 12, 2023
2 parents e893d1a + aeaaadd commit 538aea3
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t

upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.updatePendingSegmentMapping(activeSupervisorId.get(), oldId, newId)
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,16 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular

/**
* Builds a TaskAction to publish segments based on the type of locks that this
* task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}).
* task acquires.
*
* @see #determineLockType
*/
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish
Set<DataSegment> segmentsToPublish,
TaskLockType lockType
)
{
TaskLockType lockType = TaskLockType.valueOf(
getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name())
);
switch (lockType) {
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -910,10 +911,11 @@ private TaskStatus generateAndPublishSegments(
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
}


final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish));
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -1167,9 +1168,11 @@ private void publishSegments(
}
}

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish));
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

final boolean published =
newSegments.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,21 @@ public boolean checkPointDataSourceMetadata(
return false;
}

public boolean updatePendingSegmentMapping(
/**
* Registers a new version of the given pending segment on a supervisor. This
* allows the supervisor to include the pending segment in queries fired against
* that segment version.
*/
public boolean registerNewVersionOfPendingSegmentOnSupervisor(
String supervisorId,
SegmentIdWithShardSpec rootPendingSegment,
SegmentIdWithShardSpec upgradedPendingSegment
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
)
{
try {
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
Preconditions.checkNotNull(rootPendingSegment, "rootPendingSegment cannot be null");
Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null");
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null");

Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
Expand All @@ -282,7 +288,7 @@ public boolean updatePendingSegmentMapping(
}

SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = (SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
seekableStreamSupervisor.updatePendingSegmentMapping(rootPendingSegment, upgradedPendingSegment);
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion);
return true;
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

/**
* Contains a new version of an existing base pending segment. Used by realtime
* tasks to serve queries against multiple versions of the same pending segment.
*/
public class PendingSegmentVersions
{
private final SegmentIdWithShardSpec baseSegment;
private final SegmentIdWithShardSpec newVersion;

@JsonCreator
public PendingSegmentVersions(
@JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment,
@JsonProperty("newVersion") SegmentIdWithShardSpec newVersion
)
{
this.baseSegment = baseSegment;
this.newVersion = newVersion;
}

@JsonProperty
public SegmentIdWithShardSpec getBaseSegment()
{
return baseSegment;
}

@JsonProperty
public SegmentIdWithShardSpec getNewVersion()
{
return newVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public interface SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType>
Expand Down Expand Up @@ -159,15 +158,15 @@ ListenableFuture<Boolean> setEndOffsetsAsync(
* Update the task state to redirect queries for later versions to the root pending segment.
* The task also announces that it is serving the segments belonging to the subsequent versions.
* The update is processed only if the task is serving the original pending segment.
* @param id - task id
* @param rootPendingSegment - the pending segment that was originally allocated
* @param versionsOfPendingSegment - the ids belonging to the versions to which the root segment needs to be updated
* @param taskId - task id
* @param basePendingSegment - the pending segment that was originally allocated
* @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated
* @return true if the update succeeds
*/
ListenableFuture<Boolean> updatePendingSegmentMappingAsync(
String id,
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
);

Class<PartitionIdType> getPartitionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -196,24 +195,17 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsA
}

@Override
public ListenableFuture<Boolean> updatePendingSegmentMappingAsync(
String id,
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
)
{
if (versionsOfPendingSegment.isEmpty()) {
return Futures.immediateFuture(true);
}
final List<SegmentIdWithShardSpec> allVersionsOfPendingSegment = new ArrayList<>();
allVersionsOfPendingSegment.add(rootPendingSegment);
allVersionsOfPendingSegment.addAll(versionsOfPendingSegment);
final RequestBuilder requestBuilder = new RequestBuilder(
HttpMethod.POST,
"/pendingSegmentMapping"
).jsonContent(jsonMapper, allVersionsOfPendingSegment);
final RequestBuilder requestBuilder
= new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
.jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment));

return makeRequest(id, requestBuilder)
return makeRequest(taskId, requestBuilder)
.handler(IgnoreHttpResponseHandler.INSTANCE)
.onSuccess(r -> true)
.go();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1553,17 +1553,20 @@ public Response setEndOffsetsHTTP(
}

@POST
@Path("pendingSegmentMapping")
@Path("/pendingSegmentVersion")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response updatePendingSegmentMapping(
List<SegmentIdWithShardSpec> allVersionsOfPendingSegment,
public Response registerNewVersionOfPendingSegment(
PendingSegmentVersions pendingSegmentVersions,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.WRITE);
return updatePendingSegmentMapping(allVersionsOfPendingSegment);
return registerNewVersionOfPendingSegment(
pendingSegmentVersions.getBaseSegment(),
pendingSegmentVersions.getNewVersion()
);
}

public Map<String, Object> doGetRowStats()
Expand Down Expand Up @@ -1771,12 +1774,15 @@ public Response setEndOffsets(
return Response.ok(sequenceNumbers).build();
}

private Response updatePendingSegmentMapping(List<SegmentIdWithShardSpec> allVersionsOfPendingSegment)
private Response registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
)
{
try {
((StreamAppenderator) appenderator).updatePendingSegmentMapping(
allVersionsOfPendingSegment.get(0),
allVersionsOfPendingSegment.subList(1, allVersionsOfPendingSegment.size())
((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
basePendingSegment,
newSegmentVersion
);
return Response.ok().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,28 +1093,20 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
SegmentIdWithShardSpec upgradedPendingSegment
public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
)
{
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
for (String taskId : taskGroup.taskIds()) {
taskClient.updatePendingSegmentMappingAsync(
taskId,
rootPendingSegment,
Collections.singleton(upgradedPendingSegment)
);
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion);
}
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
for (String taskId : taskGroup.taskIds()) {
taskClient.updatePendingSegmentMappingAsync(
taskId,
rootPendingSegment,
Collections.singleton(upgradedPendingSegment)
);
taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -95,7 +94,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToRootPendingSegment
private final ConcurrentMap<SegmentDescriptor, SegmentDescriptor> newIdToBasePendingSegment
= new ConcurrentHashMap<>();

public SinkQuerySegmentWalker(
Expand Down Expand Up @@ -188,7 +187,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
newDescriptor -> {
final SegmentDescriptor descriptor = newIdToRootPendingSegment.getOrDefault(newDescriptor, newDescriptor);
final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor);
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
Expand Down Expand Up @@ -303,17 +302,15 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
);
}

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
List<SegmentIdWithShardSpec> versionsOfPendingSegment
public void registerNewVersionOfPendingSegment(
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
)
{
for (SegmentIdWithShardSpec versionOfPendingSegment : versionsOfPendingSegment) {
newIdToRootPendingSegment.put(
versionOfPendingSegment.asSegmentId().toDescriptor(),
rootPendingSegment.asSegmentId().toDescriptor()
);
}
newIdToBasePendingSegment.put(
newSegmentVersion.asSegmentId().toDescriptor(),
basePendingSegment.asSegmentId().toDescriptor()
);
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 538aea3

Please sign in to comment.