From 7a519ac73c1abddf8695134f1cf6308c215a6c4e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 22 May 2024 12:44:54 -0700 Subject: [PATCH 1/6] fix(ingest/dbt): resolve more dbt ephemeral node lineage gaps (#10553) --- .../ingestion/source/dbt/dbt_common.py | 142 +++++++++++------- .../datahub/sql_parsing/sqlglot_lineage.py | 20 ++- .../src/datahub/sql_parsing/sqlglot_utils.py | 2 + .../integration/powerbi/golden_test_cll.json | 25 +++ 4 files changed, 127 insertions(+), 62 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 3b686ef60de290..0996f76fc2799b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -142,11 +142,17 @@ @dataclass class DBTSourceReport(StaleEntityRemovalSourceReport): - sql_statements_parsed: int = 0 - sql_statements_table_error: int = 0 - sql_statements_column_error: int = 0 - sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList) sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList) + sql_parser_parse_failures: int = 0 + sql_parser_detach_ctes_failures: int = 0 + sql_parser_table_errors: int = 0 + sql_parser_column_errors: int = 0 + sql_parser_successes: int = 0 + + sql_parser_parse_failures_list: LossyList[str] = field(default_factory=LossyList) + sql_parser_detach_ctes_failures_list: LossyList[str] = field( + default_factory=LossyList + ) in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList) @@ -558,10 +564,11 @@ def get_fake_ephemeral_table_name(self) -> str: assert self.is_ephemeral_model() # Similar to get_db_fqn. - fqn = self._join_parts( + db_fqn = self._join_parts( [self.database, self.schema, f"__datahub__dbt__ephemeral__{self.name}"] ) - return fqn.replace('"', "") + db_fqn = db_fqn.lower() + return db_fqn.replace('"', "") def get_urn_for_upstream_lineage( self, @@ -819,9 +826,10 @@ def get_column_type( # if still not found, report the warning if TypeClass is None: - report.report_warning( - dataset_name, f"unable to map type {column_type} to metadata schema" - ) + if column_type: + report.report_warning( + dataset_name, f"unable to map type {column_type} to metadata schema" + ) TypeClass = NullTypeClass return SchemaFieldDataType(type=TypeClass()) @@ -1041,7 +1049,7 @@ def _infer_schemas_and_update_cll( # noqa: C901 # Iterate over the dbt nodes in topological order. # This ensures that we process upstream nodes before downstream nodes. - for dbt_name in topological_sort( + node_order = topological_sort( list(all_nodes_map.keys()), edges=list( (upstream, node.dbt_name) @@ -1049,7 +1057,8 @@ def _infer_schemas_and_update_cll( # noqa: C901 for upstream in node.upstream_nodes if upstream in all_nodes_map ), - ): + ) + for dbt_name in node_order: node = all_nodes_map[dbt_name] logger.debug(f"Processing CLL/schemas for {node.dbt_name}") @@ -1119,55 +1128,26 @@ def _infer_schemas_and_update_cll( # noqa: C901 # Run sql parser to infer the schema + generate column lineage. sql_result = None - if node.node_type in {"source", "test"}: + if node.node_type in {"source", "test", "seed"}: # For sources, we generate CLL as a 1:1 mapping. - # We don't support CLL for tests (assertions). + # We don't support CLL for tests (assertions) or seeds. pass elif node.compiled_code: - try: - # Add CTE stops based on the upstreams list. - cte_mapping = { - cte_name: upstream_node.get_fake_ephemeral_table_name() - for upstream_node in [ - all_nodes_map[upstream_node_name] - for upstream_node_name in node.upstream_nodes - if upstream_node_name in all_nodes_map - ] - if upstream_node.is_ephemeral_model() - for cte_name in _get_dbt_cte_names( - upstream_node.name, schema_resolver.platform - ) - } - preprocessed_sql = detach_ctes( - parse_statements_and_pick( - node.compiled_code, - platform=schema_resolver.platform, - ), - platform=schema_resolver.platform, - cte_mapping=cte_mapping, - ) - except Exception as e: - self.report.sql_parser_detach_ctes_failures.append(node.dbt_name) - logger.debug( - f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage." - ) - sql_result = SqlParsingResult.make_from_error(e) - else: - sql_result = sqlglot_lineage( - preprocessed_sql, schema_resolver=schema_resolver + # Add CTE stops based on the upstreams list. + cte_mapping = { + cte_name: upstream_node.get_fake_ephemeral_table_name() + for upstream_node in [ + all_nodes_map[upstream_node_name] + for upstream_node_name in node.upstream_nodes + if upstream_node_name in all_nodes_map + ] + if upstream_node.is_ephemeral_model() + for cte_name in _get_dbt_cte_names( + upstream_node.name, schema_resolver.platform ) - if sql_result.debug_info.error: - self.report.sql_statements_table_error += 1 - logger.info( - f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}" - ) - elif sql_result.debug_info.column_error: - self.report.sql_statements_column_error += 1 - logger.info( - f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}" - ) - else: - self.report.sql_statements_parsed += 1 + } + + sql_result = self._parse_cll(node, cte_mapping, schema_resolver) else: self.report.sql_parser_skipped_missing_code.append(node.dbt_name) @@ -1212,6 +1192,56 @@ def _infer_schemas_and_update_cll( # noqa: C901 if inferred_schema_fields: node.columns_setdefault(inferred_schema_fields) + def _parse_cll( + self, + node: DBTNode, + cte_mapping: Dict[str, str], + schema_resolver: SchemaResolver, + ) -> SqlParsingResult: + assert node.compiled_code is not None + + try: + picked_statement = parse_statements_and_pick( + node.compiled_code, + platform=schema_resolver.platform, + ) + except Exception as e: + logger.debug( + f"Failed to parse compiled code. {node.dbt_name} will not have column lineage." + ) + self.report.sql_parser_parse_failures += 1 + self.report.sql_parser_parse_failures_list.append(node.dbt_name) + return SqlParsingResult.make_from_error(e) + + try: + preprocessed_sql = detach_ctes( + picked_statement, + platform=schema_resolver.platform, + cte_mapping=cte_mapping, + ) + except Exception as e: + self.report.sql_parser_detach_ctes_failures += 1 + self.report.sql_parser_detach_ctes_failures_list.append(node.dbt_name) + logger.debug( + f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage." + ) + return SqlParsingResult.make_from_error(e) + + sql_result = sqlglot_lineage(preprocessed_sql, schema_resolver=schema_resolver) + if sql_result.debug_info.table_error: + self.report.sql_parser_table_errors += 1 + logger.info( + f"Failed to generate any CLL lineage for {node.dbt_name}: {sql_result.debug_info.error}" + ) + elif sql_result.debug_info.column_error: + self.report.sql_parser_column_errors += 1 + logger.info( + f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}" + ) + else: + self.report.sql_parser_successes += 1 + return sql_result + def create_dbt_platform_mces( self, dbt_nodes: List[DBTNode], diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index c112f5b74ac510..ae152a6c58b156 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -365,7 +365,7 @@ def _column_level_lineage( # noqa: C901 col_normalized = col table_schema_normalized_mapping[table][col_normalized] = col - normalized_table_schema[col_normalized] = col_type + normalized_table_schema[col_normalized] = col_type or "UNKNOWN" sqlglot_db_schema.add_table( table.as_sqlglot_table(), @@ -923,12 +923,20 @@ def _sqlglot_lineage_inner( out_urns = sorted({table_name_urn_mapping[table] for table in modified}) column_lineage_urns = None if column_lineage: - column_lineage_urns = [ - _translate_internal_column_lineage( - table_name_urn_mapping, internal_col_lineage, dialect=dialect + try: + column_lineage_urns = [ + _translate_internal_column_lineage( + table_name_urn_mapping, internal_col_lineage, dialect=dialect + ) + for internal_col_lineage in column_lineage + ] + except KeyError as e: + # When this happens, it's usually because of things like PIVOT where we can't + # really go up the scope chain. + logger.debug( + f"Failed to translate column lineage to urns: {e}", exc_info=True ) - for internal_col_lineage in column_lineage - ] + debug_info.column_error = e query_type, query_type_props = get_query_type_of_sql( original_statement, dialect=dialect diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index dfb3b8925dccaa..ddab26b28ec4f7 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -81,6 +81,8 @@ def parse_statement( def parse_statements_and_pick(sql: str, platform: DialectOrStr) -> sqlglot.Expression: + logger.debug("Parsing SQL query: %s", sql) + dialect = get_dialect(platform) statements = [ expression for expression in sqlglot.parse(sql, dialect=dialect) if expression diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_cll.json b/metadata-ingestion/tests/integration/powerbi/golden_test_cll.json index 15f526406c4382..4a08c6658c3950 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_cll.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_cll.json @@ -907,6 +907,31 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)", + "type": "TRANSFORMED" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "corpuser", "entityUrn": "urn:li:corpuser:users.User1@foo.com", From 4f33f3ac3df86df98881e4d54fef74a34713a4eb Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Wed, 22 May 2024 16:31:32 -0400 Subject: [PATCH 2/6] fix(ui) Fix preventing users from deleting personal views (#10510) --- datahub-web-react/src/app/entity/view/menu/ViewDropdownMenu.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-web-react/src/app/entity/view/menu/ViewDropdownMenu.tsx b/datahub-web-react/src/app/entity/view/menu/ViewDropdownMenu.tsx index 1c3183d87e7d98..38deb4052b7a93 100644 --- a/datahub-web-react/src/app/entity/view/menu/ViewDropdownMenu.tsx +++ b/datahub-web-react/src/app/entity/view/menu/ViewDropdownMenu.tsx @@ -63,7 +63,7 @@ type Props = { export const ViewDropdownMenu = ({ view, visible, - isOwnedByUser, + isOwnedByUser = view.viewType === DataHubViewType.Personal, trigger = 'hover', onClickEdit, onClickPreview, From dca296ab307d4e0aed0b7f0e3e4cc896282a8d4a Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 22 May 2024 16:02:17 -0500 Subject: [PATCH 3/6] fix(lint): fix linting (#10572) --- .../src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java index 3d97e0d5c99189..89f0cd8fbc9791 100644 --- a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java +++ b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java @@ -23,7 +23,8 @@ private UrnUtils() {} @Nonnull public static DatasetUrn toDatasetUrn( @Nonnull String platformName, @Nonnull String datasetName, @Nonnull String origin) { - return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); + return new DatasetUrn( + new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); } /** From 94af2492511b3d28852d68e4fc4f100d5ecbf445 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 23 May 2024 07:32:22 -0500 Subject: [PATCH 4/6] build(jar): enable custom plugin lib (#10552) --- metadata-integration/java/custom-plugin-lib/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-integration/java/custom-plugin-lib/build.gradle b/metadata-integration/java/custom-plugin-lib/build.gradle index 9fbe1066706be3..08bbe587b49c0c 100644 --- a/metadata-integration/java/custom-plugin-lib/build.gradle +++ b/metadata-integration/java/custom-plugin-lib/build.gradle @@ -86,7 +86,7 @@ publishing { } repositories { -/* maven { + maven { def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/" def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/" def ossrhUsername = System.getenv('RELEASE_USERNAME') @@ -96,7 +96,7 @@ publishing { password ossrhPassword } url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl - }*/ + } } } From 92780e607c337e86b479ca30b2af9a8b408dc5f8 Mon Sep 17 00:00:00 2001 From: dushayntAW <158567391+dushayntAW@users.noreply.github.com> Date: Thu, 23 May 2024 15:04:56 +0200 Subject: [PATCH 5/6] fix(ingest/unity-catalog) upstream lineage for hive_metastore external table with s3 location (#10546) --- .../datahub/ingestion/source/unity/source.py | 9 +- .../unity/test_unity_catalog_ingest.py | 30 + .../unity/unity_catalog_mces_golden.json | 709 +++++++++++++++++- 3 files changed, 743 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index f1f0b5ddb44755..42ca9af7e8459a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -45,6 +45,7 @@ create_dataset_props_patch_builder, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws import s3_util from datahub.ingestion.source.aws.s3_util import ( make_s3_urn_for_lineage, strip_s3_prefix, @@ -512,14 +513,16 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn if table.view_definition: self.view_definitions[dataset_urn] = (table.ref, table.view_definition) - # generate sibling and lineage aspects in case of EXTERNAL DELTA TABLE if ( - table_props.customProperties.get("table_type") == "EXTERNAL" + table_props.customProperties.get("table_type") + in {"EXTERNAL", "HIVE_EXTERNAL_TABLE"} and table_props.customProperties.get("data_source_format") == "DELTA" and self.config.emit_siblings ): storage_location = str(table_props.customProperties.get("storage_location")) - if storage_location.startswith("s3://"): + if any( + storage_location.startswith(prefix) for prefix in s3_util.S3_PREFIXES + ): browse_path = strip_s3_prefix(storage_location) source_dataset_urn = make_dataset_urn_with_platform_instance( "delta-lake", diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index 59e7f582da5e84..22a48efdec41d7 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -351,6 +351,35 @@ def mock_hive_sql(query): "", ), ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`external_metastore`": + return [ + ("betStatusId", "bigint", None), + ("channelId", "bigint", None), + ( + "combination", + "struct>,eventId:bigint,eventName:string,eventStartDate:string,live:boolean,odds:double,outcomeIds:array,outcomeLabel:string,sportId:string,status:string,voidReason:string>>,payout:double,rewardExtraPayout:double,stake:double>", + None, + ), + ("", "", ""), + ("# Detailed Table Information", "", ""), + ("Catalog", "hive_metastore", ""), + ("Database", "bronze_kambi", ""), + ("Table", "external_metastore", ""), + ("Created Time", "Wed Jun 22 05:14:56 UTC 2022", ""), + ("Last Access", "UNKNOWN", ""), + ("Created By", "Spark 3.2.1", ""), + ("Statistics", "1024 bytes, 3 rows", ""), + ("Type", "EXTERNAL", ""), + ("Location", "s3://external_metastore/", ""), + ("Provider", "delta", ""), + ("Owner", "root", ""), + ("Is_managed_location", "true", ""), + ( + "Table Properties", + "[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]", + "", + ), + ] elif query == "DESCRIBE EXTENDED `bronze_kambi`.`view1`": return [ ("betStatusId", "bigint", None), @@ -384,6 +413,7 @@ def mock_hive_sql(query): elif query == "SHOW TABLES FROM `bronze_kambi`": return [ TableEntry("bronze_kambi", "bet", False), + TableEntry("bronze_kambi", "external_metastore", False), TableEntry("bronze_kambi", "delta_error_table", False), TableEntry("bronze_kambi", "view1", False), ] diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index f01878fed13532..c6d24371bd6eaf 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -1394,6 +1394,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", @@ -1410,6 +1426,184 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586", + "urn": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586" + }, + { + "id": "urn:li:container:21058fb6993a790a4a43727021e52956", + "urn": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "siblings", + "aspect": { + "json": { + "siblings": [ + "urn:li:dataset:(urn:li:dataPlatform:delta-lake,external_metastore/,PROD)" + ], + "primary": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,external_metastore/,PROD)", + "type": "VIEW" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/name", + "value": "external_metastore" + }, + { + "op": "add", + "path": "/created", + "value": { + "time": 1655874896000 + } + }, + { + "op": "add", + "path": "/lastModified", + "value": { + "time": 1655874896000 + } + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "hive_metastore.bronze_kambi.external_metastore" + }, + { + "op": "add", + "path": "/customProperties/storage_location", + "value": "s3://external_metastore/" + }, + { + "op": "add", + "path": "/customProperties/data_source_format", + "value": "DELTA" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "HIVE_EXTERNAL_TABLE" + }, + { + "op": "add", + "path": "/customProperties/Catalog", + "value": "hive_metastore" + }, + { + "op": "add", + "path": "/customProperties/Database", + "value": "bronze_kambi" + }, + { + "op": "add", + "path": "/customProperties/Table", + "value": "external_metastore" + }, + { + "op": "add", + "path": "/customProperties/Last Access", + "value": "UNKNOWN" + }, + { + "op": "add", + "path": "/customProperties/Created By", + "value": "Spark 3.2.1" + }, + { + "op": "add", + "path": "/customProperties/Statistics", + "value": "1024 bytes, 3 rows" + }, + { + "op": "add", + "path": "/customProperties/Owner", + "value": "root" + }, + { + "op": "add", + "path": "/customProperties/Is_managed_location", + "value": "true" + }, + { + "op": "add", + "path": "/customProperties/Table Properties", + "value": "[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "hive_metastore.bronze_kambi.external_metastore" + }, + { + "op": "add", + "path": "/customProperties/created_at", + "value": "2022-06-22 05:14:56" + } + ] + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1433,6 +1627,24 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", @@ -1468,6 +1680,25 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,external_metastore/,PROD)", + "changeType": "UPSERT", + "aspectName": "siblings", + "aspect": { + "json": { + "siblings": [ + "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)" + ], + "primary": true + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", @@ -1491,8 +1722,442 @@ "com.linkedin.schema.MySqlDDL": { "tableSchema": "" } - }, - "fields": [] + }, + "fields": [] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "hive_metastore.bronze_kambi.external_metastore", + "platform": "urn:li:dataPlatform:databricks", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "betStatusId", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "channelId", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.RecordType": {} + } + }, + "nativeDataType": "struct>,eventid:bigint,eventname:string,eventstartdate:string,live:boolean,odds:double,outcomeids:array,outcomelabel:string,sportid:string,status:string,voidreason:string>>,payout:double,rewardextrapayout:double,stake:double>", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"struct>,eventid:bigint,eventname:string,eventstartdate:string,live:boolean,odds:double,outcomeids:array,outcomelabel:string,sportid:string,status:string,voidreason:string>>,payout:double,rewardextrapayout:double,stake:double>\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=long].combinationref", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=double].currentodds", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=boolean].eachway", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "boolean", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"boolean\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=boolean].livebetting", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "boolean", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"boolean\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=double].odds", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.ArrayType": { + "nestedType": [ + "record" + ] + } + } + }, + "nativeDataType": "array>,eventid:bigint,eventname:string,eventstartdate:string,live:boolean,odds:double,outcomeids:array,outcomelabel:string,sportid:string,status:string,voidreason:string>>", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array>,eventid:bigint,eventname:string,eventstartdate:string,live:boolean,odds:double,outcomeids:array,outcomelabel:string,sportid:string,status:string,voidreason:string>>\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=long].betoffertypeid", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=long].criterionid", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].criterionname", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=double].currentodds", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=long].eventgroupid", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=array].[type=struct].eventgrouppath", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.ArrayType": { + "nestedType": [ + "record" + ] + } + } + }, + "nativeDataType": "array>", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array>\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=array].[type=struct].eventgrouppath.[type=long].id", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=array].[type=struct].eventgrouppath.[type=string].name", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=long].eventid", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "bigint", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"bigint\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].eventname", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].eventstartdate", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=boolean].live", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "boolean", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"boolean\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=double].odds", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=array].[type=long].outcomeids", + "nullable": false, + "type": { + "type": { + "com.linkedin.schema.ArrayType": { + "nestedType": [ + "long" + ] + } + } + }, + "nativeDataType": "array", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].outcomelabel", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].sportid", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].status", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=array].[type=struct].outcomes.[type=string].voidreason", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=double].payout", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=double].rewardextrapayout", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=struct].combination.[type=double].stake", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "double", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}" + } + ] } }, "systemMetadata": { @@ -2513,6 +3178,30 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1716198037325, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 3, + "columnCount": 3, + "fieldProfiles": [], + "sizeInBytes": 1024 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table,PROD)", @@ -2584,5 +3273,21 @@ "runId": "unity-catalog-test", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.external_metastore,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file From e361d283095d5f85b4d3fe98a9c94a39a7352042 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Thu, 23 May 2024 18:36:43 +0530 Subject: [PATCH 6/6] feat(ingestion/looker): ingest explore tags into the DataHub (#10547) --- .../ingestion/source/looker/looker_common.py | 19 +++++- .../tests/integration/looker/test_looker.py | 66 ++++++++++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index b6cc97b2e5fda7..09105b2c6bfb0d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -92,6 +92,7 @@ TagPropertiesClass, TagSnapshotClass, ) +from datahub.metadata.urns import TagUrn from datahub.utilities.lossy_collections import LossyList, LossySet from datahub.utilities.url_util import remove_port_from_url @@ -669,6 +670,7 @@ class LookerExplore: joins: Optional[List[str]] = None fields: Optional[List[ViewField]] = None # the fields exposed in this explore source_file: Optional[str] = None + tags: List[str] = dataclasses_field(default_factory=list) @validator("name") def remove_quotes(cls, v): @@ -770,6 +772,7 @@ def from_dict( # This method is getting called from lookml_source's get_internal_workunits method # & upstream_views_file_path is not in use in that code flow upstream_views_file_path={}, + tags=cast(List, dict.get("tags")) if dict.get("tags") is not None else [], ) @classmethod # noqa: C901 @@ -786,7 +789,6 @@ def from_api( # noqa: C901 try: explore = client.lookml_model_explore(model, explore_name) views: Set[str] = set() - lkml_fields: List[ LookmlModelExploreField ] = explore_field_set_to_lkml_fields(explore) @@ -956,6 +958,7 @@ def from_api( # noqa: C901 ), upstream_views_file_path=upstream_views_file_path, source_file=explore.source_file, + tags=list(explore.tags) if explore.tags is not None else [], ) except SDKError as e: if "Looker Not Found (404)" in str(e): @@ -1133,6 +1136,20 @@ def _to_metadata_events( # noqa: C901 mcp, ] + # Add tags + explore_tag_urns: List[TagAssociationClass] = [] + for tag in self.tags: + tag_urn = TagUrn(tag) + explore_tag_urns.append(TagAssociationClass(tag_urn.urn())) + proposals.append( + MetadataChangeProposalWrapper( + entityUrn=tag_urn.urn(), + aspect=tag_urn.to_key_aspect(), + ) + ) + if explore_tag_urns: + dataset_snapshot.aspects.append(GlobalTagsClass(explore_tag_urns)) + # If extracting embeds is enabled, produce an MCP for embed URL. if extract_embed_urls: embed_mcp = create_embed_mcp( diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index c2314e65bd3676..3e049f8b2ef4e0 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -1,7 +1,7 @@ import json import time from datetime import datetime -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, List, Optional, Union, cast from unittest import mock import pytest @@ -24,9 +24,12 @@ WriteQuery, ) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source import SourceReport from datahub.ingestion.run.pipeline import Pipeline, PipelineInitError from datahub.ingestion.source.looker import looker_common, looker_usage from datahub.ingestion.source.looker.looker_common import LookerExplore +from datahub.ingestion.source.looker.looker_config import LookerCommonConfig from datahub.ingestion.source.looker.looker_lib_wrapper import ( LookerAPI, LookerAPIConfig, @@ -37,6 +40,8 @@ UserViewField, ) from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import GlobalTagsClass, MetadataChangeEventClass from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, @@ -481,7 +486,9 @@ def setup_mock_explore_unaliased_with_joins(mocked_client): def setup_mock_explore( - mocked_client: Any, additional_lkml_fields: List[LookmlModelExploreField] = [] + mocked_client: Any, + additional_lkml_fields: List[LookmlModelExploreField] = [], + **additional_explore_fields: Any, ) -> None: mock_model = mock.MagicMock(project_name="lkml_samples") mocked_client.lookml_model.return_value = mock_model @@ -508,6 +515,7 @@ def setup_mock_explore( dimensions=lkml_fields, ), source_file="test_source_file.lkml", + **additional_explore_fields, ) @@ -1058,3 +1066,57 @@ def test_upstream_cll(pytestconfig, tmp_path, mock_time, mock_datahub_graph): assert ( looker_explore.fields[2].upstream_fields[0] == "dataset_lineages.createdon" ) + + +@freeze_time(FROZEN_TIME) +def test_explore_tags(pytestconfig, tmp_path, mock_time, mock_datahub_graph): + mocked_client = mock.MagicMock() + + with mock.patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk: + mock_checkpoint.return_value = mock_datahub_graph + + tags: List[str] = ["metrics", "all"] + + mock_sdk.return_value = mocked_client + setup_mock_explore( + mocked_client, + tags=tags, + ) + + looker_explore: Optional[LookerExplore] = looker_common.LookerExplore.from_api( + model="fake", + explore_name="my_explore_name", + client=mocked_client, + reporter=mock.MagicMock(), + source_config=mock.MagicMock(), + ) + + assert looker_explore is not None + assert looker_explore.name == "my_explore_name" + assert looker_explore.tags == tags + + mcps: Optional[ + List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] + ] = looker_explore._to_metadata_events( + config=LookerCommonConfig(), + reporter=SourceReport(), + base_url="fake", + extract_embed_urls=False, + ) + + expected_tag_urns: List[str] = ["urn:li:tag:metrics", "urn:li:tag:all"] + + actual_tag_urns: List[str] = [] + if mcps: + for mcp in mcps: + if isinstance(mcp, MetadataChangeEventClass): + for aspect in mcp.proposedSnapshot.aspects: + if isinstance(aspect, GlobalTagsClass): + actual_tag_urns = [ + tag_association.tag for tag_association in aspect.tags + ] + + assert expected_tag_urns == actual_tag_urns