From 24e5d8a9e81fffd656e9544ae16bea53d7e60808 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 12 Dec 2024 18:16:57 -0800 Subject: [PATCH] Refactor: Minor cleanup of segment allocation flow (#17524) Changes -------- - Simplify the arguments of IndexerMetadataStorageCoordinator.allocatePendingSegment - Remove field SegmentCreateRequest.upgradedFromSegmentId as it was always null - Miscellaneous cleanup --- .../druid/indexing/overlord/TaskLockbox.java | 80 +++--- .../indexing/overlord/TaskLockboxTest.java | 4 +- ...TestIndexerMetadataStorageCoordinator.java | 11 +- .../partition/OverwriteShardSpec.java | 1 + .../timeline/partition/PartitionIds.java | 1 + .../IndexerMetadataStorageCoordinator.java | 22 +- .../overlord/SegmentCreateRequest.java | 20 +- .../IndexerSQLMetadataStorageCoordinator.java | 242 +++++++----------- .../overlord/SegmentCreateRequestTest.java | 1 - ...exerSQLMetadataStorageCoordinatorTest.java | 102 +++++--- 10 files changed, 213 insertions(+), 271 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 916f8cab75ca..2c1b78d3adab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -183,7 +183,7 @@ public int compare(Pair left, Pair right) ? savedTaskLock.withPriority(task.getPriority()) : savedTaskLock; - final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse( + final TaskLockPosse taskLockPosse = reacquireLockOnStartup( task, savedTaskLockWithPriority ); @@ -192,15 +192,11 @@ public int compare(Pair left, Pair right) if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; - log.info( - "Reacquired lock[%s] for task: %s", - taskLock, - task.getId() - ); + log.info("Reacquired lock[%s] for task[%s].", taskLock, task.getId()); } else { taskLockCount++; log.info( - "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", + "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task[%s].", savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getVersion(), taskLock.getVersion(), @@ -210,7 +206,7 @@ public int compare(Pair left, Pair right) } else { failedToReacquireLockTaskGroups.add(task.getGroupId()); log.error( - "Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.", + "Could not reacquire lock on interval[%s] version[%s] for task[%s], groupId[%s].", savedTaskLockWithPriority.getInterval(), savedTaskLockWithPriority.getVersion(), task.getId(), @@ -253,38 +249,28 @@ public int compare(Pair left, Pair right) } /** - * This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same - * groupId, dataSource, and priority. + * Reacquire lock during {@link #syncFromStorage()}. + * + * @return null if the lock could not be reacquired. */ @VisibleForTesting @Nullable - protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) { + if (!taskMatchesLock(task, taskLock)) { + log.warn( + "Task[datasource: %s, groupId: %s, priority: %s] does not match" + + " TaskLock[datasource: %s, groupId: %s, priority: %s].", + task.getDataSource(), task.getGroupId(), task.getPriority(), + taskLock.getDataSource(), taskLock.getGroupId(), taskLock.getNonNullPriority() + ); + return null; + } + giant.lock(); try { - Preconditions.checkArgument( - task.getGroupId().equals(taskLock.getGroupId()), - "lock groupId[%s] is different from task groupId[%s]", - taskLock.getGroupId(), - task.getGroupId() - ); - Preconditions.checkArgument( - task.getDataSource().equals(taskLock.getDataSource()), - "lock dataSource[%s] is different from task dataSource[%s]", - taskLock.getDataSource(), - task.getDataSource() - ); final int taskPriority = task.getPriority(); - final int lockPriority = taskLock.getNonNullPriority(); - - Preconditions.checkArgument( - lockPriority == taskPriority, - "lock priority[%s] is different from task priority[%s]", - lockPriority, - taskPriority - ); - final LockRequest request; switch (taskLock.getGranularity()) { case SEGMENT: @@ -313,15 +299,13 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL ); break; default: - throw new ISE("Unknown lockGranularity[%s]", taskLock.getGranularity()); + throw DruidException.defensive("Unknown lockGranularity[%s]", taskLock.getGranularity()); } return createOrFindLockPosse(request, task, false); } catch (Exception e) { - log.error(e, - "Could not reacquire lock for task: %s from metadata store", task.getId() - ); + log.error(e, "Could not reacquire lock for task[%s] from metadata store", task.getId()); return null; } finally { @@ -329,6 +313,17 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL } } + /** + * Returns true if the datasource, groupId and priority of the given Task + * match that of the TaskLock. + */ + private boolean taskMatchesLock(Task task, TaskLock taskLock) + { + return task.getGroupId().equals(taskLock.getGroupId()) + && task.getDataSource().equals(taskLock.getDataSource()) + && task.getPriority() == taskLock.getNonNullPriority(); + } + /** * Acquires a lock on behalf of a task. Blocks until the lock is acquired. * @@ -751,13 +746,15 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques { return metadataStorageCoordinator.allocatePendingSegment( request.getDataSource(), - request.getSequenceName(), - request.getPreviousSegmentId(), request.getInterval(), - request.getPartialShardSpec(), - version, request.isSkipSegmentLineageCheck(), - allocatorId + new SegmentCreateRequest( + request.getSequenceName(), + request.getPreviousSegmentId(), + version, + request.getPartialShardSpec(), + allocatorId + ) ); } @@ -1818,7 +1815,6 @@ SegmentCreateRequest getSegmentRequest() action.getPreviousSegmentId(), acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(), action.getPartialShardSpec(), - null, ((PendingSegmentAllocatingTask) task).getTaskAllocatorId() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a8c4b5117b1b..8f47b78a3bfe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -2270,10 +2270,10 @@ public NullLockPosseTaskLockbox( } @Override - protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) + protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) { return task.getGroupId() - .contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock); + .contains("FailingLockAcquisition") ? null : super.reacquireLockOnStartup(task, taskLock); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 54e323581c47..a95d73ce1bb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -36,7 +36,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; -import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -242,20 +241,16 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set * Note that the semantic of the interval (for `created_date`s) is different from the semantic of the interval * parameters in some other methods in this class, such as {@link #retrieveUsedSegmentsForInterval} (where the * interval is about the time column value in rows belonging to the segment). @@ -269,7 +257,7 @@ SegmentIdWithShardSpec allocatePendingSegment( *

* If startMetadata and endMetadata are set, this insertion will be atomic with a compare-and-swap on dataSource * commit metadata. - * + *

* If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting * {@param segments} and dropping {@param segmentsToDrop}. * @@ -426,7 +414,7 @@ List upgradePendingSegmentsOverlappingWith( * Similar to {@link #commitSegments}, but meant for streaming ingestion tasks for handling * the case where the task ingested no records and created no segments, but still needs to update the metadata * with the progress that the task made. - * + *

* The metadata should undergo the same validation checks as performed by {@link #commitSegments}. * * diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java index 49b31e5e6ff9..bcbf9416fe84 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java @@ -38,7 +38,6 @@ public class SegmentCreateRequest private final String sequenceName; private final String previousSegmentId; private final PartialShardSpec partialShardSpec; - private final String upgradedFromSegmentId; private final String taskAllocatorId; public SegmentCreateRequest( @@ -46,7 +45,6 @@ public SegmentCreateRequest( String previousSegmentId, String version, PartialShardSpec partialShardSpec, - String upgradedFromSegmentId, String taskAllocatorId ) { @@ -54,24 +52,31 @@ public SegmentCreateRequest( this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId; this.version = version; this.partialShardSpec = partialShardSpec; - this.upgradedFromSegmentId = upgradedFromSegmentId; this.taskAllocatorId = taskAllocatorId; } + /** + * Represents group of ingestion tasks that produce a segment series. + */ public String getSequenceName() { return sequenceName; } /** - * Non-null previous segment id. This can be used for persisting to the - * pending segments table in the metadata store. + * Previous segment id allocated for this sequence. + * + * @return Empty string if there is no previous segment in the series. */ public String getPreviousSegmentId() { return previousSegmentId; } + /** + * Version of the lock held by the task that has requested the segment allocation. + * The allocated segment must have a version less than or equal to this version. + */ public String getVersion() { return version; @@ -82,11 +87,6 @@ public PartialShardSpec getPartialShardSpec() return partialShardSpec; } - public String getUpgradedFromSegmentId() - { - return upgradedFromSegmentId; - } - public String getTaskAllocatorId() { return taskAllocatorId; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0717c9b07eea..c85fd1c49608 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -80,7 +80,6 @@ import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.Update; import org.skife.jdbi.v2.exceptions.CallbackFailedException; -import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; import javax.annotation.Nullable; @@ -350,8 +349,7 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv /** * Fetches all the pending segments, whose interval overlaps with the given search interval, from the metadata store. */ - @VisibleForTesting - List getPendingSegmentsForIntervalWithHandle( + private List getPendingSegmentsForInterval( final Handle handle, final String dataSource, final Interval interval @@ -390,7 +388,7 @@ List getPendingSegmentsForIntervalWithHandle( return pendingSegments.build(); } - List getPendingSegmentsForTaskAllocatorIdWithHandle( + private List getPendingSegmentsForTaskAllocatorId( final Handle handle, final String dataSource, final String taskAllocatorId @@ -580,7 +578,7 @@ public SegmentPublishResult commitReplaceSegments( ); } } - SegmentPublishResult result = SegmentPublishResult.ok( + return SegmentPublishResult.ok( insertSegments( handle, segmentsToInsert, @@ -591,7 +589,6 @@ public SegmentPublishResult commitReplaceSegments( ), upgradePendingSegmentsOverlappingWith(segmentsToInsert) ); - return result; }, 3, getSqlMetadataMaxRetry() @@ -740,21 +737,16 @@ public Map allocatePendingSegments } @Override + @Nullable public SegmentIdWithShardSpec allocatePendingSegment( final String dataSource, - final String sequenceName, - @Nullable final String previousSegmentId, final Interval interval, - final PartialShardSpec partialShardSpec, - final String maxVersion, final boolean skipSegmentLineageCheck, - String taskAllocatorId + final SegmentCreateRequest createRequest ) { Preconditions.checkNotNull(dataSource, "dataSource"); - Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(interval, "interval"); - Preconditions.checkNotNull(maxVersion, "version"); final Interval allocateInterval = interval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( @@ -776,24 +768,17 @@ public SegmentIdWithShardSpec allocatePendingSegment( return allocatePendingSegment( handle, dataSource, - sequenceName, allocateInterval, - partialShardSpec, - maxVersion, - existingChunks, - taskAllocatorId + createRequest, + existingChunks ); } else { return allocatePendingSegmentWithSegmentLineageCheck( handle, dataSource, - sequenceName, - previousSegmentId, allocateInterval, - partialShardSpec, - maxVersion, - existingChunks, - taskAllocatorId + createRequest, + existingChunks ); } } @@ -854,7 +839,7 @@ private List upgradePendingSegments( int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum(); final List overlappingPendingSegments - = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval); + = getPendingSegmentsForInterval(handle, datasource, replaceInterval); for (PendingSegmentRecord overlappingPendingSegment : overlappingPendingSegments) { final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getId(); @@ -929,17 +914,11 @@ private boolean shouldUpgradePendingSegment( private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final Handle handle, final String dataSource, - final String sequenceName, - @Nullable final String previousSegmentId, final Interval interval, - final PartialShardSpec partialShardSpec, - final String maxVersion, - final List> existingChunks, - final String taskAllocatorId + final SegmentCreateRequest createRequest, + final List> existingChunks ) throws IOException { - final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; - final String sql = StringUtils.format( "SELECT payload FROM %s WHERE " + "dataSource = :dataSource AND " @@ -950,15 +929,15 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final Query> query = handle.createQuery(sql) .bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull); + .bind("sequence_name", createRequest.getSequenceName()) + .bind("sequence_prev_id", createRequest.getPreviousSegmentId()); final String usedSegmentVersion = existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion(); final CheckExistingSegmentIdResult result = findExistingPendingSegment( query, interval, - sequenceName, - previousSegmentIdNotNull, + createRequest.getSequenceName(), + createRequest.getPreviousSegmentId(), usedSegmentVersion ); @@ -967,12 +946,12 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( return result.segmentIdentifier; } - final SegmentIdWithShardSpec newIdentifier = createNewSegment( + final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment( handle, dataSource, interval, - partialShardSpec, - maxVersion, + createRequest.getPartialShardSpec(), + createRequest.getVersion(), existingChunks ); if (newIdentifier == null) { @@ -989,9 +968,9 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( Hashing.sha1() .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) + .putBytes(StringUtils.toUtf8(createRequest.getSequenceName())) .putByte((byte) 0xff) - .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .putBytes(StringUtils.toUtf8(createRequest.getPreviousSegmentId())) .putByte((byte) 0xff) .putBytes(StringUtils.toUtf8(newIdentifier.getVersion())) .hash() @@ -1003,10 +982,10 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( newIdentifier, dataSource, interval, - previousSegmentIdNotNull, - sequenceName, + createRequest.getPreviousSegmentId(), + createRequest.getSequenceName(), sequenceNamePrevIdSha1, - taskAllocatorId + createRequest.getTaskAllocatorId() ); return newIdentifier; } @@ -1108,12 +1087,9 @@ private Map allocatePendingSegment private SegmentIdWithShardSpec allocatePendingSegment( final Handle handle, final String dataSource, - final String sequenceName, final Interval interval, - final PartialShardSpec partialShardSpec, - final String maxVersion, - final List> existingChunks, - final String taskAllocatorId + final SegmentCreateRequest createRequest, + final List> existingChunks ) throws IOException { final String sql = StringUtils.format( @@ -1128,14 +1104,14 @@ private SegmentIdWithShardSpec allocatePendingSegment( final Query> query = handle.createQuery(sql) .bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) + .bind("sequence_name", createRequest.getSequenceName()) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()); final CheckExistingSegmentIdResult result = findExistingPendingSegment( query, interval, - sequenceName, + createRequest.getSequenceName(), null, existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion() ); @@ -1144,12 +1120,12 @@ private SegmentIdWithShardSpec allocatePendingSegment( return result.segmentIdentifier; } - final SegmentIdWithShardSpec newIdentifier = createNewSegment( + final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment( handle, dataSource, interval, - partialShardSpec, - maxVersion, + createRequest.getPartialShardSpec(), + createRequest.getVersion(), existingChunks ); if (newIdentifier == null) { @@ -1166,7 +1142,7 @@ private SegmentIdWithShardSpec allocatePendingSegment( final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( Hashing.sha1() .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) + .putBytes(StringUtils.toUtf8(createRequest.getSequenceName())) .putByte((byte) 0xff) .putLong(interval.getStartMillis()) .putLong(interval.getEndMillis()) @@ -1183,14 +1159,14 @@ private SegmentIdWithShardSpec allocatePendingSegment( dataSource, interval, "", - sequenceName, + createRequest.getSequenceName(), sequenceNamePrevIdSha1, - taskAllocatorId + createRequest.getTaskAllocatorId() ); log.info( - "Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].", - newIdentifier, dataSource, sequenceName, interval + "Created new pending segment[%s] for datasource[%s], interval[%s].", + newIdentifier, dataSource, interval ); return newIdentifier; @@ -1334,7 +1310,6 @@ private CheckExistingSegmentIdResult findExistingPendingSegment( private static class CheckExistingSegmentIdResult { private final boolean found; - @Nullable private final SegmentIdWithShardSpec segmentIdentifier; CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdWithShardSpec segmentIdentifier) @@ -1391,21 +1366,6 @@ public int hashCode() } } - private static void bindColumnValuesToQueryWithInCondition( - final String columnName, - final List values, - final Update query - ) - { - if (values == null) { - return; - } - - for (int i = 0; i < values.size(); i++) { - query.bind(StringUtils.format("%s%d", columnName, i), values.get(i)); - } - } - private int deletePendingSegmentsById(Handle handle, String datasource, List pendingSegmentIds) { if (pendingSegmentIds.isEmpty()) { @@ -1419,7 +1379,7 @@ private int deletePendingSegmentsById(Handle handle, String datasource, List segmentIdsForNewVersions = connector.retryTransaction( (handle, transactionStatus) - -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, dataSource, taskAllocatorId), + -> getPendingSegmentsForTaskAllocatorId(handle, dataSource, taskAllocatorId), 0, SQLMetadataConnector.DEFAULT_MAX_TRIES ); @@ -1668,11 +1628,11 @@ private Map createNewSegments( // across all shard specs (published + pending). // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. - final Set pendingSegments = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() - .map(PendingSegmentRecord::getId) - .collect(Collectors.toSet()) - ); + final Set pendingSegments = + getPendingSegmentsForInterval(handle, dataSource, interval) + .stream() + .map(PendingSegmentRecord::getId) + .collect(Collectors.toSet()); final Map createdSegments = new HashMap<>(); final Map uniqueRequestToSegment = new HashMap<>(); @@ -1686,7 +1646,7 @@ private Map createNewSegments( if (uniqueRequestToSegment.containsKey(uniqueRequest)) { createdSegment = uniqueRequestToSegment.get(uniqueRequest); } else { - createdSegment = createNewSegment( + createdSegment = createNewPendingSegment( request, dataSource, interval, @@ -1712,7 +1672,8 @@ private Map createNewSegments( return createdSegments; } - private PendingSegmentRecord createNewSegment( + @Nullable + private PendingSegmentRecord createNewPendingSegment( SegmentCreateRequest request, String dataSource, Interval interval, @@ -1775,17 +1736,14 @@ private PendingSegmentRecord createNewSegment( pendingSegmentId, request.getSequenceName(), request.getPreviousSegmentId(), - request.getUpgradedFromSegmentId(), + null, request.getTaskAllocatorId() ); } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - existingVersion, - overallMaxId + dataSource, interval, existingVersion, overallMaxId ); return null; } else if (committedMaxId != null @@ -1793,8 +1751,7 @@ private PendingSegmentRecord createNewSegment( == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { log.warn( "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", - committedMaxId, - committedMaxId.getShardSpec() + committedMaxId, committedMaxId.getShardSpec() ); return null; } else { @@ -1815,28 +1772,20 @@ private PendingSegmentRecord createNewSegment( getTrueAllocatedId(pendingSegmentId), request.getSequenceName(), request.getPreviousSegmentId(), - request.getUpgradedFromSegmentId(), + null, request.getTaskAllocatorId() ); } } /** - * This function creates a new segment for the given datasource/interval/etc. A critical - * aspect of the creation is to make sure that the new version & new partition number will make - * sense given the existing segments & pending segments also very important is to avoid - * clashes with existing pending & used/unused segments. - * @param handle Database handle - * @param dataSource datasource for the new segment - * @param interval interval for the new segment + * Creates a new pending segment for the given datasource and interval. * @param partialShardSpec Shard spec info minus segment id stuff * @param existingVersion Version of segments in interval, used to compute the version of the very first segment in * interval - * @return - * @throws IOException */ @Nullable - private SegmentIdWithShardSpec createNewSegment( + private SegmentIdWithShardSpec createNewPendingSegment( final Handle handle, final String dataSource, final Interval interval, @@ -1876,11 +1825,12 @@ private SegmentIdWithShardSpec createNewSegment( // across all shard specs (published + pending). // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. - final Set pendings = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() - .map(PendingSegmentRecord::getId) - .collect(Collectors.toSet()) - ); + final Set pendings = + getPendingSegmentsForInterval(handle, dataSource, interval) + .stream() + .map(PendingSegmentRecord::getId) + .collect(Collectors.toSet()); + if (committedMaxId != null) { pendings.add(committedMaxId); } @@ -1910,11 +1860,9 @@ private SegmentIdWithShardSpec createNewSegment( } if (overallMaxId == null) { - // When appending segments, null overallMaxId means that we are allocating the very initial - // segment for this time chunk. - // This code is executed when the Overlord coordinates segment allocation, which is either you append segments - // or you use segment lock. Since the core partitions set is not determined for appended segments, we set - // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the + // We are allocating the very first segment for this time chunk. + // Set numCorePartitions to 0 as core partitions are not determined for append segments + // When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the // OvershadowableManager handles the atomic segment update. final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID @@ -1929,10 +1877,7 @@ private SegmentIdWithShardSpec createNewSegment( } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - existingVersion, - overallMaxId + dataSource, interval, existingVersion, overallMaxId ); return null; } else if (committedMaxId != null @@ -1940,14 +1885,12 @@ private SegmentIdWithShardSpec createNewSegment( == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { log.warn( "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", - committedMaxId, - committedMaxId.getShardSpec() + committedMaxId, committedMaxId.getShardSpec() ); return null; } else { - // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. - // When the core partitions have been dropped, using pending segments may lead to an incorrect state - // where the chunk is believed to have core partitions and queries results are incorrect. + // numCorePartitions must always be picked from the committedMaxId and not overallMaxId + // as overallMaxId may refer to a pending segment which might have stale info of numCorePartitions final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec( dataSource, interval, @@ -1963,7 +1906,7 @@ private SegmentIdWithShardSpec createNewSegment( } /** - * Verifies that the allocated id doesn't already exist in the druid segments table. + * Verifies that the allocated id doesn't already exist in the druid_segments table. * If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval * Otherwise, use the same id. * @param allocatedId The segment allcoted on the basis of used and pending segments @@ -1977,7 +1920,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat } // If yes, try to compute allocated partition num using the max unused segment shard spec - SegmentId unusedMaxId = getUnusedMaxId( + SegmentId unusedMaxId = getMaxIdOfUnusedSegment( allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion() @@ -2002,7 +1945,14 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat ); } - private SegmentId getUnusedMaxId(String datasource, Interval interval, String version) + /** + * Determines the highest ID amongst unused segments for the given datasource, + * interval and version. + * + * @return null if no unused segment exists for the given parameters. + */ + @Nullable + private SegmentId getMaxIdOfUnusedSegment(String datasource, Interval interval, String version) { List unusedSegmentIds = retrieveUnusedSegmentIdsForExactIntervalAndVersion( datasource, @@ -2134,7 +2084,7 @@ private Set announceHistoricalSegmentBatch( .bind("created_date", now) .bind("start", segment.getInterval().getStart().toString()) .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) + .bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec)) .bind("version", segment.getVersion()) .bind("used", usedSegments.contains(segment)) .bind("payload", jsonMapper.writeValueAsBytes(segment)) @@ -2330,9 +2280,7 @@ private Set insertSegments( Map upgradedFromSegmentIdMap ) throws IOException { - boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping); - - if (shouldPersistSchema) { + if (shouldPersistSchema(segmentSchemaMapping)) { persistSchema(handle, segments, segmentSchemaMapping); } @@ -2407,6 +2355,7 @@ private Set insertSegments( return segmentsToInsert; } + @Nullable private SegmentMetadata getSegmentMetadataFromSchemaMappingOrUpgradeMetadata( final SegmentId segmentId, final SegmentSchemaMapping segmentSchemaMapping, @@ -2786,27 +2735,18 @@ public boolean resetDataSourceMetadata(final String dataSource, final DataSource Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); + final String sql = "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource"; return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) - { - final int numRows = handle.createStatement( - StringUtils.format( - "UPDATE %s SET " - + "commit_metadata_payload = :new_commit_metadata_payload, " - + "commit_metadata_sha1 = :new_commit_metadata_sha1 " - + "WHERE dataSource = :dataSource", - dbTables.getDataSourceTable() - ) - ) - .bind("dataSource", dataSource) - .bind("new_commit_metadata_payload", newCommitMetadataBytes) - .bind("new_commit_metadata_sha1", newCommitMetadataSha1) - .execute(); - return numRows == 1; - } + handle -> { + final int numRows = handle.createStatement(StringUtils.format(sql, dbTables.getDataSourceTable())) + .bind("dataSource", dataSource) + .bind("new_commit_metadata_payload", newCommitMetadataBytes) + .bind("new_commit_metadata_sha1", newCommitMetadataSha1) + .execute(); + return numRows == 1; } ); } @@ -3028,7 +2968,7 @@ public int deletePendingSegmentsForTaskAllocatorId(final String datasource, fina public List getPendingSegments(String datasource, Interval interval) { return connector.retryWithHandle( - handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, interval) + handle -> getPendingSegmentsForInterval(handle, datasource, interval) ); } @@ -3178,7 +3118,6 @@ public static class DataStoreMetadataUpdateResult { private final boolean failed; private final boolean canRetry; - @Nullable private final String errorMsg; public static final DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null); @@ -3198,7 +3137,6 @@ public static DataStoreMetadataUpdateResult retryableFailure(String errorMsgForm this.failed = failed; this.canRetry = canRetry; this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, errorFormatArgs); - } public boolean isFailed() diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java index 57e01d76a446..567867bd97e3 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java @@ -36,7 +36,6 @@ public void testNullPreviousSegmentId() null, "version", partialShardSpec, - null, null ); Assert.assertEquals("sequence", request.getSequenceName()); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index a000fbec5a3e..06bbf3b7ecd8 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2280,7 +2280,7 @@ public void testAllocatePendingSegment() final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", null, @@ -2293,7 +2293,7 @@ public void testAllocatePendingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString()); - final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier1 = allocatePendingSegment( dataSource, "seq", identifier.toString(), @@ -2306,7 +2306,7 @@ public void testAllocatePendingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString()); - final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier2 = allocatePendingSegment( dataSource, "seq", identifier1.toString(), @@ -2319,7 +2319,7 @@ public void testAllocatePendingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString()); - final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier3 = allocatePendingSegment( dataSource, "seq", identifier1.toString(), @@ -2333,7 +2333,7 @@ public void testAllocatePendingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString()); Assert.assertEquals(identifier2, identifier3); - final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier4 = allocatePendingSegment( dataSource, "seq1", null, @@ -2370,7 +2370,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", null, @@ -2385,7 +2385,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions()); // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name) - final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier1 = allocatePendingSegment( dataSource, "seq2", identifier.toString(), @@ -2400,7 +2400,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() Assert.assertEquals(0, identifier1.getShardSpec().getNumCorePartitions()); // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name) - final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier2 = allocatePendingSegment( dataSource, "seq3", identifier1.toString(), @@ -2431,7 +2431,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new", ids.get(0)); // one more load on same interval: - final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier3 = allocatePendingSegment( dataSource, "seq4", identifier1.toString(), @@ -2450,7 +2450,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() // and final load, this reproduces an issue that could happen with multiple streaming appends, // followed by a reindex, followed by a drop, and more streaming data coming in for same interval - final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier4 = allocatePendingSegment( dataSource, "seq5", identifier1.toString(), @@ -2484,7 +2484,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", null, @@ -2513,7 +2513,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() // 1.1) simulate one more append load (as if previous segment was published, note different sequence name) - final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier1 = allocatePendingSegment( dataSource, "seq2", identifier.toString(), @@ -2542,7 +2542,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() // 1.2) simulate one more append load (as if previous segment was published, note different sequence name) - final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier2 = allocatePendingSegment( dataSource, "seq3", identifier1.toString(), @@ -2597,7 +2597,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() // unused segment: // 4) pending segment of version = B, id = 1 <= appending new data, aborted - final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier3 = allocatePendingSegment( dataSource, "seq4", identifier2.toString(), @@ -2632,7 +2632,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() Assert.assertEquals(1, unused.size()); // Simulate one more append load - final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier4 = allocatePendingSegment( dataSource, "seq5", identifier1.toString(), @@ -2678,7 +2678,7 @@ public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() final Interval interval = Intervals.of("2017-01-01/2017-02-01"); final String sequenceName = "seq"; - final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null); final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2690,7 +2690,7 @@ public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); final SegmentCreateRequest request1 = - new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2702,7 +2702,7 @@ public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); final SegmentCreateRequest request2 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2714,7 +2714,7 @@ public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); final SegmentCreateRequest request3 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2727,7 +2727,7 @@ public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() Assert.assertEquals(segmentId2, segmentId3); final SegmentCreateRequest request4 = - new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null); final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2747,7 +2747,7 @@ public void testAllocatePendingSegments() final Interval interval = Intervals.of("2017-01-01/2017-02-01"); final String sequenceName = "seq"; - final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null); final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2759,7 +2759,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); final SegmentCreateRequest request1 = - new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2771,7 +2771,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); final SegmentCreateRequest request2 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2783,7 +2783,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); final SegmentCreateRequest request3 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null); final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2796,7 +2796,7 @@ public void testAllocatePendingSegments() Assert.assertEquals(segmentId2, segmentId3); final SegmentCreateRequest request4 = - new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null); final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2833,7 +2833,7 @@ public void testNoPendingSegmentsAndOneUsedSegment() final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", null, @@ -2857,7 +2857,7 @@ public void testDeletePendingSegment() throws InterruptedException final DateTime begin = DateTimes.nowUtc(); for (int i = 0; i < 10; i++) { - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", prevSegmentId, @@ -2873,7 +2873,7 @@ public void testDeletePendingSegment() throws InterruptedException final DateTime secondBegin = DateTimes.nowUtc(); for (int i = 0; i < 5; i++) { - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", prevSegmentId, @@ -2901,7 +2901,7 @@ public void testAllocatePendingSegmentsWithOvershadowingSegments() String prevSegmentId = null; for (int i = 0; i < 10; i++) { - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( dataSource, "seq", prevSegmentId, @@ -2970,7 +2970,7 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() final String dataSource = "ds"; final Interval interval = Intervals.of("2017-01-01/2017-02-01"); - SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( + SegmentIdWithShardSpec id = allocatePendingSegment( dataSource, "seq", null, @@ -3003,7 +3003,7 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - id = coordinator.allocatePendingSegment( + id = allocatePendingSegment( dataSource, "seq2", null, @@ -3036,7 +3036,7 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - id = coordinator.allocatePendingSegment( + id = allocatePendingSegment( dataSource, "seq3", null, @@ -3084,7 +3084,7 @@ public void testAddNumberedShardSpecAfterMultiDimensionsShardSpecWithUnknownCore ); } coordinator.commitSegments(originalSegments, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)); - final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec id = allocatePendingSegment( datasource, "seq", null, @@ -3130,7 +3130,7 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor ); } coordinator.commitSegments(originalSegments, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)); - final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec id = allocatePendingSegment( datasource, "seq", null, @@ -3377,7 +3377,7 @@ public void testTimelineVisibilityWith0CorePartitionTombstone() Assert.assertTrue(coordinator.commitSegments(tombstones, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones)); // Allocate and commit a data segment by appending to the same interval - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( TestDataSource.WIKI, "seq", tombstoneSegment.getVersion(), @@ -3432,7 +3432,7 @@ public void testTimelineWith1CorePartitionTombstone() Assert.assertTrue(coordinator.commitSegments(tombstones, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones)); // Allocate and commit a data segment by appending to the same interval - final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec identifier = allocatePendingSegment( TestDataSource.WIKI, "seq", tombstoneSegment.getVersion(), @@ -3471,7 +3471,7 @@ public void testTimelineWith1CorePartitionTombstone() @Test public void testSegmentIdShouldNotBeReallocated() { - final SegmentIdWithShardSpec idWithNullTaskAllocator = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec idWithNullTaskAllocator = allocatePendingSegment( TestDataSource.WIKI, "seq", "0", @@ -3487,7 +3487,7 @@ public void testSegmentIdShouldNotBeReallocated() idWithNullTaskAllocator.getShardSpec() ); - final SegmentIdWithShardSpec idWithValidTaskAllocator = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec idWithValidTaskAllocator = allocatePendingSegment( TestDataSource.WIKI, "seq", "1", @@ -3510,7 +3510,7 @@ public void testSegmentIdShouldNotBeReallocated() // Mark all segments as unused coordinator.markSegmentsAsUnusedWithinInterval(TestDataSource.WIKI, Intervals.ETERNITY); - final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment( + final SegmentIdWithShardSpec theId = allocatePendingSegment( TestDataSource.WIKI, "seq", "2", @@ -3791,6 +3791,30 @@ public void testRetrieveUsedSegmentsForSegmentAllocation() ); } + private SegmentIdWithShardSpec allocatePendingSegment( + String datasource, + String sequenceName, + String previousSegmentId, + Interval interval, + PartialShardSpec partialShardSpec, + String maxVersion, + boolean skipSegmentLineageCheck, + String taskAllocatorId + ) + { + return coordinator.allocatePendingSegment( + datasource, + interval, + skipSegmentLineageCheck, + new SegmentCreateRequest( + sequenceName, + previousSegmentId, + maxVersion, + partialShardSpec, + taskAllocatorId + ) + ); + } private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) {