diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index a250bddcc16d1f..54042d104d9066 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -10,7 +10,7 @@ on: - "metadata-models/**" pull_request: branches: - - master + - "**" paths: - ".github/**" - "metadata-ingestion-modules/airflow-plugin/**" diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 96b9bb2a149335..25f3957e8f0861 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -8,7 +8,7 @@ on: - "**.md" pull_request: branches: - - master + - "**" paths-ignore: - "docs/**" - "**.md" @@ -24,17 +24,12 @@ jobs: strategy: fail-fast: false matrix: - command: - [ + command: [ # metadata-ingestion and airflow-plugin each have dedicated build jobs "except_metadata_ingestion", "frontend" ] - timezone: - [ - "UTC", - "America/New_York", - ] + timezone: ["UTC", "America/New_York"] runs-on: ubuntu-latest timeout-minutes: 60 steps: diff --git a/.github/workflows/check-datahub-jars.yml b/.github/workflows/check-datahub-jars.yml index 841a9ed5f9bc73..9a17a70e7f8d41 100644 --- a/.github/workflows/check-datahub-jars.yml +++ b/.github/workflows/check-datahub-jars.yml @@ -10,7 +10,7 @@ on: - "**.md" pull_request: branches: - - master + - "**" paths-ignore: - "docker/**" - "docs/**" @@ -28,12 +28,7 @@ jobs: max-parallel: 1 fail-fast: false matrix: - command: - [ - "datahub-client", - "datahub-protobuf", - "spark-lineage" - ] + command: ["datahub-client", "datahub-protobuf", "spark-lineage"] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/close-stale-issues.yml b/.github/workflows/close-stale-issues.yml index a7809087702acb..98e3041f288040 100644 --- a/.github/workflows/close-stale-issues.yml +++ b/.github/workflows/close-stale-issues.yml @@ -18,7 +18,9 @@ jobs: days-before-issue-stale: 30 days-before-issue-close: 30 stale-issue-label: "stale" - stale-issue-message: "This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io" + stale-issue-message: + "This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io.\ + \ For feature requests please use https://feature-requests.datahubproject.io" close-issue-message: "This issue was closed because it has been inactive for 30 days since being marked as stale." days-before-pr-stale: -1 days-before-pr-close: -1 diff --git a/.github/workflows/code-checks.yml b/.github/workflows/code-checks.yml index 6ce19a5b4616ec..e12971b8a62084 100644 --- a/.github/workflows/code-checks.yml +++ b/.github/workflows/code-checks.yml @@ -10,7 +10,7 @@ on: - ".github/workflows/code-checks.yml" pull_request: branches: - - master + - "**" paths: - "metadata-io/**" - "datahub-web-react/**" @@ -21,17 +21,12 @@ concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true - jobs: code_check: strategy: fail-fast: false matrix: - command: - [ - "check_event_type.py", - "check_policies.py" - ] + command: ["check_event_type.py", "check_policies.py"] name: run code checks runs-on: ubuntu-latest steps: @@ -43,5 +38,5 @@ jobs: with: python-version: "3.10" - name: run check ${{ matrix.command }} - run: | - python .github/scripts/${{ matrix.command }} \ No newline at end of file + run: |- + python .github/scripts/${{ matrix.command }} diff --git a/.github/workflows/docker-postgres-setup.yml b/.github/workflows/docker-postgres-setup.yml index a5d421d4b7ff56..fda4349f90bf7c 100644 --- a/.github/workflows/docker-postgres-setup.yml +++ b/.github/workflows/docker-postgres-setup.yml @@ -8,7 +8,7 @@ on: - ".github/workflows/docker-postgres-setup.yml" pull_request: branches: - - master + - "**" paths: - "docker/postgres-setup/**" - ".github/workflows/docker-postgres-setup.yml" @@ -61,4 +61,3 @@ jobs: context: . file: ./docker/postgres-setup/Dockerfile platforms: linux/amd64,linux/arm64 - diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 20675610b583b9..193aeafad4af90 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -8,7 +8,7 @@ on: - "**.md" pull_request: branches: - - master + - "**" paths-ignore: - "docs/**" - "**.md" @@ -551,7 +551,6 @@ jobs: id: tag run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT - datahub_ingestion_slim_build: name: Build and Push DataHub Ingestion Docker Images runs-on: ubuntu-latest @@ -815,8 +814,8 @@ jobs: DATAHUB_VERSION: ${{ needs.setup.outputs.unique_tag }} DATAHUB_ACTIONS_IMAGE: ${{ env.DATAHUB_INGESTION_IMAGE }} ACTIONS_VERSION: ${{ needs.datahub_ingestion_slim_build.outputs.tag }} - ACTIONS_EXTRA_PACKAGES: 'acryl-datahub-actions[executor]==0.0.13 acryl-datahub-actions==0.0.13 acryl-datahub==0.10.5' - ACTIONS_CONFIG: 'https://raw.githubusercontent.com/acryldata/datahub-actions/main/docker/config/executor.yaml' + ACTIONS_EXTRA_PACKAGES: "acryl-datahub-actions[executor]==0.0.13 acryl-datahub-actions==0.0.13 acryl-datahub==0.10.5" + ACTIONS_CONFIG: "https://raw.githubusercontent.com/acryldata/datahub-actions/main/docker/config/executor.yaml" run: | ./smoke-test/run-quickstart.sh - name: sleep 60s diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index b4d2d8182afe0d..4f55a89eb93b5e 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -3,7 +3,7 @@ name: documentation on: pull_request: branches: - - master + - "**" push: branches: - master diff --git a/.github/workflows/lint-actions.yml b/.github/workflows/lint-actions.yml index b285e46da48575..6f34bf292bf51a 100644 --- a/.github/workflows/lint-actions.yml +++ b/.github/workflows/lint-actions.yml @@ -2,8 +2,10 @@ name: Lint actions on: pull_request: paths: - - '.github/workflows/**' + - ".github/workflows/**" + branches: + - "**" jobs: actionlint: runs-on: ubuntu-latest diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index dea4603868f8ef..699ca330ce0ac6 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -9,7 +9,7 @@ on: - "metadata-models/**" pull_request: branches: - - master + - "**" paths: - ".github/**" - "metadata-ingestion/**" diff --git a/.github/workflows/metadata-io.yml b/.github/workflows/metadata-io.yml index e37ddd0ce4e86f..48f230ce14c8db 100644 --- a/.github/workflows/metadata-io.yml +++ b/.github/workflows/metadata-io.yml @@ -10,7 +10,7 @@ on: - "metadata-io/**" pull_request: branches: - - master + - "**" paths: - "**/*.gradle" - "li-utils/**" diff --git a/.github/workflows/spark-smoke-test.yml b/.github/workflows/spark-smoke-test.yml index b2482602e75480..541b2019b93ef1 100644 --- a/.github/workflows/spark-smoke-test.yml +++ b/.github/workflows/spark-smoke-test.yml @@ -12,7 +12,7 @@ on: - ".github/workflows/spark-smoke-test.yml" pull_request: branches: - - master + - "**" paths: - "metadata_models/**" - "metadata-integration/java/datahub-client/**" diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 1bd5b6f1f768b5..b18384909c33f0 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -130,7 +130,7 @@ "name": "dynamodb", "displayName": "DynamoDB", "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", - "recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # User could use the below option to provide a list of primary keys of a table in dynamodb format,\n # those items from given primary keys will be included when we scan the table.\n # For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items.\n # We'll enforce the the primary keys list size not to exceed 100\n # The total items we'll try to retrieve in these two scenarios:\n # 1. If user don't specify include_table_item: we'll retrieve up to 100 items\n # 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in\n # the table, with a total not more than 200 items\n # include_table_item:\n # table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]" + "recipe": "source:\n type: dynamodb\n config:\n platform_instance: \"AWS_ACCOUNT_ID\"\n aws_access_key_id : '${AWS_ACCESS_KEY_ID}'\n aws_secret_access_key : '${AWS_SECRET_ACCESS_KEY}'\n # If there are items that have most representative fields of the table, users could use the\n # `include_table_item` option to provide a list of primary keys of the table in dynamodb format.\n # For each `region.table`, the list of primary keys can be at most 100.\n # We include these items in addition to the first 100 items in the table when we scan it.\n # include_table_item:\n # region.table_name:\n # [\n # {\n # 'partition_key_name': { 'attribute_type': 'attribute_value' },\n # 'sort_key_name': { 'attribute_type': 'attribute_value' },\n # },\n # ]" }, { "urn": "urn:li:dataPlatform:glue", @@ -223,4 +223,4 @@ "docsUrl": "https://datahubproject.io/docs/metadata-ingestion/", "recipe": "source:\n type: \n config:\n # Source-type specifics config\n " } -] \ No newline at end of file +] diff --git a/docs-website/download_historical_versions.py b/docs-website/download_historical_versions.py index 83157edc1972cf..53ee9cf1e63ef5 100644 --- a/docs-website/download_historical_versions.py +++ b/docs-website/download_historical_versions.py @@ -1,6 +1,7 @@ import json import os import tarfile +import time import urllib.request repo_url = "https://api.github.com/repos/datahub-project/static-assets" @@ -16,17 +17,30 @@ def download_file(url, destination): f.write(chunk) -def fetch_urls(repo_url: str, folder_path: str, file_format: str): +def fetch_urls( + repo_url: str, folder_path: str, file_format: str, max_retries=3, retry_delay=5 +): api_url = f"{repo_url}/contents/{folder_path}" - response = urllib.request.urlopen(api_url) - data = response.read().decode("utf-8") - urls = [ - file["download_url"] - for file in json.loads(data) - if file["name"].endswith(file_format) - ] - print(urls) - return urls + for attempt in range(max_retries + 1): + try: + response = urllib.request.urlopen(api_url) + if response.status == 403 or (500 <= response.status < 600): + raise Exception(f"HTTP Error {response.status}: {response.reason}") + data = response.read().decode("utf-8") + urls = [ + file["download_url"] + for file in json.loads(data) + if file["name"].endswith(file_format) + ] + print(urls) + return urls + except Exception as e: + if attempt < max_retries: + print(f"Attempt {attempt + 1}/{max_retries}: {e}") + time.sleep(retry_delay) + else: + print(f"Max retries reached. Unable to fetch data.") + raise def extract_tar_file(destination_path): diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 4df8d435cf1c45..5d0ad5eaf8f7e1 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -9,6 +9,11 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now. - #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details. - #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`. +- #8943 The Unity Catalog ingestion source has a new option `include_metastore`, which will cause all urns to be changed when disabled. +This is currently enabled by default to preserve compatibility, but will be disabled by default and then removed in the future. +If stateful ingestion is enabled, simply setting `include_metastore: false` will perform all required cleanup. +Otherwise, we recommend soft deleting all databricks data via the DataHub CLI: +`datahub delete --platform databricks --soft` and then reingesting with `include_metastore: false`. ### Potential Downtime diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md b/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md index 7f9a0324c7bc64..a1c0a6e2d4d214 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_post.md @@ -1,21 +1,18 @@ -## Limitations - -For each region, the list table operation returns maximum number 100 tables, we need to further improve it by implementing pagination for listing tables - ## Advanced Configurations ### Using `include_table_item` config -If there are items that have most representative fields of the table, user could use the `include_table_item` option to provide a list of primary keys of a table in dynamodb format, those items from given primary keys will be included when we scan the table. +If there are items that have most representative fields of the table, users could use the `include_table_item` option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it. -Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if user has a table `Reply` with composite primary key `Id` and `ReplyDateTime`, user can use `include_table_item` to include 2 items as following: +Take [AWS DynamoDB Developer Guide Example tables and data](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/AppendixSampleTables.html) as an example, if a account has a table `Reply` in the `us-west-2` region with composite primary key `Id` and `ReplyDateTime`, users can use `include_table_item` to include 2 items as following: Example: ```yml -# put the table name and composite key in DynamoDB format +# The table name should be in the format of region.table_name +# The primary keys should be in the DynamoDB format include_table_item: - Reply: + us-west-2.Reply: [ { "ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" }, diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md index a48e8d5be04aa0..598d0ecdb3786b 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_pre.md @@ -1,8 +1,8 @@ ### Prerequisities -In order to execute this source, you will need to create access key and secret keys that have DynamoDB read access. You can create these policies and attach to your account or can ask your account admin to attach these policies to your account. +In order to execute this source, you need to attach the `AmazonDynamoDBReadOnlyAccess` policy to a user in your AWS account. Then create an API access key and secret for the user. -For access key permissions, you can create a policy with permissions below and attach to your account, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) +For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in [Managing access keys for IAM users](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) ```json { @@ -22,5 +22,3 @@ For access key permissions, you can create a policy with permissions below and a ] } ``` - -For DynamoDB read access, you can simply attach AWS managed policy `AmazonDynamoDBReadOnlyAccess` to your account, you can find more details in [Attaching a policy to an IAM user group](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_groups_manage_attach-policy.html) diff --git a/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml b/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml index bd41637907b5c9..4f4edc9a7d496e 100644 --- a/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml +++ b/metadata-ingestion/docs/sources/dynamodb/dynamodb_recipe.yml @@ -4,16 +4,14 @@ source: platform_instance: "AWS_ACCOUNT_ID" aws_access_key_id: "${AWS_ACCESS_KEY_ID}" aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}" - # User could use the below option to provide a list of primary keys of a table in dynamodb format, - # those items from given primary keys will be included when we scan the table. - # For each table we can retrieve up to 16 MB of data, which can contain as many as 100 items. - # We'll enforce the the primary keys list size not to exceed 100 - # The total items we'll try to retrieve in these two scenarios: - # 1. If user don't specify include_table_item: we'll retrieve up to 100 items - # 2. If user specifies include_table_item: we'll retrieve up to 100 items plus user specified items in - # the table, with a total not more than 200 items + # + # If there are items that have most representative fields of the table, users could use the + # `include_table_item` option to provide a list of primary keys of the table in dynamodb format. + # For each `region.table`, the list of primary keys can be at most 100. + # We include these items in addition to the first 100 items in the table when we scan it. + # # include_table_item: - # table_name: + # region.table_name: # [ # { # "partition_key_name": { "attribute_type": "attribute_value" }, diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index 37b93f3e598e1a..a9f891ddb7b1e1 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -4,7 +4,7 @@ from pydantic.fields import Field from datahub.configuration.common import ConfigModel, ConfigurationError -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.metadata.schema_classes import FabricTypeClass DEFAULT_ENV = FabricTypeClass.PROD diff --git a/metadata-ingestion/src/datahub/configuration/pydantic_field_deprecation.py b/metadata-ingestion/src/datahub/configuration/validate_field_deprecation.py similarity index 74% rename from metadata-ingestion/src/datahub/configuration/pydantic_field_deprecation.py rename to metadata-ingestion/src/datahub/configuration/validate_field_deprecation.py index ed82acb594ed7c..6134c4dab48174 100644 --- a/metadata-ingestion/src/datahub/configuration/pydantic_field_deprecation.py +++ b/metadata-ingestion/src/datahub/configuration/validate_field_deprecation.py @@ -1,20 +1,28 @@ import warnings -from typing import Optional, Type +from typing import Any, Optional, Type import pydantic from datahub.configuration.common import ConfigurationWarning from datahub.utilities.global_warning_util import add_global_warning +_unset = object() -def pydantic_field_deprecated(field: str, message: Optional[str] = None) -> classmethod: + +def pydantic_field_deprecated( + field: str, + warn_if_value_is_not: Any = _unset, + message: Optional[str] = None, +) -> classmethod: if message: output = message else: output = f"{field} is deprecated and will be removed in a future release. Please remove it from your config." def _validate_deprecated(cls: Type, values: dict) -> dict: - if field in values: + if field in values and ( + warn_if_value_is_not is _unset or values[field] != warn_if_value_is_not + ): add_global_warning(output) warnings.warn(output, ConfigurationWarning, stacklevel=2) return values diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 06f689dfd317b1..65e0c0d6ba60d5 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -94,7 +94,15 @@ class MetastoreKey(ContainerKey): metastore: str -class CatalogKey(MetastoreKey): +class CatalogKeyWithMetastore(MetastoreKey): + catalog: str + + +class UnitySchemaKeyWithMetastore(CatalogKeyWithMetastore): + unity_schema: str + + +class CatalogKey(ContainerKey): catalog: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index b3e88459917b39..8ae17600e0eeaf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -1,12 +1,9 @@ -import dataclasses import logging from datetime import datetime from typing import Dict, Iterable, List, Optional, Tuple, cast from dateutil.relativedelta import relativedelta -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config @@ -15,7 +12,7 @@ RANGE_PARTITION_NAME, BigqueryTable, ) -from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest +from datahub.ingestion.source.sql.sql_generic import BaseTable from datahub.ingestion.source.sql.sql_generic_profiler import ( GenericProfiler, TableProfilerRequest, @@ -25,12 +22,6 @@ logger = logging.getLogger(__name__) -@dataclasses.dataclass -class BigqueryProfilerRequest(GEProfilerRequest): - table: BigqueryTable - profile_table_level_only: bool = False - - class BigqueryProfiler(GenericProfiler): config: BigQueryV2Config report: BigQueryV2Report @@ -183,84 +174,54 @@ def get_workunits( ) # Emit the profile work unit - profile_request = self.get_bigquery_profile_request( - project=project_id, dataset=dataset, table=table - ) + profile_request = self.get_profile_request(table, dataset, project_id) if profile_request is not None: + self.report.report_entity_profiled(profile_request.pretty_name) profile_requests.append(profile_request) if len(profile_requests) == 0: return - yield from self.generate_wu_from_profile_requests(profile_requests) - - def generate_wu_from_profile_requests( - self, profile_requests: List[BigqueryProfilerRequest] - ) -> Iterable[MetadataWorkUnit]: - table_profile_requests = cast(List[TableProfilerRequest], profile_requests) - for request, profile in self.generate_profiles( - table_profile_requests, + yield from self.generate_profile_workunits( + profile_requests, self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), - ): - if request is None or profile is None: - continue - - request = cast(BigqueryProfilerRequest, request) - profile.sizeInBytes = request.table.size_in_bytes - # If table is partitioned we profile only one partition (if nothing set then the last one) - # but for table level we can use the rows_count from the table metadata - # This way even though column statistics only reflects one partition data but the rows count - # shows the proper count. - if profile.partitionSpec and profile.partitionSpec.partition: - profile.rowCount = request.table.rows_count - - dataset_name = request.pretty_name - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) - # We don't add to the profiler state if we only do table level profiling as it always happens - if self.state_handler and not request.profile_table_level_only: - self.state_handler.add_to_state( - dataset_urn, int(datetime.now().timestamp() * 1000) - ) - - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=profile - ).as_workunit() + ) - def get_bigquery_profile_request( - self, project: str, dataset: str, table: BigqueryTable - ) -> Optional[BigqueryProfilerRequest]: - skip_profiling = False - profile_table_level_only = self.config.profiling.profile_table_level_only - dataset_name = BigqueryTableIdentifier( - project_id=project, dataset=dataset, table=table.name + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + return BigqueryTableIdentifier( + project_id=db_name, dataset=schema_name, table=table_name ).get_table_name() - if not self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, table.size_in_bytes, table.rows_count - ): - profile_table_level_only = True - self.report.num_tables_not_eligible_profiling[f"{project}.{dataset}"] += 1 - if not table.column_count: - skip_profiling = True + def get_batch_kwargs( + self, table: BaseTable, schema_name: str, db_name: str + ) -> dict: + return dict( + schema=db_name, # + table=f"{schema_name}.{table.name}", # . + ) - if skip_profiling: - if self.config.profiling.report_dropped_profiles: - self.report.report_dropped(f"profile of {dataset_name}") + def get_profile_request( + self, table: BaseTable, schema_name: str, db_name: str + ) -> Optional[TableProfilerRequest]: + profile_request = super().get_profile_request(table, schema_name, db_name) + + if not profile_request: return None + # Below code handles profiling changes required for partitioned or sharded tables + # 1. Skip profile if partition profiling is disabled. + # 2. Else update `profile_request.batch_kwargs` with partition and custom_sql + + bq_table = cast(BigqueryTable, table) (partition, custom_sql) = self.generate_partition_profiler_query( - project, dataset, table, self.config.profiling.partition_datetime + db_name, schema_name, bq_table, self.config.profiling.partition_datetime ) - if partition is None and table.partition_info: + + if partition is None and bq_table.partition_info: self.report.report_warning( "profile skipped as partitioned table is empty or partition id or type was invalid", - dataset_name, + profile_request.pretty_name, ) return None if ( @@ -268,24 +229,20 @@ def get_bigquery_profile_request( and not self.config.profiling.partition_profiling_enabled ): logger.debug( - f"{dataset_name} and partition {partition} is skipped because profiling.partition_profiling_enabled property is disabled" + f"{profile_request.pretty_name} and partition {partition} is skipped because profiling.partition_profiling_enabled property is disabled" ) self.report.profiling_skipped_partition_profiling_disabled.append( - dataset_name + profile_request.pretty_name ) return None - self.report.report_entity_profiled(dataset_name) - logger.debug(f"Preparing profiling request for {dataset_name}") - profile_request = BigqueryProfilerRequest( - pretty_name=dataset_name, - batch_kwargs=dict( - schema=project, - table=f"{dataset}.{table.name}", - custom_sql=custom_sql, - partition=partition, - ), - table=table, - profile_table_level_only=profile_table_level_only, - ) + if partition: + logger.debug("Updating profiling request for partitioned/sharded tables") + profile_request.batch_kwargs.update( + dict( + custom_sql=custom_sql, + partition=partition, + ) + ) + return profile_request diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index f9b71892975b45..0f5c08eb6ac549 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -18,8 +18,8 @@ ConfigurationError, LineageConfig, ) -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -214,7 +214,9 @@ class DBTCommonConfig( default=False, description="Use model identifier instead of model name if defined (if not, default to model name).", ) - _deprecate_use_identifiers = pydantic_field_deprecated("use_identifiers") + _deprecate_use_identifiers = pydantic_field_deprecated( + "use_identifiers", warn_if_value_is_not=False + ) entities_enabled: DBTEntitiesEnabled = Field( DBTEntitiesEnabled(), @@ -278,6 +280,14 @@ class DBTCommonConfig( description="When enabled, converts column URNs to lowercase to ensure cross-platform compatibility. " "If `target_platform` is Snowflake, the default is True.", ) + use_compiled_code: bool = Field( + default=False, + description="When enabled, uses the compiled dbt code instead of the raw dbt node definition.", + ) + test_warnings_are_errors: bool = Field( + default=False, + description="When enabled, dbt test warnings will be treated as failures.", + ) @validator("target_platform") def validate_target_platform_value(cls, target_platform: str) -> str: @@ -811,7 +821,7 @@ def _make_assertion_from_test( mce_builder.make_schema_field_urn(upstream_urn, column_name) ], nativeType=node.name, - logic=node.compiled_code if node.compiled_code else node.raw_code, + logic=node.compiled_code or node.raw_code, aggregation=AssertionStdAggregationClass._NATIVE_, nativeParameters=string_map(kw_args), ), @@ -825,7 +835,7 @@ def _make_assertion_from_test( dataset=upstream_urn, scope=DatasetAssertionScopeClass.DATASET_ROWS, operator=AssertionStdOperatorClass._NATIVE_, - logic=node.compiled_code if node.compiled_code else node.raw_code, + logic=node.compiled_code or node.raw_code, nativeType=node.name, aggregation=AssertionStdAggregationClass._NATIVE_, nativeParameters=string_map(kw_args), @@ -856,6 +866,10 @@ def _make_assertion_result_from_test( result=AssertionResultClass( type=AssertionResultTypeClass.SUCCESS if test_result.status == "pass" + or ( + not self.config.test_warnings_are_errors + and test_result.status == "warn" + ) else AssertionResultTypeClass.FAILURE, nativeResults=test_result.native_results, ), @@ -1007,8 +1021,8 @@ def create_platform_mces( aspects.append(upstream_lineage_class) # add view properties aspect - if node.raw_code and node.language == "sql": - view_prop_aspect = self._create_view_properties_aspect(node) + view_prop_aspect = self._create_view_properties_aspect(node) + if view_prop_aspect: aspects.append(view_prop_aspect) # emit subtype mcp @@ -1133,14 +1147,21 @@ def _create_dataset_properties_aspect( def get_external_url(self, node: DBTNode) -> Optional[str]: pass - def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass: + def _create_view_properties_aspect( + self, node: DBTNode + ) -> Optional[ViewPropertiesClass]: + view_logic = ( + node.compiled_code if self.config.use_compiled_code else node.raw_code + ) + + if node.language != "sql" or not view_logic: + return None + materialized = node.materialization in {"table", "incremental", "snapshot"} - # this function is only called when raw sql is present. assert is added to satisfy lint checks - assert node.raw_code is not None view_properties = ViewPropertiesClass( materialized=materialized, viewLanguage="SQL", - viewLogic=node.raw_code, + viewLogic=view_logic, ) return view_properties diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index 6b7c118373673b..d7f3dfb9279fbb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -1,5 +1,5 @@ import logging -from dataclasses import field +from dataclasses import dataclass, field from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union import boto3 @@ -79,12 +79,13 @@ class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase): table_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), - description="regex patterns for tables to filter in ingestion.", + description="Regex patterns for tables to filter in ingestion. The table name format is 'region.table'", ) # Custom Stateful Ingestion settings stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None +@dataclass class DynamoDBSourceReport(StaleEntityRemovalSourceReport): filtered: List[str] = field(default_factory=list) @@ -175,39 +176,30 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # traverse databases in sorted order so output is consistent for region in dynamodb_regions: - try: - # create a new dynamodb client for each region, - # it seems for one client we could only list the table of one specific region, - # the list_tables() method don't take any config that related to region - # TODO: list table returns maximum number 100, need to implement pagination here - dynamodb_client = boto3.client( - "dynamodb", - region_name=region, - aws_access_key_id=self.config.aws_access_key_id - if self.config.aws_access_key_id - else None, - aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value() - if self.config.aws_secret_access_key - else None, - ) - table_names: List[str] = dynamodb_client.list_tables()["TableNames"] - except Exception as ex: - # TODO: If regions is config input then this would be self.report.report_warning, - # we can create dynamodb client to take aws region or regions as user input - logger.info(f"exception happen in region {region}, skipping: {ex}") - continue - for table_name in sorted(table_names): - if not self.config.table_pattern.allowed(table_name): + logger.info(f"Processing region {region}") + # create a new dynamodb client for each region, + # it seems for one client we could only list the table of one specific region, + # the list_tables() method don't take any config that related to region + dynamodb_client = boto3.client( + "dynamodb", + region_name=region, + aws_access_key_id=self.config.aws_access_key_id, + aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(), + ) + + for table_name in self._list_tables(dynamodb_client): + dataset_name = f"{region}.{table_name}" + if not self.config.table_pattern.allowed(dataset_name): + logger.debug(f"skipping table: {dataset_name}") + self.report.report_dropped(dataset_name) continue + + logger.debug(f"Processing table: {dataset_name}") table_info = dynamodb_client.describe_table(TableName=table_name)[ "Table" ] account_id = table_info["TableArn"].split(":")[4] - if not self.config.table_pattern.allowed(table_name): - self.report.report_dropped(table_name) - continue platform_instance = self.config.platform_instance or account_id - dataset_name = f"{region}.{table_name}" dataset_urn = make_dataset_urn_with_platform_instance( platform=self.platform, platform_instance=platform_instance, @@ -222,7 +214,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) primary_key_dict = self.extract_primary_key_from_key_schema(table_info) table_schema = self.construct_schema_from_dynamodb( - dynamodb_client, table_name + dynamodb_client, region, table_name ) schema_metadata = self.construct_schema_metadata( table_name, @@ -254,9 +246,25 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aspect=platform_instance_aspect, ).as_workunit() + def _list_tables( + self, + dynamodb_client: BaseClient, + ) -> Iterable[str]: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/paginator/ListTables.html + try: + for page in dynamodb_client.get_paginator("list_tables").paginate(): + table_names = page.get("TableNames") + if table_names: + yield from table_names + except Exception as ex: + # TODO: If regions is config input then this would be self.report.report_warning, + # we can create dynamodb client to take aws region or regions as user input + logger.info(f"Exception happened while listing tables, skipping: {ex}") + def construct_schema_from_dynamodb( self, dynamodb_client: BaseClient, + region: str, table_name: str, ) -> Dict[str, SchemaDescription]: """ @@ -275,7 +283,7 @@ def construct_schema_from_dynamodb( The MaxItems is the total number of items to return, and PageSize is the size of each page, we are assigning same value to these two config. If MaxItems is more than PageSize then we expect MaxItems / PageSize pages in response_iterator will return """ - self.include_table_item_to_schema(dynamodb_client, table_name, schema) + self.include_table_item_to_schema(dynamodb_client, region, table_name, schema) response_iterator = paginator.paginate( TableName=table_name, PaginationConfig={ @@ -294,33 +302,38 @@ def construct_schema_from_dynamodb( def include_table_item_to_schema( self, dynamodb_client: Any, + region: str, table_name: str, schema: Dict[str, SchemaDescription], ) -> None: """ - It will look up in the config include_table_item dict to see if the current table name exists as key, + It will look up in the config include_table_item dict to see if "region.table_name" exists as key, if it exists then get the items by primary key from the table and put it to schema """ if self.config.include_table_item is None: return - if table_name not in self.config.include_table_item.keys(): + dataset_name = f"{region}.{table_name}" + if dataset_name not in self.config.include_table_item.keys(): return - primary_key_list = self.config.include_table_item.get(table_name) + primary_key_list = self.config.include_table_item.get(dataset_name) assert isinstance(primary_key_list, List) if len(primary_key_list) > MAX_PRIMARY_KEYS_SIZE: logger.info( - f"the provided primary keys list size exceeded the max size for table {table_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items" + f"the provided primary keys list size exceeded the max size for table {dataset_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items" ) primary_key_list = primary_key_list[0:MAX_PRIMARY_KEYS_SIZE] items = [] response = dynamodb_client.batch_get_item( RequestItems={table_name: {"Keys": primary_key_list}} - ).get("Responses", None) + ).get("Responses") if response is None: logger.error( f"failed to retrieve item from table {table_name} by the given key {primary_key_list}" ) return + logger.debug( + f"successfully retrieved {len(primary_key_list)} items based on supplied primary key list" + ) items = response.get(table_name) self.construct_schema_from_items(items, schema) diff --git a/metadata-ingestion/src/datahub/ingestion/source/file.py b/metadata-ingestion/src/datahub/ingestion/source/file.py index de61fa8481c589..590aa59f7b5b6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/file.py +++ b/metadata-ingestion/src/datahub/ingestion/source/file.py @@ -16,7 +16,7 @@ from pydantic.fields import Field from datahub.configuration.common import ConfigEnum, ConfigModel, ConfigurationError -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 01e083d566168d..9f6ac9dd211642 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -273,6 +273,7 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): partition: Optional[str] config: GEProfilingConfig report: SQLSourceReport + custom_sql: Optional[str] query_combiner: SQLAlchemyQueryCombiner @@ -596,16 +597,8 @@ def generate_dataset_profile( # noqa: C901 (complexity) "catch_exceptions", self.config.catch_exceptions ) - profile = DatasetProfileClass(timestampMillis=get_sys_time()) - if self.partition: - profile.partitionSpec = PartitionSpecClass(partition=self.partition) - elif self.config.limit and self.config.offset: - profile.partitionSpec = PartitionSpecClass( - type=PartitionTypeClass.QUERY, - partition=json.dumps( - dict(limit=self.config.limit, offset=self.config.offset) - ), - ) + profile = self.init_profile() + profile.fieldProfiles = [] self._get_dataset_rows(profile) @@ -740,6 +733,24 @@ def generate_dataset_profile( # noqa: C901 (complexity) self.query_combiner.flush() return profile + def init_profile(self): + profile = DatasetProfileClass(timestampMillis=get_sys_time()) + if self.partition: + profile.partitionSpec = PartitionSpecClass(partition=self.partition) + elif self.config.limit: + profile.partitionSpec = PartitionSpecClass( + type=PartitionTypeClass.QUERY, + partition=json.dumps( + dict(limit=self.config.limit, offset=self.config.offset) + ), + ) + elif self.custom_sql: + profile.partitionSpec = PartitionSpecClass( + type=PartitionTypeClass.QUERY, partition="SAMPLE" + ) + + return profile + def update_dataset_batch_use_sampling(self, profile: DatasetProfileClass) -> None: if ( self.dataset.engine.dialect.name.lower() == BIGQUERY @@ -1064,6 +1075,7 @@ def _generate_single_profile( partition, self.config, self.report, + custom_sql, query_combiner, ).generate_dataset_profile() diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 77761c529ba0b1..24a3e520d8caff 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -157,12 +157,12 @@ class GEProfilingConfig(ConfigModel): ) use_sampling: bool = Field( default=True, - description="Whether to profile column level stats on sample of table. Only BigQuery supports this. " + description="Whether to profile column level stats on sample of table. Only BigQuery and Snowflake support this. " "If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. ", ) sample_size: int = Field( - default=1000, + default=10000, description="Number of rows to be sampled from table for column level profiling." "Applicable only if `use_sampling` is set to True.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index a8c7e48f3785ce..96729f4c60c6c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -9,8 +9,8 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.ingestion.source.common.subtypes import BIAssetSubTypes from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py index 93850607e551e1..804a14b0fe1cfb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/config.py @@ -7,8 +7,8 @@ from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetLineageProviderConfigBase +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.sql.postgres import BasePostgresConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index e983734082b1dc..771636e8498a30 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -1,33 +1,19 @@ -import dataclasses import logging -from datetime import datetime -from typing import Dict, Iterable, List, Optional, Union, cast +from typing import Dict, Iterable, List, Optional, Union -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest from datahub.ingestion.source.redshift.config import RedshiftConfig from datahub.ingestion.source.redshift.redshift_schema import ( RedshiftTable, RedshiftView, ) from datahub.ingestion.source.redshift.report import RedshiftReport -from datahub.ingestion.source.sql.sql_generic_profiler import ( - GenericProfiler, - TableProfilerRequest, -) +from datahub.ingestion.source.sql.sql_generic_profiler import GenericProfiler from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler logger = logging.getLogger(__name__) -@dataclasses.dataclass -class RedshiftProfilerRequest(GEProfilerRequest): - table: Union[RedshiftTable, RedshiftView] - profile_table_level_only: bool = False - - class RedshiftProfiler(GenericProfiler): config: RedshiftConfig report: RedshiftReport @@ -63,80 +49,21 @@ def get_workunits( continue for table in tables[db].get(schema, {}): # Emit the profile work unit - profile_request = self.get_redshift_profile_request( - table, schema, db - ) + profile_request = self.get_profile_request(table, schema, db) if profile_request is not None: + self.report.report_entity_profiled(profile_request.pretty_name) profile_requests.append(profile_request) if len(profile_requests) == 0: continue - table_profile_requests = cast(List[TableProfilerRequest], profile_requests) - for request, profile in self.generate_profiles( - table_profile_requests, + + yield from self.generate_profile_workunits( + profile_requests, self.config.profiling.max_workers, db, platform=self.platform, profiler_args=self.get_profile_args(), - ): - if profile is None: - continue - request = cast(RedshiftProfilerRequest, request) - - profile.sizeInBytes = request.table.size_in_bytes - dataset_name = request.pretty_name - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) - - # We don't add to the profiler state if we only do table level profiling as it always happens - if self.state_handler and not request.profile_table_level_only: - self.state_handler.add_to_state( - dataset_urn, int(datetime.now().timestamp() * 1000) - ) - - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=profile - ).as_workunit() - - def get_redshift_profile_request( - self, - table: Union[RedshiftTable, RedshiftView], - schema_name: str, - db_name: str, - ) -> Optional[RedshiftProfilerRequest]: - skip_profiling = False - profile_table_level_only = self.config.profiling.profile_table_level_only - dataset_name = f"{db_name}.{schema_name}.{table.name}".lower() - if not self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, table.size_in_bytes, table.rows_count - ): - # Profile only table level if dataset is filtered from profiling - # due to size limits alone - if self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, 0, 0 - ): - profile_table_level_only = True - else: - skip_profiling = True - - if len(table.columns) == 0: - skip_profiling = True - - if skip_profiling: - if self.config.profiling.report_dropped_profiles: - self.report.report_dropped(f"profile of {dataset_name}") - return None + ) - self.report.report_entity_profiled(dataset_name) - logger.debug(f"Preparing profiling request for {dataset_name}") - profile_request = RedshiftProfilerRequest( - pretty_name=dataset_name, - batch_kwargs=dict(schema=schema_name, table=table.name), - table=table, - profile_table_level_only=profile_table_level_only, - ) - return profile_request + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + return f"{db_name}.{schema_name}.{table_name}".lower() diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index f1dd622efb7468..9b5296f0b9dd50 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -5,8 +5,8 @@ from pydantic.fields import Field from datahub.configuration.common import AllowDenyPattern -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.data_lake_common.config import PathSpecsConfigMixin diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 5f5e8e4bcdea38..24275dcdff34dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -1,20 +1,12 @@ -import dataclasses import logging -from datetime import datetime -from typing import Callable, Dict, Iterable, List, Optional, cast +from typing import Callable, Dict, Iterable, List, Optional from snowflake.sqlalchemy import snowdialect from sqlalchemy import create_engine, inspect from sqlalchemy.sql import sqltypes -from datahub.configuration.pattern_utils import is_schema_allowed -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.ge_data_profiler import ( - DatahubGEProfiler, - GEProfilerRequest, -) +from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report @@ -23,10 +15,8 @@ SnowflakeTable, ) from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin -from datahub.ingestion.source.sql.sql_generic_profiler import ( - GenericProfiler, - TableProfilerRequest, -) +from datahub.ingestion.source.sql.sql_generic import BaseTable +from datahub.ingestion.source.sql.sql_generic_profiler import GenericProfiler from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType @@ -35,12 +25,6 @@ logger = logging.getLogger(__name__) -@dataclasses.dataclass -class SnowflakeProfilerRequest(GEProfilerRequest): - table: SnowflakeTable - profile_table_level_only: bool = False - - class SnowflakeProfiler(GenericProfiler, SnowflakeCommonMixin): def __init__( self, @@ -65,101 +49,52 @@ def get_workunits( profile_requests = [] for schema in database.schemas: - if not is_schema_allowed( - self.config.schema_pattern, - schema.name, - database.name, - self.config.match_fully_qualified_names, - ): - continue - for table in db_tables[schema.name]: - profile_request = self.get_snowflake_profile_request( + profile_request = self.get_profile_request( table, schema.name, database.name ) if profile_request is not None: + self.report.report_entity_profiled(profile_request.pretty_name) profile_requests.append(profile_request) if len(profile_requests) == 0: return - table_profile_requests = cast(List[TableProfilerRequest], profile_requests) - - for request, profile in self.generate_profiles( - table_profile_requests, + yield from self.generate_profile_workunits( + profile_requests, self.config.profiling.max_workers, database.name, platform=self.platform, profiler_args=self.get_profile_args(), - ): - if profile is None: - continue - profile.sizeInBytes = cast( - SnowflakeProfilerRequest, request - ).table.size_in_bytes - dataset_name = request.pretty_name - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) - - # We don't add to the profiler state if we only do table level profiling as it always happens - if self.state_handler: - self.state_handler.add_to_state( - dataset_urn, int(datetime.now().timestamp() * 1000) - ) - - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=profile - ).as_workunit() + ) - def get_snowflake_profile_request( - self, - table: SnowflakeTable, - schema_name: str, - db_name: str, - ) -> Optional[SnowflakeProfilerRequest]: - skip_profiling = False - profile_table_level_only = self.config.profiling.profile_table_level_only - dataset_name = self.get_dataset_identifier(table.name, schema_name, db_name) - if not self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, table.size_in_bytes, table.rows_count + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + return self.get_dataset_identifier(table_name, schema_name, db_name) + + def get_batch_kwargs( + self, table: BaseTable, schema_name: str, db_name: str + ) -> dict: + custom_sql = None + if ( + not self.config.profiling.limit + and self.config.profiling.use_sampling + and table.rows_count + and table.rows_count > self.config.profiling.sample_size ): - # Profile only table level if dataset is filtered from profiling - # due to size limits alone - if self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, 0, 0 - ): - profile_table_level_only = True - else: - skip_profiling = True - - if len(table.columns) == 0: - skip_profiling = True - - if skip_profiling: - if self.config.profiling.report_dropped_profiles: - self.report.report_dropped(f"profile of {dataset_name}") - return None - - self.report.report_entity_profiled(dataset_name) - logger.debug(f"Preparing profiling request for {dataset_name}") - profile_request = SnowflakeProfilerRequest( - pretty_name=dataset_name, - batch_kwargs=dict( - schema=schema_name, - table=table.name, - # Lowercase/Mixedcase table names in Snowflake do not work by default. - # We need to pass `use_quoted_name=True` for such tables as mentioned here - - # https://github.com/great-expectations/great_expectations/pull/2023 - use_quoted_name=(table.name != table.name.upper()), - ), - table=table, - profile_table_level_only=profile_table_level_only, - ) - return profile_request + # GX creates a temporary table from query if query is passed as batch kwargs. + # We are using fraction-based sampling here, instead of fixed-size sampling because + # Fixed-size sampling can be slower than equivalent fraction-based sampling + # as per https://docs.snowflake.com/en/sql-reference/constructs/sample#performance-considerations + sample_pc = 100 * self.config.profiling.sample_size / table.rows_count + custom_sql = f'select * from "{db_name}"."{schema_name}"."{table.name}" TABLESAMPLE ({sample_pc:.3f})' + return { + **super().get_batch_kwargs(table, schema_name, db_name), + # Lowercase/Mixedcase table names in Snowflake do not work by default. + # We need to pass `use_quoted_name=True` for such tables as mentioned here - + # https://github.com/great-expectations/great_expectations/pull/2023 + "use_quoted_name": (table.name != table.name.upper()), + "custom_sql": custom_sql, + } def get_profiler_instance( self, db_name: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py index 1626f86b92545c..8873038079bada 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py @@ -19,9 +19,9 @@ from sqlalchemy.types import BOOLEAN, DATE, DATETIME, INTEGER import datahub.emitter.mce_builder as builder -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetLineageProviderConfigBase from datahub.configuration.time_window_config import BaseTimeWindowConfig +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.decorators import ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 8f1e04b915f3b2..677d32c8bac08f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -7,8 +7,8 @@ from pydantic import Field from datahub.configuration.common import AllowDenyPattern, ConfigModel -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import DatasetSourceConfigMixin +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 344c114d464a92..aaeee5717a867c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -1,12 +1,15 @@ import logging +from abc import abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from typing import Dict, Iterable, List, Optional, Tuple, Union, cast +from typing import Dict, Iterable, List, Optional, Union, cast from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.ge_data_profiler import ( DatahubGEProfiler, GEProfilerRequest, @@ -16,7 +19,7 @@ from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile -from datahub.metadata.schema_classes import DatasetProfileClass +from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionType from datahub.utilities.stats_collections import TopKDict, int_top_k_dict @@ -63,14 +66,14 @@ def __init__( self.platform = platform self.state_handler = state_handler - def generate_profiles( + def generate_profile_workunits( self, requests: List[TableProfilerRequest], max_workers: int, db_name: Optional[str] = None, platform: Optional[str] = None, profiler_args: Optional[Dict] = None, - ) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]: + ) -> Iterable[MetadataWorkUnit]: ge_profile_requests: List[GEProfilerRequest] = [ cast(GEProfilerRequest, request) for request in requests @@ -80,21 +83,109 @@ def generate_profiles( request for request in requests if request.profile_table_level_only ] for request in table_level_profile_requests: - profile = DatasetProfile( + table_level_profile = DatasetProfile( timestampMillis=int(datetime.now().timestamp() * 1000), columnCount=request.table.column_count, rowCount=request.table.rows_count, sizeInBytes=request.table.size_in_bytes, ) - yield (request, profile) + dataset_urn = self.dataset_urn_builder(request.pretty_name) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=table_level_profile + ).as_workunit() if not ge_profile_requests: return # Otherwise, if column level profiling is enabled, use GE profiler. ge_profiler = self.get_profiler_instance(db_name) - yield from ge_profiler.generate_profiles( + + for ge_profiler_request, profile in ge_profiler.generate_profiles( ge_profile_requests, max_workers, platform, profiler_args + ): + if profile is None: + continue + + request = cast(TableProfilerRequest, ge_profiler_request) + profile.sizeInBytes = request.table.size_in_bytes + + # If table is partitioned we profile only one partition (if nothing set then the last one) + # but for table level we can use the rows_count from the table metadata + # This way even though column statistics only reflects one partition data but the rows count + # shows the proper count. + if ( + profile.partitionSpec + and profile.partitionSpec.type != PartitionType.FULL_TABLE + ): + profile.rowCount = request.table.rows_count + + dataset_urn = self.dataset_urn_builder(request.pretty_name) + + # We don't add to the profiler state if we only do table level profiling as it always happens + if self.state_handler: + self.state_handler.add_to_state( + dataset_urn, int(datetime.now().timestamp() * 1000) + ) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=profile + ).as_workunit() + + def dataset_urn_builder(self, dataset_name: str) -> str: + return make_dataset_urn_with_platform_instance( + self.platform, + dataset_name, + self.config.platform_instance, + self.config.env, + ) + + @abstractmethod + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + pass + + def get_profile_request( + self, table: BaseTable, schema_name: str, db_name: str + ) -> Optional[TableProfilerRequest]: + skip_profiling = False + profile_table_level_only = self.config.profiling.profile_table_level_only + dataset_name = self.get_dataset_name(table.name, schema_name, db_name) + if not self.is_dataset_eligible_for_profiling( + dataset_name, table.last_altered, table.size_in_bytes, table.rows_count + ): + # Profile only table level if dataset is filtered from profiling + # due to size limits alone + if self.is_dataset_eligible_for_profiling( + dataset_name, table.last_altered, 0, 0 + ): + profile_table_level_only = True + else: + skip_profiling = True + self.report.num_tables_not_eligible_profiling[ + f"{db_name}.{schema_name}" + ] += 1 + + if table.column_count == 0: + skip_profiling = True + + if skip_profiling: + if self.config.profiling.report_dropped_profiles: + self.report.report_dropped(f"profile of {dataset_name}") + return None + + logger.debug(f"Preparing profiling request for {dataset_name}") + profile_request = TableProfilerRequest( + pretty_name=dataset_name, + batch_kwargs=self.get_batch_kwargs(table, schema_name, db_name), + table=table, + profile_table_level_only=profile_table_level_only, + ) + return profile_request + + def get_batch_kwargs( + self, table: BaseTable, schema_name: str, db_name: str + ) -> dict: + return dict( + schema=schema_name, + table=table.name, ) def get_inspectors(self) -> Iterable[Inspector]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 6214cba342622a..e347cd26d245ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -37,11 +37,11 @@ ConfigModel, ConfigurationError, ) -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated from datahub.configuration.source_common import ( DatasetLineageProviderConfigBase, DatasetSourceConfigMixin, ) +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( ContainerKey, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index a49c789a82f27c..51390873712d39 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -1,3 +1,4 @@ +import logging import os from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional @@ -21,6 +22,9 @@ OperationConfig, is_profiling_enabled, ) +from datahub.utilities.global_warning_util import add_global_warning + +logger = logging.getLogger(__name__) class UnityCatalogProfilerConfig(ConfigModel): @@ -97,9 +101,25 @@ class UnityCatalogSourceConfig( description="Name of the workspace. Default to deployment name present in workspace_url", ) + include_metastore: bool = pydantic.Field( + default=True, + description=( + "Whether to ingest the workspace's metastore as a container and include it in all urns." + " Changing this will affect the urns of all entities in the workspace." + " This will be disabled by default in the future," + " so it is recommended to set this to `False` for new ingestions." + " If you have an existing unity catalog ingestion, you'll want to avoid duplicates by soft deleting existing data." + " If stateful ingestion is enabled, running with `include_metastore: false` should be sufficient." + " Otherwise, we recommend deleting via the cli: `datahub delete --platform databricks` and re-ingesting with `include_metastore: false`." + ), + ) + ingest_data_platform_instance_aspect: Optional[bool] = pydantic.Field( default=False, - description="Option to enable/disable ingestion of the data platform instance aspect. The default data platform instance id for a dataset is workspace_name", + description=( + "Option to enable/disable ingestion of the data platform instance aspect." + " The default data platform instance id for a dataset is workspace_name" + ), ) _only_ingest_assigned_metastore_removed = pydantic_removed_field( @@ -122,6 +142,16 @@ class UnityCatalogSourceConfig( default=AllowDenyPattern.allow_all(), description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in `catalog.schema.table` format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex `Customer\\.public\\.customer.*`.", ) + + notebook_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description=( + "Regex patterns for notebooks to filter in ingestion, based on notebook *path*." + " Specify regex to match the entire notebook path in `//.../` format." + " e.g. to match all notebooks in the root Shared directory, use the regex `/Shared/.*`." + ), + ) + domain: Dict[str, AllowDenyPattern] = Field( default=dict(), description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.', @@ -151,6 +181,17 @@ class UnityCatalogSourceConfig( description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ", ) + column_lineage_column_limit: int = pydantic.Field( + default=300, + description="Limit the number of columns to get column level lineage. ", + ) + + lineage_max_workers: int = pydantic.Field( + default=5 * (os.cpu_count() or 4), + description="Number of worker threads to use for column lineage thread pool executor. Set to 1 to disable.", + hidden_from_docs=True, + ) + include_usage_statistics: bool = Field( default=True, description="Generate usage statistics.", @@ -182,3 +223,16 @@ def workspace_url_should_start_with_http_scheme(cls, workspace_url: str) -> str: "Workspace URL must start with http scheme. e.g. https://my-workspace.cloud.databricks.com" ) return workspace_url + + @pydantic.validator("include_metastore") + def include_metastore_warning(cls, v: bool) -> bool: + if v: + msg = ( + "`include_metastore` is enabled." + " This is not recommended and will be disabled by default in the future, which is a breaking change." + " All databricks urns will change if you re-ingest with this disabled." + " We recommend soft deleting all databricks data and re-ingesting with `include_metastore` set to `False`." + ) + logger.warning(msg) + add_global_warning(msg) + return v diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 2401f1c3d163c5..9bcdb200f180e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -97,14 +97,13 @@ def __init__( self.report = report def check_basic_connectivity(self) -> bool: - self._workspace_client.metastores.summary() - return True + return bool(self._workspace_client.catalogs.list()) def assigned_metastore(self) -> Metastore: response = self._workspace_client.metastores.summary() return self._create_metastore(response) - def catalogs(self, metastore: Metastore) -> Iterable[Catalog]: + def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: response = self._workspace_client.catalogs.list() if not response: logger.info("Catalogs not found") @@ -234,9 +233,7 @@ def list_lineages_by_column(self, table_name: str, column_name: str) -> dict: body={"table_name": table_name, "column_name": column_name}, ) - def table_lineage( - self, table: Table, include_entity_lineage: bool - ) -> Optional[dict]: + def table_lineage(self, table: Table, include_entity_lineage: bool) -> None: # Lineage endpoint doesn't exists on 2.1 version try: response: dict = self.list_lineages_by_table( @@ -247,7 +244,7 @@ def table_lineage( for item in response.get("upstreams") or []: if "tableInfo" in item: table_ref = TableReference.create_from_lineage( - item["tableInfo"], table.schema.catalog.metastore.id + item["tableInfo"], table.schema.catalog.metastore ) if table_ref: table.upstreams[table_ref] = {} @@ -257,34 +254,30 @@ def table_lineage( for item in response.get("downstreams") or []: for notebook in item.get("notebookInfos") or []: table.downstream_notebooks.add(notebook["notebook_id"]) - - return response except Exception as e: - logger.error(f"Error getting lineage: {e}") - return None + logger.warning( + f"Error getting lineage on table {table.ref}: {e}", exc_info=True + ) - def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None: + def get_column_lineage(self, table: Table, column_name: str) -> None: try: - table_lineage = self.table_lineage( - table, include_entity_lineage=include_entity_lineage + response: dict = self.list_lineages_by_column( + table_name=table.ref.qualified_table_name, + column_name=column_name, ) - if table_lineage: - for column in table.columns: - response: dict = self.list_lineages_by_column( - table_name=table.ref.qualified_table_name, - column_name=column.name, - ) - for item in response.get("upstream_cols", []): - table_ref = TableReference.create_from_lineage( - item, table.schema.catalog.metastore.id - ) - if table_ref: - table.upstreams.setdefault(table_ref, {}).setdefault( - column.name, [] - ).append(item["name"]) - + for item in response.get("upstream_cols") or []: + table_ref = TableReference.create_from_lineage( + item, table.schema.catalog.metastore + ) + if table_ref: + table.upstreams.setdefault(table_ref, {}).setdefault( + column_name, [] + ).append(item["name"]) except Exception as e: - logger.error(f"Error getting lineage: {e}") + logger.warning( + f"Error getting column lineage on table {table.ref}, column {column_name}: {e}", + exc_info=True, + ) @staticmethod def _escape_sequence(value: str) -> str: @@ -305,10 +298,13 @@ def _create_metastore( comment=None, ) - def _create_catalog(self, metastore: Metastore, obj: CatalogInfo) -> Catalog: + def _create_catalog( + self, metastore: Optional[Metastore], obj: CatalogInfo + ) -> Catalog: + catalog_name = self._escape_sequence(obj.name) return Catalog( name=obj.name, - id=f"{metastore.id}.{self._escape_sequence(obj.name)}", + id=f"{metastore.id}.{catalog_name}" if metastore else catalog_name, metastore=metastore, comment=obj.comment, owner=obj.owner, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index 54ac2e90d7c7e7..18ac2475b51e0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -92,7 +92,7 @@ class Metastore(CommonProperty): @dataclass class Catalog(CommonProperty): - metastore: Metastore + metastore: Optional[Metastore] owner: Optional[str] type: CatalogType @@ -130,7 +130,7 @@ class ServicePrincipal: @dataclass(frozen=True, order=True) class TableReference: - metastore: str + metastore: Optional[str] catalog: str schema: str table: str @@ -138,17 +138,21 @@ class TableReference: @classmethod def create(cls, table: "Table") -> "TableReference": return cls( - table.schema.catalog.metastore.id, + table.schema.catalog.metastore.id + if table.schema.catalog.metastore + else None, table.schema.catalog.name, table.schema.name, table.name, ) @classmethod - def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReference"]: + def create_from_lineage( + cls, d: dict, metastore: Optional[Metastore] + ) -> Optional["TableReference"]: try: return cls( - metastore, + metastore.id if metastore else None, d["catalog_name"], d["schema_name"], d.get("table_name", d["name"]), # column vs table query output @@ -158,7 +162,10 @@ def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReferenc return None def __str__(self) -> str: - return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}" + if self.metastore: + return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}" + else: + return self.qualified_table_name @property def qualified_table_name(self) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 808172a136bb32..fa61571fa92cbb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -18,6 +18,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile") notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook") + num_column_lineage_skipped_column_count: int = 0 + num_queries: int = 0 num_queries_dropped_parse_failure: int = 0 num_queries_missing_table: int = 0 # Can be due to pattern filter diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index f2da1aece9fd47..27c1f341aa84d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -1,6 +1,7 @@ import logging import re import time +from concurrent.futures import ThreadPoolExecutor from datetime import timedelta from typing import Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin @@ -16,10 +17,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( CatalogKey, + CatalogKeyWithMetastore, ContainerKey, MetastoreKey, NotebookKey, UnitySchemaKey, + UnitySchemaKeyWithMetastore, add_dataset_to_container, gen_containers, ) @@ -127,7 +130,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): config: UnityCatalogSourceConfig unity_catalog_api_proxy: UnityCatalogApiProxy platform: str = "databricks" - platform_instance_name: str + platform_instance_name: Optional[str] def get_report(self) -> UnityCatalogReport: return self.report @@ -146,11 +149,13 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): self.external_url_base = urljoin(self.config.workspace_url, "/explore/data") # Determine the platform_instance_name - self.platform_instance_name = ( - config.workspace_name - if config.workspace_name is not None - else config.workspace_url.split("//")[1].split(".")[0] - ) + self.platform_instance_name = self.config.platform_instance + if self.config.include_metastore: + self.platform_instance_name = ( + config.workspace_name + if config.workspace_name is not None + else config.workspace_url.split("//")[1].split(".")[0] + ) if self.config.domain: self.domain_registry = DomainRegistry( @@ -247,10 +252,14 @@ def build_service_principal_map(self) -> None: def process_notebooks(self) -> Iterable[MetadataWorkUnit]: for notebook in self.unity_catalog_api_proxy.workspace_notebooks(): + if not self.config.notebook_pattern.allowed(notebook.path): + self.report.notebooks.dropped(notebook.path) + continue + self.notebooks[str(notebook.id)] = notebook - yield from self._gen_notebook_aspects(notebook) + yield from self._gen_notebook_workunits(notebook) - def _gen_notebook_aspects(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]: + def _gen_notebook_workunits(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]: mcps = MetadataChangeProposalWrapper.construct_many( entityUrn=self.gen_notebook_urn(notebook), aspects=[ @@ -270,7 +279,7 @@ def _gen_notebook_aspects(self, notebook: Notebook) -> Iterable[MetadataWorkUnit ), SubTypesClass(typeNames=[DatasetSubTypes.NOTEBOOK]), BrowsePathsClass(paths=notebook.path.split("/")), - # TODO: Add DPI aspect + self._create_data_platform_instance_aspect(), ], ) for mcp in mcps: @@ -296,13 +305,17 @@ def _gen_notebook_lineage(self, notebook: Notebook) -> Optional[MetadataWorkUnit ).as_workunit() def process_metastores(self) -> Iterable[MetadataWorkUnit]: - metastore = self.unity_catalog_api_proxy.assigned_metastore() - yield from self.gen_metastore_containers(metastore) + metastore: Optional[Metastore] = None + if self.config.include_metastore: + metastore = self.unity_catalog_api_proxy.assigned_metastore() + yield from self.gen_metastore_containers(metastore) yield from self.process_catalogs(metastore) + if metastore and self.config.include_metastore: + self.report.metastores.processed(metastore.id) - self.report.metastores.processed(metastore.id) - - def process_catalogs(self, metastore: Metastore) -> Iterable[MetadataWorkUnit]: + def process_catalogs( + self, metastore: Optional[Metastore] + ) -> Iterable[MetadataWorkUnit]: for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore): if not self.config.catalog_pattern.allowed(catalog.id): self.report.catalogs.dropped(catalog.id) @@ -353,17 +366,9 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn operation = self._create_table_operation_aspect(table) domain = self._get_domain_aspect(dataset_name=table.ref.qualified_table_name) ownership = self._create_table_ownership_aspect(table) - data_platform_instance = self._create_data_platform_instance_aspect(table) + data_platform_instance = self._create_data_platform_instance_aspect() - if self.config.include_column_lineage: - self.unity_catalog_api_proxy.get_column_lineage( - table, include_entity_lineage=self.config.include_notebooks - ) - elif self.config.include_table_lineage: - self.unity_catalog_api_proxy.table_lineage( - table, include_entity_lineage=self.config.include_notebooks - ) - lineage = self._generate_lineage_aspect(dataset_urn, table) + lineage = self.ingest_lineage(table) if self.config.include_notebooks: for notebook_id in table.downstream_notebooks: @@ -389,6 +394,28 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn ) ] + def ingest_lineage(self, table: Table) -> Optional[UpstreamLineageClass]: + if self.config.include_table_lineage: + self.unity_catalog_api_proxy.table_lineage( + table, include_entity_lineage=self.config.include_notebooks + ) + + if self.config.include_column_lineage and table.upstreams: + if len(table.columns) > self.config.column_lineage_column_limit: + self.report.num_column_lineage_skipped_column_count += 1 + + with ThreadPoolExecutor( + max_workers=self.config.lineage_max_workers + ) as executor: + for column in table.columns[: self.config.column_lineage_column_limit]: + executor.submit( + self.unity_catalog_api_proxy.get_column_lineage, + table, + column.name, + ) + + return self._generate_lineage_aspect(self.gen_dataset_urn(table.ref), table) + def _generate_lineage_aspect( self, dataset_urn: str, table: Table ) -> Optional[UpstreamLineageClass]: @@ -503,27 +530,37 @@ def gen_metastore_containers( def gen_catalog_containers(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(catalog.name) - metastore_container_key = self.gen_metastore_key(catalog.metastore) catalog_container_key = self.gen_catalog_key(catalog) yield from gen_containers( container_key=catalog_container_key, name=catalog.name, sub_types=[DatasetContainerSubTypes.CATALOG], domain_urn=domain_urn, - parent_container_key=metastore_container_key, + parent_container_key=self.gen_metastore_key(catalog.metastore) + if self.config.include_metastore and catalog.metastore + else None, description=catalog.comment, owner_urn=self.get_owner_urn(catalog.owner), external_url=f"{self.external_url_base}/{catalog.name}", ) def gen_schema_key(self, schema: Schema) -> ContainerKey: - return UnitySchemaKey( - unity_schema=schema.name, - platform=self.platform, - instance=self.config.platform_instance, - catalog=schema.catalog.name, - metastore=schema.catalog.metastore.name, - ) + if self.config.include_metastore: + assert schema.catalog.metastore + return UnitySchemaKeyWithMetastore( + unity_schema=schema.name, + platform=self.platform, + instance=self.config.platform_instance, + catalog=schema.catalog.name, + metastore=schema.catalog.metastore.name, + ) + else: + return UnitySchemaKey( + unity_schema=schema.name, + platform=self.platform, + instance=self.config.platform_instance, + catalog=schema.catalog.name, + ) def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: return MetastoreKey( @@ -532,13 +569,21 @@ def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: instance=self.config.platform_instance, ) - def gen_catalog_key(self, catalog: Catalog) -> CatalogKey: - return CatalogKey( - catalog=catalog.name, - metastore=catalog.metastore.name, - platform=self.platform, - instance=self.config.platform_instance, - ) + def gen_catalog_key(self, catalog: Catalog) -> ContainerKey: + if self.config.include_metastore: + assert catalog.metastore + return CatalogKeyWithMetastore( + catalog=catalog.name, + metastore=catalog.metastore.name, + platform=self.platform, + instance=self.config.platform_instance, + ) + else: + return CatalogKey( + catalog=catalog.name, + platform=self.platform, + instance=self.config.platform_instance, + ) def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: domain_urn: Optional[str] = None @@ -643,15 +688,16 @@ def _create_table_ownership_aspect(self, table: Table) -> Optional[OwnershipClas return None def _create_data_platform_instance_aspect( - self, table: Table + self, ) -> Optional[DataPlatformInstanceClass]: - # Only ingest the DPI aspect if the flag is true if self.config.ingest_data_platform_instance_aspect: return DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), instance=make_dataplatform_instance_urn( self.platform, self.platform_instance_name - ), + ) + if self.platform_instance_name + else None, ) return None diff --git a/metadata-ingestion/tests/unit/test_pydantic_validators.py b/metadata-ingestion/tests/unit/test_pydantic_validators.py index 07d86043a35bf8..3e9ec6cbaf3579 100644 --- a/metadata-ingestion/tests/unit/test_pydantic_validators.py +++ b/metadata-ingestion/tests/unit/test_pydantic_validators.py @@ -4,7 +4,7 @@ from pydantic import ValidationError from datahub.configuration.common import ConfigModel -from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated +from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.utilities.global_warning_util import get_global_warnings diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 4be31b2b6bb151..4dfd96ac75c6ce 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -276,6 +276,9 @@ bootstrap: enabled: ${UPGRADE_DEFAULT_BROWSE_PATHS_ENABLED:false} # enable to run the upgrade to migrate legacy default browse paths to new ones backfillBrowsePathsV2: enabled: ${BACKFILL_BROWSE_PATHS_V2:false} # Enables running the backfill of browsePathsV2 upgrade step. There are concerns about the load of this step so hiding it behind a flag. Deprecating in favor of running through SystemUpdate + servlets: + waitTimeout: ${BOOTSTRAP_SERVLETS_WAITTIMEOUT:60} # Total waiting time in seconds for servlets to initialize + systemUpdate: initialBackOffMs: ${BOOTSTRAP_SYSTEM_UPDATE_INITIAL_BACK_OFF_MILLIS:5000} diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java index 980cafaceae27e..032b934a7ba87b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java @@ -15,15 +15,18 @@ import org.apache.http.impl.client.HttpClients; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.web.context.WebApplicationContext; +import org.springframework.context.annotation.Configuration; /** * Responsible for coordinating starting steps that happen before the application starts up. */ +@Configuration @Slf4j @Component public class OnBootApplicationListener { @@ -44,6 +47,8 @@ public class OnBootApplicationListener { @Qualifier("configurationProvider") private ConfigurationProvider provider; + @Value("${bootstrap.servlets.waitTimeout}") + private int _servletsWaitTimeout; @EventListener(ContextRefreshedEvent.class) public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) { @@ -62,7 +67,7 @@ public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) { public Runnable isSchemaRegistryAPIServletReady() { return () -> { final HttpGet request = new HttpGet(provider.getKafka().getSchemaRegistry().getUrl()); - int timeouts = 30; + int timeouts = _servletsWaitTimeout; boolean openAPIServeletReady = false; while (!openAPIServeletReady && timeouts > 0) { try { @@ -79,7 +84,7 @@ public Runnable isSchemaRegistryAPIServletReady() { timeouts--; } if (!openAPIServeletReady) { - log.error("Failed to bootstrap DataHub, OpenAPI servlet was not ready after 30 seconds"); + log.error("Failed to bootstrap DataHub, OpenAPI servlet was not ready after {} seconds", timeouts); System.exit(1); } else { _bootstrapManager.start();