Skip to content

Commit

Permalink
fix(dataProduct): reduce write fan-out for unset side effect (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien authored Nov 26, 2024
1 parent 90fe14a commit 32ef389
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -70,6 +71,7 @@ private static Stream<MCPItem> generatePatchRemove(
log.error("Unable to process data product properties for urn: {}", mclItem.getUrn());
return Stream.empty();
}
Map<String, List<GenericJsonPatch.PatchOp>> patchOpMap = new HashMap<>();
for (DataProductAssociation dataProductAssociation :
Optional.ofNullable(dataProductProperties.getAssets())
.orElse(new DataProductAssociationArray())) {
Expand All @@ -93,40 +95,45 @@ private static Stream<MCPItem> generatePatchRemove(
if (!result.getEntities().isEmpty()) {
for (RelatedEntities entity : result.getEntities()) {
if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.REMOVE.getValue());
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
mcpItems.add(
PatchItemImpl.builder()
.urn(UrnUtils.getUrn(entity.getSourceUrn()))
.entitySpec(
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(List.of(patchOp))
.build()
.getJsonPatch())
.auditStamp(mclItem.getAuditStamp())
.systemMetadata(mclItem.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
patchOpMap
.computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>())
.add(patchOp);
}
}
}
}
for (String urn : patchOpMap.keySet()) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
mcpItems.add(
PatchItemImpl.builder()
.urn(UrnUtils.getUrn(urn))
.entitySpec(
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(patchOpMap.get(urn))
.build()
.getJsonPatch())
.auditStamp(mclItem.getAuditStamp())
.systemMetadata(mclItem.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
}
return mcpItems.stream();
}
return Stream.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.test.metadata.aspect.TestEntityRegistry;
import io.datahubproject.metadata.context.RetrieverContext;
import jakarta.json.JsonArray;
import jakarta.json.JsonObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -251,6 +253,111 @@ public void testDPRemoveOld() {
.build(mockAspectRetriever.getEntityRegistry())));
}

@Test
public void testBulkAssetMove() {
DataProductUnsetSideEffect test = new DataProductUnsetSideEffect();
test.setConfig(TEST_PLUGIN_CONFIG);

// Create 100 dataset URNs and set up their existing relationships
List<Urn> datasetUrns = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Urn datasetUrn =
UrnUtils.getUrn(
String.format("urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_%d,PROD)", i));
datasetUrns.add(datasetUrn);

// Mock the existing relationship for each dataset with the old data product
RelatedEntities relatedEntities =
new RelatedEntities(
"DataProductContains",
TEST_PRODUCT_URN_2.toString(), // Old data product
datasetUrn.toString(),
RelationshipDirection.INCOMING,
null);

List<RelatedEntities> relatedEntitiesList = new ArrayList<>();
relatedEntitiesList.add(relatedEntities);
RelatedEntitiesScrollResult relatedEntitiesScrollResult =
new RelatedEntitiesScrollResult(1, 10, null, relatedEntitiesList);

when(retrieverContext
.getGraphRetriever()
.scrollRelatedEntities(
eq(null),
eq(QueryUtils.newFilter("urn", datasetUrn.toString())),
eq(null),
eq(EMPTY_FILTER),
eq(ImmutableList.of("DataProductContains")),
eq(
QueryUtils.newRelationshipFilter(
EMPTY_FILTER, RelationshipDirection.INCOMING)),
eq(Collections.emptyList()),
eq(null),
eq(10),
eq(null),
eq(null)))
.thenReturn(relatedEntitiesScrollResult);
}

// Create data product properties with all 100 assets
DataProductProperties dataProductProperties = new DataProductProperties();
DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();
for (Urn datasetUrn : datasetUrns) {
DataProductAssociation association = new DataProductAssociation();
association.setDestinationUrn(datasetUrn);
dataProductAssociations.add(association);
}
dataProductProperties.setAssets(dataProductAssociations);

// Run test
ChangeItemImpl dataProductPropertiesChangeItem =
ChangeItemImpl.builder()
.urn(TEST_PRODUCT_URN) // New data product
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.recordTemplate(dataProductProperties)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(mockAspectRetriever);

List<MCPItem> testOutput =
test.postMCPSideEffect(
List.of(
MCLItemImpl.builder()
.build(
dataProductPropertiesChangeItem,
null,
null,
retrieverContext.getAspectRetriever())),
retrieverContext)
.toList();

// Verify test
assertEquals(testOutput.size(), 1, "Expected one patch to remove assets from old data product");

MCPItem patchItem = testOutput.get(0);
assertEquals(
patchItem.getUrn(), TEST_PRODUCT_URN_2, "Patch should target the old data product");
assertEquals(patchItem.getAspectName(), DATA_PRODUCT_PROPERTIES_ASPECT_NAME);

// Verify the patch contains remove operations for all 100 assets
JsonArray patchArray = ((PatchItemImpl) patchItem).getPatch().toJsonArray();
assertEquals(patchArray.size(), 100, "Should have 100 remove operations");

// Verify each remove operation
for (int i = 0; i < 100; i++) {
JsonObject op = patchArray.getJsonObject(i);
assertEquals(op.getString("op"), PatchOperationType.REMOVE.getValue());
assertEquals(
op.getString("path"),
String.format("/assets/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_%d,PROD)", i));
}
}

private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) {
DataProductProperties dataProductProperties = new DataProductProperties();
DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();
Expand Down

0 comments on commit 32ef389

Please sign in to comment.