From 4327aad749b0aadeec4fea8be7bfde6c6b5d126e Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 26 Sep 2023 16:44:04 +0530 Subject: [PATCH] Facilitate supervisor to update pending segment mapping in tasks --- .../MaterializedViewSupervisor.java | 7 +++++ .../supervisor/SupervisorManager.java | 20 ++++++++++++++ .../SeekableStreamIndexTaskClient.java | 8 ++++++ ...eekableStreamIndexTaskClientAsyncImpl.java | 21 +++++++++++++++ .../SeekableStreamIndexTaskRunner.java | 26 +++++++++++++++++++ .../supervisor/SeekableStreamSupervisor.java | 26 +++++++++++++++++++ ...TestIndexerMetadataStorageCoordinator.java | 6 +++++ .../IndexerMetadataStorageCoordinator.java | 2 ++ .../supervisor/NoopSupervisorSpec.java | 6 +++++ .../overlord/supervisor/Supervisor.java | 3 +++ .../IndexerSQLMetadataStorageCoordinator.java | 6 +++++ .../appenderator/SinkQuerySegmentWalker.java | 20 +++++++++++++- .../appenderator/StreamAppenderator.java | 11 ++++++++ 13 files changed, 161 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index ac2738534da9..b4ba7ac1c2ef 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -49,6 +49,7 @@ import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.metadata.SqlSegmentsMetadataManager; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.joda.time.Duration; import org.joda.time.Interval; @@ -289,6 +290,12 @@ public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) // do nothing } + @Override + public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment) + { + // do nothing + } + @Override public LagStats computeLagStats() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 2cd926bae907..6fdf1f7a1400 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import javax.annotation.Nullable; @@ -247,6 +248,25 @@ public boolean checkPointDataSourceMetadata( return false; } + public boolean updatePendingSegmentMapping(String supervisorId, SegmentIdWithShardSpec rootPendingSegment) + { + try { + Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); + Preconditions.checkNotNull(rootPendingSegment, "rootPendingSegment cannot be null"); + + Pair supervisor = supervisors.get(supervisorId); + + Preconditions.checkNotNull(supervisor, "supervisor could not be found"); + + supervisor.lhs.updatePendingSegmentMapping(rootPendingSegment); + return true; + } + catch (Exception e) { + log.error(e, "Pending segment mapping update request failed"); + } + return false; + } + /** * Stops a supervisor with a given id and then removes it from the list. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java index 18631626d0f6..806bcd6c692c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; public interface SeekableStreamIndexTaskClient @@ -153,6 +155,12 @@ ListenableFuture setEndOffsetsAsync( */ ListenableFuture getStatusAsync(String id); + ListenableFuture updatePendingSegmentMapping( + String id, + SegmentIdWithShardSpec rootPendingSegment, + Set versionsOfPendingSegment + ); + Class getPartitionType(); Class getSequenceType(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java index 9d6d49e00bf2..8b7b9c616775 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -57,6 +58,7 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -68,6 +70,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -193,6 +196,24 @@ public ListenableFuture> getEndOffsetsA .go(); } + @Override + public ListenableFuture updatePendingSegmentMapping( + String id, + SegmentIdWithShardSpec rootPendingSegment, + Set versionsOfPendingSegment + ) + { + final RequestBuilder requestBuilder = new RequestBuilder( + HttpMethod.POST, + "updatePendingSegmentMapping" + ).jsonContent(jsonMapper, Pair.of(rootPendingSegment, versionsOfPendingSegment)); + + return makeRequest(id, requestBuilder) + .handler(IgnoreHttpResponseHandler.INSTANCE) + .onSuccess(r -> true) + .go(); + } + @Override public ListenableFuture setEndOffsetsAsync( final String id, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index e44dfe9a451e..f540c0ba5d4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -72,6 +72,7 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -82,7 +83,9 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.appenderator.Appenderator; import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.segment.realtime.appenderator.StreamAppenderator; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.server.security.Access; @@ -1538,6 +1541,20 @@ public Response setEndOffsetsHTTP( return setEndOffsets(sequences, finish); } + @POST + @Path("updatePendingSegmentMapping") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response updatePendingSegmentMapping( + Pair> rootPendingSegmentToVersionMapping, + // this field is only for internal purposes, shouldn't be usually set by users + @Context final HttpServletRequest req + ) + { + authorizationCheck(req, Action.WRITE); + return updatePendingSegmentMapping(rootPendingSegmentToVersionMapping.lhs, rootPendingSegmentToVersionMapping.rhs); + } + public Map doGetRowStats() { Map returnMap = new HashMap<>(); @@ -1742,6 +1759,15 @@ public Response setEndOffsets( return Response.ok(sequenceNumbers).build(); } + private Response updatePendingSegmentMapping( + SegmentIdWithShardSpec rootPendingSegment, + Set versionsOfPendingSegment + ) + { + ((StreamAppenderator) appenderator).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment); + return Response.ok().build(); + } + private void resetNextCheckpointTime() { nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 2f6cb008b842..db5f73e3b89d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -97,6 +97,7 @@ import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import javax.annotation.Nonnull; @@ -1092,6 +1093,31 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } + @Override + public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment) + { + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { + for (String taskId : taskGroup.taskIds()) { + taskClient.updatePendingSegmentMapping( + taskId, + rootPendingSegment, + indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment) + ); + } + } + for (List taskGroupList : pendingCompletionTaskGroups.values()) { + for (TaskGroup taskGroup : taskGroupList) { + for (String taskId : taskGroup.taskIds()) { + taskClient.updatePendingSegmentMapping( + taskId, + rootPendingSegment, + indexerMetadataStorageCoordinator.getAllVersionsOfPendingSegment(rootPendingSegment) + ); + } + } + } + } + public ReentrantLock getRecordSupplierLock() { return recordSupplierLock; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 34d2e44552a7..6fd749201b7b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -253,6 +253,12 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) return null; } + @Override + public Set getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment) + { + return Collections.emptySet(); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 3cbabea78fae..7f4c6c193909 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -406,4 +406,6 @@ SegmentPublishResult commitMetadataOnly( */ DataSegment retrieveSegmentForId(String id, boolean includeUnused); + Set getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment); + } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index e733ef6c233d..a7439bc44914 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.security.ResourceAction; import javax.annotation.Nonnull; @@ -185,6 +186,11 @@ public int getActiveTaskGroupsCount() { return -1; } + + @Override + public void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment) + { + } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index bcfc5ebe8196..22f7dd8b5740 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import javax.annotation.Nullable; import java.util.List; @@ -87,6 +88,8 @@ default Boolean isHealthy() */ void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata); + void updatePendingSegmentMapping(SegmentIdWithShardSpec rootPendingSegment); + /** * Computes maxLag, totalLag and avgLag */ diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 7eaac692f7ce..bdd4efa12b4d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2338,6 +2338,12 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) ); } + @Override + public Set getAllVersionsOfPendingSegment(SegmentIdWithShardSpec rootPendingSegment) + { + return Collections.emptySet(); + } + private static class PendingSegmentsRecord { private final String sequenceName; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 46315dbc0de2..2abc2e24e776 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -69,7 +69,10 @@ import org.joda.time.Interval; import java.io.Closeable; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -92,6 +95,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; + private final Map newIdToRootPendingSegment = new ConcurrentHashMap<>(); public SinkQuerySegmentWalker( String dataSource, @@ -182,7 +186,8 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable> perSegmentRunners = Iterables.transform( specs, - descriptor -> { + newDescriptor -> { + final SegmentDescriptor descriptor = newIdToRootPendingSegment.getOrDefault(newDescriptor, newDescriptor); final PartitionChunk chunk = sinkTimeline.findChunk( descriptor.getInterval(), descriptor.getVersion(), @@ -297,6 +302,19 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final ); } + public void updatePendingSegmentMapping( + SegmentIdWithShardSpec rootPendingSegment, + Set versionsOfPendingSegment + ) + { + for (SegmentIdWithShardSpec versionOfPendingSegment : versionsOfPendingSegment) { + newIdToRootPendingSegment.put( + versionOfPendingSegment.asSegmentId().toDescriptor(), + rootPendingSegment.asSegmentId().toDescriptor() + ); + } + } + @VisibleForTesting String getDataSource() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index e95852bfddb1..be50de901ec3 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1026,6 +1026,17 @@ public void closeNow() } } + public void updatePendingSegmentMapping( + SegmentIdWithShardSpec rootPendingSegment, + Set versionsOfPendingSegment + ) + { + if (!sinks.containsKey(rootPendingSegment) || droppingSinks.contains(rootPendingSegment)) { + return; + } + ((SinkQuerySegmentWalker) texasRanger).updatePendingSegmentMapping(rootPendingSegment, versionsOfPendingSegment); + } + private void lockBasePersistDirectory() { if (basePersistDirLock == null) {