From 37613c611321fea70eafb3a88d7df0d47c01dd66 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 16 Oct 2023 20:03:26 +0530 Subject: [PATCH 1/9] Filter pending segments upgraded with transactional replace --- .../MaterializedViewSupervisor.java | 6 ++++++ .../SegmentTransactionalReplaceAction.java | 7 +++++-- .../supervisor/SupervisorManager.java | 9 +++++++++ .../supervisor/SeekableStreamSupervisor.java | 15 ++++++++++++++ ...TestIndexerMetadataStorageCoordinator.java | 5 ++++- .../IndexerMetadataStorageCoordinator.java | 4 +++- .../supervisor/NoopSupervisorSpec.java | 7 +++++++ .../overlord/supervisor/Supervisor.java | 6 ++++++ .../IndexerSQLMetadataStorageCoordinator.java | 20 ++++++++++++++++--- 9 files changed, 72 insertions(+), 7 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index ac2738534da9..97cdad80f76e 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -295,6 +295,12 @@ public LagStats computeLagStats() throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); } + @Override + public Set getActiveBaseSequenceNames() + { + throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor"); + } + @Override public int getActiveTaskGroupsCount() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 5a2b3ceec8f6..638219eb3596 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -139,13 +139,16 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource()); + final Optional activeSupervisorId = + supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource()); if (!activeSupervisorId.isPresent()) { return; } + final Set activeBaseSequenceNames = supervisorManager.getActiveBaseSequenceNames(activeSupervisorId.get()); Map upgradedPendingSegments = - toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments); + toolbox.getIndexerMetadataStorageCoordinator() + .upgradePendingSegmentsOverlappingWith(segments, activeBaseSequenceNames); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index d55f3cc8bd0c..fcc08c6f2382 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -87,6 +88,14 @@ public Optional getActiveSupervisorIdForDatasource(String datasource) return Optional.absent(); } + public Set getActiveBaseSequenceNames(String activeSupervisorId) + { + if (!supervisors.containsKey(activeSupervisorId)) { + return Collections.emptySet(); + } + return supervisors.get(activeSupervisorId).lhs.getActiveBaseSequenceNames(); + } + public Optional getSupervisorSpec(String id) { Pair supervisor = supervisors.get(id); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1d05169e3fb1..1c00409a64de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1093,6 +1093,21 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } + @Override + public Set getActiveBaseSequenceNames() + { + final Set activeBaseSequences = new HashSet<>(); + for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { + activeBaseSequences.add(taskGroup.baseSequenceName); + } + for (List taskGroupList : pendingCompletionTaskGroups.values()) { + for (TaskGroup taskGroup : taskGroupList) { + activeBaseSequences.add(taskGroup.baseSequenceName); + } + } + return activeBaseSequences; + } + public void registerNewVersionOfPendingSegment( SegmentIdWithShardSpec basePendingSegment, SegmentIdWithShardSpec newSegmentVersion diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d1c72485011f..214643affa88 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -238,7 +238,10 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith(Set replaceSegments) + public Map upgradePendingSegmentsOverlappingWith( + Set replaceSegments, + Set activeBaseSequenceNames + ) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 2c2a6bc0f77b..0cf9933dd467 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -345,10 +345,12 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task + * @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor * @return Map from originally allocated pending segment to its new upgraded ID. */ Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments + Set replaceSegments, + Set activeBaseSequenceNames ); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index e733ef6c233d..4e91ca8d0f37 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -31,6 +31,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -185,6 +186,12 @@ public int getActiveTaskGroupsCount() { return -1; } + + @Override + public Set getActiveBaseSequenceNames() + { + return Collections.emptySet(); + } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index bcfc5ebe8196..80a26eddcc21 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Set; public interface Supervisor { @@ -93,4 +94,9 @@ default Boolean isHealthy() LagStats computeLagStats(); int getActiveTaskGroupsCount(); + + /** + * @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor + */ + Set getActiveBaseSequenceNames(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 226663c32330..40d751d2d368 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -601,7 +601,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( @Override public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments + Set replaceSegments, + Set activeBaseSequenceNames ) { if (replaceSegments.isEmpty()) { @@ -620,7 +621,7 @@ public Map upgradePendingSegment final String datasource = replaceSegments.iterator().next().getDataSource(); return connector.retryWithHandle( - handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId) + handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeBaseSequenceNames) ); } @@ -639,7 +640,8 @@ public Map upgradePendingSegment private Map upgradePendingSegments( Handle handle, String datasource, - Map replaceIntervalToMaxId + Map replaceIntervalToMaxId, + Set activeBaseSequenceNames ) throws IOException { final Map newPendingSegmentVersions = new HashMap<>(); @@ -660,6 +662,18 @@ private Map upgradePendingSegmen : overlappingPendingSegments.entrySet()) { final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); final String pendingSegmentSequence = overlappingPendingSegment.getValue(); + + boolean considerSequence = false; + for (String baseSequence : activeBaseSequenceNames) { + if (pendingSegmentSequence.startsWith(baseSequence)) { + considerSequence = true; + break; + } + } + if (!considerSequence) { + continue; + } + if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) { // Ensure unique sequence_name_prev_id_sha1 by setting // sequence_prev_id -> pendingSegmentId From 368312eb6d8806ff702a2d3f50383d7fa2d7931b Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 17 Oct 2023 21:15:03 +0530 Subject: [PATCH 2/9] Push sequence name filter to metadata query --- .../MaterializedViewSupervisor.java | 2 +- .../SegmentTransactionalReplaceAction.java | 5 +- .../supervisor/SupervisorManager.java | 4 +- .../supervisor/SeekableStreamSupervisor.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 5 +- .../supervisor/NoopSupervisorSpec.java | 2 +- .../overlord/supervisor/Supervisor.java | 2 +- .../IndexerSQLMetadataStorageCoordinator.java | 259 +++++++++++------- ...exerSQLMetadataStorageCoordinatorTest.java | 98 +++++++ 9 files changed, 266 insertions(+), 113 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 97cdad80f76e..636897c610c9 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -296,7 +296,7 @@ public LagStats computeLagStats() } @Override - public Set getActiveBaseSequenceNames() + public Set getActiveRealtimeSequencePrefixes() { throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor"); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 638219eb3596..9fe9e5bf46fc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -145,10 +145,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t return; } - final Set activeBaseSequenceNames = supervisorManager.getActiveBaseSequenceNames(activeSupervisorId.get()); + final Set activeRealtimeSequencePrefixes + = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorId.get()); Map upgradedPendingSegments = toolbox.getIndexerMetadataStorageCoordinator() - .upgradePendingSegmentsOverlappingWith(segments, activeBaseSequenceNames); + .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index fcc08c6f2382..40ef5a75698e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -88,12 +88,12 @@ public Optional getActiveSupervisorIdForDatasource(String datasource) return Optional.absent(); } - public Set getActiveBaseSequenceNames(String activeSupervisorId) + public Set getActiveRealtimeSequencePrefixes(String activeSupervisorId) { if (!supervisors.containsKey(activeSupervisorId)) { return Collections.emptySet(); } - return supervisors.get(activeSupervisorId).lhs.getActiveBaseSequenceNames(); + return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); } public Optional getSupervisorSpec(String id) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1c00409a64de..9583688e87db 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1094,7 +1094,7 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) } @Override - public Set getActiveBaseSequenceNames() + public Set getActiveRealtimeSequencePrefixes() { final Set activeBaseSequences = new HashSet<>(); for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 0cf9933dd467..65f1087a1215 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -345,12 +345,13 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @param activeBaseSequenceNames of base sequence names of active / pending completion task groups of the supervisor + * @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups + * of the supervisor (if any) for this datasource * @return Map from originally allocated pending segment to its new upgraded ID. */ Map upgradePendingSegmentsOverlappingWith( Set replaceSegments, - Set activeBaseSequenceNames + Set activeRealtimeSequencePrefixes ); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 4e91ca8d0f37..20c102533862 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -188,7 +188,7 @@ public int getActiveTaskGroupsCount() } @Override - public Set getActiveBaseSequenceNames() + public Set getActiveRealtimeSequencePrefixes() { return Collections.emptySet(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 80a26eddcc21..fbab17688a4e 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -98,5 +98,5 @@ default Boolean isHealthy() /** * @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor */ - Set getActiveBaseSequenceNames(); + Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 40d751d2d368..0eb96ae1061d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -212,7 +212,7 @@ public List retrieveUnusedSegmentsForInterval( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { return ImmutableList.copyOf(iterator); } } @@ -239,9 +239,73 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv /** * Fetches all the pending segments, whose interval overlaps with the given - * search interval from the metadata store. Returns a Map from the - * pending segment ID to the sequence name. + * search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter + * from the metadata store. Returns a Map from the pending segment ID to the sequence name. */ + @VisibleForTesting + Map getPendingSegmentsForIntervalWithHandle( + final Handle handle, + final String dataSource, + final Interval interval, + final Set sequenceNamePrefixFilter + ) throws IOException + { + if (sequenceNamePrefixFilter.isEmpty()) { + return Collections.emptyMap(); + } + + final List sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); + StringBuilder sql = new StringBuilder( + "SELECT sequence_name, payload FROM " + + dbTables.getPendingSegmentsTable() + + " WHERE dataSource = :dataSource AND start < :end AND " + + connector.getQuoteString() + "end" + connector.getQuoteString() + " > :start" + ); + + sql.append(" AND ( "); + + for (int i = 1; i < sequenceNamePrefixes.size(); i++) { + sql.append("(sequence_name LIKE ") + .append(StringUtils.format(":prefix%d", i)) + .append(")") + .append(" OR "); + } + + sql.append("(sequence_name LIKE ") + .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size())) + .append(")"); + + sql.append(" )"); + + Query> query = + handle.createQuery(sql.toString()) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + for (int i = 1; i <= sequenceNamePrefixes.size(); i++) { + query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i - 1) + "%"); + } + + final ResultIterator dbSegments = + query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) + .iterator(); + + final Map pendingSegmentToSequenceName = new HashMap<>(); + while (dbSegments.hasNext()) { + PendingSegmentsRecord record = dbSegments.next(); + final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class); + + if (interval.overlaps(identifier.getInterval())) { + pendingSegmentToSequenceName.put(identifier, record.sequenceName); + } + } + + dbSegments.close(); + + return pendingSegmentToSequenceName; + } + private Map getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, @@ -250,15 +314,15 @@ private Map getPendingSegmentsForIntervalWithHan { final ResultIterator dbSegments = handle.createQuery( - StringUtils.format( - // This query might fail if the year has a different number of digits - // See https://github.com/apache/druid/pull/11582 for a similar issue - // Using long for these timestamps instead of varchar would give correct time comparisons - "SELECT sequence_name, payload FROM %1$s" - + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() - ) - ) + StringUtils.format( + // This query might fail if the year has a different number of digits + // See https://github.com/apache/druid/pull/11582 for a similar issue + // Using long for these timestamps instead of varchar would give correct time comparisons + "SELECT sequence_name, payload FROM %1$s" + + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", + dbTables.getPendingSegmentsTable(), connector.getQuoteString() + ) + ) .bind("dataSource", dataSource) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) @@ -602,7 +666,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( @Override public Map upgradePendingSegmentsOverlappingWith( Set replaceSegments, - Set activeBaseSequenceNames + Set activeRealtimeSequencePrefixes ) { if (replaceSegments.isEmpty()) { @@ -621,7 +685,7 @@ public Map upgradePendingSegment final String datasource = replaceSegments.iterator().next().getDataSource(); return connector.retryWithHandle( - handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeBaseSequenceNames) + handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes) ); } @@ -641,7 +705,7 @@ private Map upgradePendingSegmen Handle handle, String datasource, Map replaceIntervalToMaxId, - Set activeBaseSequenceNames + Set activeRealtimeSequencePrefixes ) throws IOException { final Map newPendingSegmentVersions = new HashMap<>(); @@ -656,24 +720,13 @@ private Map upgradePendingSegmen int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum(); final Map overlappingPendingSegments - = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval); + = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes); for (Map.Entry overlappingPendingSegment : overlappingPendingSegments.entrySet()) { final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); final String pendingSegmentSequence = overlappingPendingSegment.getValue(); - boolean considerSequence = false; - for (String baseSequence : activeBaseSequenceNames) { - if (pendingSegmentSequence.startsWith(baseSequence)) { - considerSequence = true; - break; - } - } - if (!considerSequence) { - continue; - } - if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) { // Ensure unique sequence_name_prev_id_sha1 by setting // sequence_prev_id -> pendingSegmentId @@ -1174,13 +1227,13 @@ private int insertPendingSegmentsIntoMetastore( { final PreparedBatch insertBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - )); + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + )); // Deduplicate the segment ids by inverting the map Map segmentIdToRequest = new HashMap<>(); @@ -1220,15 +1273,15 @@ private void insertPendingSegmentIntoMetastore( ) throws JsonProcessingException { handle.createStatement( - StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - ) - ) + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ) .bind("id", newIdentifier.toString()) .bind("dataSource", dataSource) .bind("created_date", DateTimes.nowUtc().toString()) @@ -1276,9 +1329,9 @@ private Set createNewIdsForAppendSegments( final Map> overlappingIntervalToSegments = new HashMap<>(); for (DataSegment segment : overlappingSegments) { overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) - .add(segment.getInterval()); + .add(segment.getInterval()); overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>()) - .add(segment); + .add(segment); } final Set upgradedSegments = new HashSet<>(); @@ -1828,16 +1881,16 @@ private Set announceHistoricalSegmentBatch( for (DataSegment segment : partition) { final String now = DateTimes.nowUtc().toString(); preparedBatch.add() - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", now) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", usedSegments.contains(segment)) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .bind("used_status_last_updated", now); + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", now) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .bind("used_status_last_updated", now); } final int[] affectedRows = preparedBatch.execute(); final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); @@ -1845,9 +1898,9 @@ private Set announceHistoricalSegmentBatch( log.infoSegments(partition, "Published segments to DB"); } else { final List failedToPublish = IntStream.range(0, partition.size()) - .filter(i -> affectedRows[i] != 1) - .mapToObj(partition::get) - .collect(Collectors.toList()); + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .collect(Collectors.toList()); throw new ISE( "Failed to publish segments to DB: %s", SegmentUtils.commaSeparatedIdentifiers(failedToPublish) @@ -2163,11 +2216,11 @@ private Set segmentExistsBatch(final Handle handle, final Set> segmentsLists = Lists.partition(new ArrayList<>(segments), MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE); for (List segmentList : segmentsLists) { String segmentIds = segmentList.stream() - .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") - .collect(Collectors.joining(",")); + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); List existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) - .mapTo(String.class) - .list(); + .mapTo(String.class) + .list(); existedSegments.addAll(existIds); } return existedSegments; @@ -2270,7 +2323,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( // Not in the desired start state. return new DataStoreMetadataUpdateResult(true, false, StringUtils.format( "Inconsistent metadata state. This can happen if you update input topic in a spec without changing " + - "the supervisor name. Stored state: [%s], Target state: [%s].", + "the supervisor name. Stored state: [%s], Target state: [%s].", oldCommitMetadataFromDb, startMetadata )); @@ -2289,12 +2342,12 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( - StringUtils.format( - "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " - + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " + + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("created_date", DateTimes.nowUtc().toString()) .bind("commit_metadata_payload", newCommitMetadataBytes) @@ -2302,23 +2355,23 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to insert metadata for datasource [%s]", - dataSource); + ? DataStoreMetadataUpdateResult.SUCCESS + : new DataStoreMetadataUpdateResult( + true, + true, + "Failed to insert metadata for datasource [%s]", + dataSource); } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( - StringUtils.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) .bind("new_commit_metadata_payload", newCommitMetadataBytes) @@ -2326,12 +2379,12 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to update metadata for datasource [%s]", - dataSource); + ? DataStoreMetadataUpdateResult.SUCCESS + : new DataStoreMetadataUpdateResult( + true, + true, + "Failed to update metadata for datasource [%s]", + dataSource); } if (retVal.isSuccess()) { @@ -2353,8 +2406,8 @@ public boolean deleteDataSourceMetadata(final String dataSource) public Boolean withHandle(Handle handle) { int rows = handle.createStatement( - StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) + StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) .bind("dataSource", dataSource) .execute(); @@ -2380,14 +2433,14 @@ public boolean resetDataSourceMetadata(final String dataSource, final DataSource public Boolean withHandle(Handle handle) { final int numRows = handle.createStatement( - StringUtils.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("new_commit_metadata_payload", newCommitMetadataBytes) .bind("new_commit_metadata_sha1", newCommitMetadataSha1) @@ -2640,10 +2693,10 @@ public int hashCode() public String toString() { return "DataStoreMetadataUpdateResult{" + - "failed=" + failed + - ", canRetry=" + canRetry + - ", errorMsg='" + errorMsg + '\'' + - '}'; + "failed=" + failed + + ", canRetry=" + canRetry + + ", errorMsg='" + errorMsg + '\'' + + '}'; } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b1b6f3aa16ea..5c416b561939 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; import org.apache.druid.data.input.StringTuple; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; @@ -464,6 +466,44 @@ private Boolean insertUsedSegments(Set dataSegments) ); } + private Boolean insertPendingSegmentAndSequenceName(Pair pendingSegmentSequenceName) + { + final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs; + final String sequenceName = pendingSegmentSequenceName.rhs; + final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable(); + return derbyConnector.retryWithHandle( + handle -> { + handle.createStatement( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + table, + derbyConnector.getQuoteString() + ) + ) + .bind("id", pendingSegment.toString()) + .bind("dataSource", pendingSegment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", pendingSegment.getInterval().getStart().toString()) + .bind("end", pendingSegment.getInterval().getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", pendingSegment.toString()) + .bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putLong((long) pendingSegment.hashCode() * sequenceName.hashCode()) + .hash() + .asBytes() + )) + .bind("payload", mapper.writeValueAsBytes(pendingSegment)) + .execute(); + return true; + } + ); + } + private Map getSegmentsCommittedDuringReplaceTask(String taskId) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable(); @@ -2554,6 +2594,64 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ); } + @Test + public void testGetPendingSegmentsForIntervalWithSequencePrefixes() + { + Pair validIntervalValidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(defaultSegment), + "validLOL" + ); + insertPendingSegmentAndSequenceName(validIntervalValidSequence); + + Pair validIntervalInvalidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(defaultSegment2), + "invalidRandom" + ); + insertPendingSegmentAndSequenceName(validIntervalInvalidSequence); + + Pair invalidIntervalvalidSequence = Pair.of( + SegmentIdWithShardSpec.fromDataSegment(existingSegment1), + "validStuff" + ); + insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence); + + Pair twentyFifteenWithAnotherValidSequence = Pair.of( + new SegmentIdWithShardSpec( + existingSegment1.getDataSource(), + Intervals.of("2015/2016"), + "1970-01-01", + new NumberedShardSpec(1, 0) + ), + "alsoValidAgain" + ); + insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence); + + Pair twentyFifteenWithInvalidSequence = Pair.of( + new SegmentIdWithShardSpec( + existingSegment1.getDataSource(), + Intervals.of("2015/2016"), + "1970-01-01", + new NumberedShardSpec(2, 0) + ), + "definitelyInvalid" + ); + insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence); + + + final Map expected = new HashMap<>(); + expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs); + expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs); + + final Map actual = + derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle( + handle, + defaultSegment.getDataSource(), + defaultSegment.getInterval(), + ImmutableSet.of("valid", "alsoValid") + )); + Assert.assertEquals(expected, actual); + } + private static class DS { static final String WIKI = "wiki"; From 548c4c0e1b998d30a44ede20d6172d38b4e198f5 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 18 Oct 2023 09:31:55 +0530 Subject: [PATCH 3/9] Improve coverage --- .../SeekableStreamSupervisorStateTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 819a6baacd8e..08d5b57b6f8e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1313,6 +1313,48 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0); } + @Test + public void testGetActiveRealtimeSequencePrefixes() throws IOException + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off two active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("2"), + ImmutableMap.of("2", "100"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task3"), + ImmutableSet.of() + ); + + Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size()); + } + @Test public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException { From 9120210e9038bf6222f1daf1a5fba56077cf2b21 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 18 Oct 2023 11:31:50 +0530 Subject: [PATCH 4/9] Fix tests --- .../MaterializedViewSupervisorSpecTest.java | 7 +++++++ .../supervisor/SeekableStreamSupervisorStateTest.java | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index fe84d358bafb..d597862b04dc 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated() Assert.assertTrue(e instanceof UnsupportedOperationException); } + try { + supervisor.getActiveRealtimeSequencePrefixes(); + } + catch (Exception e) { + Assert.assertTrue(e instanceof UnsupportedOperationException); + } + Callable noop = new Callable() { @Override public Integer call() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 08d5b57b6f8e..7a587bb196e1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1314,7 +1314,7 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In } @Test - public void testGetActiveRealtimeSequencePrefixes() throws IOException + public void testGetActiveRealtimeSequencePrefixes() { EasyMock.expect(spec.isSuspended()).andReturn(false); From c100b02c8069cda84cf107fbea1d091b7414d39b Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 18 Oct 2023 11:33:11 +0530 Subject: [PATCH 5/9] Fix checkstyle --- .../materializedview/MaterializedViewSupervisorSpecTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index d597862b04dc..87bb85e22f04 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -221,7 +221,7 @@ public void testMaterializedViewSupervisorSpecCreated() supervisor.getActiveRealtimeSequencePrefixes(); } catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); + Assert.assertTrue(e instanceof UnsupportedOperationException); } Callable noop = new Callable() { From 156d80934023378aa17a8f96c5c063e3e3d696ce Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 18 Oct 2023 11:39:35 +0530 Subject: [PATCH 6/9] Add javadoc --- .../supervisor/SeekableStreamSupervisor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 9583688e87db..a0ec6d809eb5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1093,6 +1093,13 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } + /** + * The base sequence name of a seekable stream task group is used as a prefix of the sequence names + * of pending segments published by it. + * This method can be used to identify the active pending segments for a datasource + * by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method + * @return the set of base sequence names of both active and pending completion task gruops. + */ @Override public Set getActiveRealtimeSequencePrefixes() { From e03329e7051ce6f346bf0a9762ac0e74d03d935f Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 23 Oct 2023 13:18:55 +0530 Subject: [PATCH 7/9] Address feedback --- .../MaterializedViewSupervisor.java | 2 +- .../MaterializedViewSupervisorSpecTest.java | 21 +--- .../supervisor/SupervisorManager.java | 5 +- .../IndexerMetadataStorageCoordinator.java | 2 +- .../overlord/supervisor/Supervisor.java | 2 +- .../IndexerSQLMetadataStorageCoordinator.java | 114 +++++++++--------- 6 files changed, 66 insertions(+), 80 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 70a84d80e49a..af5c0fbe95a1 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -299,7 +299,7 @@ public LagStats computeLagStats() @Override public Set getActiveRealtimeSequencePrefixes() { - throw new UnsupportedOperationException("Get Active sequence names is not supported in MaterializedViewSupervisor"); + throw new UnsupportedOperationException(); } @Override diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index 87bb85e22f04..365fb1751eac 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -203,26 +203,11 @@ public void testMaterializedViewSupervisorSpecCreated() SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor); Assert.assertNull(autoscaler); - try { - supervisor.computeLagStats(); - } - catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); - } + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.computeLagStats()); - try { - int count = supervisor.getActiveTaskGroupsCount(); - } - catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); - } + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount()); - try { - supervisor.getActiveRealtimeSequencePrefixes(); - } - catch (Exception e) { - Assert.assertTrue(e instanceof UnsupportedOperationException); - } + Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes()); Callable noop = new Callable() { @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index bef3d6b84b73..207ff56f28f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -113,10 +113,11 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String public Set getActiveRealtimeSequencePrefixes(String activeSupervisorId) { - if (!supervisors.containsKey(activeSupervisorId)) { + if (supervisors.containsKey(activeSupervisorId)) { + return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); + } else { return Collections.emptySet(); } - return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); } public Optional getSupervisorSpec(String id) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 0e41f33d85e0..34a55574dce9 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -347,7 +347,7 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @param activeRealtimeSequencePrefixes Set of base sequence names of active and pending completion task groups + * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups * of the supervisor (if any) for this datasource * @return Map from originally allocated pending segment to its new upgraded ID. */ diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index fbab17688a4e..9d940bc55b6c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -96,7 +96,7 @@ default Boolean isHealthy() int getActiveTaskGroupsCount(); /** - * @return active base sequence names for reading and pending completion task groups of a seekable stream supervisor + * @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor */ Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a58722bef8ca..a4e7dc21c52d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -274,12 +274,12 @@ Map getPendingSegmentsForIntervalWithHandle( } final List sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); - StringBuilder sql = new StringBuilder( - "SELECT sequence_name, payload FROM " - + dbTables.getPendingSegmentsTable() - + " WHERE dataSource = :dataSource AND start < :end AND " - + connector.getQuoteString() + "end" + connector.getQuoteString() + " > :start" - ); + + StringBuilder sql = new StringBuilder("SELECT sequence_name, payload"); + sql.append(" FROM ").append(dbTables.getPendingSegmentsTable()); + sql.append(" WHERE dataSource = :dataSource"); + sql.append(" AND start < :end"); + sql.append(StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())); sql.append(" AND ( "); @@ -1246,13 +1246,13 @@ private int insertPendingSegmentsIntoMetastore( { final PreparedBatch insertBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - )); + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + )); // Deduplicate the segment ids by inverting the map Map segmentIdToRequest = new HashMap<>(); @@ -1292,15 +1292,15 @@ private void insertPendingSegmentIntoMetastore( ) throws JsonProcessingException { handle.createStatement( - StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - ) - ) + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ) .bind("id", newIdentifier.toString()) .bind("dataSource", dataSource) .bind("created_date", DateTimes.nowUtc().toString()) @@ -1900,16 +1900,16 @@ private Set announceHistoricalSegmentBatch( for (DataSegment segment : partition) { final String now = DateTimes.nowUtc().toString(); preparedBatch.add() - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", now) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", usedSegments.contains(segment)) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) - .bind("used_status_last_updated", now); + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", now) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("version", segment.getVersion()) + .bind("used", usedSegments.contains(segment)) + .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .bind("used_status_last_updated", now); } final int[] affectedRows = preparedBatch.execute(); final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); @@ -1917,9 +1917,9 @@ private Set announceHistoricalSegmentBatch( log.infoSegments(partition, "Published segments to DB"); } else { final List failedToPublish = IntStream.range(0, partition.size()) - .filter(i -> affectedRows[i] != 1) - .mapToObj(partition::get) - .collect(Collectors.toList()); + .filter(i -> affectedRows[i] != 1) + .mapToObj(partition::get) + .collect(Collectors.toList()); throw new ISE( "Failed to publish segments to DB: %s", SegmentUtils.commaSeparatedIdentifiers(failedToPublish) @@ -2235,8 +2235,8 @@ private Set segmentExistsBatch(final Handle handle, final Set> segmentsLists = Lists.partition(new ArrayList<>(segments), MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE); for (List segmentList : segmentsLists) { String segmentIds = segmentList.stream() - .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") - .collect(Collectors.joining(",")); + .map(segment -> "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") + .collect(Collectors.joining(",")); List existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) .mapTo(String.class) .list(); @@ -2398,12 +2398,12 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to update metadata for datasource [%s]", - dataSource); + ? DataStoreMetadataUpdateResult.SUCCESS + : new DataStoreMetadataUpdateResult( + true, + true, + "Failed to update metadata for datasource [%s]", + dataSource); } if (retVal.isSuccess()) { @@ -2425,8 +2425,8 @@ public boolean deleteDataSourceMetadata(final String dataSource) public Boolean withHandle(Handle handle) { int rows = handle.createStatement( - StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) - ) + StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", dbTables.getDataSourceTable()) + ) .bind("dataSource", dataSource) .execute(); @@ -2452,14 +2452,14 @@ public boolean resetDataSourceMetadata(final String dataSource, final DataSource public Boolean withHandle(Handle handle) { final int numRows = handle.createStatement( - StringUtils.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("new_commit_metadata_payload", newCommitMetadataBytes) .bind("new_commit_metadata_sha1", newCommitMetadataSha1) @@ -2712,10 +2712,10 @@ public int hashCode() public String toString() { return "DataStoreMetadataUpdateResult{" + - "failed=" + failed + - ", canRetry=" + canRetry + - ", errorMsg='" + errorMsg + '\'' + - '}'; + "failed=" + failed + + ", canRetry=" + canRetry + + ", errorMsg='" + errorMsg + '\'' + + '}'; } } From daf03376f290b7392ddeb294fbc415eaa349e696 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 23 Oct 2023 13:28:04 +0530 Subject: [PATCH 8/9] Revert indentation changes --- .../IndexerSQLMetadataStorageCoordinator.java | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a4e7dc21c52d..c8f240d3e34c 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -333,15 +333,15 @@ private Map getPendingSegmentsForIntervalWithHan { final ResultIterator dbSegments = handle.createQuery( - StringUtils.format( - // This query might fail if the year has a different number of digits - // See https://github.com/apache/druid/pull/11582 for a similar issue - // Using long for these timestamps instead of varchar would give correct time comparisons - "SELECT sequence_name, payload FROM %1$s" - + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() - ) - ) + StringUtils.format( + // This query might fail if the year has a different number of digits + // See https://github.com/apache/druid/pull/11582 for a similar issue + // Using long for these timestamps instead of varchar would give correct time comparisons + "SELECT sequence_name, payload FROM %1$s" + + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", + dbTables.getPendingSegmentsTable(), connector.getQuoteString() + ) + ) .bind("dataSource", dataSource) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) @@ -2238,8 +2238,8 @@ private Set segmentExistsBatch(final Handle handle, final Set "'" + StringEscapeUtils.escapeSql(segment.getId().toString()) + "'") .collect(Collectors.joining(",")); List existIds = handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id in (%s)", dbTables.getSegmentsTable(), segmentIds)) - .mapTo(String.class) - .list(); + .mapTo(String.class) + .list(); existedSegments.addAll(existIds); } return existedSegments; @@ -2361,12 +2361,12 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = handle.createStatement( - StringUtils.format( - "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " - + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) " + + "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("created_date", DateTimes.nowUtc().toString()) .bind("commit_metadata_payload", newCommitMetadataBytes) @@ -2374,23 +2374,23 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : new DataStoreMetadataUpdateResult( - true, - true, - "Failed to insert metadata for datasource [%s]", - dataSource); + ? DataStoreMetadataUpdateResult.SUCCESS + : new DataStoreMetadataUpdateResult( + true, + true, + "Failed to insert metadata for datasource [%s]", + dataSource); } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = handle.createStatement( - StringUtils.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", - dbTables.getDataSourceTable() - ) - ) + StringUtils.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", + dbTables.getDataSourceTable() + ) + ) .bind("dataSource", dataSource) .bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb) .bind("new_commit_metadata_payload", newCommitMetadataBytes) From 92a853cd7431c732555fb16d3cf6d89636251c07 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 23 Oct 2023 16:05:34 +0530 Subject: [PATCH 9/9] Simplify pending segment query --- .../IndexerSQLMetadataStorageCoordinator.java | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index c8f240d3e34c..612f712c1bb2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -274,36 +274,25 @@ Map getPendingSegmentsForIntervalWithHandle( } final List sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); - - StringBuilder sql = new StringBuilder("SELECT sequence_name, payload"); - sql.append(" FROM ").append(dbTables.getPendingSegmentsTable()); - sql.append(" WHERE dataSource = :dataSource"); - sql.append(" AND start < :end"); - sql.append(StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString())); - - sql.append(" AND ( "); - - for (int i = 1; i < sequenceNamePrefixes.size(); i++) { - sql.append("(sequence_name LIKE ") - .append(StringUtils.format(":prefix%d", i)) - .append(")") - .append(" OR "); + final List sequenceNamePrefixConditions = new ArrayList<>(); + for (int i = 0; i < sequenceNamePrefixes.size(); i++) { + sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i)); } - sql.append("(sequence_name LIKE ") - .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size())) - .append(")"); + String sql = "SELECT sequence_name, payload" + + " FROM " + dbTables.getPendingSegmentsTable() + + " WHERE dataSource = :dataSource" + + " AND start < :end" + + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString()) + + " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )"; - sql.append(" )"); - - Query> query = - handle.createQuery(sql.toString()) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()); + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); - for (int i = 1; i <= sequenceNamePrefixes.size(); i++) { - query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i - 1) + "%"); + for (int i = 0; i < sequenceNamePrefixes.size(); i++) { + query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%"); } final ResultIterator dbSegments =