diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 412c962cb6e36f..a5889b2d2f92de 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -75,6 +75,8 @@ jobs: path: | ~/.cache/uv key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }} + - name: Install dependencies + run: ./metadata-ingestion/scripts/install_deps.sh - name: Set up JDK 17 uses: actions/setup-java@v4 with: diff --git a/datahub-web-react/yarn.lock b/datahub-web-react/yarn.lock index 9dc563c958dd19..ddda98d7f83268 100644 --- a/datahub-web-react/yarn.lock +++ b/datahub-web-react/yarn.lock @@ -5043,9 +5043,9 @@ cross-inspect@1.0.0: tslib "^2.4.0" cross-spawn@^7.0.2: - version "7.0.3" - resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.3.tgz#f73a85b9d5d41d045551c177e2882d4ac85728a6" - integrity sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w== + version "7.0.6" + resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f" + integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA== dependencies: path-key "^3.1.0" shebang-command "^2.0.0" diff --git a/entity-registry/build.gradle b/entity-registry/build.gradle index e5baa95967f304..ee5ece4049399e 100644 --- a/entity-registry/build.gradle +++ b/entity-registry/build.gradle @@ -8,7 +8,7 @@ apply from: "../gradle/coverage/java-coverage.gradle" dependencies { implementation spec.product.pegasus.data - implementation spec.product.pegasus.generator + compileOnly spec.product.pegasus.generator api project(path: ':metadata-models') api project(path: ':metadata-models', configuration: "dataTemplate") api externalDependency.classGraph diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java index bef7782d8f7c9a..f4dc2ec2f0cd56 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java @@ -14,7 +14,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */ public class FieldExtractor { @@ -30,15 +30,34 @@ private static long getNumArrayWildcards(PathSpec pathSpec) { // Extract the value of each field in the field specs from the input record public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs) { - return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH); + @Nullable RecordTemplate record, List fieldSpecs) { + return extractFields(record, fieldSpecs, false); } public static Map> extractFields( - @Nonnull RecordTemplate record, List fieldSpecs, int maxValueLength) { + @Nullable RecordTemplate record, List fieldSpecs, boolean requiredFieldExtract) { + return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, List fieldSpecs, int maxValueLength) { + return extractFields(record, fieldSpecs, maxValueLength, false); + } + + public static Map> extractFields( + @Nullable RecordTemplate record, + List fieldSpecs, + int maxValueLength, + boolean requiredFieldExtract) { final Map> extractedFields = new HashMap<>(); for (T fieldSpec : fieldSpecs) { - Optional value = RecordUtils.getFieldValue(record, fieldSpec.getPath()); + if (requiredFieldExtract && record == null) { + throw new IllegalArgumentException( + "Field extraction is required and the RecordTemplate is null"); + } + Optional value = + Optional.ofNullable(record) + .flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath())); if (!value.isPresent()) { extractedFields.put(fieldSpec, Collections.emptyList()); } else { diff --git a/metadata-ingestion/tests/unit/urns/invalid_urns.txt b/metadata-ingestion/tests/unit/urns/invalid_urns.txt new file mode 100644 index 00000000000000..9ce2c99a1a4ee8 --- /dev/null +++ b/metadata-ingestion/tests/unit/urns/invalid_urns.txt @@ -0,0 +1,40 @@ +# Basic URN format tests +urn:li:abc +urn:li:abc: +urn:li:abc:() +urn:li:abc:(abc,) +urn:li:corpuser:abc) + +# Reserved characters +urn:li:corpuser:foo␟bar +urn:li:tag:a,b,c + +# CorpUser URN tests +urn:li:corpuser:(part1,part2) + +# Dataset URN tests +urn:li:dataset:(urn:li:user:abc,dataset,prod) +urn:li:dataset:(urn:li:user:abc,dataset) +urn:li:dataset:(urn:li:user:abc,dataset,invalidEnv) + +# DataFlow URN tests +urn:li:dataFlow:(airflow,flow_id) + +# DataJob URN tests +urn:li:dataJob:(urn:li:user:abc,job_id) +urn:li:dataJob:(urn:li:dataFlow:(airflow,flow_id,prod)) + +# Domain URN tests +urn:li:domain:(part1,part2) + +# Tag URN tests +urn:li:tag:(part1,part2) + +# Notebook URN tests +urn:li:notebook:(part1,part2,part3) + +# CorpGroup URN tests +urn:li:corpGroup:(part1,part2) + +# DataProcessInstance URN tests +urn:li:dataProcessInstance:(part1,part2) diff --git a/metadata-ingestion/tests/unit/urns/test_corp_group_urn.py b/metadata-ingestion/tests/unit/urns/test_corp_group_urn.py index 1897a0e8686f09..4e55e78255d1c1 100644 --- a/metadata-ingestion/tests/unit/urns/test_corp_group_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_corp_group_urn.py @@ -3,7 +3,6 @@ import pytest from datahub.utilities.urns.corp_group_urn import CorpGroupUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -17,12 +16,3 @@ def test_parse_urn(self) -> None: assert str(corp_group_urn) == corp_group_urn_str assert corp_group_urn == CorpGroupUrn(name="abc") assert corp_group_urn == CorpGroupUrn.create_from_id("abc") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - CorpGroupUrn.create_from_string( - "urn:li:abc:(urn:li:dataPlatform:abc,def,prod)" - ) - - with self.assertRaises(InvalidUrnError): - CorpGroupUrn.create_from_string("urn:li:corpGroup:(part1,part2)") diff --git a/metadata-ingestion/tests/unit/urns/test_corpuser_urn.py b/metadata-ingestion/tests/unit/urns/test_corpuser_urn.py index 7a2a4f4ff4493c..e4a11b4f404c6e 100644 --- a/metadata-ingestion/tests/unit/urns/test_corpuser_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_corpuser_urn.py @@ -3,7 +3,6 @@ import pytest from datahub.utilities.urns.corpuser_urn import CorpuserUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -17,12 +16,3 @@ def test_parse_urn(self) -> None: assert str(corpuser_urn) == corpuser_urn_str assert corpuser_urn == CorpuserUrn("abc") assert corpuser_urn == CorpuserUrn.create_from_id("abc") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - CorpuserUrn.create_from_string( - "urn:li:abc:(urn:li:dataPlatform:abc,def,prod)" - ) - - with self.assertRaises(InvalidUrnError): - CorpuserUrn.create_from_string("urn:li:corpuser:(part1,part2)") diff --git a/metadata-ingestion/tests/unit/urns/test_data_flow_urn.py b/metadata-ingestion/tests/unit/urns/test_data_flow_urn.py index 524411121d418b..edb5563c5b22e3 100644 --- a/metadata-ingestion/tests/unit/urns/test_data_flow_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_data_flow_urn.py @@ -3,7 +3,6 @@ import pytest from datahub.utilities.urns.data_flow_urn import DataFlowUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -16,10 +15,3 @@ def test_parse_urn(self) -> None: assert data_flow_urn.get_env() == "prod" assert data_flow_urn.__str__() == "urn:li:dataFlow:(airflow,def,prod)" assert data_flow_urn == DataFlowUrn("airflow", "def", "prod") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - DataFlowUrn.create_from_string("urn:li:abc:(airflow,def,prod)") - - with self.assertRaises(InvalidUrnError): - DataFlowUrn.create_from_string("urn:li:dataFlow:(airflow,flow_id)") diff --git a/metadata-ingestion/tests/unit/urns/test_data_job_urn.py b/metadata-ingestion/tests/unit/urns/test_data_job_urn.py index bf039cd2a91f96..484e5a474c0cd2 100644 --- a/metadata-ingestion/tests/unit/urns/test_data_job_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_data_job_urn.py @@ -4,7 +4,6 @@ from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_job_urn import DataJobUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -22,17 +21,3 @@ def test_parse_urn(self) -> None: assert data_job_urn == DataJobUrn( "urn:li:dataFlow:(airflow,flow_id,prod)", "job_id" ) - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - DataJobUrn.create_from_string( - "urn:li:abc:(urn:li:dataFlow:(airflow,flow_id,prod),job_id)" - ) - - with self.assertRaises(InvalidUrnError): - DataJobUrn.create_from_string("urn:li:dataJob:(urn:li:user:abc,job_id)") - - with self.assertRaises(InvalidUrnError): - DataJobUrn.create_from_string( - "urn:li:dataJob:(urn:li:dataFlow:(airflow,flow_id,prod))" - ) diff --git a/metadata-ingestion/tests/unit/urns/test_data_process_instance_urn.py b/metadata-ingestion/tests/unit/urns/test_data_process_instance_urn.py index a86f8dd99416ff..f9087b19b13c32 100644 --- a/metadata-ingestion/tests/unit/urns/test_data_process_instance_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_data_process_instance_urn.py @@ -3,7 +3,6 @@ import pytest from datahub.utilities.urns.data_process_instance_urn import DataProcessInstanceUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -20,12 +19,3 @@ def test_parse_urn(self) -> None: assert dataprocessinstance_urn == DataProcessInstanceUrn("abc") assert dataprocessinstance_urn == DataProcessInstanceUrn.create_from_id("abc") assert "abc" == dataprocessinstance_urn.get_dataprocessinstance_id() - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - DataProcessInstanceUrn.create_from_string("urn:li:abc:dataProcessInstance") - - with self.assertRaises(InvalidUrnError): - DataProcessInstanceUrn.create_from_string( - "urn:li:dataProcessInstance:(part1,part2)" - ) diff --git a/metadata-ingestion/tests/unit/urns/test_dataset_urn.py b/metadata-ingestion/tests/unit/urns/test_dataset_urn.py index 53065143a6ae4f..1be5cd59152009 100644 --- a/metadata-ingestion/tests/unit/urns/test_dataset_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_dataset_urn.py @@ -4,7 +4,6 @@ from datahub.utilities.urns.data_platform_urn import DataPlatformUrn from datahub.utilities.urns.dataset_urn import DatasetUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -20,22 +19,3 @@ def test_parse_urn(self) -> None: assert dataset_urn.get_env() == "PROD" assert dataset_urn.__str__() == dataset_urn_str assert dataset_urn == DatasetUrn("urn:li:dataPlatform:abc", "def", "prod") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - DatasetUrn.create_from_string( - "urn:li:abc:(urn:li:dataPlatform:abc,def,prod)" - ) - - with self.assertRaises(InvalidUrnError): - DatasetUrn.create_from_string( - "urn:li:dataset:(urn:li:user:abc,dataset,prod)" - ) - - with self.assertRaises(InvalidUrnError): - DatasetUrn.create_from_string("urn:li:dataset:(urn:li:user:abc,dataset)") - - with self.assertRaises(InvalidUrnError): - DatasetUrn.create_from_string( - "urn:li:dataset:(urn:li:user:abc,dataset,invalidEnv)" - ) diff --git a/metadata-ingestion/tests/unit/urns/test_domain_urn.py b/metadata-ingestion/tests/unit/urns/test_domain_urn.py index 843a5bf40f5c63..aa5050ce1c030e 100644 --- a/metadata-ingestion/tests/unit/urns/test_domain_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_domain_urn.py @@ -3,7 +3,6 @@ import pytest from datahub.utilities.urns.domain_urn import DomainUrn -from datahub.utilities.urns.error import InvalidUrnError @pytest.mark.filterwarnings("ignore::DeprecationWarning") @@ -17,10 +16,3 @@ def test_parse_urn(self) -> None: assert str(domain_urn) == domain_urn_str assert domain_urn == DomainUrn("abc") assert domain_urn == DomainUrn.create_from_id("abc") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - DomainUrn.create_from_string("urn:li:abc:domain") - - with self.assertRaises(InvalidUrnError): - DomainUrn.create_from_string("urn:li:domain:(part1,part2)") diff --git a/metadata-ingestion/tests/unit/urns/test_notebook_urn.py b/metadata-ingestion/tests/unit/urns/test_notebook_urn.py index 3ec580f02142b7..6d4dd2ee6fa8c0 100644 --- a/metadata-ingestion/tests/unit/urns/test_notebook_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_notebook_urn.py @@ -2,7 +2,6 @@ import pytest -from datahub.utilities.urns.error import InvalidUrnError from datahub.utilities.urns.notebook_urn import NotebookUrn @@ -16,12 +15,3 @@ def test_parse_urn(self) -> None: assert str(notebook_urn) == notebook_urn_str assert notebook_urn == NotebookUrn("querybook", "123") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - NotebookUrn.create_from_string( - "urn:li:abc:(urn:li:dataPlatform:abc,def,prod)" - ) - - with self.assertRaises(InvalidUrnError): - NotebookUrn.create_from_string("urn:li:notebook:(part1,part2,part3)") diff --git a/metadata-ingestion/tests/unit/urns/test_tag_urn.py b/metadata-ingestion/tests/unit/urns/test_tag_urn.py index fa3664bcc02180..5f4c9077e28294 100644 --- a/metadata-ingestion/tests/unit/urns/test_tag_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_tag_urn.py @@ -2,7 +2,6 @@ import pytest -from datahub.utilities.urns.error import InvalidUrnError from datahub.utilities.urns.tag_urn import TagUrn @@ -17,10 +16,3 @@ def test_parse_urn(self) -> None: assert str(tag_urn) == tag_urn_str assert tag_urn == TagUrn("abc") assert tag_urn == TagUrn.create_from_id("abc") - - def test_invalid_urn(self) -> None: - with self.assertRaises(InvalidUrnError): - TagUrn.create_from_string("urn:li:abc:tag_id") - - with self.assertRaises(InvalidUrnError): - TagUrn.create_from_string("urn:li:tag:(part1,part2)") diff --git a/metadata-ingestion/tests/unit/urns/test_urn.py b/metadata-ingestion/tests/unit/urns/test_urn.py index 73badb3d1b4234..0c362473c0cf18 100644 --- a/metadata-ingestion/tests/unit/urns/test_urn.py +++ b/metadata-ingestion/tests/unit/urns/test_urn.py @@ -1,16 +1,17 @@ +import logging +import pathlib +from typing import List + import pytest -from datahub.metadata.urns import ( - CorpUserUrn, - DashboardUrn, - DataPlatformUrn, - DatasetUrn, - Urn, -) +from datahub.metadata.urns import CorpUserUrn, DatasetUrn, Urn from datahub.utilities.urns.error import InvalidUrnError pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") +_CURRENT_DIR = pathlib.Path(__file__).parent +logger = logging.getLogger(__name__) + def test_parse_urn() -> None: simple_urn_str = "urn:li:dataPlatform:abc" @@ -40,38 +41,12 @@ def test_url_encode_urn() -> None: ) -def test_invalid_urn() -> None: - with pytest.raises(InvalidUrnError): - Urn.from_string("urn:li:abc") - - with pytest.raises(InvalidUrnError): - Urn.from_string("urn:li:abc:") - - with pytest.raises(InvalidUrnError): - Urn.from_string("urn:li:abc:()") - - with pytest.raises(InvalidUrnError): - Urn.from_string("urn:li:abc:(abc,)") - - with pytest.raises(InvalidUrnError): - Urn.from_string("urn:li:corpuser:abc)") - - def test_urn_colon() -> None: - # Colon characters are valid in urns, and should not mess up parsing. - - urn = Urn.from_string( - "urn:li:dashboard:(looker,dashboards.thelook::customer_lookup)" - ) - assert isinstance(urn, DashboardUrn) - - assert DataPlatformUrn.from_string("urn:li:dataPlatform:abc:def") - assert DatasetUrn.from_string( - "urn:li:dataset:(urn:li:dataPlatform:abc:def,table_name,PROD)" - ) - assert Urn.from_string("urn:li:corpuser:foo:bar@example.com") + # There's a bunch of other, simpler tests for special characters in the valid_urns test. + # This test ensures that the type dispatch and fields work fine here. # I'm not sure why you'd ever want this, but technically it's a valid urn. + urn = Urn.from_string("urn:li:corpuser::") assert isinstance(urn, CorpUserUrn) assert urn.username == ":" @@ -85,9 +60,48 @@ def test_urn_coercion() -> None: assert urn == Urn.from_string(urn.urn()) -def test_urn_type_dispatch() -> None: +def test_urn_type_dispatch_1() -> None: urn = Urn.from_string("urn:li:dataset:(urn:li:dataPlatform:abc,def,PROD)") assert isinstance(urn, DatasetUrn) with pytest.raises(InvalidUrnError, match="Passed an urn of type corpuser"): DatasetUrn.from_string("urn:li:corpuser:foo") + + +def test_urn_type_dispatch_2() -> None: + urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,flow_id,prod),job_id)" + assert Urn.from_string(urn).urn() == urn + + with pytest.raises(InvalidUrnError, match="Passed an urn of type dataJob"): + CorpUserUrn.from_string(urn) + + +def _load_urns(file_name: pathlib.Path) -> List[str]: + urns = [ + line.strip() + for line in file_name.read_text().splitlines() + if line.strip() and not line.startswith("#") + ] + assert len(urns) > 0, f"No urns found in {file_name}" + return urns + + +def test_valid_urns() -> None: + valid_urns_file = _CURRENT_DIR / "valid_urns.txt" + valid_urns = _load_urns(valid_urns_file) + + for valid_urn in valid_urns: + logger.info(f"Testing valid URN: {valid_urn}") + parsed_urn = Urn.from_string(valid_urn) + assert parsed_urn.urn() == valid_urn + + +def test_invalid_urns() -> None: + invalid_urns_file = _CURRENT_DIR / "invalid_urns.txt" + invalid_urns = _load_urns(invalid_urns_file) + + # Test each invalid URN + for invalid_urn in invalid_urns: + with pytest.raises(InvalidUrnError): + logger.info(f"Testing invalid URN: {invalid_urn}") + Urn.from_string(invalid_urn) diff --git a/metadata-ingestion/tests/unit/urns/valid_urns.txt b/metadata-ingestion/tests/unit/urns/valid_urns.txt new file mode 100644 index 00000000000000..23205ec9a7235b --- /dev/null +++ b/metadata-ingestion/tests/unit/urns/valid_urns.txt @@ -0,0 +1,24 @@ +# Unknown entity types become generic urns +urn:li:abc:foo +urn:li:abc:(foo,bar) +urn:li:abc:(urn:li:dataPlatform:abc,def,prod) + +# A bunch of pretty normal urns +urn:li:corpuser:foo +urn:li:corpGroup:bar +urn:li:dataset:(urn:li:dataPlatform:abc,def/ghi,prod) +urn:li:dataFlow:(airflow,def,prod) +urn:li:dataJob:(urn:li:dataFlow:(airflow,flow_id,prod),job_id) +urn:li:tag:abc +urn:li:chart:(looker,chart_name) +urn:li:dashboard:(looker,dashboard_name) +urn:li:dataProcessInstance:abc +urn:li:domain:abc +urn:li:notebook:(querybook,123) + +# Urns with colons and other special characters +urn:li:tag:dbt:bar +urn:li:tag:: +urn:li:dashboard:(looker,dashboards.thelook::customer_lookup) +urn:li:dataPlatform:abc:def +urn:li:corpuser:foo:bar@example.com diff --git a/metadata-integration/java/datahub-event/build.gradle b/metadata-integration/java/datahub-event/build.gradle index 24e119c6229369..3dca2eb0a40c9f 100644 --- a/metadata-integration/java/datahub-event/build.gradle +++ b/metadata-integration/java/datahub-event/build.gradle @@ -18,6 +18,7 @@ dependencies { implementation externalDependency.jacksonDataBind runtimeOnly externalDependency.jna + compileOnly externalDependency.swaggerAnnotations compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok // VisibleForTesting diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java index ef7f681a81539d..efe073fc00dfdc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java @@ -190,7 +190,10 @@ private void handleDeleteChangeEvent( urn.getEntityType(), event.getAspectName())); } - RecordTemplate aspect = event.getRecordTemplate(); + final RecordTemplate aspect = + event.getPreviousRecordTemplate() != null + ? event.getPreviousRecordTemplate() + : event.getRecordTemplate(); Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); if (!aspectSpec.isTimeseries()) { @@ -280,8 +283,8 @@ private Pair, HashMap>> getEdgesAndRelationshipTypes @Nonnull final RecordTemplate aspect, @Nonnull final MetadataChangeLog event, final boolean isNewAspectVersion) { - final List edgesToAdd = new ArrayList<>(); - final HashMap> urnToRelationshipTypesBeingAdded = new HashMap<>(); + final List edges = new ArrayList<>(); + final HashMap> urnToRelationshipTypes = new HashMap<>(); // we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and // inputFields @@ -289,36 +292,28 @@ private Pair, HashMap>> getEdgesAndRelationshipTypes if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data()); updateFineGrainedEdgesAndRelationships( - urn, - upstreamLineage.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); + urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes); } else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) { final InputFields inputFields = new InputFields(aspect.data()); - updateInputFieldEdgesAndRelationships( - urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded); + updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes); } else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) { DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data()); updateFineGrainedEdgesAndRelationships( - urn, - dataJobInputOutput.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); + urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes); } Map> extractedFields = - FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); + FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true); for (Map.Entry> entry : extractedFields.entrySet()) { - Set relationshipTypes = - urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>()); + Set relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>()); relationshipTypes.add(entry.getKey().getRelationshipName()); - urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes); + urnToRelationshipTypes.put(urn, relationshipTypes); final List newEdges = GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion); - edgesToAdd.addAll(newEdges); + edges.addAll(newEdges); } - return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded); + return Pair.of(edges, urnToRelationshipTypes); } /** Process snapshot and update graph index */ @@ -433,7 +428,7 @@ private void deleteGraphData( @Nonnull final OperationContext opContext, @Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, + @Nullable final RecordTemplate aspect, @Nonnull final Boolean isKeyAspect, @Nonnull final MetadataChangeLog event) { if (isKeyAspect) { @@ -441,21 +436,28 @@ private void deleteGraphData( return; } - Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); - - final HashMap> urnToRelationshipTypesBeingAdded = - edgeAndRelationTypes.getSecond(); - if (!urnToRelationshipTypesBeingAdded.isEmpty()) { - for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { - graphService.removeEdgesFromNode( - opContext, - entry.getKey(), - new ArrayList<>(entry.getValue()), - createRelationshipFilter( - new Filter().setOr(new ConjunctiveCriterionArray()), - RelationshipDirection.OUTGOING)); + if (aspect != null) { + Pair, HashMap>> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); + + final HashMap> urnToRelationshipTypesBeingRemoved = + edgeAndRelationTypes.getSecond(); + if (!urnToRelationshipTypesBeingRemoved.isEmpty()) { + for (Map.Entry> entry : urnToRelationshipTypesBeingRemoved.entrySet()) { + graphService.removeEdgesFromNode( + opContext, + entry.getKey(), + new ArrayList<>(entry.getValue()), + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING)); + } } + } else { + log.warn( + "Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}", + aspectSpec.getName(), + urn); } } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java index 03e381a9059da6..dd02b1fdc9d78d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java @@ -1,6 +1,11 @@ package com.linkedin.metadata.service; +import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter; +import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -8,9 +13,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.testng.Assert.assertEquals; +import com.google.common.collect.ImmutableList; import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; +import com.linkedin.container.Container; import com.linkedin.dataset.DatasetProperties; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; @@ -21,6 +28,14 @@ import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.ConjunctiveCriterion; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.query.filter.CriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -29,6 +44,8 @@ import com.linkedin.mxe.MetadataChangeLog; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.List; +import javax.annotation.Nonnull; import org.mockito.ArgumentCaptor; import org.opensearch.index.query.QueryBuilder; import org.opensearch.script.Script; @@ -180,4 +197,109 @@ public void testStatusNoOpEvent() { verifyNoInteractions(mockWriteDAO); } + + @Test + public void testMissingAspectGraphDelete() { + // Test deleting a null aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME)); + + // For missing aspects, verify no writes + verifyNoInteractions(mockWriteDAO); + } + + @Test + public void testNodeGraphDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting container entity + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(CONTAINER_ENTITY_NAME) + .setEntityUrn(containerUrn) + .setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME)); + + // Delete all outgoing edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING))); + + // Delete all incoming edges of this entity + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(containerUrn)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING))); + + // Delete all edges where this entity is a lifecycle owner + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of()), + eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)), + eq(containerUrn.toString())); + } + + @Test + public void testContainerDelete() { + Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo"); + + // Test deleting a container aspect + test.handleChangeEvent( + TEST_OP_CONTEXT, + new MetadataChangeLog() + .setChangeType(ChangeType.DELETE) + .setEntityType(TEST_URN.getEntityType()) + .setEntityUrn(TEST_URN) + .setAspectName(Constants.CONTAINER_ASPECT_NAME) + .setPreviousAspectValue( + GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn)))); + + // For container aspects, verify that only edges are removed in both cases + verify(mockWriteDAO, times(1)) + .deleteByQuery( + eq(TEST_OP_CONTEXT), + nullable(String.class), + eq(createUrnFilter(TEST_URN)), + nullable(String.class), + eq(new Filter().setOr(new ConjunctiveCriterionArray())), + eq(List.of("IsPartOf")), + eq( + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING))); + } + + private static Filter createUrnFilter(@Nonnull final Urn urn) { + Filter filter = new Filter(); + CriterionArray criterionArray = new CriterionArray(); + Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString()); + criterionArray.add(criterion); + filter.setOr( + new ConjunctiveCriterionArray( + ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray)))); + + return filter; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java new file mode 100644 index 00000000000000..43f8cc0ef191d6 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/service/UpdateIndicesServiceTest.java @@ -0,0 +1,84 @@ +package com.linkedin.metadata.service; + +import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME; +import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.search.EntitySearchService; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; +import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; +import com.linkedin.metadata.systemmetadata.SystemMetadataService; +import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import com.linkedin.metadata.utils.SystemMetadataUtils; +import com.linkedin.mxe.MetadataChangeLog; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class UpdateIndicesServiceTest { + + @Mock private UpdateGraphIndicesService updateGraphIndicesService; + @Mock private EntitySearchService entitySearchService; + @Mock private TimeseriesAspectService timeseriesAspectService; + @Mock private SystemMetadataService systemMetadataService; + @Mock private SearchDocumentTransformer searchDocumentTransformer; + @Mock private EntityIndexBuilders entityIndexBuilders; + + private OperationContext operationContext; + private UpdateIndicesService updateIndicesService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + operationContext = TestOperationContexts.systemContextNoSearchAuthorization(); + updateIndicesService = + new UpdateIndicesService( + updateGraphIndicesService, + entitySearchService, + timeseriesAspectService, + systemMetadataService, + searchDocumentTransformer, + entityIndexBuilders, + "MD5"); + } + + @Test + public void testContainerHandleDeleteEvent() throws Exception { + Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME); + AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME); + + // Create test data + MetadataChangeLog event = new MetadataChangeLog(); + event.setChangeType(ChangeType.DELETE); + event.setEntityUrn(urn); + event.setAspectName(CONTAINER_ASPECT_NAME); + event.setEntityType(urn.getEntityType()); + event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata()); + + // Execute Delete + updateIndicesService.handleChangeEvent(operationContext, event); + + // Verify + verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME); + verify(searchDocumentTransformer) + .transformAspect( + eq(operationContext), + eq(urn), + nullable(RecordTemplate.class), + eq(aspectSpec), + eq(true)); + verify(updateGraphIndicesService).handleChangeEvent(operationContext, event); + } +} diff --git a/metadata-models/build.gradle b/metadata-models/build.gradle index e9379163ecaecc..2d0b433d69013e 100644 --- a/metadata-models/build.gradle +++ b/metadata-models/build.gradle @@ -9,12 +9,15 @@ plugins { apply from: '../gradle/coverage/java-coverage.gradle' dependencies { - api spec.product.pegasus.data - constraints { - implementation('org.apache.commons:commons-text:1.10.0') { - because 'Vulnerability Issue' - } + constraints { + implementation('org.apache.commons:commons-text:1.10.0') { + because 'Vulnerability Issue' } + } + + api(spec.product.pegasus.data) { + exclude group: 'javax.servlet', module: 'javax.servlet-api' + } api project(':li-utils') api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') @@ -26,7 +29,7 @@ dependencies { compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok - api externalDependency.swaggerAnnotations + compileOnly externalDependency.swaggerAnnotations compileOnly externalDependency.jacksonCore compileOnly externalDependency.jacksonDataBind diff --git a/metadata-service/openapi-servlet/models/build.gradle b/metadata-service/openapi-servlet/models/build.gradle index e4100b2d094e04..d75e656e5ecd6c 100644 --- a/metadata-service/openapi-servlet/models/build.gradle +++ b/metadata-service/openapi-servlet/models/build.gradle @@ -10,6 +10,7 @@ dependencies { implementation externalDependency.jacksonDataBind implementation externalDependency.httpClient + compileOnly externalDependency.swaggerAnnotations compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok