Skip to content

Commit

Permalink
Refactor: Minor cleanup of segment allocation flow (#17524)
Browse files Browse the repository at this point in the history
Changes
--------
- Simplify the arguments of IndexerMetadataStorageCoordinator.allocatePendingSegment
- Remove field SegmentCreateRequest.upgradedFromSegmentId as it was always null
- Miscellaneous cleanup
  • Loading branch information
kfaraz authored Dec 13, 2024
1 parent b86ea4d commit 24e5d8a
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
? savedTaskLock.withPriority(task.getPriority())
: savedTaskLock;

final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse(
final TaskLockPosse taskLockPosse = reacquireLockOnStartup(
task,
savedTaskLockWithPriority
);
Expand All @@ -192,15 +192,11 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)

if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
taskLock,
task.getId()
);
log.info("Reacquired lock[%s] for task[%s].", taskLock, task.getId());
} else {
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task[%s].",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
Expand All @@ -210,7 +206,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
} else {
failedToReacquireLockTaskGroups.add(task.getGroupId());
log.error(
"Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.",
"Could not reacquire lock on interval[%s] version[%s] for task[%s], groupId[%s].",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
task.getId(),
Expand Down Expand Up @@ -253,38 +249,28 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
}

/**
* This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same
* groupId, dataSource, and priority.
* Reacquire lock during {@link #syncFromStorage()}.
*
* @return null if the lock could not be reacquired.
*/
@VisibleForTesting
@Nullable
protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock)
protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock)
{
if (!taskMatchesLock(task, taskLock)) {
log.warn(
"Task[datasource: %s, groupId: %s, priority: %s] does not match"
+ " TaskLock[datasource: %s, groupId: %s, priority: %s].",
task.getDataSource(), task.getGroupId(), task.getPriority(),
taskLock.getDataSource(), taskLock.getGroupId(), taskLock.getNonNullPriority()
);
return null;
}

giant.lock();

try {
Preconditions.checkArgument(
task.getGroupId().equals(taskLock.getGroupId()),
"lock groupId[%s] is different from task groupId[%s]",
taskLock.getGroupId(),
task.getGroupId()
);
Preconditions.checkArgument(
task.getDataSource().equals(taskLock.getDataSource()),
"lock dataSource[%s] is different from task dataSource[%s]",
taskLock.getDataSource(),
task.getDataSource()
);
final int taskPriority = task.getPriority();
final int lockPriority = taskLock.getNonNullPriority();

Preconditions.checkArgument(
lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
lockPriority,
taskPriority
);

final LockRequest request;
switch (taskLock.getGranularity()) {
case SEGMENT:
Expand Down Expand Up @@ -313,22 +299,31 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL
);
break;
default:
throw new ISE("Unknown lockGranularity[%s]", taskLock.getGranularity());
throw DruidException.defensive("Unknown lockGranularity[%s]", taskLock.getGranularity());
}

return createOrFindLockPosse(request, task, false);
}
catch (Exception e) {
log.error(e,
"Could not reacquire lock for task: %s from metadata store", task.getId()
);
log.error(e, "Could not reacquire lock for task[%s] from metadata store", task.getId());
return null;
}
finally {
giant.unlock();
}
}

/**
* Returns true if the datasource, groupId and priority of the given Task
* match that of the TaskLock.
*/
private boolean taskMatchesLock(Task task, TaskLock taskLock)
{
return task.getGroupId().equals(taskLock.getGroupId())
&& task.getDataSource().equals(taskLock.getDataSource())
&& task.getPriority() == taskLock.getNonNullPriority();
}

/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
Expand Down Expand Up @@ -751,13 +746,15 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques
{
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getInterval(),
request.getPartialShardSpec(),
version,
request.isSkipSegmentLineageCheck(),
allocatorId
new SegmentCreateRequest(
request.getSequenceName(),
request.getPreviousSegmentId(),
version,
request.getPartialShardSpec(),
allocatorId
)
);
}

