Skip to content

Commit

Permalink
Prevent multiple attempts to publish segments for the same sequence (a…
Browse files Browse the repository at this point in the history
…pache#14995)

* Prevent a race that may cause multiple attempts to publish segments for the same sequence
  • Loading branch information
AmatyaAvadhanula authored Nov 16, 2023
1 parent 857b8de commit cdc192d
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public enum Status
private final String stream;

private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final Set<String> publishedSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> handOffWaitList = new ArrayList<>();

Expand Down Expand Up @@ -806,7 +807,8 @@ public void onFailure(Throwable t)
List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences);
for (int i = 0; i < sequencesSnapshot.size(); i++) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i);
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
final boolean isLast = i == (sequencesSnapshot.size() - 1);
if (isLast) {
// Shorten endOffsets of the last sequence to match currOffsets.
Expand Down Expand Up @@ -1009,6 +1011,7 @@ public void onSuccess(SegmentsAndCommitMetadata publishedSegmentsAndCommitMetada
);
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), "Published segments");

publishedSequences.add(sequenceMetadata.getSequenceName());
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());

Expand Down Expand Up @@ -1157,7 +1160,9 @@ private void maybePersistAndPublishSequences(Supplier<Committer> committerSuppli
{
for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord);
if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!sequenceMetadata.isOpen()
&& !publishingSequences.contains(sequenceMetadata.getSequenceName())
&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
publishingSequences.add(sequenceMetadata.getSequenceName());
try {
final Object result = driver.persist(committerSupplier.get());
Expand Down

0 comments on commit cdc192d

Please sign in to comment.