Skip to content

Commit

Permalink
Facilitate supervisor to update pending segment mapping in tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Sep 26, 2023
1 parent 7301e60 commit 4327aad
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
Expand Down Expand Up @@ -289,6 +290,12 @@ public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
// do nothing
}

@Override
public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment)
{
// do nothing
}

@Override
public LagStats computeLagStats()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -247,6 +248,25 @@ public boolean checkPointDataSourceMetadata(
return false;
}

public boolean updatePendingSegmentMapping(String supervisorId, SegmentIdWithShardSpec rootPendingSegment)
{
try {
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
Preconditions.checkNotNull(rootPendingSegment, "rootPendingSegment cannot be null");

Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);

Preconditions.checkNotNull(supervisor, "supervisor could not be found");

supervisor.lhs.updatePendingSegmentMapping(rootPendingSegment);
return true;
}
catch (Exception e) {
log.error(e, "Pending segment mapping update request failed");
}
return false;
}


/**
* Stops a supervisor with a given id and then removes it from the list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public interface SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType>
Expand Down Expand Up @@ -153,6 +155,12 @@ ListenableFuture<Boolean> setEndOffsetsAsync(
*/
ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String id);

ListenableFuture<Boolean> updatePendingSegmentMapping(
String id,
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
);

Class<PartitionIdType> getPartitionType();

Class<SequenceOffsetType> getSequenceType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
Expand All @@ -57,6 +58,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
Expand All @@ -68,6 +70,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -193,6 +196,24 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsA
.go();
}

@Override
public ListenableFuture<Boolean> updatePendingSegmentMapping(
String id,
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
final RequestBuilder requestBuilder = new RequestBuilder(
HttpMethod.POST,
"updatePendingSegmentMapping"
).jsonContent(jsonMapper, Pair.of(rootPendingSegment, versionsOfPendingSegment));

return makeRequest(id, requestBuilder)
.handler(IgnoreHttpResponseHandler.INSTANCE)
.onSuccess(r -> true)
.go();
}

@Override
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand All @@ -82,7 +83,9 @@
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Access;
Expand Down Expand Up @@ -1538,6 +1541,20 @@ public Response setEndOffsetsHTTP(
return setEndOffsets(sequences, finish);
}

@POST
@Path("updatePendingSegmentMapping")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response updatePendingSegmentMapping(
Pair<SegmentIdWithShardSpec, Set<SegmentIdWithShardSpec>> rootPendingSegmentToVersionMapping,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.WRITE);
return updatePendingSegmentMapping(rootPendingSegmentToVersionMapping.lhs, rootPendingSegmentToVersionMapping.rhs);
}

public Map<String, Object> doGetRowStats()
{
Map<String, Object> returnMap = new HashMap<>();
Expand Down Expand Up @@ -1742,6 +1759,15 @@ public Response setEndOffsets(
return Response.ok(sequenceNumbers).build();
}

private Response updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
((StreamAppenderator) appenderator).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment);
return Response.ok().build();
}

private void resetNextCheckpointTime()
{
nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -1092,6 +1093,31 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}

@Override
public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment)
{
for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
for (String taskId : taskGroup.taskIds()) {
taskClient.updatePendingSegmentMapping(
taskId,
rootPendingSegment,
indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment)
);
}
}
for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroupList) {
for (String taskId : taskGroup.taskIds()) {
taskClient.updatePendingSegmentMapping(
taskId,
rootPendingSegment,
indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment)
);
}
}
}
}

public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
return null;
}

@Override
public Set<SegmentIdWithShardSpec> getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment)
{
return Collections.emptySet();
}

public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,6 @@ SegmentPublishResult commitMetadataOnly(
*/
DataSegment retrieveSegmentForId(String id, boolean includeUnused);

Set<SegmentIdWithShardSpec> getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.security.ResourceAction;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -185,6 +186,11 @@ public int getActiveTaskGroupsCount()
{
return -1;
}

@Override
public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment)
{
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

import javax.annotation.Nullable;
import java.util.List;
Expand Down Expand Up @@ -87,6 +88,8 @@ default Boolean isHealthy()
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);

void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment);

/**
* Computes maxLag, totalLag and avgLag
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2338,6 +2338,12 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
);
}

@Override
public Set<SegmentIdWithShardSpec> getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment)
{
return Collections.emptySet();
}

private static class PendingSegmentsRecord
{
private final String sequenceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

Expand All @@ -92,6 +95,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
private final Map<SegmentDescriptor, SegmentDescriptor> newIdToRootPendingSegment = new ConcurrentHashMap<>();

public SinkQuerySegmentWalker(
String dataSource,
Expand Down Expand Up @@ -182,7 +186,8 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final

Iterable<QueryRunner<T>> perSegmentRunners = Iterables.transform(
specs,
descriptor -> {
newDescriptor -> {
final SegmentDescriptor descriptor = newIdToRootPendingSegment.getOrDefault(newDescriptor, newDescriptor);
final PartitionChunk<Sink> chunk = sinkTimeline.findChunk(
descriptor.getInterval(),
descriptor.getVersion(),
Expand Down Expand Up @@ -297,6 +302,19 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
);
}

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
for (SegmentIdWithShardSpec versionOfPendingSegment : versionsOfPendingSegment) {
newIdToRootPendingSegment.put(
versionOfPendingSegment.asSegmentId().toDescriptor(),
rootPendingSegment.asSegmentId().toDescriptor()
);
}
}

@VisibleForTesting
String getDataSource()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,17 @@ public void closeNow()
}
}

public void updatePendingSegmentMapping(
SegmentIdWithShardSpec rootPendingSegment,
Set<SegmentIdWithShardSpec> versionsOfPendingSegment
)
{
if (!sinks.containsKey(rootPendingSegment) || droppingSinks.contains(rootPendingSegment)) {
return;
}
((SinkQuerySegmentWalker) texasRanger).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment);
}

private void lockBasePersistDirectory()
{
if (basePersistDirLock == null) {
Expand Down

0 comments on commit 4327aad

Please sign in to comment.