Expand Down Expand Up @@ -1818,7 +1815,6 @@ SegmentCreateRequest getSegmentRequest()
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
action.getPartialShardSpec(),
null,
((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,10 +2270,10 @@ public NullLockPosseTaskLockbox(
}

@Override
protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock)
protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock)
{
return task.getGroupId()
.contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock);
.contains("FailingLockAcquisition") ? null : super.reacquireLockOnStartup(task, taskLock);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -242,20 +241,16 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set<Strin
@Override
public SegmentIdWithShardSpec allocatePendingSegment(
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
boolean skipSegmentLineageCheck,
String taskAllocatorId
SegmentCreateRequest createRequest
)
{
return new SegmentIdWithShardSpec(
dataSource,
interval,
maxVersion,
partialShardSpec.complete(objectMapper, 0, 0)
createRequest.getVersion(),
createRequest.getPartialShardSpec().complete(objectMapper, 0, 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
* ShardSpec for non-first-generation segments.
* This shardSpec is created only by overwriting tasks using segment locks.
* This shardSpec is allocated a partitionId between {@link PartitionIds#NON_ROOT_GEN_START_PARTITION_ID} and
* {@link PartitionIds#NON_ROOT_GEN_END_PARTITION_ID}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class PartitionIds
public static final int ROOT_GEN_END_PARTITION_ID = 32768; // exclusive
/**
* Start partitionId available for non-root generation segments.
* Used only with segment locks.
*/
public static final int NON_ROOT_GEN_START_PARTITION_ID = 32768;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -215,34 +214,23 @@ SegmentTimeline getSegmentTimelineForAllocation(
* Note that a segment sequence may include segments with a variety of different intervals and versions.
*
* @param dataSource dataSource for which to allocate a segment
* @param sequenceName name of the group of ingestion tasks producing a segment series
* @param previousSegmentId previous segment in the series; may be null or empty, meaning this is the first
* segment
* @param interval interval for which to allocate a segment
* @param partialShardSpec partialShardSpec containing all necessary information to create a shardSpec for the
* new segmentId
* @param maxVersion use this version if we have no better version to use. The returned segment
* identifier may have a version lower than this one, but will not have one higher.
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
* @param taskAllocatorId The task allocator id with which the pending segment is associated
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
@Nullable
SegmentIdWithShardSpec allocatePendingSegment(
String dataSource,
String sequenceName,
@Nullable String previousSegmentId,
Interval interval,
PartialShardSpec partialShardSpec,
String maxVersion,
boolean skipSegmentLineageCheck,
String taskAllocatorId
SegmentCreateRequest createRequest
);

/**
* Delete pending segments created in the given interval belonging to the given data source from the pending segments
* table. The {@code created_date} field of the pending segments table is checked to find segments to be deleted.
*
* <p>
* Note that the semantic of the interval (for `created_date`s) is different from the semantic of the interval
* parameters in some other methods in this class, such as {@link #retrieveUsedSegmentsForInterval} (where the
* interval is about the time column value in rows belonging to the segment).
Expand All @@ -269,7 +257,7 @@ SegmentIdWithShardSpec allocatePendingSegment(
* <p/>
* If startMetadata and endMetadata are set, this insertion will be atomic with a compare-and-swap on dataSource
* commit metadata.
*
* <p>
* If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting
* {@param segments} and dropping {@param segmentsToDrop}.
*
Expand Down Expand Up @@ -426,7 +414,7 @@ List<PendingSegmentRecord> upgradePendingSegmentsOverlappingWith(
* Similar to {@link #commitSegments}, but meant for streaming ingestion tasks for handling
* the case where the task ingested no records and created no segments, but still needs to update the metadata
* with the progress that the task made.
*
* <p>
* The metadata should undergo the same validation checks as performed by {@link #commitSegments}.
*
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,45 @@ public class SegmentCreateRequest
private final String sequenceName;
private final String previousSegmentId;
private final PartialShardSpec partialShardSpec;
private final String upgradedFromSegmentId;
private final String taskAllocatorId;

public SegmentCreateRequest(
String sequenceName,
String previousSegmentId,
String version,
PartialShardSpec partialShardSpec,
String upgradedFromSegmentId,
String taskAllocatorId
)
{
this.sequenceName = sequenceName;
this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId;
this.version = version;
this.partialShardSpec = partialShardSpec;
this.upgradedFromSegmentId = upgradedFromSegmentId;
this.taskAllocatorId = taskAllocatorId;
}

/**
* Represents group of ingestion tasks that produce a segment series.
*/
public String getSequenceName()
{
return sequenceName;
}

/**
* Non-null previous segment id. This can be used for persisting to the
* pending segments table in the metadata store.
* Previous segment id allocated for this sequence.
*
* @return Empty string if there is no previous segment in the series.
*/
public String getPreviousSegmentId()
{
return previousSegmentId;
}

/**
* Version of the lock held by the task that has requested the segment allocation.
* The allocated segment must have a version less than or equal to this version.
*/
public String getVersion()
{
return version;
Expand All @@ -82,11 +87,6 @@ public PartialShardSpec getPartialShardSpec()
return partialShardSpec;
}

public String getUpgradedFromSegmentId()
{
return upgradedFromSegmentId;
}

public String getTaskAllocatorId()
{
return taskAllocatorId;
Expand Down
Loading

0 comments on commit 24e5d8a

Please sign in to comment.