From bc79aece3c0d1b744dbcae711930ab8acb1980b0 Mon Sep 17 00:00:00 2001 From: Pinaki Bhattacharjee Date: Tue, 20 Aug 2024 20:09:43 +0530 Subject: [PATCH 1/5] chore(vulnerability): Log Injection (High) (#11131) --- .../metadata/systemmetadata/ESSystemMetadataDAO.java | 5 +---- .../datahub/auth/authentication/AuthServiceController.java | 6 +++--- .../openapi/schema/registry/SchemaRegistryController.java | 6 +++++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java index cf1674ac004809..a5c2fb04b5ce39 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java @@ -54,10 +54,7 @@ public Optional getTaskStatus(@Nonnull String nodeId, long task try { return client.tasks().get(taskRequest, RequestOptions.DEFAULT); } catch (IOException e) { - log.error( - String.format( - "ERROR: Failed to get task status for %s:%d. See stacktrace for a more detailed error:", - nodeId, taskId)); + log.error("ERROR: Failed to get task status: ", e); e.printStackTrace(); } return Optional.empty(); diff --git a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java index 71eaca71a3641a..de2582af00a932 100644 --- a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java +++ b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java @@ -123,7 +123,7 @@ CompletableFuture> generateSessionTokenForUser( try { bodyJson = mapper.readTree(jsonStr); } catch (JsonProcessingException e) { - log.error("Failed to parse json while attempting to generate session token {}", jsonStr, e); + log.error("Failed to parse json while attempting to generate session token ", e); return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } if (bodyJson == null) { @@ -238,7 +238,7 @@ CompletableFuture> signUp(final HttpEntity httpEn try { Urn inviteTokenUrn = _inviteTokenService.getInviteTokenUrn(inviteTokenString); if (!_inviteTokenService.isInviteTokenValid(systemOperationContext, inviteTokenUrn)) { - log.error("Invalid invite token {}", inviteTokenString); + log.error("Invalid invite token"); return new ResponseEntity<>(HttpStatus.BAD_REQUEST); } @@ -386,7 +386,7 @@ CompletableFuture> track(final HttpEntity httpEnt try { bodyJson = mapper.readTree(jsonStr); } catch (JsonProcessingException e) { - log.error("Failed to parse json while attempting to track analytics event {}", jsonStr); + log.error("Failed to parse json while attempting to track analytics event", e); return CompletableFuture.completedFuture(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } if (bodyJson == null) { diff --git a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java index d73b353f38ae78..09043c6dd5e87e 100644 --- a/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java +++ b/metadata-service/schema-registry-servlet/src/main/java/io/datahubproject/openapi/schema/registry/SchemaRegistryController.java @@ -307,7 +307,11 @@ public ResponseEntity register( }) .orElseGet( () -> { - log.error("Couldn't find topic with name {}.", topicName); + if (topicName.matches("^[a-zA-Z0-9._-]+$")) { + log.error("Couldn't find topic with name {}.", topicName); + } else { + log.error("Couldn't find topic (Malformed topic name)"); + } return new ResponseEntity<>(HttpStatus.NOT_FOUND); }); } From 627c5abfd6e0ffd0fea196870cbbcf1b61f74625 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Wed, 21 Aug 2024 00:12:00 +0530 Subject: [PATCH 2/5] feat(ingestion/bigquery): Add ability to filter GCP project ingestion based on project labels (#11169) Co-authored-by: Alice Naghshineh Co-authored-by: Alice Naghshineh <45885699+anaghshineh@users.noreply.github.com> Co-authored-by: Tamas Nemeth Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com> --- docs/quick-ingestion-guides/bigquery/setup.md | 4 +- metadata-ingestion/setup.py | 1 + .../ingestion/source/bigquery_v2/bigquery.py | 32 +- .../source/bigquery_v2/bigquery_config.py | 47 +- .../source/bigquery_v2/bigquery_report.py | 1 + .../source/bigquery_v2/bigquery_schema.py | 26 +- .../bigquery_v2/bigquery_test_connection.py | 4 +- .../ingestion/source/bigquery_v2/lineage.py | 4 +- .../bigquery_project_label_mcp_golden.json | 452 ++++++++++++++++++ .../integration/bigquery_v2/test_bigquery.py | 150 +++++- .../tests/unit/test_bigquery_source.py | 128 ++++- 11 files changed, 786 insertions(+), 63 deletions(-) create mode 100644 metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json diff --git a/docs/quick-ingestion-guides/bigquery/setup.md b/docs/quick-ingestion-guides/bigquery/setup.md index 10351d6572c531..96850f2deb68ed 100644 --- a/docs/quick-ingestion-guides/bigquery/setup.md +++ b/docs/quick-ingestion-guides/bigquery/setup.md @@ -38,7 +38,9 @@ Please refer to the BigQuery [Permissions](https://cloud.google.com/iam/docs/per You can always add/remove roles to Service Accounts later on. Please refer to the BigQuery [Manage access to projects, folders, and organizations](https://cloud.google.com/iam/docs/granting-changing-revoking-access) guide for more details. ::: -3. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. +3. To filter projects based on the `project_labels` configuration, first visit [cloudresourcemanager.googleapis.com](https://console.developers.google.com/apis/api/cloudresourcemanager.googleapis.com/overview) and enable the `Cloud Resource Manager API` + +4. Create and download a [Service Account Key](https://cloud.google.com/iam/docs/creating-managing-service-account-keys). We will use this to set up authentication within DataHub. The key file looks like this: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7fb83fb6a83253..d59545694c3243 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -181,6 +181,7 @@ "google-cloud-logging<=3.5.0", "google-cloud-bigquery", "google-cloud-datacatalog>=1.5.0", + "google-cloud-resource-manager", "more-itertools>=8.12.0", "sqlalchemy-bigquery>=1.4.1", } diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 7a96b2f0643ab0..0d73c9ad028972 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -113,8 +113,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = "" self.bigquery_data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, - self.config.get_bigquery_client(), + report=BigQueryV2Report().schema_api_perf, + projects_client=config.get_projects_client(), + client=config.get_bigquery_client(), ) if self.config.extract_policy_tags_from_catalog: self.bigquery_data_dictionary.datacatalog_client = ( @@ -257,14 +258,37 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def _get_projects(self) -> List[BigqueryProject]: logger.info("Getting projects") + if self.config.project_ids or self.config.project_id: project_ids = self.config.project_ids or [self.config.project_id] # type: ignore return [ BigqueryProject(id=project_id, name=project_id) for project_id in project_ids ] - else: - return list(self._query_project_list()) + + if self.config.project_labels: + return list(self._query_project_list_from_labels()) + + return list(self._query_project_list()) + + def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]: + projects = self.bigquery_data_dictionary.get_projects_with_labels( + self.config.project_labels + ) + + if not projects: # Report failure on exception and if empty list is returned + self.report.report_failure( + "metadata-extraction", + "Get projects didn't return any project with any of the specified label(s). " + "Maybe resourcemanager.projects.list permission is missing for the service account. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account.", + ) + + for project in projects: + if self.config.project_id_pattern.allowed(project.id): + yield project + else: + self.report.report_dropped(project.id) def _query_project_list(self) -> Iterable[BigqueryProject]: try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index fe961dbd780f6f..af9256d8877f50 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -3,7 +3,7 @@ from datetime import timedelta from typing import Any, Dict, List, Optional, Union -from google.cloud import bigquery, datacatalog_v1 +from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 from google.cloud.logging_v2.client import Client as GCPLoggingClient from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator @@ -34,12 +34,16 @@ class BigQueryUsageConfig(BaseUsageConfig): max_query_duration: timedelta = Field( default=timedelta(minutes=15), - description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.", + description="Correction to pad start_time and end_time with. For handling the case where the read happens " + "within our time range but the query completion event is delayed and happens after the configured" + " end time.", ) apply_view_usage_to_tables: bool = Field( default=False, - description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.", + description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies " + "usage to views / tables mentioned in the query. If set to True, usage is applied to base tables " + "only.", ) @@ -74,6 +78,9 @@ def get_bigquery_client(self) -> bigquery.Client: client_options = self.extra_client_options return bigquery.Client(self.project_on_behalf, **client_options) + def get_projects_client(self) -> resourcemanager_v3.ProjectsClient: + return resourcemanager_v3.ProjectsClient() + def get_policy_tag_manager_client(self) -> datacatalog_v1.PolicyTagManagerClient: return datacatalog_v1.PolicyTagManagerClient() @@ -143,12 +150,14 @@ class BigQueryV2Config( dataset_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", + description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. " + "e.g. to match all tables in schema analytics, use the regex 'analytics'", ) match_fully_qualified_names: bool = Field( default=True, - description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", + description="[deprecated] Whether `dataset_pattern` is matched against fully qualified dataset name " + "`.`.", ) include_external_url: bool = Field( @@ -169,7 +178,9 @@ class BigQueryV2Config( table_snapshot_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + description="Regex patterns for table snapshots to filter in ingestion. Specify regex to match the entire " + "snapshot name in database.schema.snapshot format. e.g. to match all snapshots starting with " + "customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", ) debug_include_full_payloads: bool = Field( @@ -180,17 +191,22 @@ class BigQueryV2Config( number_of_datasets_process_in_batch: int = Field( hidden_from_docs=True, default=10000, - description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care.", + description="Number of table queried in batch when getting metadata. This is a low level config property " + "which should be touched with care.", ) number_of_datasets_process_in_batch_if_profiling_enabled: int = Field( default=1000, - description="Number of partitioned table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.", + description="Number of partitioned table queried in batch when getting metadata. This is a low level config " + "property which should be touched with care. This restriction is needed because we query " + "partitions system view which throws error if we try to touch too many tables.", ) use_tables_list_query_v2: bool = Field( default=False, - description="List tables using an improved query that extracts partitions and last modified timestamps more accurately. Requires the ability to read table data. Automatically enabled when profiling is enabled.", + description="List tables using an improved query that extracts partitions and last modified timestamps more " + "accurately. Requires the ability to read table data. Automatically enabled when profiling is " + "enabled.", ) @property @@ -199,7 +215,9 @@ def have_table_data_read_permission(self) -> bool: column_limit: int = Field( default=300, - description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.", + description="Maximum number of columns to process in a table. This is a low level config property which " + "should be touched with care. This restriction is needed because excessively wide tables can " + "result in failure to ingest the schema.", ) # The inheritance hierarchy is wonky here, but these options need modifications. project_id: Optional[str] = Field( @@ -214,6 +232,15 @@ def have_table_data_read_permission(self) -> bool: "Overrides `project_id_pattern`." ), ) + project_labels: List[str] = Field( + default_factory=list, + description=( + "Ingests projects with the specified labels. Set value in the format of `key:value`. Use this property to " + "define which projects to ingest based" + "on project-level labels. If project_ids or project_id is set, this configuration has no effect. The " + "ingestion process filters projects by label first, and then applies the project_id_pattern." + ), + ) storage_project_id: None = Field(default=None, hidden_from_docs=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 4cfcc3922ddc3d..807e99604f0133 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -31,6 +31,7 @@ class BigQuerySchemaApiPerfReport(Report): num_get_snapshots_for_dataset_api_requests: int = 0 list_projects: PerfTimer = field(default_factory=PerfTimer) + list_projects_with_labels: PerfTimer = field(default_factory=PerfTimer) list_datasets: PerfTimer = field(default_factory=PerfTimer) get_columns_for_dataset_sec: float = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index d73ac46c862ea1..4326ff7a35527f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -5,7 +5,7 @@ from typing import Any, Dict, Iterable, Iterator, List, Optional from google.api_core import retry -from google.cloud import bigquery, datacatalog_v1 +from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -144,9 +144,11 @@ def __init__( self, report: BigQuerySchemaApiPerfReport, client: bigquery.Client, + projects_client: resourcemanager_v3.ProjectsClient, datacatalog_client: Optional[datacatalog_v1.PolicyTagManagerClient] = None, ) -> None: self.bq_client = client + self.projects_client = projects_client self.report = report self.datacatalog_client = datacatalog_client @@ -175,7 +177,7 @@ def _should_retry(exc: BaseException) -> bool: # 'Quota exceeded: Your user exceeded quota for concurrent project.lists requests.' # Hence, added the api request retry of 15 min. # We already tried adding rate_limit externally, proving max_result and page_size - # to restrict the request calls inside list_project but issue still occured. + # to restrict the request calls inside list_project but issue still occurred. projects_iterator = self.bq_client.list_projects( max_results=max_results_per_page, page_token=page_token, @@ -202,6 +204,26 @@ def _should_retry(exc: BaseException) -> bool: return [] return projects + def get_projects_with_labels(self, labels: List[str]) -> List[BigqueryProject]: + with self.report.list_projects_with_labels: + try: + projects = [] + labels_query = " OR ".join([f"labels.{label}" for label in labels]) + for project in self.projects_client.search_projects(query=labels_query): + projects.append( + BigqueryProject( + id=project.project_id, name=project.display_name + ) + ) + + return projects + + except Exception as e: + logger.error( + f"Error getting projects with labels: {labels}. {e}", exc_info=True + ) + return [] + def get_datasets_for_project_id( self, project_id: str, maxResults: Optional[int] = None ) -> List[BigqueryDataset]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py index 3aac78c154b2ee..e21aadd91d7d52 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_test_connection.py @@ -96,7 +96,9 @@ def metadata_read_capability_test( client: bigquery.Client = config.get_bigquery_client() assert client bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client + report=BigQueryV2Report().schema_api_perf, + projects_client=config.get_projects_client(), + client=client, ) result = bigquery_data_dictionary.get_datasets_for_project_id( project_id, 10 diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 496bd64d3b4fe2..9d156914917402 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -479,7 +479,9 @@ def lineage_via_catalog_lineage_api( lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient() data_dictionary = BigQuerySchemaApi( - self.report.schema_api_perf, self.config.get_bigquery_client() + self.report.schema_api_perf, + self.config.get_bigquery_client(), + self.config.get_projects_client(), ) # Filtering datasets diff --git a/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json new file mode 100644 index 00000000000000..a529ddc6221a7a --- /dev/null +++ b/metadata-ingestion/tests/integration/bigquery_v2/bigquery_project_label_mcp_golden.json @@ -0,0 +1,452 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev" + }, + "name": "dev" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "bigquery", + "env": "PROD", + "project_id": "dev", + "dataset_id": "bigquery-dataset-1" + }, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m4!1m3!3m2!1sdev!2sbigquery-dataset-1", + "name": "bigquery-dataset-1" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "dev.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "age", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "INT", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Test Policy Tag" + } + ] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Age" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + }, + { + "fieldPath": "email", + "nullable": false, + "description": "comment", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "STRING", + "recursive": false, + "globalTags": { + "tags": [] + }, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Email_Address" + } + ], + "auditStamp": { + "time": 1643871600000, + "actor": "urn:li:corpuser:datahub" + } + }, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/bigquery?project=dev&ws=!1m5!1m4!4m3!1sdev!2sbigquery-dataset-1!3stable-1", + "name": "table-1", + "qualifiedName": "dev.bigquery-dataset-1.table-1", + "description": "", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery", + "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,dev)" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.bigquery-dataset-1.table-1,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e", + "urn": "urn:li:container:f284164f9a7db03ca6bbdb7bb17d5a7e" + }, + { + "id": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e", + "urn": "urn:li:container:ce17940c2d64e7e315e68f8d7d071b1e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Age", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Age" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "glossaryTerm", + "entityUrn": "urn:li:glossaryTerm:Email_Address", + "changeType": "UPSERT", + "aspectName": "glossaryTermKey", + "aspect": { + "json": { + "name": "Email_Address" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Test Policy Tag", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Test Policy Tag" + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py index 762c73d2a55c60..dff7f18db6135c 100644 --- a/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py +++ b/metadata-ingestion/tests/integration/bigquery_v2/test_bigquery.py @@ -15,6 +15,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( BigqueryColumn, BigqueryDataset, + BigqueryProject, BigQuerySchemaApi, BigqueryTable, ) @@ -39,6 +40,33 @@ def random_email(): ) +def recipe(mcp_output_path: str, override: dict = {}) -> dict: + return { + "source": { + "type": "bigquery", + "config": { + "project_ids": ["project-id-1"], + "include_usage_statistics": False, + "include_table_lineage": False, + "include_data_platform_instance": True, + "classification": ClassificationConfig( + enabled=True, + classifiers=[ + DynamicTypedClassifierConfig( + type="datahub", + config=DataHubClassifierConfig( + minimum_values_threshold=1, + ), + ) + ], + max_workers=1, + ).dict(), + }, + }, + "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + } + + @freeze_time(FROZEN_TIME) @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQuerySchemaGenerator, "get_core_table_details") @@ -47,9 +75,11 @@ def random_email(): @patch.object(BigQueryDataReader, "get_sample_data_for_table") @patch("google.cloud.bigquery.Client") @patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") def test_bigquery_v2_ingest( client, policy_tag_manager_client, + projects_client, get_sample_data_for_table, get_columns_for_dataset, get_datasets_for_project_id, @@ -111,33 +141,105 @@ def test_bigquery_v2_ingest( ) get_tables_for_dataset.return_value = iter([bigquery_table]) - source_config_dict: Dict[str, Any] = { - "project_ids": ["project-id-1"], - "include_usage_statistics": False, - "include_table_lineage": False, - "include_data_platform_instance": True, - "classification": ClassificationConfig( - enabled=True, - classifiers=[ - DynamicTypedClassifierConfig( - type="datahub", - config=DataHubClassifierConfig( - minimum_values_threshold=1, - ), - ) - ], - max_workers=1, - ).dict(), - } + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) - pipeline_config_dict: Dict[str, Any] = { - "source": { - "type": "bigquery", - "config": source_config_dict, - }, - "sink": {"type": "file", "config": {"filename": mcp_output_path}}, + run_and_get_pipeline(pipeline_config_dict) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=mcp_output_path, + golden_path=mcp_golden_path, + ) + + +@freeze_time(FROZEN_TIME) +@patch.object(BigQuerySchemaApi, attribute="get_projects_with_labels") +@patch.object(BigQuerySchemaApi, "get_tables_for_dataset") +@patch.object(BigQuerySchemaGenerator, "get_core_table_details") +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQuerySchemaApi, "get_columns_for_dataset") +@patch.object(BigQueryDataReader, "get_sample_data_for_table") +@patch("google.cloud.bigquery.Client") +@patch("google.cloud.datacatalog_v1.PolicyTagManagerClient") +@patch("google.cloud.resourcemanager_v3.ProjectsClient") +def test_bigquery_v2_project_labels_ingest( + client, + policy_tag_manager_client, + projects_client, + get_sample_data_for_table, + get_columns_for_dataset, + get_datasets_for_project_id, + get_core_table_details, + get_tables_for_dataset, + get_projects_with_labels, + pytestconfig, + tmp_path, +): + test_resources_dir = pytestconfig.rootpath / "tests/integration/bigquery_v2" + mcp_golden_path = f"{test_resources_dir}/bigquery_project_label_mcp_golden.json" + mcp_output_path = "{}/{}".format(tmp_path, "bigquery_project_label_mcp_output.json") + + get_datasets_for_project_id.return_value = [ + BigqueryDataset(name="bigquery-dataset-1") + ] + + get_projects_with_labels.return_value = [ + BigqueryProject(id="dev", name="development") + ] + + table_list_item = TableListItem( + {"tableReference": {"projectId": "", "datasetId": "", "tableId": ""}} + ) + table_name = "table-1" + get_core_table_details.return_value = {table_name: table_list_item} + get_columns_for_dataset.return_value = { + table_name: [ + BigqueryColumn( + name="age", + ordinal_position=1, + is_nullable=False, + field_path="col_1", + data_type="INT", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + policy_tags=["Test Policy Tag"], + ), + BigqueryColumn( + name="email", + ordinal_position=1, + is_nullable=False, + field_path="col_2", + data_type="STRING", + comment="comment", + is_partition_column=False, + cluster_column_position=None, + ), + ] + } + get_sample_data_for_table.return_value = { + "age": [random.randint(1, 80) for i in range(20)], + "email": [random_email() for i in range(20)], } + bigquery_table = BigqueryTable( + name=table_name, + comment=None, + created=None, + last_altered=None, + size_in_bytes=None, + rows_count=None, + ) + get_tables_for_dataset.return_value = iter([bigquery_table]) + + pipeline_config_dict: Dict[str, Any] = recipe(mcp_output_path=mcp_output_path) + + del pipeline_config_dict["source"]["config"]["project_ids"] + + pipeline_config_dict["source"]["config"]["project_labels"] = [ + "environment:development" + ] + run_and_get_pipeline(pipeline_config_dict) mce_helpers.check_golden_file( diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 746cf9b0acfc3e..d12ffbcbbcf10b 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -170,7 +170,11 @@ def test_bigquery_uri_with_credential(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_project_ids(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_ids( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -197,8 +201,10 @@ def test_get_projects_with_project_ids(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_with_project_ids_overrides_project_id_pattern( - get_bq_client_mock, + get_projects_client, + get_bigquery_client, ): config = BigQueryV2Config.parse_obj( { @@ -226,7 +232,11 @@ def test_platform_instance_config_always_none(): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_aspect_returns_project_id( + get_projects_client, + get_bq_client_mock, +): project_id = "project_id" expected_instance = ( f"urn:li:dataPlatformInstance:(urn:li:dataPlatform:bigquery,{project_id})" @@ -247,7 +257,11 @@ def test_get_dataplatform_instance_aspect_returns_project_id(get_bq_client_mock) @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_dataplatform_instance_default_no_instance( + get_projects_client, + get_bq_client_mock, +): config = BigQueryV2Config.parse_obj({}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) schema_gen = source.bq_schema_extractor @@ -263,7 +277,11 @@ def test_get_dataplatform_instance_default_no_instance(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_with_single_project_id(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_single_project_id( + get_projects_client, + get_bq_client_mock, +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) @@ -275,9 +293,10 @@ def test_get_projects_with_single_project_id(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_by_list(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_by_list(get_projects_client, get_bigquery_client): client_mock = MagicMock() - get_bq_client_mock.return_value = client_mock + get_bigquery_client.return_value = client_mock first_page = MagicMock() first_page.__iter__.return_value = iter( @@ -296,6 +315,7 @@ def test_get_projects_by_list(get_bq_client_mock): ] ) second_page.next_page_token = None + client_mock.list_projects.side_effect = [first_page, second_page] config = BigQueryV2Config.parse_obj({}) @@ -311,7 +331,10 @@ def test_get_projects_by_list(get_bq_client_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_filter_by_pattern( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [ BigqueryProject("test-project", "Test Project"), BigqueryProject("test-project-2", "Test Project 2"), @@ -329,7 +352,10 @@ def test_get_projects_filter_by_pattern(get_bq_client_mock, get_projects_mock): @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_empty( + get_projects_client, get_bq_client_mock, get_projects_mock +): get_projects_mock.return_value = [] config = BigQueryV2Config.parse_obj( @@ -342,7 +368,9 @@ def test_get_projects_list_empty(get_bq_client_mock, get_projects_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_projects_list_failure( + get_projects_client: MagicMock, get_bq_client_mock: MagicMock, caplog: pytest.LogCaptureFixture, ) -> None: @@ -366,7 +394,10 @@ def test_get_projects_list_failure( @patch.object(BigQuerySchemaApi, "get_projects") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_get_projects_list_fully_filtered(get_projects_mock, get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_list_fully_filtered( + get_projects_mock, get_bq_client_mock, get_projects_client +): get_projects_mock.return_value = [BigqueryProject("test-project", "Test Project")] config = BigQueryV2Config.parse_obj( @@ -399,7 +430,10 @@ def bigquery_table() -> BigqueryTable: @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_table_dataset_workunits( + get_projects_client, get_bq_client_mock, bigquery_table +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -471,7 +505,8 @@ def test_gen_table_dataset_workunits(get_bq_client_mock, bigquery_table): @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_simple_upstream_table_generation(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_simple_upstream_table_generation(get_bq_client_mock, get_projects_client): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( project_id="test-project", dataset="test-dataset", table="a" @@ -503,8 +538,10 @@ def test_simple_upstream_table_generation(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_without_temp_upstream( get_bq_client_mock, + get_projects_client, ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -536,7 +573,10 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_upstream_table_column_lineage_with_temp_table( + get_bq_client_mock, get_projects_client +): from datahub.ingestion.api.common import PipelineContext a: BigQueryTableRef = BigQueryTableRef( @@ -611,8 +651,9 @@ def test_upstream_table_column_lineage_with_temp_table(get_bq_client_mock): @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream( - get_bq_client_mock, + get_bq_client_mock, get_projects_client ): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -675,7 +716,10 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_table_processing_logic( + get_projects_client, get_bq_client_mock, data_dictionary_mock +): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock config = BigQueryV2Config.parse_obj( @@ -747,8 +791,9 @@ def test_table_processing_logic(get_bq_client_mock, data_dictionary_mock): @patch.object(BigQuerySchemaApi, "get_tables_for_dataset") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_table_processing_logic_date_named_tables( - get_bq_client_mock, data_dictionary_mock + get_projects_client, get_bq_client_mock, data_dictionary_mock ): client_mock = MagicMock() get_bq_client_mock.return_value = client_mock @@ -859,8 +904,10 @@ def bigquery_view_2() -> BigqueryView: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_views_for_dataset( get_bq_client_mock: Mock, + get_projects_client: MagicMock, query_mock: Mock, bigquery_view_1: BigqueryView, bigquery_view_2: BigqueryView, @@ -889,7 +936,9 @@ def test_get_views_for_dataset( ) query_mock.return_value = [row1, row2] bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client_mock + report=BigQueryV2Report().schema_api_perf, + client=client_mock, + projects_client=MagicMock(), ) views = bigquery_data_dictionary.get_views_for_dataset( @@ -905,8 +954,9 @@ def test_get_views_for_dataset( BigQuerySchemaGenerator, "gen_dataset_workunits", lambda *args, **kwargs: [] ) @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_gen_view_dataset_workunits( - get_bq_client_mock, bigquery_view_1, bigquery_view_2 + get_projects_client, get_bq_client_mock, bigquery_view_1, bigquery_view_2 ): project_id = "test-project" dataset_name = "test-dataset" @@ -963,7 +1013,9 @@ def bigquery_snapshot() -> BigqueryTableSnapshot: @patch.object(BigQuerySchemaApi, "get_query_result") @patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") def test_get_snapshots_for_dataset( + get_projects_client: MagicMock, get_bq_client_mock: Mock, query_mock: Mock, bigquery_snapshot: BigqueryTableSnapshot, @@ -988,7 +1040,9 @@ def test_get_snapshots_for_dataset( ) query_mock.return_value = [row1] bigquery_data_dictionary = BigQuerySchemaApi( - BigQueryV2Report().schema_api_perf, client_mock + report=BigQueryV2Report().schema_api_perf, + client=client_mock, + projects_client=MagicMock(), ) snapshots = bigquery_data_dictionary.get_snapshots_for_dataset( @@ -1001,7 +1055,10 @@ def test_get_snapshots_for_dataset( @patch.object(BigQueryV2Config, "get_bigquery_client") -def test_gen_snapshot_dataset_workunits(get_bq_client_mock, bigquery_snapshot): +@patch.object(BigQueryV2Config, "get_projects_client") +def test_gen_snapshot_dataset_workunits( + get_bq_client_mock, get_projects_client, bigquery_snapshot +): project_id = "test-project" dataset_name = "test-dataset" config = BigQueryV2Config.parse_obj( @@ -1140,7 +1197,9 @@ def test_default_config_for_excluding_projects_and_datasets(): @patch.object(BigQueryConnectionConfig, "get_bigquery_client", new=lambda self: None) @patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +@patch.object(BigQueryV2Config, "get_projects_client") def test_excluding_empty_projects_from_ingestion( + get_projects_client, get_datasets_for_project_id_mock, ): project_id_with_datasets = "project-id-with-datasets" @@ -1173,3 +1232,32 @@ def get_datasets_for_project_id_side_effect( config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True}) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2")) assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore + + +@patch.object(BigQueryV2Config, "get_bigquery_client") +@patch.object(BigQueryV2Config, "get_projects_client") +def test_get_projects_with_project_labels( + get_projects_client, + get_bq_client_mock, +): + client_mock = MagicMock() + + get_projects_client.return_value = client_mock + + client_mock.search_projects.return_value = [ + SimpleNamespace(project_id="dev", display_name="dev_project"), + SimpleNamespace(project_id="qa", display_name="qa_project"), + ] + + config = BigQueryV2Config.parse_obj( + { + "project_labels": ["environment:dev", "environment:qa"], + } + ) + + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + + assert source._get_projects() == [ + BigqueryProject("dev", "dev_project"), + BigqueryProject("qa", "qa_project"), + ] From 09acd5b989e3b00a5dce263c70cb27bcd744abb9 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:42:45 -0500 Subject: [PATCH 3/5] chore(kafka): kafka version bump (#11211) --- datahub-upgrade/build.gradle | 2 +- docker/kafka-setup/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datahub-upgrade/build.gradle b/datahub-upgrade/build.gradle index 304bf3a67a5b27..f64886953fe225 100644 --- a/datahub-upgrade/build.gradle +++ b/datahub-upgrade/build.gradle @@ -55,7 +55,7 @@ dependencies { // mock internal schema registry implementation externalDependency.kafkaAvroSerde implementation externalDependency.kafkaAvroSerializer - implementation "org.apache.kafka:kafka_2.12:3.7.0" + implementation "org.apache.kafka:kafka_2.12:3.7.1" implementation externalDependency.slf4jApi compileOnly externalDependency.lombok diff --git a/docker/kafka-setup/Dockerfile b/docker/kafka-setup/Dockerfile index ad1d01c1ce97c0..af32dd5dd4d36f 100644 --- a/docker/kafka-setup/Dockerfile +++ b/docker/kafka-setup/Dockerfile @@ -22,7 +22,7 @@ ARG ALPINE_REPO_URL ARG APACHE_DOWNLOAD_URL ARG GITHUB_REPO_URL -ENV KAFKA_VERSION=3.7.0 +ENV KAFKA_VERSION=3.7.1 ENV SCALA_VERSION=2.13 LABEL name="kafka" version=${KAFKA_VERSION} From be7b3439fd0ccc2edd3db423d77718ef0821848d Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Tue, 20 Aug 2024 19:15:18 -0400 Subject: [PATCH 4/5] fix(forms) Fix small bug in createForm graphql endpoint (#11216) --- .../datahub/graphql/resolvers/mutate/util/FormUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/FormUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/FormUtils.java index 17718f39c12387..d118c04d19393d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/FormUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/FormUtils.java @@ -202,7 +202,7 @@ public static FormActorAssignment mapFormActorAssignment( if (input.getGroups() != null) { UrnArray groupUrns = new UrnArray(); input.getGroups().forEach(group -> groupUrns.add(UrnUtils.getUrn(group))); - result.setUsers(groupUrns); + result.setGroups(groupUrns); } return result; From 6b3c06ae8c542f0f45197553e06c30c86770a0b2 Mon Sep 17 00:00:00 2001 From: sid-acryl <155424659+sid-acryl@users.noreply.github.com> Date: Wed, 21 Aug 2024 14:56:37 +0530 Subject: [PATCH 5/5] fix(ingestion/lookml): drop `hive.` from CLL (#11210) --- .../ingestion/source/looker/view_upstream.py | 2 +- .../lookml/drop_hive_dot/data.model.lkml | 6 + .../top_10_employee_income_source.view.lkml | 26 ++ .../lookml/drop_hive_dot_golden.json | 357 ++++++++++++++++++ .../tests/integration/lookml/test_lookml.py | 27 ++ 5 files changed, 417 insertions(+), 1 deletion(-) create mode 100644 metadata-ingestion/tests/integration/lookml/drop_hive_dot/data.model.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/drop_hive_dot_golden.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index d5929b52aea3a3..0917a9e9faafee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -385,7 +385,7 @@ def get_upstream_column_ref( config=self.config, ) - return upstreams_column_refs + return _drop_hive_dot_from_upstream(upstreams_column_refs) def get_upstream_dataset_urn(self) -> List[Urn]: return self._get_upstream_dataset_urn() diff --git a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/data.model.lkml b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/data.model.lkml new file mode 100644 index 00000000000000..95391f6a73e635 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/data.model.lkml @@ -0,0 +1,6 @@ +connection: "my_connection" + +include: "top_10_employee_income_source.view.lkml" + +explore: top_10_employee_income_source { +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml new file mode 100644 index 00000000000000..149ce9219b54b8 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml @@ -0,0 +1,26 @@ +view: top_10_employee_income_source { + derived_table: { + sql: SELECT id, + name, + source + FROM hive.employee_db.income_source + ORDER BY source desc + LIMIT 10 + ;; + } + + dimension: id { + type: number + sql: ${TABLE}.id ;; + } + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } + + dimension: source { + type: string + sql: ${TABLE}.source ;; + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/drop_hive_dot_golden.json b/metadata-ingestion/tests/integration/lookml/drop_hive_dot_golden.json new file mode 100644 index 00000000000000..e1dad2e91b7353 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/drop_hive_dot_golden.json @@ -0,0 +1,357 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "looker", + "env": "PROD", + "project_name": "lkml_samples" + }, + "name": "lkml_samples" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "LookML Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Folders" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "SELECT id,\n name,\n source\n FROM hive.employee_db.income_source\n ORDER BY source desc\n LIMIT 10", + "viewLanguage": "sql" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/Develop/lkml_samples/" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,employee_db.income_source,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,employee_db.income_source,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,employee_db.income_source,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,employee_db.income_source,PROD),source)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),source)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "top_10_employee_income_source", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "id", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "name", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "source", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "top_10_employee_income_source.view.lkml", + "looker.model": "data" + }, + "name": "top_10_employee_income_source", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "urn": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Dimension", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Dimension" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 9e051995d0b940..a5d838cb16d73a 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1032,3 +1032,30 @@ def test_field_tag_ingest(pytestconfig, tmp_path, mock_time): output_path=tmp_path / mce_out_file, golden_path=golden_path, ) + + +@freeze_time(FROZEN_TIME) +def test_drop_hive(pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + mce_out_file = "drop_hive_dot.json" + + new_recipe = get_default_recipe( + f"{tmp_path}/{mce_out_file}", + f"{test_resources_dir}/drop_hive_dot", + ) + + new_recipe["source"]["config"]["connection_to_platform_map"] = { + "my_connection": "hive" + } + + pipeline = Pipeline.create(new_recipe) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + golden_path = test_resources_dir / "drop_hive_dot_golden.json" + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out_file, + golden_path=golden_path, + )