From fc92d23cc16ab0dd5f675d04c4bb29442b3b1d6a Mon Sep 17 00:00:00 2001
From: david-leifker <114954101+david-leifker@users.noreply.github.com>
Date: Tue, 10 Sep 2024 12:41:21 -0500
Subject: [PATCH] feat(entity-service): fallback logic for aspect version
(#11304)
---
.../entity/ebean/batch/AspectsBatchImpl.java | 2 +-
.../linkedin/metadata/entity/AspectDao.java | 26 ++--
.../metadata/entity/EntityServiceImpl.java | 35 +++--
.../linkedin/metadata/entity/EntityUtils.java | 59 +++++---
.../metadata/entity/TransactionContext.java | 69 +++++++++
.../entity/cassandra/CassandraAspectDao.java | 29 ++--
.../metadata/entity/ebean/EbeanAspectDao.java | 57 +++----
.../entity/EbeanEntityServiceTest.java | 140 ++++++++++++++++++
.../java/entities/EntitiesControllerTest.java | 7 +-
9 files changed, 325 insertions(+), 99 deletions(-)
create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
index 3ec090a3db3a45..1fba8426317209 100644
--- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
+++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java
@@ -181,7 +181,7 @@ public AspectsBatchImplBuilder mcps(
mcp, auditStamp, retrieverContext.getAspectRetriever());
}
} catch (IllegalArgumentException e) {
- log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e);
+ log.error("Invalid proposal, skipping and proceeding with batch: {}", mcp, e);
return null;
}
})
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java
index 401d40ec177cee..3f0545b6f94a85 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java
@@ -6,13 +6,11 @@
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
-import io.ebean.Transaction;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -27,10 +25,10 @@
* aspect is set to 0 for efficient retrieval. In most cases only the latest state of an aspect will
* be fetched. See {@link EntityServiceImpl} for more details.
*
- *
TODO: This interface exposes {@link #runInTransactionWithRetry(Supplier, int)} because {@link
- * EntityServiceImpl} concerns itself with batching multiple commands into a single transaction. It
- * exposes storage concerns somewhat and it'd be worth looking into ways to move this responsibility
- * inside {@link AspectDao} implementations.
+ *
TODO: This interface exposes {@link #runInTransactionWithRetry(Function, int)}
+ * (TransactionContext)} because {@link EntityServiceImpl} concerns itself with batching multiple
+ * commands into a single transaction. It exposes storage concerns somewhat and it'd be worth
+ * looking into ways to move this responsibility inside {@link AspectDao} implementations.
*/
public interface AspectDao {
String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount";
@@ -77,7 +75,7 @@ Map> getLatestAspects(
Map> urnAspects, boolean forUpdate);
void saveAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
@@ -89,10 +87,12 @@ void saveAspect(
final boolean insert);
void saveAspect(
- @Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert);
+ @Nullable TransactionContext txContext,
+ @Nonnull final EntityAspect aspect,
+ final boolean insert);
long saveLatestAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
@@ -107,7 +107,7 @@ long saveLatestAspect(
@Nullable final String newSystemMetadata,
final Long nextVersion);
- void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect);
+ void deleteAspect(@Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect);
@Nonnull
ListResult listUrns(
@@ -125,7 +125,7 @@ ListResult listUrns(
@Nonnull
Stream streamAspects(String entityName, String aspectName);
- int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);
+ int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn);
@Nonnull
ListResult listLatestAspectMetadata(
@@ -159,11 +159,11 @@ default Map getNextVersions(
@Nonnull
T runInTransactionWithRetry(
- @Nonnull final Function block, final int maxTransactionRetry);
+ @Nonnull final Function block, final int maxTransactionRetry);
@Nonnull
default List runInTransactionWithRetry(
- @Nonnull final Function block,
+ @Nonnull final Function block,
AspectsBatch batch,
final int maxTransactionRetry) {
return List.of(runInTransactionWithRetry(block, maxTransactionRetry));
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
index fd6ad57c0adf52..4b83ea40f722db 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
@@ -6,6 +6,7 @@
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.Constants.UI_SOURCE;
+import static com.linkedin.metadata.entity.TransactionContext.DEFAULT_MAX_TRANSACTION_RETRY;
import static com.linkedin.metadata.utils.PegasusUtils.constructMCL;
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
@@ -79,7 +80,6 @@
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
-import io.ebean.Transaction;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
@@ -146,8 +146,6 @@ public class EntityServiceImpl implements EntityService {
* As described above, the latest version of an aspect should always take the value 0, with
* monotonically increasing version incrementing as usual once the latest version is replaced.
*/
- private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;
-
protected final AspectDao aspectDao;
@VisibleForTesting @Getter private final EventProducer producer;
@@ -837,7 +835,7 @@ private List ingestAspectsToLocalDB(
return aspectDao
.runInTransactionWithRetry(
- (tx) -> {
+ (txContext) -> {
// Generate default aspects within the transaction (they are re-calculated on retry)
AspectsBatch batchWithDefaults =
DefaultAspectsUtil.withAdditionalChanges(
@@ -852,7 +850,8 @@ private List ingestAspectsToLocalDB(
aspectDao.getLatestAspects(urnAspects, true));
// read #2 (potentially)
final Map> nextVersions =
- EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
+ EntityUtils.calculateNextVersions(
+ txContext, aspectDao, latestAspects, urnAspects);
// 1. Convert patches to full upserts
// 2. Run any entity/aspect level hooks
@@ -872,7 +871,7 @@ private List ingestAspectsToLocalDB(
Map> newNextVersions =
EntityUtils.calculateNextVersions(
- aspectDao, updatedLatestAspects, updatedItems.getFirst());
+ txContext, aspectDao, updatedLatestAspects, updatedItems.getFirst());
// merge
updatedNextVersions = AspectsBatch.merge(nextVersions, newNextVersions);
} else {
@@ -939,7 +938,7 @@ private List ingestAspectsToLocalDB(
if (overwrite || latest == null) {
result =
ingestAspectToLocalDB(
- tx,
+ txContext,
item.getUrn(),
item.getAspectName(),
item.getRecordTemplate(),
@@ -973,8 +972,8 @@ private List ingestAspectsToLocalDB(
.collect(Collectors.toList());
// commit upserts prior to retention or kafka send, if supported by impl
- if (tx != null) {
- tx.commitAndContinue();
+ if (txContext != null) {
+ txContext.commitAndContinue();
}
long took = ingestToLocalDBTimer.stop();
log.info(
@@ -2209,7 +2208,7 @@ private RollbackResult deleteAspectWithoutMCL(
final RollbackResult result =
aspectDao.runInTransactionWithRetry(
- (tx) -> {
+ (txContext) -> {
Integer additionalRowsDeleted = 0;
// 1. Fetch the latest existing version of the aspect.
@@ -2282,7 +2281,7 @@ private RollbackResult deleteAspectWithoutMCL(
}
// 5. Apply deletes and fix up latest row
- aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(tx, aspect));
+ aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(txContext, aspect));
if (survivingAspect != null) {
// if there was a surviving aspect, copy its information into the latest row
@@ -2300,16 +2299,16 @@ private RollbackResult deleteAspectWithoutMCL(
latest
.getEntityAspect()
.setCreatedFor(survivingAspect.getEntityAspect().getCreatedFor());
- aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
+ aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);
// metrics
aspectDao.incrementWriteMetrics(
aspectName, 1, latest.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
- aspectDao.deleteAspect(tx, survivingAspect.getEntityAspect());
+ aspectDao.deleteAspect(txContext, survivingAspect.getEntityAspect());
} else {
if (isKeyAspect) {
if (hardDelete) {
// If this is the key aspect, delete the entity entirely.
- additionalRowsDeleted = aspectDao.deleteUrn(tx, urn);
+ additionalRowsDeleted = aspectDao.deleteUrn(txContext, urn);
} else if (deleteItem.getEntitySpec().hasAspect(Constants.STATUS_ASPECT_NAME)) {
// soft delete by setting status.removed=true (if applicable)
final Status statusAspect = new Status();
@@ -2326,7 +2325,7 @@ private RollbackResult deleteAspectWithoutMCL(
}
} else {
// Else, only delete the specific aspect.
- aspectDao.deleteAspect(tx, latest.getEntityAspect());
+ aspectDao.deleteAspect(txContext, latest.getEntityAspect());
}
}
@@ -2466,7 +2465,7 @@ private Map getEnvelopedAspects(
@Nonnull
private UpdateAspectResult ingestAspectToLocalDB(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final Urn urn,
@Nonnull final String aspectName,
@Nonnull final RecordTemplate newValue,
@@ -2495,7 +2494,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
latest.getEntityAspect().setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
log.info("Ingesting aspect with name {}, urn {}", aspectName, urn);
- aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
+ aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);
// metrics
aspectDao.incrementWriteMetrics(
@@ -2518,7 +2517,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
String newValueStr = EntityApiUtils.toJsonAspect(newValue);
long versionOfOld =
aspectDao.saveLatestAspect(
- tx,
+ txContext,
urn.toString(),
aspectName,
latest == null ? null : EntityApiUtils.toJsonAspect(oldValue),
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java
index 7842365ce429be..3c4109970e9d0b 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java
@@ -285,38 +285,51 @@ public static List toSystemAspects(
* Use the precalculated next version from system metadata if it exists, otherwise lookup the next
* version the normal way from the database
*
+ * @param txContext
* @param aspectDao database access
* @param latestAspects aspect version 0 with system metadata
* @param urnAspects urn/aspects which we need next version information for
* @return map of the urn/aspect to the next aspect version
*/
public static Map> calculateNextVersions(
+ TransactionContext txContext,
AspectDao aspectDao,
Map> latestAspects,
Map> urnAspects) {
- Map> precalculatedVersions =
- latestAspects.entrySet().stream()
- .map(
- entry ->
- Map.entry(
- entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
- .filter(entry -> !entry.getValue().isEmpty())
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- Map> missingAspectVersions =
- urnAspects.entrySet().stream()
- .flatMap(
- entry ->
- entry.getValue().stream()
- .map(aspectName -> Pair.of(entry.getKey(), aspectName)))
- .filter(
- urnAspectName ->
- !precalculatedVersions
- .getOrDefault(urnAspectName.getKey(), Map.of())
- .containsKey(urnAspectName.getValue()))
- .collect(
- Collectors.groupingBy(
- Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
+
+ final Map> precalculatedVersions;
+ final Map> missingAspectVersions;
+ if (txContext.getFailedAttempts() > 2 && txContext.lastExceptionIsDuplicateKey()) {
+ log.warn(
+ "Multiple exceptions detected, last exception detected as DuplicateKey, fallback to database max(version)+1");
+ precalculatedVersions = Map.of();
+ missingAspectVersions = urnAspects;
+ } else {
+ precalculatedVersions =
+ latestAspects.entrySet().stream()
+ .map(
+ entry ->
+ Map.entry(
+ entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
+ .filter(entry -> !entry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ missingAspectVersions =
+ urnAspects.entrySet().stream()
+ .flatMap(
+ entry ->
+ entry.getValue().stream()
+ .map(aspectName -> Pair.of(entry.getKey(), aspectName)))
+ .filter(
+ urnAspectName ->
+ !precalculatedVersions
+ .getOrDefault(urnAspectName.getKey(), Map.of())
+ .containsKey(urnAspectName.getValue()))
+ .collect(
+ Collectors.groupingBy(
+ Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
+ }
+
Map> databaseVersions =
missingAspectVersions.isEmpty()
? Map.of()
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
new file mode 100644
index 00000000000000..69f2f1c8981c03
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java
@@ -0,0 +1,69 @@
+package com.linkedin.metadata.entity;
+
+import io.ebean.DuplicateKeyException;
+import io.ebean.Transaction;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.experimental.Accessors;
+import org.springframework.lang.Nullable;
+
+/** Wrap the transaction with additional information about the exceptions during retry. */
+@Data
+@AllArgsConstructor
+@Accessors(fluent = true)
+public class TransactionContext {
+ public static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;
+
+ public static TransactionContext empty() {
+ return empty(DEFAULT_MAX_TRANSACTION_RETRY);
+ }
+
+ public static TransactionContext empty(@Nullable Integer maxRetries) {
+ return empty(null, maxRetries == null ? DEFAULT_MAX_TRANSACTION_RETRY : maxRetries);
+ }
+
+ public static TransactionContext empty(Transaction tx, int maxRetries) {
+ return new TransactionContext(tx, maxRetries, new ArrayList<>());
+ }
+
+ @Nullable private Transaction tx;
+ private int maxRetries;
+ @NonNull private List exceptions;
+
+ public TransactionContext success() {
+ exceptions.clear();
+ return this;
+ }
+
+ public TransactionContext addException(RuntimeException e) {
+ exceptions.add(e);
+ return this;
+ }
+
+ public int getFailedAttempts() {
+ return exceptions.size();
+ }
+
+ @Nullable
+ public RuntimeException lastException() {
+ return exceptions.isEmpty() ? null : exceptions.get(exceptions.size() - 1);
+ }
+
+ public boolean lastExceptionIsDuplicateKey() {
+ return lastException() instanceof DuplicateKeyException;
+ }
+
+ public boolean shouldAttemptRetry() {
+ return exceptions.size() <= maxRetries;
+ }
+
+ public void commitAndContinue() {
+ if (tx != null) {
+ tx.commitAndContinue();
+ }
+ success();
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
index 51f898d3122af3..9e7387947a9547 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java
@@ -29,13 +29,13 @@
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.entity.EntityAspectIdentifier;
import com.linkedin.metadata.entity.ListResult;
+import com.linkedin.metadata.entity.TransactionContext;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
-import io.ebean.Transaction;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
@@ -187,7 +187,7 @@ private Map getMaxVersions(
@Override
public void saveAspect(
- @Nullable Transaction tx, @Nonnull EntityAspect aspect, final boolean insert) {
+ @Nullable TransactionContext txContext, @Nonnull EntityAspect aspect, final boolean insert) {
validateConnection();
SimpleStatement statement = generateSaveStatement(aspect, insert);
_cqlSession.execute(statement);
@@ -287,23 +287,21 @@ public ListResult listAspectMetadata(
@Override
@Nonnull
public T runInTransactionWithRetry(
- @Nonnull final Function block, final int maxTransactionRetry) {
+ @Nonnull final Function block, final int maxTransactionRetry) {
validateConnection();
- int retryCount = 0;
- Exception lastException;
-
+ TransactionContext txContext = TransactionContext.empty(maxTransactionRetry);
do {
try {
// TODO: Try to bend this code to make use of Cassandra batches. This method is called from
// single-urn operations, so perf should not suffer much
- return block.apply(null);
+ return block.apply(txContext);
} catch (DriverException exception) {
- lastException = exception;
+ txContext.addException(exception);
}
- } while (++retryCount <= maxTransactionRetry);
+ } while (txContext.shouldAttemptRetry());
throw new RetryLimitReached(
- "Failed to add after " + maxTransactionRetry + " retries", lastException);
+ "Failed to add after " + maxTransactionRetry + " retries", txContext.lastException());
}
private ListResult toListResult(
@@ -368,7 +366,8 @@ private static AuditStamp toAuditStamp(@Nonnull final EntityAspect aspect) {
}
@Override
- public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect) {
+ public void deleteAspect(
+ @Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect) {
validateConnection();
SimpleStatement ss =
deleteFrom(CassandraAspect.TABLE_NAME)
@@ -385,7 +384,7 @@ public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect a
}
@Override
- public int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn) {
+ public int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn) {
validateConnection();
SimpleStatement ss =
deleteFrom(CassandraAspect.TABLE_NAME)
@@ -569,7 +568,7 @@ public Map> getNextVersions(Map> u
@Override
public long saveLatestAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
@@ -675,7 +674,7 @@ public void setWritable(boolean canWrite) {
@Override
public void saveAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
@@ -698,7 +697,7 @@ public void saveAspect(
actor,
impersonator);
- saveAspect(tx, aspect, insert);
+ saveAspect(txContext, aspect, insert);
// metrics
incrementWriteMetrics(aspectName, 1, aspectMetadata.getBytes(StandardCharsets.UTF_8).length);
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
index 93c06b9236d501..4304be1aa2a00a 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java
@@ -19,6 +19,7 @@
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.entity.EntityAspectIdentifier;
import com.linkedin.metadata.entity.ListResult;
+import com.linkedin.metadata.entity.TransactionContext;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
@@ -143,7 +144,7 @@ private boolean validateConnection() {
@Override
public long saveLatestAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
@@ -167,7 +168,7 @@ public long saveLatestAspect(
if (oldAspectMetadata != null && oldTime != null) {
largestVersion = nextVersion;
saveAspect(
- tx,
+ txContext,
urn,
aspectName,
oldAspectMetadata,
@@ -181,7 +182,7 @@ public long saveLatestAspect(
// Save newValue as the latest version (v0)
saveAspect(
- tx,
+ txContext,
urn,
aspectName,
newAspectMetadata,
@@ -197,7 +198,7 @@ public long saveLatestAspect(
@Override
public void saveAspect(
- @Nullable Transaction tx,
+ @Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
@@ -220,23 +221,27 @@ public void saveAspect(
aspect.setCreatedFor(impersonator);
}
- saveEbeanAspect(tx, aspect, insert);
+ saveEbeanAspect(txContext, aspect, insert);
}
@Override
public void saveAspect(
- @Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert) {
+ @Nullable TransactionContext txContext,
+ @Nonnull final EntityAspect aspect,
+ final boolean insert) {
EbeanAspectV2 ebeanAspect = EbeanAspectV2.fromEntityAspect(aspect);
- saveEbeanAspect(tx, ebeanAspect, insert);
+ saveEbeanAspect(txContext, ebeanAspect, insert);
}
private void saveEbeanAspect(
- @Nullable Transaction tx, @Nonnull final EbeanAspectV2 ebeanAspect, final boolean insert) {
+ @Nullable TransactionContext txContext,
+ @Nonnull final EbeanAspectV2 ebeanAspect,
+ final boolean insert) {
validateConnection();
if (insert) {
- _server.insert(ebeanAspect, tx);
+ _server.insert(ebeanAspect, txContext.tx());
} else {
- _server.update(ebeanAspect, tx);
+ _server.update(ebeanAspect, txContext.tx());
}
}
@@ -304,20 +309,21 @@ public EntityAspect getAspect(@Nonnull final EntityAspectIdentifier key) {
}
@Override
- public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect) {
+ public void deleteAspect(
+ @Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect) {
validateConnection();
EbeanAspectV2 ebeanAspect = EbeanAspectV2.fromEntityAspect(aspect);
- _server.delete(ebeanAspect, tx);
+ _server.delete(ebeanAspect, txContext.tx());
}
@Override
- public int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn) {
+ public int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn) {
validateConnection();
return _server
.createQuery(EbeanAspectV2.class)
.where()
.eq(EbeanAspectV2.URN_COLUMN, urn)
- .delete(tx);
+ .delete(txContext.tx());
}
@Override
@@ -658,14 +664,14 @@ public ListResult listLatestAspectMetadata(
@Override
@Nonnull
public T runInTransactionWithRetry(
- @Nonnull final Function block, final int maxTransactionRetry) {
+ @Nonnull final Function block, final int maxTransactionRetry) {
return runInTransactionWithRetry(block, null, maxTransactionRetry).get(0);
}
@Override
@Nonnull
public List runInTransactionWithRetry(
- @Nonnull final Function block,
+ @Nonnull final Function block,
@Nullable AspectsBatch batch,
final int maxTransactionRetry) {
@@ -720,13 +726,12 @@ public List runInTransactionWithRetry(
@Nonnull
public T runInTransactionWithRetryUnlocked(
- @Nonnull final Function block,
+ @Nonnull final Function block,
@Nullable AspectsBatch batch,
final int maxTransactionRetry) {
validateConnection();
- int retryCount = 0;
- Exception lastException = null;
+ TransactionContext transactionContext = TransactionContext.empty(maxTransactionRetry);
T result = null;
do {
@@ -734,9 +739,8 @@ public T runInTransactionWithRetryUnlocked(
_server.beginTransaction(
TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
transaction.setBatchMode(true);
- result = block.apply(transaction);
+ result = block.apply(transactionContext.tx(transaction));
transaction.commit();
- lastException = null;
break;
} catch (PersistenceException exception) {
if (exception instanceof DuplicateKeyException) {
@@ -749,20 +753,21 @@ public T runInTransactionWithRetryUnlocked(
log.warn(
"Skipping DuplicateKeyException retry since aspect is the key aspect. {}",
batch.getUrnAspectsMap().keySet());
- continue;
+ break;
}
}
MetricUtils.counter(MetricRegistry.name(this.getClass(), "txFailed")).inc();
log.warn("Retryable PersistenceException: {}", exception.getMessage());
- lastException = exception;
+ transactionContext.addException(exception);
}
- } while (++retryCount <= maxTransactionRetry);
+ } while (transactionContext.shouldAttemptRetry());
- if (lastException != null) {
+ if (transactionContext.lastException() != null) {
MetricUtils.counter(MetricRegistry.name(this.getClass(), "txFailedAfterRetries")).inc();
throw new RetryLimitReached(
- "Failed to add after " + maxTransactionRetry + " retries", lastException);
+ "Failed to add after " + maxTransactionRetry + " retries",
+ transactionContext.lastException());
}
return result;
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
index b9f5984e576678..e8d3c654f6f639 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java
@@ -1,14 +1,19 @@
package com.linkedin.metadata.entity;
+import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
+import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.linkedin.common.AuditStamp;
+import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
+import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.AspectGenerationUtils;
import com.linkedin.metadata.Constants;
@@ -36,6 +41,8 @@
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
import java.net.URISyntaxException;
+import java.sql.Timestamp;
+import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -292,6 +299,139 @@ public void testNestedTransactions() throws AssertionError {
System.out.println("done");
}
+ @Test
+ public void testSystemMetadataDuplicateKey() throws Exception {
+ Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:duplicateKeyTest");
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+ ChangeItemImpl item =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(true))
+ .systemMetadata(systemMetadata)
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(item))
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
+
+ assertNotNull(envelopedAspect);
+ assertEquals(envelopedAspect.getVersion(), 0L, "Expected version 0");
+ assertEquals(
+ envelopedAspect.getSystemMetadata().getVersion(),
+ "1",
+ "Expected version 0 with systemMeta version 1");
+
+ // Corrupt the version 0 systemMeta
+ try (Transaction transaction =
+ ((EbeanAspectDao) _entityServiceImpl.aspectDao)
+ .getServer()
+ .beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
+ TransactionContext transactionContext = TransactionContext.empty(transaction, 3);
+ _entityServiceImpl.aspectDao.saveAspect(
+ transactionContext,
+ entityUrn.toString(),
+ STATUS_ASPECT_NAME,
+ new Status().setRemoved(false).toString(),
+ entityUrn.toString(),
+ null,
+ Timestamp.from(Instant.now()),
+ systemMetadata.toString(),
+ 1,
+ true);
+ transaction.commit();
+ }
+
+ // Run another update
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(
+ List.of(
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(false))
+ .systemMetadata(systemMetadata)
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null))))
+ .build(),
+ false,
+ true);
+ EnvelopedAspect envelopedAspect2 =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
+
+ assertNotNull(envelopedAspect2);
+ assertEquals(envelopedAspect2.getVersion(), 0L, "Expected version 0");
+ assertEquals(
+ envelopedAspect2.getSystemMetadata().getVersion(),
+ "3",
+ "Expected version 0 with systemMeta version 3 accounting for the the collision");
+ }
+
+ @Test
+ public void testBatchDuplicate() throws Exception {
+ Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest");
+ SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata();
+ ChangeItemImpl item1 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(true))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ ChangeItemImpl item2 =
+ ChangeItemImpl.builder()
+ .urn(entityUrn)
+ .aspectName(STATUS_ASPECT_NAME)
+ .recordTemplate(new Status().setRemoved(false))
+ .systemMetadata(systemMetadata.copy())
+ .auditStamp(TEST_AUDIT_STAMP)
+ .build(TestOperationContexts.emptyAspectRetriever(null));
+ _entityServiceImpl.ingestAspects(
+ opContext,
+ AspectsBatchImpl.builder()
+ .retrieverContext(opContext.getRetrieverContext().get())
+ .items(List.of(item1, item2))
+ .build(),
+ false,
+ true);
+
+ // List aspects urns
+ ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2);
+
+ assertEquals(batch.getStart().intValue(), 0);
+ assertEquals(batch.getCount().intValue(), 1);
+ assertEquals(batch.getTotal().intValue(), 1);
+ assertEquals(batch.getEntities().size(), 1);
+ assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString());
+
+ EnvelopedAspect envelopedAspect =
+ _entityServiceImpl.getLatestEnvelopedAspect(
+ opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME);
+ assertEquals(
+ envelopedAspect.getSystemMetadata().getVersion(),
+ "2",
+ "Expected version 2 accounting for duplicates");
+ assertEquals(
+ envelopedAspect.getValue().toString(),
+ "{removed=false}",
+ "Expected 2nd item to be the latest");
+ }
+
@Test
public void dataGeneratorThreadingTest() {
DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl);
diff --git a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java
index 3e352403c88bca..8b530b218532d0 100644
--- a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java
+++ b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java
@@ -14,6 +14,7 @@
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.AspectDao;
+import com.linkedin.metadata.entity.TransactionContext;
import com.linkedin.metadata.entity.UpdateAspectResult;
import com.linkedin.metadata.event.EventProducer;
import io.datahubproject.metadata.context.OperationContext;
@@ -69,14 +70,14 @@ public void setup()
OperationContext opContext = TestOperationContexts.systemContextNoSearchAuthorization();
AspectDao aspectDao = Mockito.mock(AspectDao.class);
when(aspectDao.runInTransactionWithRetry(
- ArgumentMatchers.>>any(),
+ ArgumentMatchers.>>any(),
any(AspectsBatch.class),
anyInt()))
.thenAnswer(
i ->
List.of(
- ((Function>) i.getArgument(0))
- .apply(Mockito.mock(Transaction.class))));
+ ((Function>) i.getArgument(0))
+ .apply(TransactionContext.empty(Mockito.mock(Transaction.class), 0))));
EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class);
PreProcessHooks preProcessHooks = new PreProcessHooks();