diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index d0a035be17c2..af5c0fbe95a1 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -296,6 +296,12 @@ public LagStats computeLagStats() throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); } + @Override + public Set getActiveRealtimeSequencePrefixes() + { + throw new UnsupportedOperationException(); + } + @Override public int getActiveTaskGroupsCount() { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index fe84d358bafb..365fb1751eac 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -203,19 +203,11 @@ public void testMaterializedViewSupervisorSpecCreated() SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor); Assert.assertNull(autoscaler); - try { - supervisor.computeLagStats(); - } - catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); - } + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.computeLagStats()); - try { - int count = supervisor.getActiveTaskGroupsCount(); - } - catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); - } + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount()); + + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes()); Callable noop = new Callable() { @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index e6ad0426e466..aaa62db90a7c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -145,8 +145,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t return; } + final Set activeRealtimeSequencePrefixes + = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); Map upgradedPendingSegments = - toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments); + toolbox.getIndexerMetadataStorageCoordinator() + .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index df454c1011a8..207ff56f28f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -110,6 +111,15 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String return Optional.absent(); } + public Set getActiveRealtimeSequencePrefixes(String activeSupervisorId) + { + if (supervisors.containsKey(activeSupervisorId)) { + return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); + } else { + return Collections.emptySet(); + } + } + public Optional getSupervisorSpec(String id) { Pair supervisor = supervisors.get(id); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1d05169e3fb1..a0ec6d809eb5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1093,6 +1093,28 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } + /** + * The base sequence name of a seekable stream task group is used as a prefix of the sequence names + * of pending segments published by it. + * This method can be used to identify the active pending segments for a datasource + * by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method + * @return the set of base sequence names of both active and pending completion task gruops. + */ + @Override + public Set getActiveRealtimeSequencePrefixes() + { + final Set activeBaseSequences = new HashSet<>(); + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { + activeBaseSequences.add(taskGroup.baseSequenceName); + } + for (List taskGroupList : pendingCompletionTaskGroups.values()) { + for (TaskGroup taskGroup : taskGroupList) { + activeBaseSequences.add(taskGroup.baseSequenceName); + } + } + return activeBaseSequences; + } + public void registerNewVersionOfPendingSegment( SegmentIdWithShardSpec basePendingSegment, SegmentIdWithShardSpec newSegmentVersion diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 819a6baacd8e..7a587bb196e1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1313,6 +1313,48 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0); } + @Test + public void testGetActiveRealtimeSequencePrefixes() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off two active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("2"), + ImmutableMap.of("2", "100"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task3"), + ImmutableSet.of() + ); + + Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size()); + } + @Test public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 108833422c88..143a74c72cbc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -238,7 +238,10 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith(Set replaceSegments) + public Map upgradePendingSegmentsOverlappingWith( + Set replaceSegments, + Set activeBaseSequenceNames + ) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 7c6710048a1a..34a55574dce9 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task + * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups + * of the supervisor (if any) for this datasource * @return Map from originally allocated pending segment to its new upgraded ID. */ Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments + Set replaceSegments, + Set activeRealtimeSequencePrefixes ); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index e733ef6c233d..20c102533862 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -31,6 +31,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -185,6 +186,12 @@ public int getActiveTaskGroupsCount() { return -1; } + + @Override + public Set getActiveRealtimeSequencePrefixes() + { + return Collections.emptySet(); + } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index bcfc5ebe8196..9d940bc55b6c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Set; public interface Supervisor { @@ -93,4 +94,9 @@ default Boolean isHealthy() LagStats computeLagStats(); int getActiveTaskGroupsCount(); + + /** + * @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor + */ + Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c654d5e229b7..612f712c1bb2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -231,7 +231,7 @@ public List retrieveUnusedSegmentsForInterval( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { return ImmutableList.copyOf(iterator); } } @@ -258,9 +258,62 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv /** * Fetches all the pending segments, whose interval overlaps with the given - * search interval from the metadata store. Returns a Map from the - * pending segment ID to the sequence name. + * search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter + * from the metadata store. Returns a Map from the pending segment ID to the sequence name. */ + @VisibleForTesting + Map getPendingSegmentsForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval, + final Set sequenceNamePrefixFilter + ) throws IOException + { + if (sequenceNamePrefixFilter.isEmpty()) { + return Collections.emptyMap(); + } + + final List sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); + final List sequenceNamePrefixConditions = new ArrayList<>(); + for (int i = 0; i < sequenceNamePrefixes.size(); i++) { + sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i)); + } + + String sql = "SELECT sequence_name, payload" + + " FROM " + dbTables.getPendingSegmentsTable() + + " WHERE dataSource = :dataSource" + + " AND start < :end" + + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString()) + + " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )"; + + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + for (int i = 0; i < sequenceNamePrefixes.size(); i++) { + query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%"); + } + + final ResultIterator dbSegments = + query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) + .iterator(); + + final Map pendingSegmentToSequenceName = new HashMap<>(); + while (dbSegments.hasNext()) { + PendingSegmentsRecord record = dbSegments.next(); + final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class); + + if (interval.overlaps(identifier.getInterval())) { + pendingSegmentToSequenceName.put(identifier, record.sequenceName); + } + } + + dbSegments.close(); + + return pendingSegmentToSequenceName; + } + private Map getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, @@ -620,7 +673,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( @Override public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments + Set replaceSegments, + Set activeRealtimeSequencePrefixes ) { if (replaceSegments.isEmpty()) { @@ -639,7 +693,7 @@ public Map upgradePendingSegment final String datasource = replaceSegments.iterator().next().getDataSource(); return connector.retryWithHandle( - handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId) + handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes) ); } @@ -658,7 +712,8 @@ public Map upgradePendingSegment private Map upgradePendingSegments( Handle handle, String datasource, - Map replaceIntervalToMaxId + Map replaceIntervalToMaxId, + Set activeRealtimeSequencePrefixes ) throws IOException { final Map newPendingSegmentVersions = new HashMap<>(); @@ -673,12 +728,13 @@ private Map upgradePendingSegmen int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum(); final Map overlappingPendingSegments - = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval); + = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes); for (Map.Entry overlappingPendingSegment : overlappingPendingSegments.entrySet()) { final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); final String pendingSegmentSequence = overlappingPendingSegment.getValue(); + if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) { // Ensure unique sequence_name_prev_id_sha1 by setting // sequence_prev_id -> pendingSegmentId @@ -1281,9 +1337,9 @@ private Set createNewIdsForAppendSegments( final Map> overlappingIntervalToSegments = new HashMap<>(); for (DataSegment segment : overlappingSegments) { overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) - .add(segment.getInterval()); + .add(segment.getInterval()); overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>()) - .add(segment); + .add(segment); } final Set upgradedSegments = new HashSet<>(); @@ -2275,7 +2331,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( // Not in the desired start state. return new DataStoreMetadataUpdateResult(true, false, StringUtils.format( "Inconsistent metadata state. This can happen if you update input topic in a spec without changing " + - "the supervisor name. Stored state: [%s], Target state: [%s].", + "the supervisor name. Stored state: [%s], Target state: [%s].", oldCommitMetadataFromDb, startMetadata )); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 0512357ffc10..a8fc9e923c5c 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; import org.apache.druid.data.input.StringTuple; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; @@ -464,6 +466,44 @@ private Boolean insertUsedSegments(Set dataSegments) ); } + private Boolean insertPendingSegmentAndSequenceName(Pair pendingSegmentSequenceName) + { + final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs; + final String sequenceName = pendingSegmentSequenceName.rhs; + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable(); + return derbyConnector.retryWithHandle( + handle -> { + handle.createStatement( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + table, + derbyConnector.getQuoteString() + ) + ) + .bind("id", pendingSegment.toString()) + .bind("dataSource", pendingSegment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", pendingSegment.getInterval().getStart().toString()) + .bind("end", pendingSegment.getInterval().getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", pendingSegment.toString()) + .bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putLong((long) pendingSegment.hashCode() * sequenceName.hashCode()) + .hash() + .asBytes() + )) + .bind("payload", mapper.writeValueAsBytes(pendingSegment)) + .execute(); + return true; + } + ); + } + private Map getSegmentsCommittedDuringReplaceTask(String taskId) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable(); @@ -2554,6 +2594,64 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ); } + @Test + public void testGetPendingSegmentsForIntervalWithSequencePrefixes() + { + Pair validIntervalValidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(defaultSegment), + "validLOL" + ); + insertPendingSegmentAndSequenceName(validIntervalValidSequence); + + Pair validIntervalInvalidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(defaultSegment2), + "invalidRandom" + ); + insertPendingSegmentAndSequenceName(validIntervalInvalidSequence); + + Pair invalidIntervalvalidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(existingSegment1), + "validStuff" + ); + insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence); + + Pair twentyFifteenWithAnotherValidSequence = Pair.of( + new SegmentIdWithShardSpec( + existingSegment1.getDataSource(), + Intervals.of("2015/2016"), + "1970-01-01", + new NumberedShardSpec(1, 0) + ), + "alsoValidAgain" + ); + insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence); + + Pair twentyFifteenWithInvalidSequence = Pair.of( + new SegmentIdWithShardSpec( + existingSegment1.getDataSource(), + Intervals.of("2015/2016"), + "1970-01-01", + new NumberedShardSpec(2, 0) + ), + "definitelyInvalid" + ); + insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence); + + + final Map expected = new HashMap<>(); + expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs); + expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs); + + final Map actual = + derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle( + handle, + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + ImmutableSet.of("valid", "alsoValid") + )); + Assert.assertEquals(expected, actual); + } + @Test public void testRetrieveUsedSegmentsAndCreatedDates() {