Skip to content

Commit

Permalink
reject publishing actions with a retriable error code if a earlier ta…
Browse files Browse the repository at this point in the history
…sk is still publishing (#17509)

* Working queuing of publishing

* fix style

* Add unit tests

* add tests

* retry within the connector

* fix unit tests

* Update indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java

Co-authored-by: Kashif Faraz <[email protected]>

* Add comment

* fix style

* Fix unit tests

* style fix

---------

Co-authored-by: Kashif Faraz <[email protected]>
  • Loading branch information
georgew5656 and kfaraz authored Dec 12, 2024
1 parent 1a38434 commit aca56d6
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private <R> R performAction(TaskAction<R> taskAction)
return result;
}
catch (Throwable t) {
log.error(t, "Failed to perform action[%s]", taskAction);
throw new RuntimeException(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
Expand All @@ -32,6 +31,7 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.RetryTransactionException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception

Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
new RetryTransactionException(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is"
+ " ahead of the last committed end state[null]. Try resetting the supervisor."
).toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2687,7 +2687,9 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(

if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
// Offsets stored in startMetadata is greater than the last commited metadata.
return DataStoreMetadataUpdateResult.failure(
// This can happen because the previous task is still publishing its segments and can resolve once
// the previous task finishes publishing.
return DataStoreMetadataUpdateResult.retryableFailure(
"The new start metadata state[%s] is ahead of the last committed"
+ " end state[%s]. Try resetting the supervisor.",
startMetadata, oldCommitMetadataFromDb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,15 +784,15 @@ public void testTransactionalAnnounceFailDbNullWantNotNull()
);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
new RetryTransactionException(
"The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed"
+ " end state[null]. Try resetting the supervisor."
).toString()),
result1
);

// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
// Should be retried.
Assert.assertEquals(2, metadataUpdateCounter.get());
}

@Test
Expand Down

0 comments on commit aca56d6

Please sign in to comment.