diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 6f8e827c705a..847354706ba7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -151,7 +152,12 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception ); Assert.assertEquals( - SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), + SegmentPublishResult.fail( + InvalidInput.exception( + "The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end" + + " state[null]. Try resetting the supervisor." + ).toString() + ), result ); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e2addccbcb91..d364299d21d2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -33,6 +33,7 @@ import com.google.common.io.BaseEncoding; import com.google.inject.Inject; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentCreateRequest; @@ -445,41 +446,33 @@ public SegmentPublishResult commitSegmentsAndMetadata( try { return connector.retryTransaction( - new TransactionCallback() - { - @Override - public SegmentPublishResult inTransaction( - final Handle handle, - final TransactionStatus transactionStatus - ) throws Exception - { - // Set definitelyNotUpdated back to false upon retrying. - definitelyNotUpdated.set(false); + (handle, transactionStatus) -> { + // Set definitelyNotUpdated back to false upon retrying. + definitelyNotUpdated.set(false); - if (startMetadata != null) { - final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( - handle, - dataSource, - startMetadata, - endMetadata - ); - - if (result.isFailed()) { - // Metadata was definitely not updated. - transactionStatus.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw new RuntimeException(result.getErrorMsg()); - } + if (startMetadata != null) { + final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + handle, + dataSource, + startMetadata, + endMetadata + ); + + if (result.isFailed()) { + // Metadata was definitely not updated. + transactionStatus.setRollbackOnly(); + definitelyNotUpdated.set(true); + + if (result.canRetry()) { + throw new RetryTransactionException(result.getErrorMsg()); + } else { + throw InvalidInput.exception(result.getErrorMsg()); } } - - final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); - return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } + + final Set inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments); + return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); }, 3, getSqlMetadataMaxRetry() @@ -2395,17 +2388,19 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( } final boolean startMetadataMatchesExisting; - int startMetadataGreaterThanExisting = 0; + boolean startMetadataGreaterThanExisting = false; if (oldCommitMetadataFromDb == null) { startMetadataMatchesExisting = startMetadata.isValidStart(); - startMetadataGreaterThanExisting = 1; + startMetadataGreaterThanExisting = true; } else { // Checking against the last committed metadata. - // If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1, - // 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time. + // If the new start sequence number is greater than the end sequence number of the last commit, + // compareTo() will return 1 and 0 in all other cases. This can happen if multiple tasks are publishing the + // sequence around the same time. if (startMetadata instanceof Comparable) { - startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata()); + startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()) + .compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0; } // Converting the last one into start metadata for checking since only the same type of metadata can be matched. @@ -2415,25 +2410,20 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } - if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) { - // Offset stored in StartMetadata is Greater than the last commited metadata, - // Then retry multiple task might be trying to publish the segment for same partitions. - log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].", - startMetadata, - oldCommitMetadataFromDb); + if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { + // Offsets stored in startMetadata is greater than the last commited metadata. return new DataStoreMetadataUpdateResult(true, false, - "Failed to update the metadata Store. The new start metadata is ahead of last commited end state." + "The new start metadata state[%s] is ahead of the last commited" + + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb ); } if (!startMetadataMatchesExisting) { // Not in the desired start state. - return new DataStoreMetadataUpdateResult(true, false, StringUtils.format( - "Inconsistent metadata state. This can happen if you update input topic in a spec without changing " + - "the supervisor name. Stored state: [%s], Target state: [%s].", - oldCommitMetadataFromDb, - startMetadata - )); + return new DataStoreMetadataUpdateResult(true, false, + "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", + oldCommitMetadataFromDb, startMetadata + ); } // Only endOffsets should be stored in metadata store diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 03de72b96fbb..7b6fb4d11a2c 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -28,6 +28,7 @@ import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentCreateRequest; @@ -935,7 +936,14 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), result1); + Assert.assertEquals( + SegmentPublishResult.fail( + InvalidInput.exception( + "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited" + + " end state[null]. Try resetting the supervisor." + ).toString()), + result1 + ); // Should only be tried once. Assert.assertEquals(1, metadataUpdateCounter.get()); @@ -956,10 +964,15 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " + - "happen if you update input topic in a spec without changing the supervisor name. " + - "Stored state: [ObjectMetadata{theObject={foo=baz}}], " + - "Target state: [ObjectMetadata{theObject=null}]."), result2); + Assert.assertEquals( + SegmentPublishResult.fail( + InvalidInput.exception( + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" + + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." + ).toString() + ), + result2 + ); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); @@ -1026,10 +1039,14 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " + - "happen if you update input topic in a spec without changing the supervisor name. " + - "Stored state: [ObjectMetadata{theObject={foo=baz}}], " + - "Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2); + Assert.assertEquals( + SegmentPublishResult.fail( + InvalidInput.exception( + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " + + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." + ).toString()), + result2 + ); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get());