diff --git a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts index 27c0af87fc8332..87fca3b898c838 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts +++ b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/__tests__/filterSchemaRows.test.ts @@ -235,4 +235,100 @@ describe('filterSchemaRows', () => { expect(filteredRows).toMatchObject([{ fieldPath: 'shipment' }]); expect(expandedRowsFromFilter).toMatchObject(new Set()); }); + + it('should properly filter schema rows based on business attribute properties description', () => { + const rowsWithSchemaFieldEntity = [ + { + fieldPath: 'customer', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'customer description' } }, + }, + }, + }, + }, + { + fieldPath: 'testing', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'testing description' } }, + }, + }, + }, + }, + { + fieldPath: 'shipment', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { description: 'shipment description' } }, + }, + }, + }, + }, + ] as SchemaField[]; + const filterText = 'testing description'; + const editableSchemaMetadata = { editableSchemaFieldInfo: [] }; + const { filteredRows, expandedRowsFromFilter } = filterSchemaRows( + rowsWithSchemaFieldEntity, + editableSchemaMetadata, + filterText, + testEntityRegistry, + ); + + expect(filteredRows).toMatchObject([{ fieldPath: 'testing' }]); + expect(expandedRowsFromFilter).toMatchObject(new Set()); + }); + + it('should properly filter schema rows based on business attribute properties tags', () => { + const rowsWithSchemaFieldEntity = [ + { + fieldPath: 'customer', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { properties: { tags: { tags: [{ tag: sampleTag }] } } }, + }, + }, + }, + }, + { + fieldPath: 'testing', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { + properties: { tags: { tags: [{ tag: { properties: { name: 'otherTag' } } }] } }, + }, + }, + }, + }, + }, + { + fieldPath: 'shipment', + schemaFieldEntity: { + businessAttributes: { + businessAttribute: { + businessAttribute: { + properties: { tags: { tags: [{ tag: { properties: { name: 'anotherTag' } } }] } }, + }, + }, + }, + }, + }, + ] as SchemaField[]; + const filterText = sampleTag.properties.name; + const editableSchemaMetadata = { editableSchemaFieldInfo: [] }; + const { filteredRows, expandedRowsFromFilter } = filterSchemaRows( + rowsWithSchemaFieldEntity, + editableSchemaMetadata, + filterText, + testEntityRegistry, + ); + + expect(filteredRows).toMatchObject([{ fieldPath: 'customer' }]); + expect(expandedRowsFromFilter).toMatchObject(new Set()); + }); }); diff --git a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts index 96505e1bee785b..53b76d53f886af 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts +++ b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Schema/utils/filterSchemaRows.ts @@ -16,6 +16,25 @@ function matchesTagsOrTermsOrDescription(field: SchemaField, filterText: string, ); } +function matchesBusinessAttributesProperties(field: SchemaField, filterText: string, entityRegistry: EntityRegistry) { + if (!field.schemaFieldEntity?.businessAttributes) return false; + const businessAttributeProperties = + field.schemaFieldEntity?.businessAttributes?.businessAttribute?.businessAttribute?.properties; + return ( + businessAttributeProperties?.description?.toLocaleLowerCase().includes(filterText) || + businessAttributeProperties?.name?.toLocaleLowerCase().includes(filterText) || + businessAttributeProperties?.glossaryTerms?.terms?.find((termAssociation) => + entityRegistry + .getDisplayName(EntityType.GlossaryTerm, termAssociation.term) + .toLocaleLowerCase() + .includes(filterText), + ) || + businessAttributeProperties?.tags?.tags?.find((tagAssociation) => + entityRegistry.getDisplayName(EntityType.Tag, tagAssociation.tag).toLocaleLowerCase().includes(filterText), + ) + ); +} + // returns list of fieldPaths for fields that have Terms or Tags or Descriptions matching the filterText function getFilteredFieldPathsByMetadata(editableSchemaMetadata: any, entityRegistry, filterText) { return ( @@ -56,7 +75,8 @@ export function filterSchemaRows( if ( matchesFieldName(row.fieldPath, formattedFilterText) || matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) || - matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description + matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description + matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry) ) { finalFieldPaths.add(row.fieldPath); } @@ -65,7 +85,8 @@ export function filterSchemaRows( if ( matchesFieldName(fieldName, formattedFilterText) || matchesEditableTagsOrTermsOrDescription(row, filteredFieldPathsByEditableMetadata) || - matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) // non-editable tags, terms and description + matchesTagsOrTermsOrDescription(row, formattedFilterText, entityRegistry) || // non-editable tags, terms and description + matchesBusinessAttributesProperties(row, formattedFilterText, entityRegistry) ) { // if we match specifically on this field (not just its parent), add and expand all parents splitFieldPath.reduce((previous, current) => { diff --git a/docs-website/adoptionStoriesIndexes.json b/docs-website/adoptionStoriesIndexes.json index 1177d74dc6df95..15cb770697c1c9 100644 --- a/docs-website/adoptionStoriesIndexes.json +++ b/docs-website/adoptionStoriesIndexes.json @@ -14,8 +14,6 @@ { "name": "Visa", "slug": "visa", - "imageUrl": "/img/logos/companies/visa.png", - "imageSize": "large", "link": "https://blog.datahubproject.io/how-visa-uses-datahub-to-scale-data-governance-cace052d61c5", "linkType": "blog", "tagline": "How Visa uses DataHub to scale data governance", @@ -374,4 +372,4 @@ "category": "And More" } ] -} \ No newline at end of file +} diff --git a/docs-website/src/pages/_components/CaseStudy/caseStudyContent.js b/docs-website/src/pages/_components/CaseStudy/caseStudyContent.js index 47c379027da814..0346acc37ed4a1 100644 --- a/docs-website/src/pages/_components/CaseStudy/caseStudyContent.js +++ b/docs-website/src/pages/_components/CaseStudy/caseStudyContent.js @@ -9,16 +9,16 @@ const caseStudyData = [ image: "https://datahubproject.io/img/logos/companies/netflix.png", link: "https://datahubproject.io/adoption-stories/#netflix", }, - { - title: "Scaling Data Governance", - description: - "How VISA Uses DataHub to Scale Data Governance.", - tag: "Finance", - backgroundImage: - "https://miro.medium.com/v2/resize:fit:2000/format:webp/1*mtC4j8J-jumJKWg8RuR6xQ@2x.png", - image: "https://datahubproject.io/img/logos/companies/visa.png", - link: "https://datahubproject.io/adoption-stories/#visa", - }, + // { + // title: "Scaling Data Governance", + // description: + // "How VISA Uses DataHub to Scale Data Governance.", + // tag: "Finance", + // backgroundImage: + // "https://miro.medium.com/v2/resize:fit:2000/format:webp/1*mtC4j8J-jumJKWg8RuR6xQ@2x.png", + // image: "https://datahubproject.io/img/logos/companies/visa.png", + // link: "https://datahubproject.io/adoption-stories/#visa", + // }, { title: "Ensuring Data Reliability", description: diff --git a/docs-website/src/pages/index.js b/docs-website/src/pages/index.js index d538831ca3dca1..d74e80f381097a 100644 --- a/docs-website/src/pages/index.js +++ b/docs-website/src/pages/index.js @@ -21,7 +21,7 @@ import CloseButton from "@ant-design/icons/CloseCircleFilled"; const companyIndexes = require("../../adoptionStoriesIndexes.json"); const companies = companyIndexes.companies; -const keyCompanySlugs = ["netflix", "visa", "pinterest", "airtel", "optum"]; +const keyCompanySlugs = ["netflix", "pinterest", "airtel", "notion", "optum"]; const keyCompanies = keyCompanySlugs .map((slug) => companies.find((co) => co.slug === slug)) .filter((isDefined) => isDefined); diff --git a/docs-website/static/img/logos/companies/visa.png b/docs-website/static/img/logos/companies/visa.png deleted file mode 100644 index 20af2c5cd8e26f..00000000000000 Binary files a/docs-website/static/img/logos/companies/visa.png and /dev/null differ diff --git a/docs/how/search.md b/docs/how/search.md index 5c1ba266ee2ae5..4df5e7c1984d59 100644 --- a/docs/how/search.md +++ b/docs/how/search.md @@ -398,6 +398,32 @@ queryConfigurations: boost_mode: multiply ``` +##### Example 4: Entity Ranking + +Alter the ranking of entities. For example, chart vs dashboard, you may want the dashboard +to appear above charts. This can be done using the following function score and leverages a prefix match on the entity type +of the URN. Depending on the entity the weight may have to be adjusted based on your data and the entities +involved since often multiple field matches may shift weight towards one entity vs another. + +```yaml +queryConfigurations: + - queryRegex: .* + + simpleQuery: true + prefixMatchQuery: true + exactMatchQuery: true + + functionScore: + functions: + - filter: + prefix: + urn: + value: 'urn:li:dashboard:' + weight: 1.5 + score_mode: multiply + boost_mode: multiply +``` + ### Search Autocomplete Configuration Similar to the options provided in the previous section for search configuration, there are autocomplete specific options diff --git a/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py b/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py index d4cc65297e42ca..7b7616b1ec11de 100644 --- a/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py +++ b/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py @@ -32,12 +32,12 @@ def extract(): ins={ "data": In( dagster_type=PythonObjectDagsterType(list), - metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]}, + metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]}, ) }, out={ "result": Out( - metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]} + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]} ) }, ) @@ -101,6 +101,5 @@ def asset_lineage_extractor( dagster_url="http://localhost:3000", asset_lineage_extractor=asset_lineage_extractor, ) - datahub_sensor = make_datahub_sensor(config=config) defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor]) diff --git a/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py b/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py index 57634ab345a5e3..1ed3f2f915061b 100644 --- a/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py +++ b/metadata-ingestion-modules/dagster-plugin/examples/assets_job.py @@ -7,6 +7,7 @@ define_asset_job, multi_asset, ) +from datahub.ingestion.graph.config import DatahubClientConfig from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub_dagster_plugin.sensors.datahub_sensors import ( @@ -18,7 +19,7 @@ @multi_asset( outs={ "extract": AssetOut( - metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn]} + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn()]} ), } ) @@ -47,13 +48,9 @@ def transform(extract): assets_job = define_asset_job(name="assets_job") -config = DatahubDagsterSourceConfig.parse_obj( - { - "rest_sink_config": { - "server": "http://localhost:8080", - }, - "dagster_url": "http://localhost:3000", - } +config = DatahubDagsterSourceConfig( + datahub_client_config=DatahubClientConfig(server="http://localhost:8080"), + dagster_url="http://localhost:3000", ) datahub_sensor = make_datahub_sensor(config=config) diff --git a/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py b/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py index d743e19a235d56..a17fc89e6922df 100644 --- a/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py +++ b/metadata-ingestion-modules/dagster-plugin/examples/ops_job.py @@ -1,4 +1,5 @@ from dagster import Definitions, In, Out, PythonObjectDagsterType, job, op +from datahub.ingestion.graph.config import DatahubClientConfig from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub_dagster_plugin.sensors.datahub_sensors import ( @@ -17,12 +18,12 @@ def extract(): ins={ "data": In( dagster_type=PythonObjectDagsterType(list), - metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]}, + metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]}, ) }, out={ "result": Out( - metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]} + metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]} ) }, ) @@ -38,13 +39,9 @@ def do_stuff(): transform(extract()) -config = DatahubDagsterSourceConfig.parse_obj( - { - "rest_sink_config": { - "server": "http://localhost:8080", - }, - "dagster_url": "http://localhost:3000", - } +config = DatahubDagsterSourceConfig( + datahub_client_config=DatahubClientConfig(server="http://localhost:8080"), + dagster_url="http://localhost:3000", ) datahub_sensor = make_datahub_sensor(config=config) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 03a224bcf7da47..d48c6d2c1ab5b4 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -197,9 +197,12 @@ transformers: | `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | | `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | | `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. | let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. +If the is_container field is set to true, the module will not only attach the ownerships to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners. + The config, which we’d append to our ingestion recipe YAML, would look like this: ```yaml @@ -251,6 +254,35 @@ The config, which we’d append to our ingestion recipe YAML, would look like th ".*example2.*": ["urn:li:corpuser:username2"] ownership_type: "PRODUCER" ``` +- Add owner to dataset and its containers + ```yaml + transformers: + - type: "pattern_add_dataset_ownership" + config: + is_container: true + replace_existing: true # false is default behaviour + semantics: PATCH / OVERWRITE # Based on user + owner_pattern: + rules: + ".*example1.*": ["urn:li:corpuser:username1"] + ".*example2.*": ["urn:li:corpuser:username2"] + ownership_type: "PRODUCER" + ``` +⚠️ Warning: +When working with two datasets in the same container but with different owners, all owners will be added for that dataset containers. + +For example: +```yaml +transformers: + - type: "pattern_add_dataset_ownership" + config: + is_container: true + owner_pattern: + rules: + ".*example1.*": ["urn:li:corpuser:username1"] + ".*example2.*": ["urn:li:corpuser:username2"] +``` +If example1 and example2 are in the same container, then both urns urn:li:corpuser:username1 and urn:li:corpuser:username2 will be added for respective dataset containers. ## Simple Remove Dataset ownership If we wanted to clear existing owners sent by ingestion source we can use the `simple_remove_dataset_ownership` transformer which removes all owners sent by the ingestion source. @@ -1074,10 +1106,13 @@ transformers: | `domain_pattern` | ✅ | map[regx, list[union[urn, str]] | | dataset urn with regular expression and list of simple domain name or domain urn need to be apply on matching dataset urn. | | `replace_existing` | | boolean | `false` | Whether to remove domains from entity sent by ingestion source. | | `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then domains will be attached to both the dataset and its container. | Let’s suppose we’d like to append a series of domain to specific datasets. To do so, we can use the pattern_add_dataset_domain transformer that’s included in the ingestion framework. This will match the regex pattern to urn of the dataset and assign the respective domain urns given in the array. +If the is_container field is set to true, the module will not only attach the domains to the matching datasets but will also find and attach containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified owners. + The config, which we’d append to our ingestion recipe YAML, would look like this: Here we can set domain list to either urn (i.e. urn:li:domain:hr) or simple domain name (i.e. hr) in both of the cases domain should be provisioned on DataHub GMS @@ -1129,6 +1164,33 @@ in both of the cases domain should be provisioned on DataHub GMS 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] ``` +- Add domains to dataset and its containers + ```yaml + transformers: + - type: "pattern_add_dataset_domain" + config: + is_container: true + semantics: PATCH / OVERWRITE # Based on user + domain_pattern: + rules: + 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"] + 'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"] + ``` +⚠️ Warning: +When working with two datasets in the same container but with different domains, all domains will be added for that dataset containers. + +For example: +```yaml +transformers: + - type: "pattern_add_dataset_domain" + config: + is_container: true + domain_pattern: + rules: + ".*example1.*": ["hr"] + ".*example2.*": ["urn:li:domain:finance"] +``` +If example1 and example2 are in the same container, then both domains hr and finance will be added for respective dataset containers. ## Domain Mapping Based on Tags diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index 8bda5db9a379a0..aa5913f5dc66b1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -138,31 +138,31 @@ def _get_schema_fields( for columnName, column in elastic_schema_dict.items(): elastic_type: Optional[str] = column.get("type") nested_props: Optional[Dict[str, Any]] = column.get(PROPERTIES) - if elastic_type is not None: - self._prefix_name_stack.append(f"[type={elastic_type}].{columnName}") - schema_field_data_type = self.get_column_type(elastic_type) + if nested_props: + self._prefix_name_stack.append(f"[type={PROPERTIES}].{columnName}") schema_field = SchemaField( fieldPath=self._get_cur_field_path(), - nativeDataType=elastic_type, - type=schema_field_data_type, + nativeDataType=PROPERTIES, + type=SchemaFieldDataTypeClass(RecordTypeClass()), description=None, nullable=True, recursive=False, ) yield schema_field + yield from self._get_schema_fields(nested_props) self._prefix_name_stack.pop() - elif nested_props: - self._prefix_name_stack.append(f"[type={PROPERTIES}].{columnName}") + elif elastic_type is not None: + self._prefix_name_stack.append(f"[type={elastic_type}].{columnName}") + schema_field_data_type = self.get_column_type(elastic_type) schema_field = SchemaField( fieldPath=self._get_cur_field_path(), - nativeDataType=PROPERTIES, - type=SchemaFieldDataTypeClass(RecordTypeClass()), + nativeDataType=elastic_type, + type=schema_field_data_type, description=None, nullable=True, recursive=False, ) yield schema_field - yield from self._get_schema_fields(nested_props) self._prefix_name_stack.pop() else: # Unexpected! Log a warning. diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py index adaa3c4875450e..7e23079156b625 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_dataclasses.py @@ -4,11 +4,14 @@ from dataclasses import dataclass from typing import Dict, List, Optional, Set -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition +from datahub.ingestion.source.looker.looker_template_language import ( + load_and_preprocess_file, +) from datahub.ingestion.source.looker.lookml_config import ( _BASE_PROJECT_NAME, _EXPLORE_FILE_EXTENSION, + LookMLSourceConfig, LookMLSourceReport, ) @@ -43,6 +46,7 @@ def from_looker_dict( root_project_name: Optional[str], base_projects_folders: Dict[str, pathlib.Path], path: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, ) -> "LookerModel": logger.debug(f"Loading model from {path}") @@ -54,6 +58,7 @@ def from_looker_dict( root_project_name, base_projects_folders, path, + source_config, reporter, seen_so_far=set(), traversal_path=pathlib.Path(path).stem, @@ -68,7 +73,10 @@ def from_looker_dict( ] for included_file in explore_files: try: - parsed = load_lkml(included_file) + parsed = load_and_preprocess_file( + path=included_file, + source_config=source_config, + ) included_explores = parsed.get("explores", []) explores.extend(included_explores) except Exception as e: @@ -94,6 +102,7 @@ def resolve_includes( root_project_name: Optional[str], base_projects_folder: Dict[str, pathlib.Path], path: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, seen_so_far: Set[str], traversal_path: str = "", # a cosmetic parameter to aid debugging @@ -206,7 +215,10 @@ def resolve_includes( f"Will be loading {included_file}, traversed here via {traversal_path}" ) try: - parsed = load_lkml(included_file) + parsed = load_and_preprocess_file( + path=included_file, + source_config=source_config, + ) seen_so_far.add(included_file) if "includes" in parsed: # we have more includes to resolve! resolved.extend( @@ -216,6 +228,7 @@ def resolve_includes( root_project_name, base_projects_folder, included_file, + source_config, reporter, seen_so_far, traversal_path=traversal_path @@ -259,6 +272,7 @@ def from_looker_dict( root_project_name: Optional[str], base_projects_folder: Dict[str, pathlib.Path], raw_file_content: str, + source_config: LookMLSourceConfig, reporter: LookMLSourceReport, ) -> "LookerViewFile": logger.debug(f"Loading view file at {absolute_file_path}") @@ -272,6 +286,7 @@ def from_looker_dict( root_project_name, base_projects_folder, absolute_file_path, + source_config, reporter, seen_so_far=seen_so_far, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py index 52ebcdde06a279..f894c96debc54a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_file_loader.py @@ -3,11 +3,10 @@ from dataclasses import replace from typing import Dict, Optional -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile from datahub.ingestion.source.looker.looker_template_language import ( - process_lookml_template_language, + load_and_preprocess_file, ) from datahub.ingestion.source.looker.lookml_config import ( _EXPLORE_FILE_EXTENSION, @@ -72,10 +71,8 @@ def _load_viewfile( try: logger.debug(f"Loading viewfile {path}") - parsed = load_lkml(path) - - process_lookml_template_language( - view_lkml_file_dict=parsed, + parsed = load_and_preprocess_file( + path=path, source_config=self.source_config, ) @@ -86,6 +83,7 @@ def _load_viewfile( root_project_name=self._root_project_name, base_projects_folder=self._base_projects_folder, raw_file_content=raw_file_content, + source_config=self.source_config, reporter=reporter, ) logger.debug(f"adding viewfile for path {path} to the cache") diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py index 04f9ec081ee680..1e60c08fe00c2b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_template_language.py @@ -1,12 +1,14 @@ import logging +import pathlib import re from abc import ABC, abstractmethod -from typing import Any, ClassVar, Dict, List, Optional, Set +from typing import Any, ClassVar, Dict, List, Optional, Set, Union from deepmerge import always_merger from liquid import Undefined from liquid.exceptions import LiquidSyntaxError +from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_constant import ( DATAHUB_TRANSFORMED_SQL, DATAHUB_TRANSFORMED_SQL_TABLE_NAME, @@ -390,6 +392,7 @@ def process_lookml_template_language( source_config: LookMLSourceConfig, view_lkml_file_dict: dict, ) -> None: + if "views" not in view_lkml_file_dict: return @@ -416,3 +419,18 @@ def process_lookml_template_language( ) view_lkml_file_dict["views"] = transformed_views + + +def load_and_preprocess_file( + path: Union[str, pathlib.Path], + source_config: LookMLSourceConfig, +) -> dict: + + parsed = load_lkml(path) + + process_lookml_template_language( + view_lkml_file_dict=parsed, + source_config=source_config, + ) + + return parsed diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index bf24f4b84679b1..ce4a242027e11a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -365,8 +365,9 @@ def sql_table_name(self) -> str: return sql_table_name.lower() def datahub_transformed_sql_table_name(self) -> str: - table_name: Optional[str] = self.raw_view.get( - "datahub_transformed_sql_table_name" + # This field might be present in parent view of current view + table_name: Optional[str] = self.get_including_extends( + field="datahub_transformed_sql_table_name" ) if not table_name: diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index b00291caabbf68..e4d8dd19fb7917 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -29,7 +29,6 @@ DatasetSubTypes, ) from datahub.ingestion.source.git.git_import import GitClone -from datahub.ingestion.source.looker.lkml_patched import load_lkml from datahub.ingestion.source.looker.looker_common import ( CORPUSER_DATAHUB, LookerExplore, @@ -45,6 +44,9 @@ get_connection_def_based_on_connection_string, ) from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI +from datahub.ingestion.source.looker.looker_template_language import ( + load_and_preprocess_file, +) from datahub.ingestion.source.looker.looker_view_id_cache import ( LookerModel, LookerViewFileLoader, @@ -311,13 +313,19 @@ def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext): def _load_model(self, path: str) -> LookerModel: logger.debug(f"Loading model from file {path}") - parsed = load_lkml(path) + + parsed = load_and_preprocess_file( + path=path, + source_config=self.source_config, + ) + looker_model = LookerModel.from_looker_dict( parsed, _BASE_PROJECT_NAME, self.source_config.project_name, self.base_projects_folder, path, + self.source_config, self.reporter, ) return looker_model @@ -495,7 +503,10 @@ def get_project_name(self, model_name: str) -> str: def get_manifest_if_present(self, folder: pathlib.Path) -> Optional[LookerManifest]: manifest_file = folder / "manifest.lkml" if manifest_file.exists(): - manifest_dict = load_lkml(manifest_file) + + manifest_dict = load_and_preprocess_file( + path=manifest_file, source_config=self.source_config + ) manifest = LookerManifest( project_name=manifest_dict.get("project_name"), diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index de1022b5482cef..057dbca4281849 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -154,6 +154,7 @@ def _generate_fully_qualified_name( sql_table_name: str, connection_def: LookerConnectionDefinition, reporter: LookMLSourceReport, + view_name: str, ) -> str: """Returns a fully qualified dataset name, resolved through a connection definition. Input sql_table_name can be in three forms: table, db.table, db.schema.table""" @@ -192,7 +193,7 @@ def _generate_fully_qualified_name( reporter.report_warning( title="Malformed Table Name", message="Table name has more than 3 parts.", - context=f"Table Name: {sql_table_name}", + context=f"view-name: {view_name}, table-name: {sql_table_name}", ) return sql_table_name.lower() @@ -280,10 +281,13 @@ def __get_upstream_dataset_urn(self) -> List[Urn]: return [] if sql_parsing_result.debug_info.table_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, sql_query={self.get_sql_query()}" + ) self.reporter.report_warning( title="Table Level Lineage Missing", message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", + context=f"view-name: {self.view_context.name()}, platform: {self.view_context.view_connection.platform}", exc=sql_parsing_result.debug_info.table_error, ) return [] @@ -530,6 +534,7 @@ def __get_upstream_dataset_urn(self) -> Urn: sql_table_name=self.view_context.datahub_transformed_sql_table_name(), connection_def=self.view_context.view_connection, reporter=self.view_context.reporter, + view_name=self.view_context.name(), ) self.upstream_dataset_urn = make_dataset_urn_with_platform_instance( @@ -586,6 +591,7 @@ def __get_upstream_dataset_urn(self) -> List[Urn]: self.view_context.datahub_transformed_sql_table_name(), self.view_context.view_connection, self.view_context.reporter, + self.view_context.name(), ), base_folder_path=self.view_context.base_folder_path, looker_view_id_cache=self.looker_view_id_cache, diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 52dce3a8b7599f..25781cd2f1dcc9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -2,10 +2,11 @@ import logging import ssl import time +from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union +from typing import Callable, Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin import requests @@ -196,6 +197,75 @@ def validator_site_url(cls, site_url: str) -> str: return site_url +class BidirectionalComponentGraph: + def __init__(self): + self._outgoing: Dict[str, Set[str]] = defaultdict(set) + self._incoming: Dict[str, Set[str]] = defaultdict(set) + # this will not count duplicates/removal of non-existing connections correctly - it is only there for a quick check + self._connections_cnt = 0 + + def add_connection(self, from_component: str, to_component: str) -> None: + # this is sanity check + outgoing_duplicated = to_component in self._outgoing[from_component] + incoming_duplicated = from_component in self._incoming[to_component] + + self._outgoing[from_component].add(to_component) + self._incoming[to_component].add(from_component) + self._connections_cnt += 1 + + if outgoing_duplicated or incoming_duplicated: + logger.warning( + f"Somehow we attempted to add a connection between 2 components which already existed! Duplicated incoming: {incoming_duplicated}, duplicated outgoing: {outgoing_duplicated}. Connection from component: {from_component} to component: {to_component}" + ) + + def remove_connection(self, from_component: str, to_component: str) -> None: + self._outgoing[from_component].discard(to_component) + self._incoming[to_component].discard(from_component) + self._connections_cnt -= 1 + + def get_outgoing_connections(self, component: str) -> Set[str]: + return self._outgoing[component] + + def get_incoming_connections(self, component: str) -> Set[str]: + return self._incoming[component] + + def delete_component(self, component: str) -> None: + logger.debug(f"Deleting component with id: {component}") + incoming = self._incoming[component] + logger.debug( + f"Recognized {len(incoming)} incoming connections to the component" + ) + outgoing = self._outgoing[component] + logger.debug( + f"Recognized {len(outgoing)} outgoing connections from the component" + ) + + for i in incoming: + for o in outgoing: + self.add_connection(i, o) + + for i in incoming: + self._outgoing[i].remove(component) + for o in outgoing: + self._incoming[o].remove(component) + + added_connections_cnt = len(incoming) * len(outgoing) + deleted_connections_cnt = len(incoming) + len(outgoing) + logger.debug( + f"Deleted {deleted_connections_cnt} connections and added {added_connections_cnt}" + ) + + del self._outgoing[component] + del self._incoming[component] + + # for performance reasons we are not using `remove_connection` function when deleting an entire component, + # therefor we need to adjust the estimated count + self._connections_cnt -= deleted_connections_cnt + + def __len__(self): + return self._connections_cnt + + TOKEN_ENDPOINT = "access/token" KERBEROS_TOKEN_ENDPOINT = "access/kerberos" ABOUT_ENDPOINT = "flow/about" @@ -360,7 +430,9 @@ class NifiFlow: root_process_group: NifiProcessGroup components: Dict[str, NifiComponent] = field(default_factory=dict) remotely_accessible_ports: Dict[str, NifiComponent] = field(default_factory=dict) - connections: List[Tuple[str, str]] = field(default_factory=list) + connections: BidirectionalComponentGraph = field( + default_factory=BidirectionalComponentGraph + ) processGroups: Dict[str, NifiProcessGroup] = field(default_factory=dict) remoteProcessGroups: Dict[str, NifiRemoteProcessGroup] = field(default_factory=dict) remote_ports: Dict[str, NifiComponent] = field(default_factory=dict) @@ -416,10 +488,15 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": def get_report(self) -> SourceReport: return self.report - def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 + def update_flow( + self, pg_flow_dto: Dict, recursion_level: int = 0 + ) -> None: # noqa: C901 """ Update self.nifi_flow with contents of the input process group `pg_flow_dto` """ + logger.debug( + f"Updating flow with pg_flow_dto {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}, recursion level: {recursion_level}" + ) breadcrumb_dto = pg_flow_dto.get("breadcrumb", {}).get("breadcrumb", {}) nifi_pg = NifiProcessGroup( breadcrumb_dto.get("id"), @@ -433,6 +510,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 flow_dto = pg_flow_dto.get("flow", {}) + logger.debug(f"Processing {len(flow_dto.get('processors', []))} processors") for processor in flow_dto.get("processors", []): component = processor.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -445,6 +523,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 comments=component.get("config", {}).get("comments"), status=component.get("status", {}).get("runStatus"), ) + logger.debug(f"Processing {len(flow_dto.get('funnels', []))} funnels") for funnel in flow_dto.get("funnels", []): component = funnel.get("component") self.nifi_flow.components[component.get("id")] = NifiComponent( @@ -458,13 +537,15 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding funnel {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('connections', []))} connections") for connection in flow_dto.get("connections", []): # Exclude self - recursive relationships if connection.get("sourceId") != connection.get("destinationId"): - self.nifi_flow.connections.append( - (connection.get("sourceId"), connection.get("destinationId")) + self.nifi_flow.connections.add_connection( + connection.get("sourceId"), connection.get("destinationId") ) + logger.debug(f"Processing {len(flow_dto.get('inputPorts', []))} inputPorts") for inputPort in flow_dto.get("inputPorts", []): component = inputPort.get("component") if inputPort.get("allowRemoteAccess"): @@ -492,6 +573,7 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding port {component.get('id')}") + logger.debug(f"Processing {len(flow_dto.get('outputPorts', []))} outputPorts") for outputPort in flow_dto.get("outputPorts", []): component = outputPort.get("component") if outputPort.get("allowRemoteAccess"): @@ -519,6 +601,9 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 ) logger.debug(f"Adding report port {component.get('id')}") + logger.debug( + f"Processing {len(flow_dto.get('remoteProcessGroups', []))} remoteProcessGroups" + ) for rpg in flow_dto.get("remoteProcessGroups", []): rpg_component = rpg.get("component", {}) remote_ports = {} @@ -564,7 +649,13 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 self.nifi_flow.components.update(remote_ports) self.nifi_flow.remoteProcessGroups[nifi_rpg.id] = nifi_rpg + logger.debug( + f"Processing {len(flow_dto.get('processGroups', []))} processGroups" + ) for pg in flow_dto.get("processGroups", []): + logger.debug( + f"Retrieving process group: {pg.get('id')} while updating flow for {pg_flow_dto.get('breadcrumb', {}).get('breadcrumb', {}).get('id')}" + ) pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + pg.get("id") ) @@ -578,11 +669,24 @@ def update_flow(self, pg_flow_dto: Dict) -> None: # noqa: C901 pg_flow_dto = pg_response.json().get("processGroupFlow", {}) - self.update_flow(pg_flow_dto) + self.update_flow(pg_flow_dto, recursion_level=recursion_level + 1) def update_flow_keep_only_ingress_egress(self): components_to_del: List[NifiComponent] = [] - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug( + f"Processing {len(components)} components for keep only ingress/egress" + ) + logger.debug( + f"All the connections recognized: {len(self.nifi_flow.connections)}" + ) + for index, component in enumerate(components, start=1): + logger.debug( + f"Processing {index}th component for ingress/egress pruning. Component id: {component.id}, name: {component.name}, type: {component.type}" + ) + logger.debug( + f"Current amount of connections: {len(self.nifi_flow.connections)}" + ) if ( component.nifi_type is NifiType.PROCESSOR and component.type @@ -592,47 +696,28 @@ def update_flow_keep_only_ingress_egress(self): NifiType.REMOTE_INPUT_PORT, NifiType.REMOTE_OUTPUT_PORT, ]: + self.nifi_flow.connections.delete_component(component.id) components_to_del.append(component) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) - # Create new connections from incoming to outgoing - for i in incoming: - for j in outgoing: - self.nifi_flow.connections.append((i[0], j[1])) - - # Remove older connections, as we already created - # new connections bypassing component to be deleted - - for i in incoming: - self.nifi_flow.connections.remove(i) - for j in outgoing: - self.nifi_flow.connections.remove(j) - - for c in components_to_del: - if c.nifi_type is NifiType.PROCESSOR and ( - c.name.startswith("Get") - or c.name.startswith("List") - or c.name.startswith("Fetch") - or c.name.startswith("Put") + + for component in components_to_del: + if component.nifi_type is NifiType.PROCESSOR and component.name.startswith( + ("Get", "List", "Fetch", "Put") ): self.report.warning( - f"Dropping NiFi Processor of type {c.type}, id {c.id}, name {c.name} from lineage view. \ + f"Dropping NiFi Processor of type {component.type}, id {component.id}, name {component.name} from lineage view. \ This is likely an Ingress or Egress node which may be reading to/writing from external datasets \ However not currently supported in datahub", self.config.site_url, ) else: logger.debug( - f"Dropping NiFi Component of type {c.type}, id {c.id}, name {c.name} from lineage view." + f"Dropping NiFi Component of type {component.type}, id {component.id}, name {component.name} from lineage view." ) - del self.nifi_flow.components[c.id] + del self.nifi_flow.components[component.id] def create_nifi_flow(self): + logger.debug(f"Retrieving NIFI info from {ABOUT_ENDPOINT}") about_response = self.session.get( url=urljoin(self.rest_api_base_url, ABOUT_ENDPOINT) ) @@ -646,6 +731,8 @@ def create_nifi_flow(self): ) else: logger.warning("Failed to fetch version for nifi") + logger.debug(f"Retrieved nifi version: {nifi_version}") + logger.debug(f"Retrieving cluster info from {CLUSTER_ENDPOINT}") cluster_response = self.session.get( url=urljoin(self.rest_api_base_url, CLUSTER_ENDPOINT) ) @@ -654,8 +741,10 @@ def create_nifi_flow(self): clustered = ( cluster_response.json().get("clusterSummary", {}).get("clustered") ) + logger.debug(f"Retrieved cluster summary: {clustered}") else: logger.warning("Failed to fetch cluster summary for flow") + logger.debug("Retrieving ROOT Process Group") pg_response = self.session.get( url=urljoin(self.rest_api_base_url, PG_ENDPOINT) + "root" ) @@ -695,7 +784,7 @@ def fetch_provenance_events( if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) provenance_uri = provenance.get("uri") - + logger.debug(f"Retrieving provenance uri: {provenance_uri}") provenance_response = self.session.get(provenance_uri) if provenance_response.ok: provenance = provenance_response.json().get("provenance", {}) @@ -734,7 +823,9 @@ def fetch_provenance_events( total = provenance.get("results", {}).get("total") totalCount = provenance.get("results", {}).get("totalCount") + logger.debug(f"Retrieved {totalCount} of {total}") if total != str(totalCount): + logger.debug("Trying to retrieve more events for the same processor") yield from self.fetch_provenance_events( processor, eventType, startDate, oldest_event_time ) @@ -800,6 +891,7 @@ def submit_provenance_query(self, processor, eventType, startDate, endDate): return provenance_response def delete_provenance(self, provenance_uri): + logger.debug(f"Deleting provenance with uri: {provenance_uri}") delete_response = self.session.delete(provenance_uri) if not delete_response.ok: logger.error("failed to delete provenance ", provenance_uri) @@ -821,12 +913,8 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 job_name = component.name job_urn = builder.make_data_job_urn_with_flow(flow_urn, component.id) - incoming = list( - filter(lambda x: x[1] == component.id, self.nifi_flow.connections) - ) - outgoing = list( - filter(lambda x: x[0] == component.id, self.nifi_flow.connections) - ) + incoming = self.nifi_flow.connections.get_incoming_connections(component.id) + outgoing = self.nifi_flow.connections.get_outgoing_connections(component.id) inputJobs = set() jobProperties = None @@ -864,8 +952,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 datasetProperties=dataset.dataset_properties, ) - for edge in incoming: - incoming_from = edge[0] + for incoming_from in incoming: if incoming_from in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[incoming_from].name}" dataset_urn = builder.make_dataset_urn( @@ -882,8 +969,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 builder.make_data_job_urn_with_flow(flow_urn, incoming_from) ) - for edge in outgoing: - outgoing_to = edge[1] + for outgoing_to in outgoing: if outgoing_to in self.nifi_flow.remotely_accessible_ports.keys(): dataset_name = f"{self.config.site_name}.{self.nifi_flow.remotely_accessible_ports[outgoing_to].name}" dataset_urn = builder.make_dataset_urn( @@ -977,14 +1063,19 @@ def make_flow_urn(self) -> str: ) def process_provenance_events(self): + logger.debug("Starting processing of provenance events") startDate = datetime.now(timezone.utc) - timedelta( days=self.config.provenance_days ) eventAnalyzer = NifiProcessorProvenanceEventAnalyzer() eventAnalyzer.env = self.config.env - - for component in self.nifi_flow.components.values(): + components = self.nifi_flow.components.values() + logger.debug(f"Processing {len(components)} components") + for component in components: + logger.debug( + f"Processing provenance events for component id: {component.id} name: {component.name}" + ) if component.nifi_type is NifiType.PROCESSOR: eventType = eventAnalyzer.KNOWN_INGRESS_EGRESS_PROCESORS[component.type] events = self.fetch_provenance_events(component, eventType, startDate) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index 5112a443768db0..54be2e5fac1e30 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, cast +import logging +from typing import Callable, Dict, List, Optional, Union, cast import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( @@ -9,16 +10,22 @@ ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.transformer.dataset_transformer import ( DatasetOwnershipTransformer, ) from datahub.metadata.schema_classes import ( + BrowsePathsV2Class, + MetadataChangeProposalClass, OwnerClass, OwnershipClass, OwnershipTypeClass, ) +from datahub.specific.dashboard import DashboardPatchBuilder + +logger = logging.getLogger(__name__) class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): @@ -27,6 +34,8 @@ class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): _resolve_owner_fn = pydantic_resolve_key("get_owners_to_add") + is_container: bool = False + class AddDatasetOwnership(DatasetOwnershipTransformer): """Transformer that adds owners to datasets according to a callback function.""" @@ -70,6 +79,52 @@ def _merge_with_server_ownership( return mce_ownership + def handle_end_of_stream( + self, + ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + if not self.config.is_container: + return [] + + logger.debug("Generating Ownership for containers") + ownership_container_mapping: Dict[str, List[OwnerClass]] = {} + for entity_urn, data_ownerships in ( + (urn, self.config.get_owners_to_add(urn)) for urn in self.entity_map.keys() + ): + if not data_ownerships: + continue + + assert self.ctx.graph + browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class) + if not browse_paths: + continue + + for path in browse_paths.path: + container_urn = path.urn + + if not container_urn or not container_urn.startswith( + "urn:li:container:" + ): + continue + + if container_urn not in ownership_container_mapping: + ownership_container_mapping[container_urn] = data_ownerships + else: + ownership_container_mapping[container_urn] = list( + ownership_container_mapping[container_urn] + data_ownerships + ) + + mcps: List[ + Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] + ] = [] + + for urn, owners in ownership_container_mapping.items(): + patch_builder = DashboardPatchBuilder(urn) + for owner in owners: + patch_builder.add_owner(owner) + mcps.extend(list(patch_builder.build())) + + return mcps + def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -147,6 +202,7 @@ def create( class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig): owner_pattern: KeyValuePattern = KeyValuePattern.all() default_actor: str = builder.make_user_urn("etl") + is_container: bool = False class PatternAddDatasetOwnership(AddDatasetOwnership): @@ -169,6 +225,7 @@ def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext): default_actor=config.default_actor, semantics=config.semantics, replace_existing=config.replace_existing, + is_container=config.is_container, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py index 82dd21bbdd1d17..6a838248152650 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_domain.py @@ -1,4 +1,5 @@ -from typing import Callable, List, Optional, Union, cast +import logging +from typing import Callable, Dict, List, Optional, Sequence, Union, cast from datahub.configuration.common import ( ConfigurationError, @@ -8,12 +9,19 @@ ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer -from datahub.metadata.schema_classes import DomainsClass +from datahub.metadata.schema_classes import ( + BrowsePathsV2Class, + DomainsClass, + MetadataChangeProposalClass, +) from datahub.utilities.registries.domain_registry import DomainRegistry +logger = logging.getLogger(__name__) + class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): get_domains_to_add: Union[ @@ -23,6 +31,8 @@ class AddDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): _resolve_domain_fn = pydantic_resolve_key("get_domains_to_add") + is_container: bool = False + class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domains: List[str] @@ -30,6 +40,7 @@ class SimpleDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): class PatternDatasetDomainSemanticsConfig(TransformerSemanticsConfigModel): domain_pattern: KeyValuePattern = KeyValuePattern.all() + is_container: bool = False class AddDatasetDomain(DatasetDomainTransformer): @@ -90,6 +101,56 @@ def _merge_with_server_domains( return mce_domain + def handle_end_of_stream( + self, + ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: + domain_mcps: List[MetadataChangeProposalWrapper] = [] + container_domain_mapping: Dict[str, List[str]] = {} + + logger.debug("Generating Domains for containers") + + if not self.config.is_container: + return domain_mcps + + for entity_urn, domain_to_add in ( + (urn, self.config.get_domains_to_add(urn)) for urn in self.entity_map.keys() + ): + if not domain_to_add or not domain_to_add.domains: + continue + + assert self.ctx.graph + browse_paths = self.ctx.graph.get_aspect(entity_urn, BrowsePathsV2Class) + if not browse_paths: + continue + + for path in browse_paths.path: + container_urn = path.urn + + if not container_urn or not container_urn.startswith( + "urn:li:container:" + ): + continue + + if container_urn not in container_domain_mapping: + container_domain_mapping[container_urn] = domain_to_add.domains + else: + container_domain_mapping[container_urn] = list( + set( + container_domain_mapping[container_urn] + + domain_to_add.domains + ) + ) + + for urn, domains in container_domain_mapping.items(): + domain_mcps.append( + MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=DomainsClass(domains=domains), + ) + ) + + return domain_mcps + def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -156,6 +217,7 @@ def resolve_domain(domain_urn: str) -> DomainsClass: get_domains_to_add=resolve_domain, semantics=config.semantics, replace_existing=config.replace_existing, + is_container=config.is_container, ) super().__init__(generic_config, ctx) diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index a5d838cb16d73a..e4eb564e3e86b7 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -2,6 +2,7 @@ import pathlib from typing import Any, List from unittest import mock +from unittest.mock import MagicMock import pydantic import pytest @@ -14,13 +15,13 @@ from datahub.ingestion.source.file import read_metadata_file from datahub.ingestion.source.looker.looker_template_language import ( SpecialVariable, + load_and_preprocess_file, resolve_liquid_variable, ) from datahub.ingestion.source.looker.lookml_source import ( LookerModel, LookerRefinementResolver, LookMLSourceConfig, - load_lkml, ) from datahub.metadata.schema_classes import ( DatasetSnapshotClass, @@ -870,7 +871,11 @@ def test_manifest_parser(pytestconfig: pytest.Config) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" manifest_file = test_resources_dir / "lkml_manifest_samples/complex-manifest.lkml" - manifest = load_lkml(manifest_file) + manifest = load_and_preprocess_file( + path=manifest_file, + source_config=MagicMock(), + ) + assert manifest diff --git a/metadata-ingestion/tests/unit/test_nifi_source.py b/metadata-ingestion/tests/unit/test_nifi_source.py index 9e8bf64261ffaf..30a0855d44f341 100644 --- a/metadata-ingestion/tests/unit/test_nifi_source.py +++ b/metadata-ingestion/tests/unit/test_nifi_source.py @@ -6,6 +6,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.nifi import ( + BidirectionalComponentGraph, NifiComponent, NifiFlow, NifiProcessGroup, @@ -55,7 +56,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0", @@ -126,7 +127,7 @@ def test_nifi_s3_provenance_event(): ) }, remotely_accessible_ports={}, - connections=[], + connections=BidirectionalComponentGraph(), processGroups={ "803ebb92-017d-1000-2961-4bdaa27a3ba0": NifiProcessGroup( id="803ebb92-017d-1000-2961-4bdaa27a3ba0", diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 506bfd9c12674a..46c6390b184d36 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1105,6 +1105,354 @@ def test_pattern_dataset_ownership_with_invalid_type_transformation(mock_time): ) +def test_pattern_container_and_dataset_ownership_transformation( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_transformation" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[models.StatusClass(removed=False)], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ) + ], + ), + ) + + # Not a dataset, should be ignored + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + not_a_dataset, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example1.*": [builder.make_user_urn("person1")], + ".*example2.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 3 + + # Check the first entry. + assert inputs[0] == outputs[0].record + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[3].record.aspect + assert first_ownership_aspect + assert len(first_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in first_ownership_aspect.owners + ] + ) + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 2 # One existing + one new + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in second_ownership_aspect.owners + ] + ) + + # Check container ownerships + for i in range(2): + container_ownership_aspect = outputs[i + 4].record.aspect + assert container_ownership_aspect + ownership = json.loads(container_ownership_aspect.value.decode("utf-8")) + assert len(ownership) == 2 + assert ownership[0]["value"]["owner"] == builder.make_user_urn("person1") + assert ownership[1]["value"]["owner"] == builder.make_user_urn("person2") + + # Verify that the third input (not a dataset) is unchanged + assert inputs[2] == outputs[2].record + + +def test_pattern_container_and_dataset_ownership_with_no_container( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return None + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_with_no_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[ + models.StatusClass(removed=False), + models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ), + ], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ), + models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ), + ], + ), + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example1.*": [builder.make_user_urn("person1")], + ".*example2.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[2].record.aspect + assert first_ownership_aspect + assert len(first_ownership_aspect.owners) == 1 + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in first_ownership_aspect.owners + ] + ) + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 2 # One existing + one new + assert all( + [ + owner.type == models.OwnershipTypeClass.DATAOWNER + for owner in second_ownership_aspect.owners + ] + ) + + +def test_pattern_container_and_dataset_ownership_with_no_match( + mock_time, mock_datahub_graph +): + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ) + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_ownership_with_no_match" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + # No owner aspect for the first dataset + no_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", + aspects=[ + models.StatusClass(removed=False), + ], + ), + ) + # Dataset with an existing owner + with_owner_aspect = models.MetadataChangeEventClass( + proposedSnapshot=models.DatasetSnapshotClass( + urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)", + aspects=[ + models.OwnershipClass( + owners=[ + models.OwnerClass( + owner=builder.make_user_urn("fake_owner"), + type=models.OwnershipTypeClass.DATAOWNER, + ), + ], + lastModified=models.AuditStampClass( + time=1625266033123, actor="urn:li:corpuser:datahub" + ), + ) + ], + ), + ) + + inputs = [ + no_owner_aspect, + with_owner_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support + transformer = PatternAddDatasetOwnership.create( + { + "owner_pattern": { + "rules": { + ".*example3.*": [builder.make_user_urn("person1")], + ".*example4.*": [builder.make_user_urn("person2")], + } + }, + "ownership_type": "DATAOWNER", + "is_container": True, # Enable container ownership handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + # Check the ownership for the first dataset (example1) + first_ownership_aspect = outputs[2].record.aspect + assert first_ownership_aspect + assert builder.make_user_urn("person1") not in first_ownership_aspect.owners + assert builder.make_user_urn("person2") not in first_ownership_aspect.owners + + # Check the ownership for the second dataset (example2) + second_ownership_aspect = builder.get_aspect_if_available( + outputs[1].record, models.OwnershipClass + ) + assert second_ownership_aspect + assert len(second_ownership_aspect.owners) == 1 + assert builder.make_user_urn("person1") not in second_ownership_aspect.owners + assert builder.make_user_urn("person2") not in second_ownership_aspect.owners + assert ( + builder.make_user_urn("fake_owner") == second_ownership_aspect.owners[0].owner + ) + + def gen_owners( owners: List[str], ownership_type: Union[ @@ -2435,6 +2783,224 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: assert server_owner in owner_urns +def test_pattern_container_and_dataset_domain_transformation(mock_datahub_graph): + datahub_domain = builder.make_domain_urn("datahubproject.io") + acryl_domain = builder.make_domain_urn("acryl_domain") + server_domain = builder.make_domain_urn("server_domain") + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ), + models.BrowsePathEntryClass( + id="container_2", urn="urn:li:container:container_2" + ), + ] + ) + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_domain_transformation" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + with_domain_aspect = make_generic_dataset_mcp( + aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + ) + no_domain_aspect = make_generic_dataset_mcp( + entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" + ) + + # Not a dataset, should be ignored + not_a_dataset = models.MetadataChangeEventClass( + proposedSnapshot=models.DataJobSnapshotClass( + urn="urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_456)", + aspects=[ + models.DataJobInfoClass( + name="User Deletions", + description="Constructs the fct_users_deleted from logging_events", + type=models.AzkabanJobTypeClass.SQL, + ) + ], + ) + ) + + inputs = [ + with_domain_aspect, + no_domain_aspect, + not_a_dataset, + EndOfStream(), + ] + + # Initialize the transformer with container support for domains + transformer = PatternAddDatasetDomain.create( + { + "domain_pattern": { + "rules": { + ".*example1.*": [acryl_domain, server_domain], + ".*example2.*": [server_domain], + } + }, + "is_container": True, # Enable container domain handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert ( + len(outputs) == len(inputs) + 3 + ) # MCPs for the dataset without domains and the containers + + first_domain_aspect = outputs[0].record.aspect + assert first_domain_aspect + assert len(first_domain_aspect.domains) == 3 + assert all( + domain in first_domain_aspect.domains + for domain in [datahub_domain, acryl_domain, server_domain] + ) + + second_domain_aspect = outputs[3].record.aspect + assert second_domain_aspect + assert len(second_domain_aspect.domains) == 1 + assert server_domain in second_domain_aspect.domains + + # Verify that the third input (not a dataset) is unchanged + assert inputs[2] == outputs[2].record + + # Verify conainer 1 and container 2 should contain all domains + container_1 = outputs[4].record.aspect + assert len(container_1.domains) == 2 + assert acryl_domain in container_1.domains + assert server_domain in container_1.domains + + container_2 = outputs[5].record.aspect + assert len(container_2.domains) == 2 + assert acryl_domain in container_2.domains + assert server_domain in container_2.domains + + +def test_pattern_container_and_dataset_domain_transformation_with_no_container( + mock_datahub_graph, +): + datahub_domain = builder.make_domain_urn("datahubproject.io") + acryl_domain = builder.make_domain_urn("acryl_domain") + server_domain = builder.make_domain_urn("server_domain") + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> Optional[models.BrowsePathsV2Class]: + return None + + pipeline_context = PipelineContext( + run_id="test_pattern_container_and_dataset_domain_transformation_with_no_container" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + with_domain_aspect = make_generic_dataset_mcp( + aspect=models.DomainsClass(domains=[datahub_domain]), aspect_name="domains" + ) + no_domain_aspect = make_generic_dataset_mcp( + entity_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)" + ) + + inputs = [ + with_domain_aspect, + no_domain_aspect, + EndOfStream(), + ] + + # Initialize the transformer with container support for domains + transformer = PatternAddDatasetDomain.create( + { + "domain_pattern": { + "rules": { + ".*example1.*": [acryl_domain, server_domain], + ".*example2.*": [server_domain], + } + }, + "is_container": True, # Enable container domain handling + }, + pipeline_context, + ) + + outputs = list( + transformer.transform([RecordEnvelope(input, metadata={}) for input in inputs]) + ) + + assert len(outputs) == len(inputs) + 1 + + first_domain_aspect = outputs[0].record.aspect + assert first_domain_aspect + assert len(first_domain_aspect.domains) == 3 + assert all( + domain in first_domain_aspect.domains + for domain in [datahub_domain, acryl_domain, server_domain] + ) + + second_domain_aspect = outputs[2].record.aspect + assert second_domain_aspect + assert len(second_domain_aspect.domains) == 1 + assert server_domain in second_domain_aspect.domains + + +def test_pattern_add_container_dataset_domain_no_match(mock_datahub_graph): + acryl_domain = builder.make_domain_urn("acryl.io") + datahub_domain = builder.make_domain_urn("datahubproject.io") + pattern = "urn:li:dataset:\\(urn:li:dataPlatform:invalid,.*" + + pipeline_context: PipelineContext = PipelineContext( + run_id="test_simple_add_dataset_domain" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + def fake_get_aspect( + entity_urn: str, + aspect_type: Type[models.BrowsePathsV2Class], + version: int = 0, + ) -> models.BrowsePathsV2Class: + return models.BrowsePathsV2Class( + path=[ + models.BrowsePathEntryClass( + id="container_1", urn="urn:li:container:container_1" + ) + ] + ) + + pipeline_context.graph.get_aspect = fake_get_aspect # type: ignore + + output = run_dataset_transformer_pipeline( + transformer_type=PatternAddDatasetDomain, + aspect=models.DomainsClass(domains=[datahub_domain]), + config={ + "replace_existing": True, + "domain_pattern": {"rules": {pattern: [acryl_domain]}}, + "is_container": True, + }, + pipeline_context=pipeline_context, + ) + + assert len(output) == 2 + assert output[0] is not None + assert output[0].record is not None + assert isinstance(output[0].record, MetadataChangeProposalWrapper) + assert output[0].record.aspect is not None + assert isinstance(output[0].record.aspect, models.DomainsClass) + transformed_aspect = cast(models.DomainsClass, output[0].record.aspect) + assert len(transformed_aspect.domains) == 0 + + def run_pattern_dataset_schema_terms_transformation_semantics( semantics: TransformerSemantics, mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph], diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCLBootstrapManagerFactory.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCLBootstrapManagerFactory.java index 8ad1638115dae3..8a913910b3523c 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCLBootstrapManagerFactory.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCLBootstrapManagerFactory.java @@ -11,7 +11,6 @@ import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -27,9 +26,6 @@ public class MCLBootstrapManagerFactory { @Autowired private ConfigurationProvider _configurationProvider; - @Value("${bootstrap.upgradeDefaultBrowsePaths.enabled}") - private Boolean _upgradeDefaultBrowsePathsEnabled; - @Bean(name = "mclBootstrapManager") @Scope("singleton") @Nonnull diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCPBootstrapManagerFactory.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCPBootstrapManagerFactory.java index 0220764cd99d69..5419fa48ee5bad 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCPBootstrapManagerFactory.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/boot/MCPBootstrapManagerFactory.java @@ -11,7 +11,6 @@ import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -27,9 +26,6 @@ public class MCPBootstrapManagerFactory { @Autowired private ConfigurationProvider _configurationProvider; - @Value("${bootstrap.upgradeDefaultBrowsePaths.enabled}") - private Boolean _upgradeDefaultBrowsePathsEnabled; - @Bean(name = "mcpBootstrapManager") @Scope("singleton") @Nonnull diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 0ce0b976c976e8..5e07bfc479e93c 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -343,12 +343,6 @@ incidents: consumerGroupSuffix: ${INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX:} bootstrap: - upgradeDefaultBrowsePaths: - enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones - backfillBrowsePathsV2: - enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate - reprocessDefaultBrowsePathsV2: - enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false} # reprocess V2 browse paths which were set to the default: {"path":[{"id":"Default"}]} policies: file: ${BOOTSTRAP_POLICIES_FILE:classpath:boot/policies.json} # eg for local file diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 97a009dcbbb6d4..9e29883f439a74 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -9,7 +9,6 @@ import com.linkedin.metadata.boot.BootstrapManager; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.boot.dependencies.BootstrapDependency; -import com.linkedin.metadata.boot.steps.BackfillBrowsePathsV2Step; import com.linkedin.metadata.boot.steps.IndexDataPlatformsStep; import com.linkedin.metadata.boot.steps.IngestDataPlatformInstancesStep; import com.linkedin.metadata.boot.steps.IngestDataPlatformsStep; @@ -25,7 +24,6 @@ import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices; import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices; import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices; -import com.linkedin.metadata.boot.steps.UpgradeDefaultBrowsePathsStep; import com.linkedin.metadata.boot.steps.WaitForSystemUpdateStep; import com.linkedin.metadata.entity.AspectMigrationsDao; import com.linkedin.metadata.entity.EntityService; @@ -89,12 +87,6 @@ public class BootstrapManagerFactory { @Autowired private ConfigurationProvider _configurationProvider; - @Value("${bootstrap.upgradeDefaultBrowsePaths.enabled}") - private Boolean _upgradeDefaultBrowsePathsEnabled; - - @Value("${bootstrap.backfillBrowsePathsV2.enabled}") - private Boolean _backfillBrowsePathsV2Enabled; - @Value("${bootstrap.policies.file}") private Resource _policiesResource; @@ -154,14 +146,6 @@ protected BootstrapManager createInstance( ingestDataTypesStep, ingestEntityTypesStep)); - if (_upgradeDefaultBrowsePathsEnabled) { - finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService)); - } - - if (_backfillBrowsePathsV2Enabled) { - finalSteps.add(new BackfillBrowsePathsV2Step(_entityService, _searchService)); - } - return new BootstrapManager(finalSteps); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java deleted file mode 100644 index 2c00c73c96549e..00000000000000 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java +++ /dev/null @@ -1,156 +0,0 @@ -package com.linkedin.metadata.boot.steps; - -import static com.linkedin.metadata.utils.CriterionUtils.buildExistsCriterion; -import static com.linkedin.metadata.utils.CriterionUtils.buildIsNullCriterion; -import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePathsV2; -import com.linkedin.common.urn.Urn; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.Constants; -import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; -import com.linkedin.metadata.boot.UpgradeStep; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.query.filter.ConjunctiveCriterion; -import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; -import com.linkedin.metadata.query.filter.Criterion; -import com.linkedin.metadata.query.filter.CriterionArray; -import com.linkedin.metadata.query.filter.Filter; -import com.linkedin.metadata.search.ScrollResult; -import com.linkedin.metadata.search.SearchEntity; -import com.linkedin.metadata.search.SearchService; -import com.linkedin.metadata.utils.GenericRecordUtils; -import com.linkedin.mxe.MetadataChangeProposal; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Set; -import javax.annotation.Nonnull; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class BackfillBrowsePathsV2Step extends UpgradeStep { - - private static final Set ENTITY_TYPES_TO_MIGRATE = - ImmutableSet.of( - Constants.DATASET_ENTITY_NAME, - Constants.DASHBOARD_ENTITY_NAME, - Constants.CHART_ENTITY_NAME, - Constants.DATA_JOB_ENTITY_NAME, - Constants.DATA_FLOW_ENTITY_NAME, - Constants.ML_MODEL_ENTITY_NAME, - Constants.ML_MODEL_GROUP_ENTITY_NAME, - Constants.ML_FEATURE_TABLE_ENTITY_NAME, - Constants.ML_FEATURE_ENTITY_NAME); - private static final String VERSION = "2"; - private static final String UPGRADE_ID = "backfill-default-browse-paths-v2-step"; - private static final Integer BATCH_SIZE = 5000; - - private final SearchService searchService; - - public BackfillBrowsePathsV2Step(EntityService entityService, SearchService searchService) { - super(entityService, VERSION, UPGRADE_ID); - this.searchService = searchService; - } - - @Nonnull - @Override - public ExecutionMode getExecutionMode() { - return ExecutionMode.BLOCKING; // ensure there are no write conflicts. - } - - @Override - public void upgrade(@Nonnull OperationContext systemOpContext) throws Exception { - final AuditStamp auditStamp = - new AuditStamp() - .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)) - .setTime(System.currentTimeMillis()); - - String scrollId = null; - for (String entityType : ENTITY_TYPES_TO_MIGRATE) { - int migratedCount = 0; - do { - log.info( - String.format( - "Upgrading batch %s-%s of browse paths for entity type %s", - migratedCount, migratedCount + BATCH_SIZE, entityType)); - scrollId = backfillBrowsePathsV2(systemOpContext, entityType, auditStamp, scrollId); - migratedCount += BATCH_SIZE; - } while (scrollId != null); - } - } - - private String backfillBrowsePathsV2( - @Nonnull OperationContext systemOperationContext, - String entityType, - AuditStamp auditStamp, - String scrollId) - throws Exception { - - // Condition: has `browsePaths` AND does NOT have `browsePathV2` - Criterion missingBrowsePathV2 = buildIsNullCriterion("browsePathV2"); - - // Excludes entities without browsePaths - Criterion hasBrowsePathV1 = buildExistsCriterion("browsePaths"); - - CriterionArray criterionArray = new CriterionArray(); - criterionArray.add(missingBrowsePathV2); - criterionArray.add(hasBrowsePathV1); - - ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion(); - conjunctiveCriterion.setAnd(criterionArray); - - ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray(); - conjunctiveCriterionArray.add(conjunctiveCriterion); - - Filter filter = new Filter(); - filter.setOr(conjunctiveCriterionArray); - - final ScrollResult scrollResult = - searchService.scrollAcrossEntities( - systemOperationContext, - ImmutableList.of(entityType), - "*", - filter, - null, - scrollId, - "5m", - BATCH_SIZE); - if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) { - return null; - } - - for (SearchEntity searchEntity : scrollResult.getEntities()) { - try { - ingestBrowsePathsV2(systemOperationContext, searchEntity.getEntity(), auditStamp); - } catch (Exception e) { - // don't stop the whole step because of one bad urn or one bad ingestion - log.error( - String.format( - "Error ingesting default browsePathsV2 aspect for urn %s", - searchEntity.getEntity()), - e); - } - } - - return scrollResult.getScrollId(); - } - - private void ingestBrowsePathsV2( - @Nonnull OperationContext systemOperationContext, Urn urn, AuditStamp auditStamp) - throws Exception { - BrowsePathsV2 browsePathsV2 = - DefaultAspectsUtil.buildDefaultBrowsePathV2( - systemOperationContext, urn, true, entityService); - log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2)); - MetadataChangeProposal proposal = new MetadataChangeProposal(); - proposal.setEntityUrn(urn); - proposal.setEntityType(urn.getEntityType()); - proposal.setAspectName(Constants.BROWSE_PATHS_V2_ASPECT_NAME); - proposal.setChangeType(ChangeType.UPSERT); - proposal.setSystemMetadata(createDefaultSystemMetadata()); - proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2)); - entityService.ingestProposal(systemOperationContext, proposal, auditStamp, false); - } -} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java deleted file mode 100644 index 89846476a9875e..00000000000000 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.linkedin.metadata.boot.steps; - -import static com.linkedin.metadata.Constants.*; -import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata; - -import com.google.common.collect.ImmutableSet; -import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePaths; -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.Constants; -import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil; -import com.linkedin.metadata.boot.UpgradeStep; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ListResult; -import com.linkedin.metadata.query.ExtraInfo; -import com.linkedin.metadata.search.utils.BrowsePathUtils; -import com.linkedin.metadata.utils.GenericRecordUtils; -import com.linkedin.mxe.MetadataChangeProposal; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Set; -import javax.annotation.Nonnull; -import lombok.extern.slf4j.Slf4j; - -/** - * This is an opt-in optional upgrade step to migrate your browse paths to the new truncated form. - * It is idempotent, can be retried as many times as necessary. - */ -@Slf4j -public class UpgradeDefaultBrowsePathsStep extends UpgradeStep { - - private static final Set ENTITY_TYPES_TO_MIGRATE = - ImmutableSet.of( - Constants.DATASET_ENTITY_NAME, - Constants.DASHBOARD_ENTITY_NAME, - Constants.CHART_ENTITY_NAME, - Constants.DATA_JOB_ENTITY_NAME, - Constants.DATA_FLOW_ENTITY_NAME); - private static final String VERSION = "1"; - private static final String UPGRADE_ID = "upgrade-default-browse-paths-step"; - private static final Integer BATCH_SIZE = 5000; - - public UpgradeDefaultBrowsePathsStep(EntityService entityService) { - super(entityService, VERSION, UPGRADE_ID); - } - - @Override - public void upgrade(@Nonnull OperationContext systemOperationContext) throws Exception { - final AuditStamp auditStamp = - new AuditStamp() - .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)) - .setTime(System.currentTimeMillis()); - - int total = 0; - for (String entityType : ENTITY_TYPES_TO_MIGRATE) { - int migratedCount = 0; - do { - log.info( - String.format( - "Upgrading batch %s-%s out of %s of browse paths for entity type %s", - migratedCount, migratedCount + BATCH_SIZE, total, entityType)); - total = - getAndMigrateBrowsePaths(systemOperationContext, entityType, migratedCount, auditStamp); - migratedCount += BATCH_SIZE; - } while (migratedCount < total); - } - log.info("Successfully upgraded all browse paths!"); - } - - @Nonnull - @Override - public ExecutionMode getExecutionMode() { - return ExecutionMode.BLOCKING; // ensure there are no write conflicts. - } - - private int getAndMigrateBrowsePaths( - @Nonnull OperationContext opContext, String entityType, int start, AuditStamp auditStamp) - throws Exception { - - final ListResult latestAspects = - entityService.listLatestAspects( - opContext, entityType, Constants.BROWSE_PATHS_ASPECT_NAME, start, BATCH_SIZE); - - if (latestAspects.getTotalCount() == 0 - || latestAspects.getValues() == null - || latestAspects.getMetadata() == null) { - log.debug( - String.format( - "Found 0 browse paths for entity with type %s. Skipping migration!", entityType)); - return 0; - } - - if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) { - // Bad result -- we should log that we cannot migrate this batch of paths. - log.warn( - "Failed to match browse path aspects with corresponding urns. Found mismatched length between aspects ({})" - + "and metadata ({}) for metadata {}", - latestAspects.getValues().size(), - latestAspects.getMetadata().getExtraInfos().size(), - latestAspects.getMetadata()); - return latestAspects.getTotalCount(); - } - - for (int i = 0; i < latestAspects.getValues().size(); i++) { - - ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i); - RecordTemplate browsePathsRec = latestAspects.getValues().get(i); - - // Assert on 2 conditions: - // 1. The latest browse path aspect contains only 1 browse path - // 2. The latest browse path matches exactly the legacy default path - - Urn urn = info.getUrn(); - BrowsePaths browsePaths = (BrowsePaths) browsePathsRec; - - log.debug(String.format("Inspecting browse path for urn %s, value %s", urn, browsePaths)); - - if (browsePaths.hasPaths() && browsePaths.getPaths().size() == 1) { - String legacyBrowsePath = - BrowsePathUtils.getLegacyDefaultBrowsePath(urn, opContext.getEntityRegistry()); - log.debug(String.format("Legacy browse path for urn %s, value %s", urn, legacyBrowsePath)); - if (legacyBrowsePath.equals(browsePaths.getPaths().get(0))) { - migrateBrowsePath(opContext, urn, auditStamp); - } - } - } - - return latestAspects.getTotalCount(); - } - - private void migrateBrowsePath( - @Nonnull OperationContext opContext, Urn urn, AuditStamp auditStamp) throws Exception { - BrowsePaths newPaths = DefaultAspectsUtil.buildDefaultBrowsePath(opContext, urn, entityService); - log.debug(String.format("Updating browse path for urn %s to value %s", urn, newPaths)); - MetadataChangeProposal proposal = new MetadataChangeProposal(); - proposal.setEntityUrn(urn); - proposal.setEntityType(urn.getEntityType()); - proposal.setAspectName(Constants.BROWSE_PATHS_ASPECT_NAME); - proposal.setChangeType(ChangeType.UPSERT); - proposal.setSystemMetadata(createDefaultSystemMetadata()); - proposal.setAspect(GenericRecordUtils.serializeAspect(newPaths)); - entityService.ingestProposal(opContext, proposal, auditStamp, false); - } -} diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java deleted file mode 100644 index 54c2cd73dfe1d0..00000000000000 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java +++ /dev/null @@ -1,188 +0,0 @@ -package com.linkedin.metadata.boot.steps; - -import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; - -import com.google.common.collect.ImmutableList; -import com.linkedin.common.AuditStamp; -import com.linkedin.common.urn.Urn; -import com.linkedin.common.urn.UrnUtils; -import com.linkedin.entity.Aspect; -import com.linkedin.entity.EntityResponse; -import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.entity.EnvelopedAspectMap; -import com.linkedin.metadata.Constants; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.query.filter.Filter; -import com.linkedin.metadata.search.ScrollResult; -import com.linkedin.metadata.search.SearchEntity; -import com.linkedin.metadata.search.SearchEntityArray; -import com.linkedin.metadata.search.SearchService; -import com.linkedin.mxe.MetadataChangeProposal; -import io.datahubproject.metadata.context.OperationContext; -import io.datahubproject.test.metadata.context.TestOperationContexts; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.mockito.Mockito; -import org.testng.annotations.Test; - -public class BackfillBrowsePathsV2StepTest { - private static final String VERSION = "2"; - private static final String UPGRADE_URN = - String.format( - "urn:li:%s:%s", - Constants.DATA_HUB_UPGRADE_ENTITY_NAME, "backfill-default-browse-paths-v2-step"); - - private static final String DATASET_URN = - "urn:li:dataset:(urn:li:dataPlatform:platform,name,PROD)"; - private static final String DASHBOARD_URN = "urn:li:dashboard:(airflow,id)"; - private static final String CHART_URN = "urn:li:chart:(looker,baz)"; - private static final String DATA_JOB_URN = - "urn:li:dataJob:(urn:li:dataFlow:(airflow,test,prod),test1)"; - private static final String DATA_FLOW_URN = "urn:li:dataFlow:(orchestrator,flowId,cluster)"; - private static final String ML_MODEL_URN = - "urn:li:mlModel:(urn:li:dataPlatform:sagemaker,trustmodel,PROD)"; - private static final String ML_MODEL_GROUP_URN = - "urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,a-model-package-group,PROD)"; - private static final String ML_FEATURE_TABLE_URN = - "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,user_features)"; - private static final String ML_FEATURE_URN = "urn:li:mlFeature:(test,feature_1)"; - private static final List ENTITY_TYPES = - ImmutableList.of( - Constants.DATASET_ENTITY_NAME, - Constants.DASHBOARD_ENTITY_NAME, - Constants.CHART_ENTITY_NAME, - Constants.DATA_JOB_ENTITY_NAME, - Constants.DATA_FLOW_ENTITY_NAME, - Constants.ML_MODEL_ENTITY_NAME, - Constants.ML_MODEL_GROUP_ENTITY_NAME, - Constants.ML_FEATURE_TABLE_ENTITY_NAME, - Constants.ML_FEATURE_ENTITY_NAME); - private static final List ENTITY_URNS = - ImmutableList.of( - UrnUtils.getUrn(DATASET_URN), - UrnUtils.getUrn(DASHBOARD_URN), - UrnUtils.getUrn(CHART_URN), - UrnUtils.getUrn(DATA_JOB_URN), - UrnUtils.getUrn(DATA_FLOW_URN), - UrnUtils.getUrn(ML_MODEL_URN), - UrnUtils.getUrn(ML_MODEL_GROUP_URN), - UrnUtils.getUrn(ML_FEATURE_TABLE_URN), - UrnUtils.getUrn(ML_FEATURE_URN)); - - @Test - public void testExecuteNoExistingBrowsePaths() throws Exception { - final EntityService mockService = initMockService(); - final SearchService mockSearchService = initMockSearchService(); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(null); - - BackfillBrowsePathsV2Step backfillBrowsePathsV2Step = - new BackfillBrowsePathsV2Step(mockService, mockSearchService); - backfillBrowsePathsV2Step.execute(TestOperationContexts.systemContextNoSearchAuthorization()); - - Mockito.verify(mockSearchService, Mockito.times(9)) - .scrollAcrossEntities( - any(OperationContext.class), - any(), - Mockito.eq("*"), - any(Filter.class), - Mockito.eq(null), - Mockito.eq(null), - Mockito.eq("5m"), - Mockito.eq(5000)); - // Verify that 11 aspects are ingested, 2 for the upgrade request / result, 9 for ingesting 1 of - // each entity type - Mockito.verify(mockService, Mockito.times(11)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(), - Mockito.eq(false)); - } - - @Test - public void testDoesNotRunWhenAlreadyExecuted() throws Exception { - final EntityService mockService = mock(EntityService.class); - final SearchService mockSearchService = initMockSearchService(); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = - new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(VERSION); - Map upgradeRequestAspects = new HashMap<>(); - upgradeRequestAspects.put( - Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, - new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); - EntityResponse response = - new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(response); - - BackfillBrowsePathsV2Step backfillBrowsePathsV2Step = - new BackfillBrowsePathsV2Step(mockService, mockSearchService); - backfillBrowsePathsV2Step.execute(mock(OperationContext.class)); - - Mockito.verify(mockService, Mockito.times(0)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(AuditStamp.class), - Mockito.anyBoolean()); - } - - private EntityService initMockService() throws URISyntaxException { - final EntityService mockService = mock(EntityService.class); - - for (int i = 0; i < ENTITY_TYPES.size(); i++) { - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - any(), - Mockito.eq(ENTITY_URNS.get(i)), - Mockito.eq(Collections.singleton(CONTAINER_ASPECT_NAME)))) - .thenReturn(null); - } - - return mockService; - } - - private SearchService initMockSearchService() { - final SearchService mockSearchService = mock(SearchService.class); - - for (int i = 0; i < ENTITY_TYPES.size(); i++) { - Mockito.when( - mockSearchService.scrollAcrossEntities( - Mockito.any(OperationContext.class), - Mockito.eq(ImmutableList.of(ENTITY_TYPES.get(i))), - Mockito.eq("*"), - any(Filter.class), - Mockito.eq(null), - Mockito.eq(null), - Mockito.eq("5m"), - Mockito.eq(5000))) - .thenReturn( - new ScrollResult() - .setNumEntities(1) - .setEntities( - new SearchEntityArray(new SearchEntity().setEntity(ENTITY_URNS.get(i))))); - } - - return mockSearchService; - } -} diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestEntityTypesStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestEntityTypesStepTest.java index a93bd8073a5530..ab76f171aa7b11 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestEntityTypesStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestEntityTypesStepTest.java @@ -17,10 +17,18 @@ import io.datahubproject.test.metadata.context.TestOperationContexts; import org.jetbrains.annotations.NotNull; import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; public class IngestEntityTypesStepTest { + @BeforeTest + public static void beforeTest() { + PathSpecBasedSchemaAnnotationVisitor.class + .getClassLoader() + .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); + } + @Test public void testExecuteTestEntityRegistry() throws Exception { EntityRegistry testEntityRegistry = getTestEntityRegistry(); diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java deleted file mode 100644 index 08f3dba12bd2a6..00000000000000 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java +++ /dev/null @@ -1,406 +0,0 @@ -package com.linkedin.metadata.boot.steps; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import com.linkedin.common.AuditStamp; -import com.linkedin.common.BrowsePaths; -import com.linkedin.common.urn.Urn; -import com.linkedin.common.urn.UrnUtils; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.data.template.StringArray; -import com.linkedin.entity.Aspect; -import com.linkedin.entity.EntityResponse; -import com.linkedin.entity.EnvelopedAspect; -import com.linkedin.entity.EnvelopedAspectMap; -import com.linkedin.metadata.Constants; -import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine; -import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ListResult; -import com.linkedin.metadata.models.AspectSpec; -import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.models.EntitySpecBuilder; -import com.linkedin.metadata.models.EventSpec; -import com.linkedin.metadata.models.registry.EntityRegistry; -import com.linkedin.metadata.query.ExtraInfo; -import com.linkedin.metadata.query.ExtraInfoArray; -import com.linkedin.metadata.query.ListResultMetadata; -import com.linkedin.metadata.search.utils.BrowsePathUtils; -import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.mxe.MetadataChangeProposal; -import io.datahubproject.metadata.context.OperationContext; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.jetbrains.annotations.NotNull; -import org.mockito.Mockito; -import org.testng.annotations.Test; - -public class UpgradeDefaultBrowsePathsStepTest { - - private static final String VERSION_1 = "1"; - private static final String UPGRADE_URN = - String.format( - "urn:li:%s:%s", - Constants.DATA_HUB_UPGRADE_ENTITY_NAME, "upgrade-default-browse-paths-step"); - - @Test - public void testExecuteNoExistingBrowsePaths() throws Exception { - - final EntityService mockService = Mockito.mock(EntityService.class); - final EntityRegistry registry = new TestEntityRegistry(); - final OperationContext mockContext = mock(OperationContext.class); - when(mockContext.getEntityRegistry()).thenReturn(registry); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(null); - - final List browsePaths1 = Collections.emptyList(); - - Mockito.when( - mockService.listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000))) - .thenReturn( - new ListResult<>( - browsePaths1, - new ListResultMetadata().setExtraInfos(new ExtraInfoArray(Collections.emptyList())), - 0, - false, - 0, - 0, - 2)); - initMockServiceOtherEntities(mockService); - - UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = - new UpgradeDefaultBrowsePathsStep(mockService); - upgradeDefaultBrowsePathsStep.execute(mockContext); - - Mockito.verify(mockService, Mockito.times(1)) - .listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000)); - // Verify that 4 aspects are ingested, 2 for the upgrade request / result, but none for - // ingesting - Mockito.verify(mockService, Mockito.times(2)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(), - Mockito.eq(false)); - } - - @Test - public void testExecuteFirstTime() throws Exception { - - Urn testUrn1 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset1,PROD)"); - Urn testUrn2 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset2,PROD)"); - - final EntityService mockService = Mockito.mock(EntityService.class); - final EntityRegistry registry = new TestEntityRegistry(); - final OperationContext mockContext = mock(OperationContext.class); - when(mockContext.getEntityRegistry()).thenReturn(registry); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(null); - final List browsePaths1 = - ImmutableList.of( - new BrowsePaths() - .setPaths( - new StringArray( - ImmutableList.of( - BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn1, registry)))), - new BrowsePaths() - .setPaths( - new StringArray( - ImmutableList.of( - BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn2, registry))))); - - final List extraInfos1 = - ImmutableList.of( - new ExtraInfo() - .setUrn(testUrn1) - .setVersion(0L) - .setAudit( - new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)), - new ExtraInfo() - .setUrn(testUrn2) - .setVersion(0L) - .setAudit( - new AuditStamp() - .setActor(UrnUtils.getUrn("urn:li:corpuser:test")) - .setTime(0L))); - - Mockito.when( - mockService.listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000))) - .thenReturn( - new ListResult<>( - browsePaths1, - new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos1)), - 2, - false, - 2, - 2, - 2)); - initMockServiceOtherEntities(mockService); - - UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = - new UpgradeDefaultBrowsePathsStep(mockService); - upgradeDefaultBrowsePathsStep.execute(mockContext); - - Mockito.verify(mockService, Mockito.times(1)) - .listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000)); - // Verify that 4 aspects are ingested, 2 for the upgrade request / result and 2 for the browse - // pahts - Mockito.verify(mockService, Mockito.times(4)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(), - Mockito.eq(false)); - } - - @Test - public void testDoesNotRunWhenBrowsePathIsNotQualified() throws Exception { - // Test for browse paths that are not ingested - Urn testUrn3 = - UrnUtils.getUrn( - "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset3,PROD)"); // Do not - // migrate - Urn testUrn4 = - UrnUtils.getUrn( - "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset4,PROD)"); // Do not - // migrate - - final EntityService mockService = Mockito.mock(EntityService.class); - final EntityRegistry registry = new TestEntityRegistry(); - final OperationContext mockContext = mock(OperationContext.class); - when(mockContext.getEntityRegistry()).thenReturn(registry); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(null); - - final List browsePaths2 = - ImmutableList.of( - new BrowsePaths() - .setPaths( - new StringArray( - ImmutableList.of( - BrowsePathUtils.getDefaultBrowsePath(testUrn3, registry, '.')))), - new BrowsePaths() - .setPaths( - new StringArray( - ImmutableList.of( - BrowsePathUtils.getLegacyDefaultBrowsePath(testUrn4, registry), - BrowsePathUtils.getDefaultBrowsePath(testUrn4, registry, '.'))))); - - final List extraInfos2 = - ImmutableList.of( - new ExtraInfo() - .setUrn(testUrn3) - .setVersion(0L) - .setAudit( - new AuditStamp().setActor(UrnUtils.getUrn("urn:li:corpuser:test")).setTime(0L)), - new ExtraInfo() - .setUrn(testUrn4) - .setVersion(0L) - .setAudit( - new AuditStamp() - .setActor(UrnUtils.getUrn("urn:li:corpuser:test")) - .setTime(0L))); - - Mockito.when( - mockService.listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000))) - .thenReturn( - new ListResult<>( - browsePaths2, - new ListResultMetadata().setExtraInfos(new ExtraInfoArray(extraInfos2)), - 2, - false, - 2, - 2, - 2)); - initMockServiceOtherEntities(mockService); - - UpgradeDefaultBrowsePathsStep upgradeDefaultBrowsePathsStep = - new UpgradeDefaultBrowsePathsStep(mockService); - upgradeDefaultBrowsePathsStep.execute(mockContext); - - Mockito.verify(mockService, Mockito.times(1)) - .listLatestAspects( - any(OperationContext.class), - Mockito.eq(Constants.DATASET_ENTITY_NAME), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000)); - // Verify that 2 aspects are ingested, only those for the upgrade step - Mockito.verify(mockService, Mockito.times(2)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(), - Mockito.eq(false)); - } - - @Test - public void testDoesNotRunWhenAlreadyExecuted() throws Exception { - final EntityService mockService = Mockito.mock(EntityService.class); - final OperationContext mockContext = mock(OperationContext.class); - when(mockContext.getEntityRegistry()).thenReturn(mock(EntityRegistry.class)); - - final Urn upgradeEntityUrn = Urn.createFromString(UPGRADE_URN); - com.linkedin.upgrade.DataHubUpgradeRequest upgradeRequest = - new com.linkedin.upgrade.DataHubUpgradeRequest().setVersion(VERSION_1); - Map upgradeRequestAspects = new HashMap<>(); - upgradeRequestAspects.put( - Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, - new EnvelopedAspect().setValue(new Aspect(upgradeRequest.data()))); - EntityResponse response = - new EntityResponse().setAspects(new EnvelopedAspectMap(upgradeRequestAspects)); - Mockito.when( - mockService.getEntityV2( - any(OperationContext.class), - Mockito.eq(Constants.DATA_HUB_UPGRADE_ENTITY_NAME), - Mockito.eq(upgradeEntityUrn), - Mockito.eq(Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)))) - .thenReturn(response); - - UpgradeDefaultBrowsePathsStep step = new UpgradeDefaultBrowsePathsStep(mockService); - step.execute(mockContext); - - Mockito.verify(mockService, Mockito.times(0)) - .ingestProposal( - any(OperationContext.class), - any(MetadataChangeProposal.class), - any(AuditStamp.class), - Mockito.anyBoolean()); - } - - private void initMockServiceOtherEntities(EntityService mockService) { - List skippedEntityTypes = - ImmutableList.of( - Constants.DASHBOARD_ENTITY_NAME, - Constants.CHART_ENTITY_NAME, - Constants.DATA_FLOW_ENTITY_NAME, - Constants.DATA_JOB_ENTITY_NAME); - for (String entityType : skippedEntityTypes) { - Mockito.when( - mockService.listLatestAspects( - any(OperationContext.class), - Mockito.eq(entityType), - Mockito.eq(Constants.BROWSE_PATHS_ASPECT_NAME), - Mockito.eq(0), - Mockito.eq(5000))) - .thenReturn( - new ListResult<>( - Collections.emptyList(), - new ListResultMetadata() - .setExtraInfos(new ExtraInfoArray(Collections.emptyList())), - 0, - false, - 0, - 0, - 0)); - } - } - - public static class TestEntityRegistry implements EntityRegistry { - - private final Map entityNameToSpec; - - public TestEntityRegistry() { - entityNameToSpec = - new EntitySpecBuilder(EntitySpecBuilder.AnnotationExtractionMode.IGNORE_ASPECT_FIELDS) - .buildEntitySpecs(new Snapshot().schema()).stream() - .collect(Collectors.toMap(spec -> spec.getName().toLowerCase(), spec -> spec)); - } - - @Nonnull - @Override - public EntitySpec getEntitySpec(@Nonnull final String entityName) { - String lowercaseEntityName = entityName.toLowerCase(); - if (!entityNameToSpec.containsKey(lowercaseEntityName)) { - throw new IllegalArgumentException( - String.format("Failed to find entity with name %s in EntityRegistry", entityName)); - } - return entityNameToSpec.get(lowercaseEntityName); - } - - @Nullable - @Override - public EventSpec getEventSpec(@Nonnull String eventName) { - return null; - } - - @Nonnull - @Override - public Map getEntitySpecs() { - return entityNameToSpec; - } - - @NotNull - @Override - public Map getAspectSpecs() { - return new HashMap<>(); - } - - @Nonnull - @Override - public Map getEventSpecs() { - return Collections.emptyMap(); - } - - @NotNull - @Override - public AspectTemplateEngine getAspectTemplateEngine() { - return new AspectTemplateEngine(); - } - } -}