diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 4cc3ec50bacd46..c30dade921d257 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -188,6 +188,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): self.sql_parser_schema_resolver, self.profiler, self.identifiers, + self.ctx.graph, ) self.add_config_to_report() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py new file mode 100644 index 00000000000000..d2da895be985dc --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_platform_resource_helper.py @@ -0,0 +1,144 @@ +import logging +from dataclasses import dataclass +from typing import Optional + +import cachetools +from pydantic import BaseModel, ValidationError + +from datahub.api.entities.platformresource.platform_resource import ( + PlatformResource, + PlatformResourceKey, +) +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.urns import TagUrn + +logger: logging.Logger = logging.getLogger(__name__) + + +@dataclass +class BigQueryLabel: + key: str + value: Optional[str] + + def primary_key(self) -> str: + return f"{self.key}/{self.value}" if self.value else f"{self.key}" + + +class BigQueryLabelInfo(BaseModel): + datahub_urn: str + managed_by_datahub: bool + key: str + value: str + + +@dataclass() +class BigQueryLabelPlatformResource: + datahub_urn: str + project: Optional[str] + managed_by_datahub: bool + label: BigQueryLabel + + def platform_resource_key(self) -> PlatformResourceKey: + return PlatformResourceKey( + platform="bigquery", + resource_type="BigQueryLabelInfo", + platform_instance=self.project, + primary_key=self.label.primary_key(), + ) + + def platform_resource_info(self) -> BigQueryLabelInfo: + bq_label_info = BigQueryLabelInfo( + datahub_urn=self.datahub_urn, + managed_by_datahub=self.managed_by_datahub, + key=self.label.key, + value=self.label.value, + ) + return bq_label_info + + def platform_resource(self) -> PlatformResource: + return PlatformResource.create( + key=self.platform_resource_key(), + secondary_keys=[self.datahub_urn], + value=self.platform_resource_info(), + ) + + +class BigQueryPlatformResourceHelper: + def __init__( + self, + bq_project: Optional[str], + graph: Optional[DataHubGraph], + ): + self.bq_project = bq_project + self.graph = graph + + platform_resource_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=500) + + def get_platform_resource( + self, platform_resource_key: PlatformResourceKey + ) -> Optional[PlatformResource]: + # if graph is not available we always create a new PlatformResource + if not self.graph: + return None + if self.platform_resource_cache.get(platform_resource_key.primary_key): + return self.platform_resource_cache.get(platform_resource_key.primary_key) + + platform_resource = PlatformResource.from_datahub( + key=platform_resource_key, graph_client=self.graph + ) + if platform_resource: + self.platform_resource_cache[ + platform_resource_key.primary_key + ] = platform_resource + return platform_resource + return None + + def generate_label_platform_resource( + self, + bigquery_label: BigQueryLabel, + tag_urn: TagUrn, + managed_by_datahub: bool = True, + ) -> PlatformResource: + new_platform_resource = BigQueryLabelPlatformResource( + datahub_urn=tag_urn.urn(), + project=self.bq_project, + managed_by_datahub=managed_by_datahub, + label=bigquery_label, + ) + + platform_resource = self.get_platform_resource( + new_platform_resource.platform_resource_key() + ) + if platform_resource: + if ( + platform_resource.resource_info + and platform_resource.resource_info.value + ): + try: + existing_info: Optional[BigQueryLabelInfo] = platform_resource.resource_info.value.as_pydantic_object(BigQueryLabelInfo) # type: ignore + except ValidationError as e: + logger.error( + f"Error converting existing value to BigQueryLabelInfo: {e}. Creating new one. Maybe this is because of a non backward compatible schema change." + ) + existing_info = None + + if existing_info: + if ( + new_platform_resource.platform_resource_info() == existing_info + or existing_info.managed_by_datahub + ): + return platform_resource + else: + raise ValueError( + f"Datahub URN mismatch for platform resources. Old (existing) platform resource: {platform_resource} and new platform resource: {new_platform_resource}" + ) + + logger.info(f"Created platform resource {new_platform_resource}") + + self.platform_resource_cache.update( + { + new_platform_resource.platform_resource_key().primary_key: new_platform_resource.platform_resource() + } + ) + + return new_platform_resource.platform_resource() diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 11d06771d4e4f4..1235f638f68ff7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -6,6 +6,7 @@ from google.cloud.bigquery.table import TableListItem +from datahub.api.entities.platformresource.platform_resource import PlatformResource from datahub.configuration.pattern_utils import is_schema_allowed, is_tag_allowed from datahub.emitter.mce_builder import make_tag_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -16,6 +17,7 @@ ClassificationHandler, classification_workunit_processor, ) +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigqueryTableIdentifier, BigQueryTableRef, @@ -25,6 +27,11 @@ from datahub.ingestion.source.bigquery_v2.bigquery_helper import ( unquote_and_decode_unicode_escape_seq, ) +from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import ( + BigQueryLabel, + BigQueryLabelInfo, + BigQueryPlatformResourceHelper, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, @@ -84,6 +91,7 @@ GlobalTagsClass, TagAssociationClass, ) +from datahub.metadata.urns import TagUrn from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.hive_schema_to_avro import ( @@ -160,6 +168,7 @@ def __init__( sql_parser_schema_resolver: SchemaResolver, profiler: BigqueryProfiler, identifiers: BigQueryIdentifierBuilder, + graph: Optional[DataHubGraph] = None, ): self.config = config self.report = report @@ -168,6 +177,7 @@ def __init__( self.sql_parser_schema_resolver = sql_parser_schema_resolver self.profiler = profiler self.identifiers = identifiers + self.graph = graph self.classification_handler = ClassificationHandler(self.config, self.report) self.data_reader: Optional[BigQueryDataReader] = None @@ -188,6 +198,21 @@ def __init__( # Maps snapshot ref -> Snapshot self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict() + bq_project = ( + self.config.project_on_behalf + if self.config.project_on_behalf + else self.config.credential.project_id + if self.config.credential + else None + ) + + self.platform_resource_helper: BigQueryPlatformResourceHelper = ( + BigQueryPlatformResourceHelper( + bq_project, + self.graph, + ) + ) + @property def store_table_refs(self): return ( @@ -264,13 +289,28 @@ def gen_dataset_containers( ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_dataset_key(project_id, dataset) - tags_joined: Optional[List[str]] = None + tags_joined: List[str] = [] if tags and self.config.capture_dataset_label_as_tag: - tags_joined = [ - self.make_tag_from_label(k, v) - for k, v in tags.items() - if is_tag_allowed(self.config.capture_dataset_label_as_tag, k) - ] + for k, v in tags.items(): + if is_tag_allowed(self.config.capture_dataset_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + label = BigQueryLabel(key=k, value=v) + try: + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + tags_joined.append(tag_urn.urn()) database_container_key = self.gen_project_id_key(database=project_id) @@ -676,10 +716,11 @@ def _process_snapshot( dataset_name=dataset_name, ) - def make_tag_from_label(self, key: str, value: str) -> str: - if not value.startswith(ENCODED_TAG_PREFIX): + def make_tag_urn_from_label(self, key: str, value: str) -> str: + if value: return make_tag_urn(f"""{key}:{value}""") - return self.modified_base32decode(value) + else: + return make_tag_urn(key) def gen_table_dataset_workunits( self, @@ -724,13 +765,26 @@ def gen_table_dataset_workunits( tags_to_add = None if table.labels and self.config.capture_table_label_as_tag: tags_to_add = [] - tags_to_add.extend( - [ - self.make_tag_from_label(k, v) - for k, v in table.labels.items() - if is_tag_allowed(self.config.capture_table_label_as_tag, k) - ] - ) + for k, v in table.labels.items(): + if is_tag_allowed(self.config.capture_table_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + try: + label = BigQueryLabel(key=k, value=v) + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + tags_to_add.append(tag_urn.urn()) yield from self.gen_dataset_workunits( table=table, @@ -749,13 +803,29 @@ def gen_view_dataset_workunits( project_id: str, dataset_name: str, ) -> Iterable[MetadataWorkUnit]: - tags_to_add = None + tags_to_add = [] if table.labels and self.config.capture_view_label_as_tag: - tags_to_add = [ - self.make_tag_from_label(k, v) - for k, v in table.labels.items() - if is_tag_allowed(self.config.capture_view_label_as_tag, k) - ] + for k, v in table.labels.items(): + if is_tag_allowed(self.config.capture_view_label_as_tag, k): + tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v)) + try: + label = BigQueryLabel(key=k, value=v) + platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource( + label, tag_urn, managed_by_datahub=False + ) + label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore + BigQueryLabelInfo + ) + tag_urn = TagUrn.from_string(label_info.datahub_urn) + + for mcpw in platform_resource.to_mcps(): + yield mcpw.as_workunit() + except ValueError as e: + logger.warning( + f"Failed to generate platform resource for label {k}:{v}: {e}" + ) + + tags_to_add.append(tag_urn.urn()) yield from self.gen_dataset_workunits( table=table, columns=columns, diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json index 02660f0fae08ed..b268926f155b74 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_mcp_golden.json @@ -199,6 +199,49 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "priority/high", + "secondaryKeys": [ + "urn:li:tag:priority:high" + ], + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:priority:high\", \"managed_by_datahub\": false, \"key\": \"priority\", \"value\": \"high\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -215,6 +258,49 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "purchase", + "secondaryKeys": [ + "urn:li:tag:purchase" + ], + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:purchase\", \"managed_by_datahub\": false, \"key\": \"purchase\", \"value\": \"\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,project-id-1)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -309,6 +395,38 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:79d443a7956814fdab2168e11392bbf2", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:0a8c87e84bd90486c4fd57bbae6557e3", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -330,6 +448,45 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "BigQueryLabelInfo", + "primaryKey": "mixedcasetag", + "value": { + "blob": "{\"datahub_urn\": \"urn:li:tag:MixedCaseTag\", \"managed_by_datahub\": true, \"key\": \"mixedcasetag\", \"value\": \"\"}", + "contentType": "JSON", + "schemaType": "JSON", + "schemaRef": "BigQueryLabelInfo" + } + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -343,6 +500,9 @@ }, { "tag": "urn:li:tag:purchase" + }, + { + "tag": "urn:li:tag:MixedCaseTag" } ] } @@ -353,6 +513,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:7da6409504c5c6444b4ce60b0239b759", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)", @@ -1082,5 +1258,21 @@ "runId": "bigquery-2022_02_03-07_00_00", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:MixedCaseTag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "MixedCaseTag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 0ac4e94a5a24f3..39cefcb42f360b 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -1,12 +1,16 @@ import random import string from datetime import datetime, timezone -from typing import Any, Dict +from typing import Any, Dict, Optional from unittest.mock import MagicMock, patch from freezegun import freeze_time from google.cloud.bigquery.table import TableListItem +from datahub.api.entities.platformresource.platform_resource import ( + PlatformResource, + PlatformResourceKey, +) from datahub.ingestion.glossary.classifier import ( ClassificationConfig, DynamicTypedClassifierConfig, @@ -14,6 +18,10 @@ from datahub.ingestion.glossary.datahub_classifier import DataHubClassifierConfig from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_data_reader import BigQueryDataReader +from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import ( + BigQueryLabelInfo, + BigQueryPlatformResourceHelper, +) from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, @@ -51,6 +59,13 @@ def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict: "type": "bigquery", "config": { "project_ids": ["project-id-1"], + "credential": { + "project_id": "project-id-1", + "private_key_id": "private_key_id", + "private_key": "private_key", + "client_email": "client_email", + "client_id": "client_id", + }, "include_usage_statistics": False, "include_table_lineage": True, "include_data_platform_instance": True, @@ -82,6 +97,7 @@ def recipe(mcp_output_path: str, source_config_override: dict = {}) -> dict: @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") @patch.object(BigQuerySchemaApi, "get_columns_for_dataset") @patch.object(BigQueryDataReader, "get_sample_data_for_table") +@patch.object(BigQueryPlatformResourceHelper, "get_platform_resource") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") @patch("google.cloud.resourcemanager_v3.ProjectsClient") @@ -89,6 +105,7 @@ def test_bigquery_v2_ingest( client, policy_tag_manager_client, projects_client, + get_platform_resource, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -104,6 +121,25 @@ def test_bigquery_v2_ingest( mcp_output_path = "{}/{}".format(tmp_path, "bigquery_mcp_output.json") dataset_name = "bigquery-dataset-1" + + def side_effect(*args: Any) -> Optional[PlatformResource]: + if args[0].primary_key == "mixedcasetag": + return PlatformResource.create( + key=PlatformResourceKey( + primary_key="mixedcasetag", + resource_type="BigQueryLabelInfo", + platform="bigquery", + ), + value=BigQueryLabelInfo( + datahub_urn="urn:li:tag:MixedCaseTag", + managed_by_datahub=True, + key="mixedcasetag", + value="", + ), + ) + return None + + get_platform_resource.side_effect = side_effect get_datasets_for_project_id.return_value = [ BigqueryDataset(name=dataset_name, location="US") ] @@ -158,7 +194,8 @@ def test_bigquery_v2_ingest( rows_count=None, labels={ "priority": "high", - "purchase": "urn_li_encoded_tag_ovzg4otmne5hiylhhjyhk4tdnbqxgzi_", + "purchase": "", + "mixedcasetag": "", }, ) get_tables_for_dataset.return_value = iter([bigquery_table]) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 38239d150dd6b5..b605e9b3f8a3e6 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -489,30 +489,45 @@ def test_gen_table_dataset_workunits( gen = schema_gen.gen_table_dataset_workunits( bigquery_table, [], project_id, dataset_name ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert mcp.aspect == StatusClass(removed=False) + mcps = list(gen) + + # Helper function to find MCP by aspect type + def find_mcp_by_aspect(aspect_type): + return next( + mcp # type: ignore + for mcp in mcps + if isinstance(mcp.metadata.aspect, aspect_type) # type: ignore + ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, SchemaMetadataClass) - assert mcp.aspect.schemaName == f"{project_id}.{dataset_name}.{bigquery_table.name}" - assert mcp.aspect.fields == [] + # Assert StatusClass + status_mcp = find_mcp_by_aspect(StatusClass) + assert status_mcp.metadata.aspect.removed is False - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, DatasetPropertiesClass) - assert mcp.aspect.name == bigquery_table.name + # Assert SchemaMetadataClass + schema_mcp = find_mcp_by_aspect(SchemaMetadataClass) + assert ( + schema_mcp.metadata.aspect.schemaName + == f"{project_id}.{dataset_name}.{bigquery_table.name}" + ) + assert schema_mcp.metadata.aspect.fields == [] + + # Assert DatasetPropertiesClass + dataset_props_mcp = find_mcp_by_aspect(DatasetPropertiesClass) + assert dataset_props_mcp.metadata.aspect.name == bigquery_table.name assert ( - mcp.aspect.qualifiedName == f"{project_id}.{dataset_name}.{bigquery_table.name}" + dataset_props_mcp.metadata.aspect.qualifiedName + == f"{project_id}.{dataset_name}.{bigquery_table.name}" ) - assert mcp.aspect.description == bigquery_table.comment - assert mcp.aspect.created == TimeStampClass( + assert dataset_props_mcp.metadata.aspect.description == bigquery_table.comment + assert dataset_props_mcp.metadata.aspect.created == TimeStampClass( time=int(bigquery_table.created.timestamp() * 1000) ) - assert mcp.aspect.lastModified == TimeStampClass( + assert dataset_props_mcp.metadata.aspect.lastModified == TimeStampClass( time=int(bigquery_table.last_altered.timestamp() * 1000) ) - assert mcp.aspect.tags == [] + assert dataset_props_mcp.metadata.aspect.tags == [] - assert mcp.aspect.customProperties == { + expected_custom_properties = { "expiration_date": str(bigquery_table.expires), "size_in_bytes": str(bigquery_table.size_in_bytes), "billable_bytes_active": str(bigquery_table.active_billable_bytes), @@ -523,24 +538,33 @@ def test_gen_table_dataset_workunits( "max_shard_id": str(bigquery_table.max_shard_id), "is_sharded": "True", } + assert ( + dataset_props_mcp.metadata.aspect.customProperties == expected_custom_properties + ) - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, GlobalTagsClass) - assert mcp.aspect.tags == [ + # Assert GlobalTagsClass + global_tags_mcp = find_mcp_by_aspect(GlobalTagsClass) + assert global_tags_mcp.metadata.aspect.tags == [ TagAssociationClass( "urn:li:tag:data_producer_owner_email:games_team-nytimes_com" ) ] - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, ContainerClass) + # Assert ContainerClass + container_mcp = find_mcp_by_aspect(ContainerClass) + assert container_mcp is not None - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, DataPlatformInstanceClass) + # Assert DataPlatformInstanceClass + data_platform_instance_mcp = find_mcp_by_aspect(DataPlatformInstanceClass) + assert data_platform_instance_mcp is not None - mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) - assert isinstance(mcp.aspect, SubTypesClass) - assert mcp.aspect.typeNames[1] == DatasetSubTypes.TABLE + # Assert SubTypesClass + sub_types_mcp = find_mcp_by_aspect(SubTypesClass) + assert sub_types_mcp.metadata.aspect.typeNames[1] == DatasetSubTypes.TABLE + + # Ensure all MCPs were checked + # TODO: Test for PlatformResource MCPs as well + assert len(mcps) >= 7 @patch.object(BigQueryV2Config, "get_bigquery_client")