From 58988458a8d68e8db1c1b8cceba3c5c401707846 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Tue, 30 Jan 2024 08:06:05 -0600 Subject: [PATCH] fix(aspects): fix default aspect generation for non-restli locations --- .../upgrade/nocode/DataMigrationStep.java | 3 +- .../steps/BackfillBrowsePathsV2Step.java | 4 +- .../metadata/aspect/batch/AspectsBatch.java | 5 +- .../metadata/aspect/batch/BatchItem.java | 10 + .../metadata/aspect/batch/MCLBatchItem.java | 7 +- .../metadata/aspect/batch/UpsertItem.java | 2 - .../java/spark-lineage/junit.spark.smoke.xml | 1 + .../datahub/spark/TestCoalesceJobLineage.java | 4 +- .../datahub/spark/TestSparkJobsLineage.java | 4 +- .../aspect/utils/DefaultAspectsUtil.java | 291 ++++++++++++++++++ .../metadata/client/JavaEntityClient.java | 8 +- .../metadata/entity/EntityServiceImpl.java | 221 +------------ .../entity/ebean/batch/AspectsBatchImpl.java | 3 +- .../entity/ebean/batch/MCLBatchItemImpl.java | 4 +- .../entity/ebean/batch/MCPPatchBatchItem.java | 10 +- .../ebean/batch/MCPUpsertBatchItem.java | 24 +- .../search/utils/BrowsePathUtils.java | 3 +- .../search/utils/BrowsePathV2Utils.java | 6 +- .../service/UpdateIndicesService.java | 6 +- .../metadata/AspectIngestionUtils.java | 6 +- .../utils/DefaultAspectsUtilTest.java} | 41 ++- .../entity/EbeanEntityServiceTest.java | 24 +- .../metadata/entity/EntityServiceTest.java | 82 +++-- .../io/datahubproject/test/DataGenerator.java | 39 ++- .../CustomDataQualityRulesMCPSideEffect.java | 2 +- .../token/StatefulTokenService.java | 13 +- .../boot/steps/BackfillBrowsePathsV2Step.java | 4 +- .../IngestDataPlatformInstancesStep.java | 2 +- .../boot/steps/IngestDataPlatformsStep.java | 2 +- .../steps/UpgradeDefaultBrowsePathsStep.java | 3 +- .../steps/BackfillBrowsePathsV2StepTest.java | 26 +- .../IngestDataPlatformInstancesStepTest.java | 4 +- .../UpgradeDefaultBrowsePathsStepTest.java | 4 - .../openapi/util/MappingUtil.java | 11 +- .../v2/controller/EntityController.java | 2 +- .../resources/entity/AspectResource.java | 17 +- .../resources/entity/AspectResourceTest.java | 2 +- .../linkedin/metadata/entity/AspectUtils.java | 101 ------ .../metadata/entity/EntityService.java | 42 --- 39 files changed, 519 insertions(+), 524 deletions(-) create mode 100644 metadata-integration/java/spark-lineage/junit.spark.smoke.xml create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java rename metadata-io/src/test/java/com/linkedin/metadata/{AspectUtilsTest.java => aspect/utils/DefaultAspectsUtilTest.java} (60%) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java index ac56e5e91c72be..9f41daf02d2093 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/DataMigrationStep.java @@ -10,6 +10,7 @@ import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ebean.EbeanAspectV1; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; @@ -170,7 +171,7 @@ public Function executable() { // Emit a browse path aspect. final BrowsePaths browsePaths; try { - browsePaths = _entityService.buildDefaultBrowsePath(urn); + browsePaths = DefaultAspectsUtil.buildDefaultBrowsePath(urn, _entityService); final AuditStamp browsePathsStamp = new AuditStamp(); browsePathsStamp.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)); diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java index 9a426369cfb026..601ce4d25493c1 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java @@ -15,6 +15,7 @@ import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.query.SearchFlags; import com.linkedin.metadata.query.filter.Condition; @@ -181,7 +182,8 @@ private Filter backfillDefaultBrowsePathsV2Filter() { } private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception { - BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true); + BrowsePathsV2 browsePathsV2 = + DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService); log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2)); MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(urn); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java index 806fd47c721ec8..3d803d238b4f92 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/AspectsBatch.java @@ -3,6 +3,7 @@ import com.linkedin.metadata.aspect.plugins.validation.AspectRetriever; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -17,14 +18,14 @@ * SystemMetadata} and record/message created time */ public interface AspectsBatch { - List getItems(); + Collection getItems(); /** * Returns MCP items. Can be patch, upsert, etc. * * @return batch items */ - default List getMCPItems() { + default Collection getMCPItems() { return getItems().stream() .filter(item -> item instanceof MCPBatchItem) .map(item -> (MCPBatchItem) item) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java index a4c0624150532c..60033cd6919d60 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/BatchItem.java @@ -2,11 +2,13 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.mxe.SystemMetadata; import javax.annotation.Nonnull; +import javax.annotation.Nullable; public interface BatchItem { /** @@ -63,4 +65,12 @@ default String getAspectName() { */ @Nonnull AspectSpec getAspectSpec(); + + /** + * The aspect's record template. Null when patch + * + * @return record template if it exists + */ + @Nullable + RecordTemplate getRecordTemplate(); } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCLBatchItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCLBatchItem.java index 30e882705da453..17a910b125a34f 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCLBatchItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCLBatchItem.java @@ -26,7 +26,7 @@ default String getAspectName() { if (getMetadataChangeLog().getAspectName() != null) { return getMetadataChangeLog().getAspectName(); } else { - return getAspect().schema().getName(); + return getRecordTemplate().schema().getName(); } } @@ -40,10 +40,7 @@ default SystemMetadata getPreviousSystemMetadata() { } @Nullable - RecordTemplate getPreviousAspect(); - - @Nonnull - RecordTemplate getAspect(); + RecordTemplate getPreviousRecordTemplate(); @Override @Nonnull diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/UpsertItem.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/UpsertItem.java index c337e4f848e5c7..c64105637dfcc6 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/UpsertItem.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/UpsertItem.java @@ -11,8 +11,6 @@ * related data stored along with the aspect */ public abstract class UpsertItem extends MCPBatchItem { - public abstract RecordTemplate getAspect(); - public abstract SystemAspect toLatestEntityAspect(); public abstract void validatePreCommit( diff --git a/metadata-integration/java/spark-lineage/junit.spark.smoke.xml b/metadata-integration/java/spark-lineage/junit.spark.smoke.xml new file mode 100644 index 00000000000000..38d6a558abe0d1 --- /dev/null +++ b/metadata-integration/java/spark-lineage/junit.spark.smoke.xml @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java index 053055716eaa07..17aea13dbb94e3 100644 --- a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java +++ b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestCoalesceJobLineage.java @@ -99,7 +99,9 @@ public static void resetBaseExpectations() { @BeforeClass public static void initMockServer() { - mockServer = startClientAndServer(GMS_PORT); + if (mockServer == null) { + mockServer = startClientAndServer(GMS_PORT); + } resetBaseExpectations(); } diff --git a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java index a4eb035b0abce5..885be6d00fee85 100644 --- a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java +++ b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java @@ -138,7 +138,9 @@ public static void resetBaseExpectations() { @BeforeClass public static void init() { - mockServer = startClientAndServer(GMS_PORT); + if (mockServer == null) { + mockServer = startClientAndServer(GMS_PORT); + } resetBaseExpectations(); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java new file mode 100644 index 00000000000000..709fb2f0decfda --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtil.java @@ -0,0 +1,291 @@ +package com.linkedin.metadata.aspect.utils; + +import static com.linkedin.metadata.Constants.BROWSE_PATHS_ASPECT_NAME; +import static com.linkedin.metadata.Constants.BROWSE_PATHS_V2_ASPECT_NAME; +import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME; +import static com.linkedin.metadata.search.utils.BrowsePathUtils.buildDataPlatformUrn; +import static com.linkedin.metadata.search.utils.BrowsePathUtils.getDefaultBrowsePath; +import static com.linkedin.metadata.search.utils.BrowsePathV2Utils.getDefaultBrowsePathV2; + +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.BrowsePaths; +import com.linkedin.common.BrowsePathsV2; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.dataplatform.DataPlatformInfo; +import com.linkedin.entity.EntityResponse; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.MCPBatchItem; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.EntityUtils; +import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.DataPlatformInstanceUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.GenericAspect; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.util.Pair; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + +/** Consolidates logic for default aspects */ +@Slf4j +public class DefaultAspectsUtil { + private DefaultAspectsUtil() {} + + public static final Set SUPPORTED_TYPES = + Set.of(ChangeType.UPSERT, ChangeType.CREATE, ChangeType.PATCH); + + public static List getAdditionalChanges( + @Nonnull AspectsBatch batch, @Nonnull EntityService entityService) { + + Map> itemsByUrn = + batch.getMCPItems().stream() + .filter(item -> SUPPORTED_TYPES.contains(item.getChangeType())) + .collect(Collectors.groupingBy(BatchItem::getUrn)); + + Set urnsWithExistingKeyAspects = entityService.exists(itemsByUrn.keySet()); + + // create default aspects when key aspect is missing + return itemsByUrn.entrySet().stream() + .filter(aspectsEntry -> !urnsWithExistingKeyAspects.contains(aspectsEntry.getKey())) + .flatMap( + aspectsEntry -> { + // Generate key aspect and defaults + List> defaultAspects = + generateDefaultAspects(entityService, aspectsEntry.getKey()); + + // First is the key aspect + RecordTemplate entityKeyAspect = defaultAspects.get(0).getSecond(); + + // pick the first item as a template (use entity information) + MCPBatchItem templateItem = aspectsEntry.getValue().get(0); + + // generate default aspects (including key aspect, always upserts) + return defaultAspects.stream() + .map( + entry -> + MCPUpsertBatchItem.MCPUpsertBatchItemBuilder.build( + getProposalFromAspect( + entry.getKey(), entry.getValue(), entityKeyAspect, templateItem), + templateItem.getAuditStamp(), + entityService)) + .filter(Objects::nonNull); + }) + .collect(Collectors.toList()); + } + + /** + * Generate default aspects + * + * @param entityService entity service + * @param urn entity urn + * @return a list of aspect name/aspect pairs to be written + */ + public static List> generateDefaultAspects( + @Nonnull EntityService entityService, @Nonnull final Urn urn) { + + final List> defaultAspects = new LinkedList<>(); + + // Key Aspect + final String keyAspectName = entityService.getKeyAspectName(urn); + defaultAspects.add( + Pair.of(keyAspectName, EntityUtils.buildKeyAspect(entityService.getEntityRegistry(), urn))); + + // Other Aspects + defaultAspects.addAll( + generateDefaultAspectsIfMissing(entityService, urn, defaultAspects.get(0).getSecond())); + + return defaultAspects; + } + + /** + * Generate default aspects if the aspect is NOT in the database. + * + *

Does not automatically create key aspects. + * + * @see #generateDefaultAspectsIfMissing if key aspects need autogeneration + * @param entityService + * @param urn entity urn + * @param entityKeyAspect entity's key aspect + * @return additional aspects to be written + */ + private static List> generateDefaultAspectsIfMissing( + @Nonnull EntityService entityService, + @Nonnull final Urn urn, + RecordTemplate entityKeyAspect) { + EntityRegistry entityRegistry = entityService.getEntityRegistry(); + + Set fetchAspects = + Stream.of( + BROWSE_PATHS_ASPECT_NAME, + BROWSE_PATHS_V2_ASPECT_NAME, + DATA_PLATFORM_INSTANCE_ASPECT_NAME) + .filter( + aspectName -> + entityRegistry + .getEntitySpec(urn.getEntityType()) + .getAspectSpecMap() + .containsKey(aspectName)) + .collect(Collectors.toSet()); + + if (!fetchAspects.isEmpty()) { + + Set latestAspects = + entityService.getLatestAspectsForUrn(urn, fetchAspects).keySet(); + + return fetchAspects.stream() + .filter(aspectName -> !latestAspects.contains(aspectName)) + .map( + aspectName -> { + switch (aspectName) { + case BROWSE_PATHS_ASPECT_NAME: + return Pair.of( + BROWSE_PATHS_ASPECT_NAME, + (RecordTemplate) buildDefaultBrowsePath(urn, entityService)); + case BROWSE_PATHS_V2_ASPECT_NAME: + return Pair.of( + BROWSE_PATHS_V2_ASPECT_NAME, + (RecordTemplate) buildDefaultBrowsePathV2(urn, false, entityService)); + case DATA_PLATFORM_INSTANCE_ASPECT_NAME: + return DataPlatformInstanceUtils.buildDataPlatformInstance( + urn.getEntityType(), entityKeyAspect) + .map( + aspect -> + Pair.of( + DATA_PLATFORM_INSTANCE_ASPECT_NAME, (RecordTemplate) aspect)) + .orElse(null); + default: + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + return Collections.emptyList(); + } + + /** + * Builds the default browse path aspects for a subset of well-supported entities. + * + *

This method currently supports datasets, charts, dashboards, data flows, data jobs, and + * glossary terms. + */ + @Nonnull + public static BrowsePaths buildDefaultBrowsePath( + final @Nonnull Urn urn, EntityService entityService) { + Character dataPlatformDelimiter = getDataPlatformDelimiter(urn, entityService); + String defaultBrowsePath = + getDefaultBrowsePath(urn, entityService.getEntityRegistry(), dataPlatformDelimiter); + StringArray browsePaths = new StringArray(); + browsePaths.add(defaultBrowsePath); + BrowsePaths browsePathAspect = new BrowsePaths(); + browsePathAspect.setPaths(browsePaths); + return browsePathAspect; + } + + /** + * Builds the default browse path V2 aspects for all entities. + * + *

This method currently supports datasets, charts, dashboards, and data jobs best. Everything + * else will have a basic "Default" folder added to their browsePathV2. + */ + @Nonnull + public static BrowsePathsV2 buildDefaultBrowsePathV2( + final @Nonnull Urn urn, boolean useContainerPaths, EntityService entityService) { + Character dataPlatformDelimiter = getDataPlatformDelimiter(urn, entityService); + return getDefaultBrowsePathV2( + urn, + entityService.getEntityRegistry(), + dataPlatformDelimiter, + entityService, + useContainerPaths); + } + + /** Returns a delimiter on which the name of an asset may be split. */ + private static Character getDataPlatformDelimiter(Urn urn, EntityService entityService) { + // Attempt to construct the appropriate Data Platform URN + Urn dataPlatformUrn = buildDataPlatformUrn(urn, entityService.getEntityRegistry()); + if (dataPlatformUrn != null) { + // Attempt to resolve the delimiter from Data Platform Info + DataPlatformInfo dataPlatformInfo = getDataPlatformInfo(dataPlatformUrn, entityService); + if (dataPlatformInfo != null && dataPlatformInfo.hasDatasetNameDelimiter()) { + return dataPlatformInfo.getDatasetNameDelimiter().charAt(0); + } + } + // Else, fallback to a default delimiter (period) if one cannot be resolved. + return '.'; + } + + @Nullable + private static DataPlatformInfo getDataPlatformInfo(Urn urn, EntityService entityService) { + try { + final EntityResponse entityResponse = + entityService.getEntityV2( + Constants.DATA_PLATFORM_ENTITY_NAME, + urn, + ImmutableSet.of(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)); + if (entityResponse != null + && entityResponse.hasAspects() + && entityResponse.getAspects().containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) { + return new DataPlatformInfo( + entityResponse + .getAspects() + .get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME) + .getValue() + .data()); + } + } catch (Exception e) { + log.warn(String.format("Failed to find Data Platform Info for urn %s", urn)); + } + return null; + } + + private static MetadataChangeProposal getProposalFromAspect( + String aspectName, + RecordTemplate aspect, + RecordTemplate entityKeyAspect, + MCPBatchItem templateItem) { + MetadataChangeProposal proposal = new MetadataChangeProposal(); + GenericAspect genericAspect = GenericRecordUtils.serializeAspect(aspect); + + // Set net new fields + proposal.setAspect(genericAspect); + proposal.setAspectName(aspectName); + + // Set fields determined from original + // Additional changes should never be set as PATCH, if a PATCH is coming across it should be an + // UPSERT + proposal.setChangeType(templateItem.getChangeType()); + if (ChangeType.PATCH.equals(proposal.getChangeType())) { + proposal.setChangeType(ChangeType.UPSERT); + } + + if (templateItem.getSystemMetadata() != null) { + proposal.setSystemMetadata(templateItem.getSystemMetadata()); + } + if (templateItem.getUrn() != null) { + proposal.setEntityUrn(templateItem.getUrn()); + } + if (entityKeyAspect != null) { + proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(entityKeyAspect)); + } + proposal.setEntityType(templateItem.getUrn().getEntityType()); + + return proposal; + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 0ebe9ed1d1b666..9a3bc9e319d2bd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -26,7 +26,6 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.browse.BrowseResult; import com.linkedin.metadata.browse.BrowseResultV2; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.DeleteEntityService; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.IngestResult; @@ -67,7 +66,6 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -706,14 +704,10 @@ public String ingestProposal( : Constants.UNKNOWN_ACTOR; final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(UrnUtils.getUrn(actorUrnStr)); - final List additionalChanges = - AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService); - Stream proposalStream = - Stream.concat(Stream.of(metadataChangeProposal), additionalChanges.stream()); AspectsBatch batch = AspectsBatchImpl.builder() - .mcps(proposalStream.collect(Collectors.toList()), auditStamp, _entityService) + .mcps(List.of(metadataChangeProposal), auditStamp, _entityService) .build(); IngestResult one = _entityService.ingestProposal(batch, async).stream().findFirst().get(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index b3b11d200ec0dc..59c1620185dfcb 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -2,16 +2,11 @@ import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION; -import static com.linkedin.metadata.Constants.BROWSE_PATHS_ASPECT_NAME; -import static com.linkedin.metadata.Constants.BROWSE_PATHS_V2_ASPECT_NAME; -import static com.linkedin.metadata.Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME; import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID; import static com.linkedin.metadata.Constants.FORCE_INDEXING_KEY; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static com.linkedin.metadata.Constants.SYSTEM_ACTOR; import static com.linkedin.metadata.Constants.UI_SOURCE; -import static com.linkedin.metadata.search.utils.BrowsePathUtils.buildDataPlatformUrn; -import static com.linkedin.metadata.search.utils.BrowsePathUtils.getDefaultBrowsePath; import static com.linkedin.metadata.utils.GenericRecordUtils.entityResponseToAspectMap; import static com.linkedin.metadata.utils.PegasusUtils.constructMCL; import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema; @@ -25,8 +20,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Streams; import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePaths; -import com.linkedin.common.BrowsePathsV2; import com.linkedin.common.Status; import com.linkedin.common.UrnArray; import com.linkedin.common.VersionedUrn; @@ -38,10 +31,8 @@ import com.linkedin.data.template.GetMode; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.SetMode; -import com.linkedin.data.template.StringArray; import com.linkedin.data.template.StringMap; import com.linkedin.data.template.UnionTemplate; -import com.linkedin.dataplatform.DataPlatformInfo; import com.linkedin.entity.AspectType; import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; @@ -57,6 +48,7 @@ import com.linkedin.metadata.aspect.batch.SystemAspect; import com.linkedin.metadata.aspect.batch.UpsertItem; import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; @@ -72,10 +64,8 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.run.AspectRowSummary; -import com.linkedin.metadata.search.utils.BrowsePathV2Utils; import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.metadata.utils.DataPlatformInstanceUtils; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.PegasusUtils; @@ -622,7 +612,7 @@ public List ingestAspects( MCPUpsertBatchItem.builder() .urn(entityUrn) .aspectName(pair.getKey()) - .aspect(pair.getValue()) + .recordTemplate(pair.getValue()) .systemMetadata(systemMetadata) .auditStamp(auditStamp) .build(this)) @@ -744,7 +734,7 @@ private List ingestAspectsToLocalDB( tx, item.getUrn(), item.getAspectName(), - item.getAspect(), + item.getRecordTemplate(), item.getAuditStamp(), item.getSystemMetadata(), latest == null @@ -921,7 +911,7 @@ public RecordTemplate ingestAspectIfNotPresent( MCPUpsertBatchItem.builder() .urn(urn) .aspectName(aspectName) - .aspect(newValue) + .recordTemplate(newValue) .systemMetadata(systemMetadata) .auditStamp(auditStamp) .build(this)) @@ -965,7 +955,6 @@ public IngestResult ingestProposal( */ @Override public Set ingestProposal(AspectsBatch aspectsBatch, final boolean async) { - Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch); Stream nonTimeseriesIngestResults = async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(aspectsBatch); @@ -1005,7 +994,7 @@ private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch) conditionallyProduceMCLAsync( null, null, - item.getAspect(), + item.getRecordTemplate(), item.getSystemMetadata(), item.getMetadataChangeProposal(), item.getUrn(), @@ -1082,10 +1071,17 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) { } private Stream ingestProposalSync(AspectsBatch aspectsBatch) { + Set items = new HashSet<>(aspectsBatch.getItems()); + + // Generate additional items as needed + items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this)); + + AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build(); + AspectsBatchImpl nonTimeseries = AspectsBatchImpl.builder() .items( - aspectsBatch.getItems().stream() + withDefaults.getItems().stream() .filter(item -> !item.getAspectSpec().isTimeseries()) .collect(Collectors.toList())) .build(); @@ -1542,116 +1538,6 @@ protected Map> getLatestAspectUnions( .collect(Collectors.toList()))); } - /** - * Returns true if entityType should have some aspect as per its definition but aspects given does - * not have that aspect - */ - private boolean isAspectMissing(String entityType, String aspectName, Set aspects) { - return _entityRegistry.getEntitySpec(entityType).getAspectSpecMap().containsKey(aspectName) - && !aspects.contains(aspectName); - } - - @Override - public Pair>> generateDefaultAspectsOnFirstWrite( - @Nonnull final Urn urn, Map includedAspects) { - List> returnAspects = new ArrayList<>(); - - final String keyAspectName = getKeyAspectName(urn); - final Map latestAspects = - new HashMap<>(getLatestAspectsForUrn(urn, Set.of(keyAspectName))); - - // key aspect: does not exist in database && is being written - boolean generateDefaults = - !latestAspects.containsKey(keyAspectName) && includedAspects.containsKey(keyAspectName); - - // conditionally generate defaults - if (generateDefaults) { - String entityType = urnToEntityName(urn); - Set aspectsToGet = new HashSet<>(); - - boolean shouldCheckBrowsePath = - isAspectMissing(entityType, BROWSE_PATHS_ASPECT_NAME, includedAspects.keySet()); - if (shouldCheckBrowsePath) { - aspectsToGet.add(BROWSE_PATHS_ASPECT_NAME); - } - - boolean shouldCheckBrowsePathV2 = - isAspectMissing(entityType, BROWSE_PATHS_V2_ASPECT_NAME, includedAspects.keySet()); - if (shouldCheckBrowsePathV2) { - aspectsToGet.add(BROWSE_PATHS_V2_ASPECT_NAME); - } - - boolean shouldCheckDataPlatform = - isAspectMissing(entityType, DATA_PLATFORM_INSTANCE_ASPECT_NAME, includedAspects.keySet()); - if (shouldCheckDataPlatform) { - aspectsToGet.add(DATA_PLATFORM_INSTANCE_ASPECT_NAME); - } - - // fetch additional aspects - latestAspects.putAll(getLatestAspectsForUrn(urn, aspectsToGet)); - - if (shouldCheckBrowsePath - && latestAspects.get(BROWSE_PATHS_ASPECT_NAME) == null - && !includedAspects.containsKey(BROWSE_PATHS_ASPECT_NAME)) { - try { - BrowsePaths generatedBrowsePath = buildDefaultBrowsePath(urn); - returnAspects.add(Pair.of(BROWSE_PATHS_ASPECT_NAME, generatedBrowsePath)); - } catch (URISyntaxException e) { - log.error("Failed to parse urn: {}", urn); - } - } - - if (shouldCheckBrowsePathV2 - && latestAspects.get(BROWSE_PATHS_V2_ASPECT_NAME) == null - && !includedAspects.containsKey(BROWSE_PATHS_V2_ASPECT_NAME)) { - try { - BrowsePathsV2 generatedBrowsePathV2 = buildDefaultBrowsePathV2(urn, false); - returnAspects.add(Pair.of(BROWSE_PATHS_V2_ASPECT_NAME, generatedBrowsePathV2)); - } catch (URISyntaxException e) { - log.error("Failed to parse urn: {}", urn); - } - } - - if (shouldCheckDataPlatform - && latestAspects.get(DATA_PLATFORM_INSTANCE_ASPECT_NAME) == null - && !includedAspects.containsKey(DATA_PLATFORM_INSTANCE_ASPECT_NAME)) { - RecordTemplate keyAspect = includedAspects.get(keyAspectName); - DataPlatformInstanceUtils.buildDataPlatformInstance(entityType, keyAspect) - .ifPresent( - aspect -> returnAspects.add(Pair.of(DATA_PLATFORM_INSTANCE_ASPECT_NAME, aspect))); - } - } - - return Pair.of(latestAspects.containsKey(keyAspectName), returnAspects); - } - - @Override - public List> generateDefaultAspectsIfMissing( - @Nonnull final Urn urn, Map includedAspects) { - - final String keyAspectName = getKeyAspectName(urn); - - if (includedAspects.containsKey(keyAspectName)) { - return generateDefaultAspectsOnFirstWrite(urn, includedAspects).getValue(); - } else { - // No key aspect being written, generate it and potentially suggest writing it later - HashMap includedWithKeyAspect = new HashMap<>(includedAspects); - Pair keyAspect = - Pair.of(keyAspectName, EntityUtils.buildKeyAspect(_entityRegistry, urn)); - includedWithKeyAspect.put(keyAspect.getKey(), keyAspect.getValue()); - - Pair>> returnAspects = - generateDefaultAspectsOnFirstWrite(urn, includedWithKeyAspect); - - // missing key aspect in database, add it - if (!returnAspects.getFirst()) { - returnAspects.getValue().add(keyAspect); - } - - return returnAspects.getValue(); - } - } - private void ingestSnapshotUnion( @Nonnull final Snapshot snapshotUnion, @Nonnull final AuditStamp auditStamp, @@ -1663,11 +1549,7 @@ private void ingestSnapshotUnion( NewModelUtils.getAspectsFromSnapshot(snapshotRecord); log.info("INGEST urn {} with system metadata {}", urn.toString(), systemMetadata.toString()); - aspectRecordsToIngest.addAll( - generateDefaultAspectsIfMissing( - urn, - aspectRecordsToIngest.stream() - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)))); + aspectRecordsToIngest.addAll(DefaultAspectsUtil.generateDefaultAspects(this, urn)); AspectsBatchImpl aspectsBatch = AspectsBatchImpl.builder() @@ -1678,7 +1560,7 @@ private void ingestSnapshotUnion( MCPUpsertBatchItem.builder() .urn(urn) .aspectName(pair.getKey()) - .aspect(pair.getValue()) + .recordTemplate(pair.getValue()) .auditStamp(auditStamp) .systemMetadata(systemMetadata) .build(this)) @@ -2397,79 +2279,6 @@ private UpdateAspectResult ingestAspectToLocalDB( .build(); } - /** - * Builds the default browse path aspects for a subset of well-supported entities. - * - *

This method currently supports datasets, charts, dashboards, data flows, data jobs, and - * glossary terms. - */ - @Nonnull - @Override - public BrowsePaths buildDefaultBrowsePath(final @Nonnull Urn urn) throws URISyntaxException { - Character dataPlatformDelimiter = getDataPlatformDelimiter(urn); - String defaultBrowsePath = - getDefaultBrowsePath(urn, this.getEntityRegistry(), dataPlatformDelimiter); - StringArray browsePaths = new StringArray(); - browsePaths.add(defaultBrowsePath); - BrowsePaths browsePathAspect = new BrowsePaths(); - browsePathAspect.setPaths(browsePaths); - return browsePathAspect; - } - - /** - * Builds the default browse path V2 aspects for all entities. - * - *

This method currently supports datasets, charts, dashboards, and data jobs best. Everything - * else will have a basic "Default" folder added to their browsePathV2. - */ - @Nonnull - @Override - public BrowsePathsV2 buildDefaultBrowsePathV2(final @Nonnull Urn urn, boolean useContainerPaths) - throws URISyntaxException { - Character dataPlatformDelimiter = getDataPlatformDelimiter(urn); - return BrowsePathV2Utils.getDefaultBrowsePathV2( - urn, this.getEntityRegistry(), dataPlatformDelimiter, this, useContainerPaths); - } - - /** Returns a delimiter on which the name of an asset may be split. */ - private Character getDataPlatformDelimiter(Urn urn) { - // Attempt to construct the appropriate Data Platform URN - Urn dataPlatformUrn = buildDataPlatformUrn(urn, this.getEntityRegistry()); - if (dataPlatformUrn != null) { - // Attempt to resolve the delimiter from Data Platform Info - DataPlatformInfo dataPlatformInfo = getDataPlatformInfo(dataPlatformUrn); - if (dataPlatformInfo != null && dataPlatformInfo.hasDatasetNameDelimiter()) { - return dataPlatformInfo.getDatasetNameDelimiter().charAt(0); - } - } - // Else, fallback to a default delimiter (period) if one cannot be resolved. - return '.'; - } - - @Nullable - private DataPlatformInfo getDataPlatformInfo(Urn urn) { - try { - final EntityResponse entityResponse = - getEntityV2( - Constants.DATA_PLATFORM_ENTITY_NAME, - urn, - ImmutableSet.of(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)); - if (entityResponse != null - && entityResponse.hasAspects() - && entityResponse.getAspects().containsKey(Constants.DATA_PLATFORM_INFO_ASPECT_NAME)) { - return new DataPlatformInfo( - entityResponse - .getAspects() - .get(Constants.DATA_PLATFORM_INFO_ASPECT_NAME) - .getValue() - .data()); - } - } catch (Exception e) { - log.warn(String.format("Failed to find Data Platform Info for urn %s", urn)); - } - return null; - } - private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) { final List relationshipFieldSpecs = aspectSpec.getRelationshipFieldSpecs(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 80fb4e3e1b940e..1718bd835dc31f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -11,6 +11,7 @@ import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; import com.linkedin.util.Pair; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -26,7 +27,7 @@ @Builder(toBuilder = true) public class AspectsBatchImpl implements AspectsBatch { - private final List items; + private final Collection items; /** * Convert patches to upserts, apply hooks at the aspect and batch level. diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLBatchItemImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLBatchItemImpl.java index 6563765657d6d0..a2ed2eb18fe6a3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLBatchItemImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCLBatchItemImpl.java @@ -30,9 +30,9 @@ public class MCLBatchItemImpl implements MCLBatchItem { @Nonnull private final MetadataChangeLog metadataChangeLog; - @Nullable private final RecordTemplate aspect; + @Nullable private final RecordTemplate recordTemplate; - @Nullable private final RecordTemplate previousAspect; + @Nullable private final RecordTemplate previousRecordTemplate; // derived private final EntitySpec entitySpec; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPPatchBatchItem.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPPatchBatchItem.java index be333af2f75398..d0cb2a4cc59b8a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPPatchBatchItem.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPPatchBatchItem.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.Objects; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -72,6 +73,12 @@ public ChangeType getChangeType() { return ChangeType.PATCH; } + @Nullable + @Override + public RecordTemplate getRecordTemplate() { + return null; + } + public MCPUpsertBatchItem applyPatch( RecordTemplate recordTemplate, AspectRetriever aspectRetriever) { MCPUpsertBatchItem.MCPUpsertBatchItemBuilder builder = @@ -100,7 +107,8 @@ public MCPUpsertBatchItem applyPatch( } try { - builder.aspect(aspectTemplateEngine.applyPatch(currentValue, getPatch(), getAspectSpec())); + builder.recordTemplate( + aspectTemplateEngine.applyPatch(currentValue, getPatch(), getAspectSpec())); } catch (JsonProcessingException | JsonPatchException e) { throw new RuntimeException(e); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java index 89209c44f10c77..b9d5f24e7ce084 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/batch/MCPUpsertBatchItem.java @@ -58,7 +58,7 @@ public static MCPUpsertBatchItem fromPatch( recordTemplate != null ? recordTemplate : genericPatchTemplate.getDefault(); try { - builder.aspect(genericPatchTemplate.applyPatch(currentValue)); + builder.recordTemplate(genericPatchTemplate.applyPatch(currentValue)); } catch (JsonPatchException | IOException e) { throw new RuntimeException(e); } @@ -72,7 +72,7 @@ public static MCPUpsertBatchItem fromPatch( // aspectName name of the aspect being inserted @Nonnull private final String aspectName; - @Nonnull private final RecordTemplate aspect; + @Nonnull private final RecordTemplate recordTemplate; @Nonnull private final SystemMetadata systemMetadata; @@ -104,7 +104,7 @@ public void applyMutationHooks( entitySpec, aspectSpec, oldAspectValue, - aspect, + recordTemplate, oldSystemMetadata, systemMetadata, auditStamp, @@ -116,7 +116,7 @@ public void applyMutationHooks( public SystemAspect toLatestEntityAspect() { EntityAspect latest = new EntityAspect(); latest.setAspect(getAspectName()); - latest.setMetadata(EntityUtils.toJsonAspect(getAspect())); + latest.setMetadata(EntityUtils.toJsonAspect(getRecordTemplate())); latest.setUrn(getUrn().toString()); latest.setVersion(ASPECT_LATEST_VERSION); latest.setCreatedOn(new Timestamp(auditStamp.getTime())); @@ -135,7 +135,7 @@ public void validatePreCommit( .getAspectPayloadValidators( getChangeType(), entitySpec.getName(), aspectSpec.getName())) { validator.validatePreCommit( - getChangeType(), urn, getAspectSpec(), previous, this.aspect, aspectRetriever); + getChangeType(), urn, getAspectSpec(), previous, this.recordTemplate, aspectRetriever); } } @@ -167,13 +167,13 @@ public MCPUpsertBatchItem build(AspectRetriever aspectRetriever) { this.entitySpec, this.aspectSpec, this.urn, - this.aspect, + this.recordTemplate, aspectRetriever); return new MCPUpsertBatchItem( this.urn, this.aspectName, - this.aspect, + this.recordTemplate, SystemMetadataUtils.generateSystemMetadataIfEmpty(this.systemMetadata), this.auditStamp, this.metadataChangeProposal, @@ -213,7 +213,7 @@ public static MCPUpsertBatchItem build( SystemMetadataUtils.generateSystemMetadataIfEmpty(mcp.getSystemMetadata())) .metadataChangeProposal(mcp) .auditStamp(auditStamp) - .aspect(convertToRecordTemplate(mcp, aspectSpec)) + .recordTemplate(convertToRecordTemplate(mcp, aspectSpec)) .build(aspectRetriever); } @@ -258,12 +258,12 @@ public boolean equals(Object o) { return urn.equals(that.urn) && aspectName.equals(that.aspectName) && Objects.equals(systemMetadata, that.systemMetadata) - && aspect.equals(that.aspect); + && recordTemplate.equals(that.recordTemplate); } @Override public int hashCode() { - return Objects.hash(urn, aspectName, systemMetadata, aspect); + return Objects.hash(urn, aspectName, systemMetadata, recordTemplate); } @Override @@ -276,8 +276,8 @@ public String toString() { + '\'' + ", systemMetadata=" + systemMetadata - + ", aspect=" - + aspect + + ", recordTemplate=" + + recordTemplate + '}'; } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java index af0f537de86292..4152122c381dab 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathUtils.java @@ -29,8 +29,7 @@ public class BrowsePathUtils { public static String getDefaultBrowsePath( @Nonnull Urn urn, @Nonnull EntityRegistry entityRegistry, - @Nonnull Character dataPlatformDelimiter) - throws URISyntaxException { + @Nonnull Character dataPlatformDelimiter) { switch (urn.getEntityType()) { case Constants.DATASET_ENTITY_NAME: diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathV2Utils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathV2Utils.java index 961167663e11f7..a531c268ed7d29 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathV2Utils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/BrowsePathV2Utils.java @@ -16,7 +16,6 @@ import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.EntityKeyUtils; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -46,9 +45,8 @@ public static BrowsePathsV2 getDefaultBrowsePathV2( @Nonnull Urn urn, @Nonnull EntityRegistry entityRegistry, @Nonnull Character dataPlatformDelimiter, - @Nonnull EntityService entityService, - boolean useContainerPaths) - throws URISyntaxException { + @Nonnull EntityService entityService, + boolean useContainerPaths) { BrowsePathsV2 result = new BrowsePathsV2(); BrowsePathEntryArray browsePathEntries = new BrowsePathEntryArray(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java index ee2d794471f6be..86ed3fc073efd2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java @@ -158,8 +158,8 @@ private void handleUpdateChangeEvent(@Nonnull final MCLBatchItem event) throws I final AspectSpec aspectSpec = event.getAspectSpec(); final Urn urn = event.getUrn(); - RecordTemplate aspect = event.getAspect(); - RecordTemplate previousAspect = event.getPreviousAspect(); + RecordTemplate aspect = event.getRecordTemplate(); + RecordTemplate previousAspect = event.getPreviousRecordTemplate(); // Step 0. If the aspect is timeseries, add to its timeseries index. if (aspectSpec.isTimeseries()) { @@ -262,7 +262,7 @@ private void handleDeleteChangeEvent(@Nonnull final MCLBatchItem event) { urn.getEntityType(), event.getAspectName())); } - RecordTemplate aspect = event.getAspect(); + RecordTemplate aspect = event.getRecordTemplate(); Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); if (!aspectSpec.isTimeseries()) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java index 451b7327224986..72bbc794171ff9 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java @@ -38,7 +38,7 @@ public static Map ingestCorpUserKeyAspects( MCPUpsertBatchItem.builder() .urn(urn) .aspectName(aspectName) - .aspect(aspect) + .recordTemplate(aspect) .auditStamp(AspectGenerationUtils.createAuditStamp()) .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService)); @@ -68,7 +68,7 @@ public static Map ingestCorpUserInfoAspects( MCPUpsertBatchItem.builder() .urn(urn) .aspectName(aspectName) - .aspect(aspect) + .recordTemplate(aspect) .auditStamp(AspectGenerationUtils.createAuditStamp()) .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService)); @@ -99,7 +99,7 @@ public static Map ingestChartInfoAspects( MCPUpsertBatchItem.builder() .urn(urn) .aspectName(aspectName) - .aspect(aspect) + .recordTemplate(aspect) .auditStamp(AspectGenerationUtils.createAuditStamp()) .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService)); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtilTest.java similarity index 60% rename from metadata-io/src/test/java/com/linkedin/metadata/AspectUtilsTest.java rename to metadata-io/src/test/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtilTest.java index 258b40cac63715..cd5673c8e4d639 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/AspectUtilsTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/utils/DefaultAspectsUtilTest.java @@ -1,31 +1,36 @@ -package com.linkedin.metadata; +package com.linkedin.metadata.aspect.utils; import static org.mockito.Mockito.*; +import com.linkedin.common.AuditStamp; import com.linkedin.common.FabricType; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DatasetUrn; -import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.EbeanTestUtils; +import com.linkedin.metadata.aspect.batch.MCPBatchItem; +import com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder; import com.linkedin.metadata.config.PreProcessHooks; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityServiceImpl; import com.linkedin.metadata.entity.TestEntityRegistry; import com.linkedin.metadata.entity.ebean.EbeanAspectDao; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.metadata.entity.ebean.batch.MCPPatchBatchItem; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistryException; import com.linkedin.metadata.models.registry.MergedEntityRegistry; import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import io.ebean.Database; import java.util.List; +import java.util.stream.Collectors; + import org.testng.Assert; import org.testng.annotations.Test; -public class AspectUtilsTest { +public class DefaultAspectsUtilTest { protected final EntityRegistry _snapshotEntityRegistry = new TestEntityRegistry(); protected final EntityRegistry _configEntityRegistry = @@ -34,11 +39,11 @@ public class AspectUtilsTest { protected final EntityRegistry _testEntityRegistry = new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry); - public AspectUtilsTest() throws EntityRegistryException {} + public DefaultAspectsUtilTest() throws EntityRegistryException {} @Test public void testAdditionalChanges() { - Database server = EbeanTestUtils.createTestServer(AspectUtilsTest.class.getSimpleName()); + Database server = EbeanTestUtils.createTestServer(DefaultAspectsUtilTest.class.getSimpleName()); EbeanAspectDao aspectDao = new EbeanAspectDao(server); aspectDao.setConnectionValidated(true); EventProducer mockProducer = mock(EventProducer.class); @@ -48,17 +53,21 @@ public void testAdditionalChanges() { new EntityServiceImpl( aspectDao, mockProducer, _testEntityRegistry, true, null, preProcessHooks); - MetadataChangeProposal proposal1 = new MetadataChangeProposal(); - proposal1.setEntityUrn( - new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD)); - proposal1.setAspectName("datasetProperties"); - DatasetProperties datasetProperties = new DatasetProperties().setName("name"); - proposal1.setAspect(GenericRecordUtils.serializeAspect(datasetProperties)); - proposal1.setEntityType("dataset"); - proposal1.setChangeType(ChangeType.PATCH); + MetadataChangeProposal proposal1 = + new DatasetPropertiesPatchBuilder() + .urn(new DatasetUrn(new DataPlatformUrn("platform"), "name", FabricType.PROD)) + .setDescription("something") + .setName("name") + .addCustomProperty("prop1", "propVal1") + .addCustomProperty("prop2", "propVal2") + .build(); + + Assert.assertEquals(proposal1.getChangeType(), ChangeType.PATCH); List proposalList = - AspectUtils.getAdditionalChanges(proposal1, entityServiceImpl); + DefaultAspectsUtil.getAdditionalChanges( + AspectsBatchImpl.builder().mcps(List.of(proposal1), new AuditStamp(), entityServiceImpl).build(), + entityServiceImpl).stream().map(MCPBatchItem::getMetadataChangeProposal).collect(Collectors.toList()); // proposals for key aspect, browsePath, browsePathV2, dataPlatformInstance Assert.assertEquals(proposalList.size(), 4); Assert.assertEquals(proposalList.get(0).getChangeType(), ChangeType.UPSERT); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index c45306e5f022bb..5d26a55fbf285d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -121,21 +121,21 @@ public void testIngestListLatestAspects() throws AssertionError { MCPUpsertBatchItem.builder() .urn(entityUrn1) .aspectName(aspectName) - .aspect(writeAspect1) + .recordTemplate(writeAspect1) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl), MCPUpsertBatchItem.builder() .urn(entityUrn2) .aspectName(aspectName) - .aspect(writeAspect2) + .recordTemplate(writeAspect2) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl), MCPUpsertBatchItem.builder() .urn(entityUrn3) .aspectName(aspectName) - .aspect(writeAspect3) + .recordTemplate(writeAspect3) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl)); @@ -190,21 +190,21 @@ public void testIngestListUrns() throws AssertionError { MCPUpsertBatchItem.builder() .urn(entityUrn1) .aspectName(aspectName) - .aspect(writeAspect1) + .recordTemplate(writeAspect1) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl), MCPUpsertBatchItem.builder() .urn(entityUrn2) .aspectName(aspectName) - .aspect(writeAspect2) + .recordTemplate(writeAspect2) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl), MCPUpsertBatchItem.builder() .urn(entityUrn3) .aspectName(aspectName) - .aspect(writeAspect3) + .recordTemplate(writeAspect3) .systemMetadata(metadata1) .auditStamp(TEST_AUDIT_STAMP) .build(_entityServiceImpl)); @@ -311,6 +311,12 @@ public void multiThreadingTest() { Set> additions = actualAspectIds.stream() .filter(id -> !generatedAspectIds.contains(id)) + // Exclude default aspects + .filter( + id -> + !Set.of("browsePaths", "browsePathsV2", "dataPlatformInstance") + .contains(id.getMiddle())) + .filter(id -> !id.getMiddle().endsWith("Key")) .collect(Collectors.toSet()); assertEquals( additions.size(), 0, String.format("Expected no additional aspects. Found: %s", additions)); @@ -361,6 +367,12 @@ public void singleThreadingTest() { Set> additions = actualAspectIds.stream() .filter(id -> !generatedAspectIds.contains(id)) + // Exclude default aspects + .filter( + id -> + !Set.of("browsePaths", "browsePathsV2", "dataPlatformInstance") + .contains(id.getMiddle())) + .filter(id -> !id.getMiddle().endsWith("Key")) .collect(Collectors.toSet()); assertEquals( additions.size(), 0, String.format("Expected no additional aspects. Found: %s", additions)); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index db749f3575a064..58e3bfd6d2f51e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -108,6 +108,8 @@ public abstract class EntityServiceTest captor = ArgumentCaptor.forClass(MetadataChangeLog.class); - verify(_mockProducer, times(1)) - .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), captor.capture()); + ArgumentCaptor aspectSpecCaptor = ArgumentCaptor.forClass(AspectSpec.class); + verify(_mockProducer, times(5)) + .produceMetadataChangeLog( + Mockito.eq(entityUrn), aspectSpecCaptor.capture(), captor.capture()); assertEquals(UI_SOURCE, captor.getValue().getSystemMetadata().getProperties().get(APP_SOURCE)); + assertEquals( + aspectSpecCaptor.getAllValues().stream() + .map(AspectSpec::getName) + .collect(Collectors.toSet()), + Set.of( + "browsePathsV2", + "editableDatasetProperties", + "browsePaths", + "dataPlatformInstance", + "datasetKey")); } @Test @@ -1673,12 +1688,17 @@ public void testStructuredPropertyIngestProposal() throws Exception { genericAspect.setContentType("application/json"); gmce.setAspect(genericAspect); _entityServiceImpl.ingestProposal(gmce, TEST_AUDIT_STAMP, false); + ArgumentCaptor captor = ArgumentCaptor.forClass(MetadataChangeLog.class); verify(_mockProducer, times(1)) - .produceMetadataChangeLog(Mockito.eq(firstPropertyUrn), Mockito.any(), captor.capture()); + .produceMetadataChangeLog( + Mockito.eq(firstPropertyUrn), + Mockito.eq(structuredPropertiesDefinitionAspect), + captor.capture()); assertEquals( _entityServiceImpl.getAspect(firstPropertyUrn, definitionAspectName, 0), structuredPropertyDefinition); + Urn secondPropertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:secondStructuredProperty"); assertNull(_entityServiceImpl.getAspect(secondPropertyUrn, definitionAspectName, 0)); assertEquals( @@ -1752,7 +1772,9 @@ public void testStructuredPropertyIngestProposal() throws Exception { ArgumentCaptor.forClass(MetadataChangeLog.class); verify(_mockProducer, times(1)) .produceMetadataChangeLog( - Mockito.eq(secondPropertyUrn), Mockito.any(), secondCaptor.capture()); + Mockito.eq(secondPropertyUrn), + Mockito.eq(structuredPropertiesDefinitionAspect), + secondCaptor.capture()); assertEquals( _entityServiceImpl.getAspect(firstPropertyUrn, definitionAspectName, 0), structuredPropertyDefinition); diff --git a/metadata-io/src/test/java/io/datahubproject/test/DataGenerator.java b/metadata-io/src/test/java/io/datahubproject/test/DataGenerator.java index 29c64abdc4d0d0..81856c98ca3499 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/DataGenerator.java +++ b/metadata-io/src/test/java/io/datahubproject/test/DataGenerator.java @@ -10,15 +10,19 @@ import com.linkedin.common.urn.GlossaryTermUrn; import com.linkedin.common.urn.TagUrn; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.glossary.GlossaryTermInfo; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.batch.MCPBatchItem; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.entity.AspectDao; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.EntityServiceImpl; +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; +import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -51,11 +55,17 @@ public class DataGenerator { private static final Faker FAKER = new Faker(); private final EntityRegistry entityRegistry; - private final EntityService entityService; + private final EntityService entityService; + private final boolean generateDefaultAspects; - public DataGenerator(EntityService entityService) { + public DataGenerator(EntityService entityService) { + this(entityService, false); + } + + public DataGenerator(EntityService entityService, Boolean generateDefaultAspects) { this.entityService = entityService; this.entityRegistry = entityService.getEntityRegistry(); + this.generateDefaultAspects = generateDefaultAspects != null ? generateDefaultAspects : false; } public static DataGenerator build(EntityRegistry entityRegistry) { @@ -81,10 +91,15 @@ public List generateTags(long count) { public Stream> generateMCPs( String entityName, long count, List aspects) { EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + AuditStamp auditStamp = + new AuditStamp() + .setActor(UrnUtils.getUrn(Constants.DATAHUB_ACTOR)) + .setTime(System.currentTimeMillis()); // Prevent duplicate tags and terms generated as secondary entities Set secondaryUrns = new HashSet<>(); + // Expand with default aspects per normal return LongStream.range(0, count) .mapToObj( idx -> { @@ -145,11 +160,19 @@ public Stream> generateMCPs( }) .map( mcp -> { - // Expand with default aspects per normal - return Stream.concat( - Stream.of(mcp), - AspectUtils.getAdditionalChanges(mcp, entityService, true).stream()) - .collect(Collectors.toList()); + if (generateDefaultAspects) { + // Expand with default aspects instead of relying on default generation + return Stream.concat( + Stream.of(mcp), + DefaultAspectsUtil.getAdditionalChanges( + AspectsBatchImpl.builder() + .mcps(List.of(mcp), auditStamp, entityService).build(), + entityService).stream() + .map(MCPBatchItem::getMetadataChangeProposal)) + .collect(Collectors.toList()); + } else { + return List.of(mcp); + } }); } diff --git a/metadata-models-custom/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/CustomDataQualityRulesMCPSideEffect.java b/metadata-models-custom/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/CustomDataQualityRulesMCPSideEffect.java index d2041c443503ed..c21b64c8a4fc00 100644 --- a/metadata-models-custom/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/CustomDataQualityRulesMCPSideEffect.java +++ b/metadata-models-custom/src/main/java/com/linkedin/metadata/aspect/plugins/hooks/CustomDataQualityRulesMCPSideEffect.java @@ -24,7 +24,7 @@ protected Stream applyMCPSideEffect( MCPUpsertBatchItem.builder() .urn(mirror) .aspectName(input.getAspectName()) - .aspect(input.getAspect()) + .recordTemplate(input.getRecordTemplate()) .auditStamp(input.getAuditStamp()) .systemMetadata(input.getSystemMetadata()) .build(aspectRetriever)); diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java index e072a59ae77ffd..0d1da4a7687bae 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java @@ -10,7 +10,6 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.key.DataHubAccessTokenKey; @@ -20,12 +19,11 @@ import java.util.Base64; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -146,15 +144,8 @@ public String generateAccessToken( final AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp().setActor(UrnUtils.getUrn(actorUrn)); - Stream proposalStream = - Stream.concat( - Stream.of(proposal), - AspectUtils.getAdditionalChanges(proposal, _entityService).stream()); - _entityService.ingestProposal( - AspectsBatchImpl.builder() - .mcps(proposalStream.collect(Collectors.toList()), auditStamp, _entityService) - .build(), + AspectsBatchImpl.builder().mcps(List.of(proposal), auditStamp, _entityService).build(), false); return accessToken; diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java index 80e139dcd5c65b..49a86406c1ecd5 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java @@ -9,6 +9,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.boot.UpgradeStep; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.query.filter.Condition; @@ -128,7 +129,8 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S } private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception { - BrowsePathsV2 browsePathsV2 = _entityService.buildDefaultBrowsePathV2(urn, true); + BrowsePathsV2 browsePathsV2 = + DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService); log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2)); MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(urn); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java index 716ae292338ed2..19efa5e9c4de20 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java @@ -79,7 +79,7 @@ public void execute() throws Exception { MCPUpsertBatchItem.builder() .urn(urn) .aspectName(DATA_PLATFORM_INSTANCE_ASPECT_NAME) - .aspect(dataPlatformInstance.get()) + .recordTemplate(dataPlatformInstance.get()) .auditStamp(aspectAuditStamp) .build(_entityService)); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java index 89ed493e162ccf..d2bb61ad7ade5d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java @@ -86,7 +86,7 @@ public void execute() throws IOException, URISyntaxException { return MCPUpsertBatchItem.builder() .urn(urn) .aspectName(PLATFORM_ASPECT_NAME) - .aspect(info) + .recordTemplate(info) .auditStamp( new AuditStamp() .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)) diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java index 3eedbb48aaecaf..f28e9ad4e9ed8c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java @@ -9,6 +9,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; import com.linkedin.metadata.boot.UpgradeStep; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ListResult; @@ -126,7 +127,7 @@ private int getAndMigrateBrowsePaths(String entityType, int start, AuditStamp au } private void migrateBrowsePath(Urn urn, AuditStamp auditStamp) throws Exception { - BrowsePaths newPaths = _entityService.buildDefaultBrowsePath(urn); + BrowsePaths newPaths = DefaultAspectsUtil.buildDefaultBrowsePath(urn, _entityService); log.debug(String.format("Updating browse path for urn %s to value %s", urn, newPaths)); MetadataChangeProposal proposal = new MetadataChangeProposal(); proposal.setEntityUrn(urn); diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java index 06571415620893..0858736e39021a 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java @@ -1,12 +1,10 @@ package com.linkedin.metadata.boot.steps; import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME; +import static org.mockito.ArgumentMatchers.any; import com.google.common.collect.ImmutableList; import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePathEntry; -import com.linkedin.common.BrowsePathEntryArray; -import com.linkedin.common.BrowsePathsV2; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.entity.Aspect; @@ -93,9 +91,9 @@ public void testExecuteNoExistingBrowsePaths() throws Exception { Mockito.verify(mockSearchService, Mockito.times(9)) .scrollAcrossEntities( - Mockito.any(), + any(), Mockito.eq("*"), - Mockito.any(Filter.class), + any(Filter.class), Mockito.eq(null), Mockito.eq(null), Mockito.eq("5m"), @@ -104,8 +102,7 @@ public void testExecuteNoExistingBrowsePaths() throws Exception { // Verify that 11 aspects are ingested, 2 for the upgrade request / result, 9 for ingesting 1 of // each entity type Mockito.verify(mockService, Mockito.times(11)) - .ingestProposal( - Mockito.any(MetadataChangeProposal.class), Mockito.any(), Mockito.eq(false)); + .ingestProposal(any(MetadataChangeProposal.class), any(), Mockito.eq(false)); } @Test @@ -135,9 +132,7 @@ public void testDoesNotRunWhenAlreadyExecuted() throws Exception { Mockito.verify(mockService, Mockito.times(0)) .ingestProposal( - Mockito.any(MetadataChangeProposal.class), - Mockito.any(AuditStamp.class), - Mockito.anyBoolean()); + any(MetadataChangeProposal.class), any(AuditStamp.class), Mockito.anyBoolean()); } private EntityService initMockService() throws URISyntaxException { @@ -146,16 +141,9 @@ private EntityService initMockService() throws URISyntaxException { Mockito.when(mockService.getEntityRegistry()).thenReturn(registry); for (int i = 0; i < ENTITY_TYPES.size(); i++) { - Mockito.when( - mockService.buildDefaultBrowsePathV2( - Mockito.eq(ENTITY_URNS.get(i)), Mockito.eq(true))) - .thenReturn( - new BrowsePathsV2() - .setPath(new BrowsePathEntryArray(new BrowsePathEntry().setId("test")))); - Mockito.when( mockService.getEntityV2( - Mockito.any(), + any(), Mockito.eq(ENTITY_URNS.get(i)), Mockito.eq(Collections.singleton(CONTAINER_ASPECT_NAME)))) .thenReturn(null); @@ -172,7 +160,7 @@ private SearchService initMockSearchService() { mockSearchService.scrollAcrossEntities( Mockito.eq(ImmutableList.of(ENTITY_TYPES.get(i))), Mockito.eq("*"), - Mockito.any(Filter.class), + any(Filter.class), Mockito.eq(null), Mockito.eq(null), Mockito.eq("5m"), diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java index 1ac0f2f4f914a6..5617d7e9714b08 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java @@ -122,7 +122,7 @@ public void testExecuteWhenSomeEntitiesShouldReceiveDataPlatformInstance() throw item.getUrn().getEntityType().equals("chart") && item.getAspectName() .equals(DATA_PLATFORM_INSTANCE_ASPECT_NAME) - && ((MCPUpsertBatchItem) item).getAspect() + && ((MCPUpsertBatchItem) item).getRecordTemplate() instanceof DataPlatformInstance)), anyBoolean(), anyBoolean()); @@ -136,7 +136,7 @@ public void testExecuteWhenSomeEntitiesShouldReceiveDataPlatformInstance() throw item.getUrn().getEntityType().equals("chart") && item.getAspectName() .equals(DATA_PLATFORM_INSTANCE_ASPECT_NAME) - && ((MCPUpsertBatchItem) item).getAspect() + && ((MCPUpsertBatchItem) item).getRecordTemplate() instanceof DataPlatformInstance)), anyBoolean(), anyBoolean()); diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java index 024ad7b16a8447..605d9d1c5e5d89 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java @@ -107,10 +107,6 @@ public void testExecuteFirstTime() throws Exception { final EntityService mockService = Mockito.mock(EntityService.class); final EntityRegistry registry = new TestEntityRegistry(); Mockito.when(mockService.getEntityRegistry()).thenReturn(registry); - Mockito.when(mockService.buildDefaultBrowsePath(Mockito.eq(testUrn1))) - .thenReturn(new BrowsePaths().setPaths(new StringArray(ImmutableList.of("/prod/kafka")))); - Mockito.when(mockService.buildDefaultBrowsePath(Mockito.eq(testUrn2))) - .thenReturn(new BrowsePaths().setPaths(new StringArray(ImmutableList.of("/prod/kafka")))); final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); Mockito.when( diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java index a7e88966e4f874..13d2e501abf09f 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java @@ -26,7 +26,6 @@ import com.linkedin.entity.Aspect; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.batch.AspectsBatch; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.RollbackRunResult; @@ -452,20 +451,12 @@ public static Pair ingestProposal( .setTime(System.currentTimeMillis()) .setActor(UrnUtils.getUrn(actorUrn)); - final List additionalChanges = - AspectUtils.getAdditionalChanges(serviceProposal, entityService); - log.info("Proposal: {}", serviceProposal); Throwable exceptionally = null; try { - Stream proposalStream = - Stream.concat( - Stream.of(serviceProposal), - AspectUtils.getAdditionalChanges(serviceProposal, entityService).stream()); - AspectsBatch batch = AspectsBatchImpl.builder() - .mcps(proposalStream.collect(Collectors.toList()), auditStamp, entityService) + .mcps(List.of(serviceProposal), auditStamp, entityService) .build(); Set proposalResult = entityService.ingestProposal(batch, async); diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java index 503330fdc8a2e5..44202c20ca6db7 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/EntityController.java @@ -481,7 +481,7 @@ private UpsertItem toUpsertItem( .urn(entityUrn) .aspectName(aspectSpec.getName()) .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) - .aspect( + .recordTemplate( GenericRecordUtils.deserializeAspect( ByteString.copyString(jsonAspect, StandardCharsets.UTF_8), GenericRecordUtils.JSON, diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index ffa3abe6806f99..21a9f47a13f738 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -248,20 +248,9 @@ public Task ingestProposal( return RestliUtil.toTask(() -> { log.debug("Proposal: {}", metadataChangeProposal); try { - final AspectsBatch batch; - if (asyncBool) { - // if async we'll expand the getAdditionalChanges later, no need to do this early - batch = AspectsBatchImpl.builder() - .mcps(List.of(metadataChangeProposal), auditStamp, _entityService) - .build(); - } else { - Stream proposalStream = Stream.concat(Stream.of(metadataChangeProposal), - AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService).stream()); - - batch = AspectsBatchImpl.builder() - .mcps(proposalStream.collect(Collectors.toList()), auditStamp, _entityService) - .build(); - } + final AspectsBatch batch = AspectsBatchImpl.builder() + .mcps(List.of(metadataChangeProposal), auditStamp, _entityService) + .build(); Set results = _entityService.ingestProposal(batch, asyncBool); diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index d6130e05b77bd3..19c5b42d3c9c0d 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -84,7 +84,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException { MCPUpsertBatchItem req = MCPUpsertBatchItem.builder() .urn(urn) .aspectName(mcp.getAspectName()) - .aspect(mcp.getAspect()) + .recordTemplate(mcp.getAspect()) .auditStamp(new AuditStamp()) .metadataChangeProposal(mcp) .build(_entityService); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java index 2c1596474fb21e..55373730e7b673 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java @@ -1,7 +1,6 @@ package com.linkedin.metadata.entity; import com.datahub.authentication.Authentication; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; @@ -12,18 +11,12 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; -import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTimeUtils; @@ -33,66 +26,6 @@ public class AspectUtils { private AspectUtils() {} - public static final Set SUPPORTED_TYPES = - Set.of(ChangeType.UPSERT, ChangeType.CREATE, ChangeType.PATCH); - - public static List getAdditionalChanges( - @Nonnull MetadataChangeProposal metadataChangeProposal, - @Nonnull EntityService entityService, - boolean onPrimaryKeyInsertOnly) { - - // No additional changes for unsupported operations - if (!SUPPORTED_TYPES.contains(metadataChangeProposal.getChangeType())) { - return Collections.emptyList(); - } - - final Urn urn = - EntityKeyUtils.getUrnFromProposal( - metadataChangeProposal, - entityService.getKeyAspectSpec(metadataChangeProposal.getEntityType())); - - final Map includedAspects; - if (metadataChangeProposal.getChangeType() != ChangeType.PATCH) { - RecordTemplate aspectRecord = - GenericRecordUtils.deserializeAspect( - metadataChangeProposal.getAspect().getValue(), - metadataChangeProposal.getAspect().getContentType(), - entityService - .getEntityRegistry() - .getEntitySpec(urn.getEntityType()) - .getAspectSpec(metadataChangeProposal.getAspectName())); - includedAspects = ImmutableMap.of(metadataChangeProposal.getAspectName(), aspectRecord); - } else { - includedAspects = ImmutableMap.of(); - } - - if (onPrimaryKeyInsertOnly) { - return entityService - .generateDefaultAspectsOnFirstWrite(urn, includedAspects) - .getValue() - .stream() - .map( - entry -> - getProposalFromAspect(entry.getKey(), entry.getValue(), metadataChangeProposal)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } else { - return entityService.generateDefaultAspectsIfMissing(urn, includedAspects).stream() - .map( - entry -> - getProposalFromAspect(entry.getKey(), entry.getValue(), metadataChangeProposal)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } - } - - public static List getAdditionalChanges( - @Nonnull MetadataChangeProposal metadataChangeProposal, - @Nonnull EntityService entityService) { - - return getAdditionalChanges(metadataChangeProposal, entityService, false); - } - public static Map batchGetLatestAspect( String entity, Set urns, @@ -112,40 +45,6 @@ public static Map batchGetLatestAspect( return finalResult; } - private static MetadataChangeProposal getProposalFromAspect( - String aspectName, RecordTemplate aspect, MetadataChangeProposal original) { - MetadataChangeProposal proposal = new MetadataChangeProposal(); - GenericAspect genericAspect = GenericRecordUtils.serializeAspect(aspect); - // Set net new fields - proposal.setAspect(genericAspect); - proposal.setAspectName(aspectName); - - // Set fields determined from original - // Additional changes should never be set as PATCH, if a PATCH is coming across it should be an - // UPSERT - proposal.setChangeType(original.getChangeType()); - if (ChangeType.PATCH.equals(proposal.getChangeType())) { - proposal.setChangeType(ChangeType.UPSERT); - } - - if (original.getSystemMetadata() != null) { - proposal.setSystemMetadata(original.getSystemMetadata()); - } - if (original.getEntityUrn() != null) { - proposal.setEntityUrn(original.getEntityUrn()); - } - if (original.getEntityKeyAspect() != null) { - proposal.setEntityKeyAspect(original.getEntityKeyAspect()); - } - if (original.getAuditHeader() != null) { - proposal.setAuditHeader(original.getAuditHeader()); - } - - proposal.setEntityType(original.getEntityType()); - - return proposal; - } - public static MetadataChangeProposal buildMetadataChangeProposal( @Nonnull Urn urn, @Nonnull String aspectName, @Nonnull RecordTemplate aspect) { final MetadataChangeProposal proposal = new MetadataChangeProposal(); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java index 94ab69e895920f..d9b0f4b73d5805 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -1,8 +1,6 @@ package com.linkedin.metadata.entity; import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePaths; -import com.linkedin.common.BrowsePathsV2; import com.linkedin.common.VersionedUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; @@ -255,34 +253,6 @@ Optional getAspectSpec( String getKeyAspectName(@Nonnull final Urn urn); - /** - * Generate default aspects if not present in the database. - * - * @param urn entity urn - * @param includedAspects aspects being written - * @return additional aspects to be written - */ - List> generateDefaultAspectsIfMissing( - @Nonnull final Urn urn, Map includedAspects); - - /** - * Generate default aspects if the entity key aspect is NOT in the database **AND** the key aspect - * is being written, present in `includedAspects`. - * - *

Does not automatically create key aspects. - * - * @see EntityService#generateDefaultAspectsIfMissing if key aspects need autogeneration - *

This version is more efficient in that it only generates additional writes when a new - * entity is being minted for the first time. The drawback is that it will not automatically - * add key aspects, in case the producer is not bothering to ensure that the entity exists - * before writing non-key aspects. - * @param urn entity urn - * @param includedAspects aspects being written - * @return whether key aspect exists in database and the additional aspects to be written - */ - Pair>> generateDefaultAspectsOnFirstWrite( - @Nonnull final Urn urn, Map includedAspects); - AspectSpec getKeyAspectSpec(@Nonnull final String entityName); Set getEntityAspectNames(final String entityName); @@ -339,17 +309,5 @@ default boolean exists(@Nonnull Urn urn, boolean includeSoftDelete) { void setWritable(boolean canWrite); - BrowsePaths buildDefaultBrowsePath(final @Nonnull Urn urn) throws URISyntaxException; - - /** - * Builds the default browse path V2 aspects for all entities. - * - *

This method currently supports datasets, charts, dashboards, and data jobs best. Everything - * else will have a basic "Default" folder added to their browsePathV2. - */ - @Nonnull - BrowsePathsV2 buildDefaultBrowsePathV2(final @Nonnull Urn urn, boolean useContainerPaths) - throws URISyntaxException; - RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName); }