-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade pending segments when a concurrent replace happens #15097
Conversation
* This action differs from {@link SegmentTransactionalInsertAction} as it is used | ||
* only with APPEND locks and also upgrades segments as needed. | ||
*/ | ||
public class CommitRealtimeSegmentsAndMetadataAction implements TaskAction<SegmentPublishResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not re-use SegmentTransactionAppendAction with the metadata being null for batch and the required values for streaming ingestion (similar to the original insert action)?
Is there anything besides the metadata commit that this action does that the transactional append action doesn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was thinking the same. Let me see what we can do.
"Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].", | ||
"CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this error will find its way to end user, we should avoid using names such as CommitRealtimeSegmentsAndMetadata
@@ -73,7 +78,8 @@ public SequenceMetadata( | |||
@JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets, | |||
@JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets, | |||
@JsonProperty("checkpointed") boolean checkpointed, | |||
@JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions | |||
@JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions, | |||
@JsonProperty("taskLockType") TaskLockType taskLockType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be nullable for backward compatibility reasons. should it not?
} | ||
|
||
final String dataSource = appendSegments.iterator().next().getDataSource(); | ||
final Set<DataSegment> upgradedSegments = connector.retryTransaction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Set<DataSegment> upgradedSegments = connector.retryTransaction( | |
final Set<DataSegment> segmentsToUpgrade = connector.retryTransaction( |
Closing this PR as these changes have already been merged as a part of #15039 |
[WIP]
Description
Changes
TaskLockType
inSequenceMetadata
. The lock type is used to determine the right action type for committing realtime segments.IndexTaskUtils.emitSegmentPublishMetrics
IndexTaskUtils.getMessagesFromSavedParseExceptions
Pending changes
IndexerMetadataStorageCoordinator
to commit segments, commit metadata and upgrade segments in the same transactioncommitReplaceSegments
method to identify pending segments to upgrade and create upgraded entriesThis PR has: