diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 94eb69a2ed8273..2e584b8ce0d24c 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -71,6 +71,7 @@ enabled = True # default | capture_ownership_info | true | Extract DAG ownership. | | capture_tags_info | true | Extract DAG tags. | | capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. | +| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | | enable_extractors | true | Enable automatic lineage extraction. | | disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. | | log_level | _no change_ | [debug] Set the log level for the plugin. | @@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default | capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | | capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | -| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. - | +| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | +| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | +| | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 48d462b85702af..f2cd647837d5db 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel): # If true, the tags field of the DAG will be captured as DataHub tags. capture_tags_info: bool = True + # If true (default), we'll materialize and un-soft-delete any urns + # referenced by inlets or outlets. + materialize_iolets: bool = True + capture_executions: bool = False enable_extractors: bool = True @@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig: "datahub", "capture_ownership_info", fallback=True ) capture_executions = conf.get("datahub", "capture_executions", fallback=True) + materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True) enable_extractors = conf.get("datahub", "enable_extractors", fallback=True) log_level = conf.get("datahub", "log_level", fallback=None) debug_emitter = conf.get("datahub", "debug_emitter", fallback=False) @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig: capture_ownership_info=capture_ownership_info, capture_tags_info=capture_tags_info, capture_executions=capture_executions, + materialize_iolets=materialize_iolets, enable_extractors=enable_extractors, log_level=log_level, debug_emitter=debug_emitter, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index ace0d035c3472e..ac5dc00e0e639f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -389,7 +389,10 @@ def on_task_instance_running( # TODO: Add handling for Airflow mapped tasks using task_instance.map_index - datajob.emit(self.emitter, callback=self._make_emit_callback()) + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) logger.debug(f"Emitted DataHub Datajob start: {datajob}") if self.config.capture_executions: @@ -430,7 +433,10 @@ def on_task_instance_finish( # Add lineage info. self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) - datajob.emit(self.emitter, callback=self._make_emit_callback()) + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}") if self.config.capture_executions: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index 7b8d719712d107..628300d45d2fdc 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status): ) task.log.info(f"Emitting Datahub Datajob: {datajob}") - datajob.emit(emitter, callback=_make_emit_callback(task.log)) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp, _make_emit_callback(task.log)) if config.capture_executions: dpi = AirflowGenerator.run_datajob( @@ -200,7 +201,8 @@ def datahub_pre_execution(context): ) task.log.info(f"Emitting Datahub dataJob {datajob}") - datajob.emit(emitter, callback=_make_emit_callback(task.log)) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp, _make_emit_callback(task.log)) if config.capture_executions: dpi = AirflowGenerator.run_datajob( diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py index daf45e1cd83f83..43e62c9f65f45c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py @@ -59,7 +59,8 @@ def send_lineage_to_datahub( entities_to_datajob_urn_list([let.urn for let in inlets]) ) - datajob.emit(emitter) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp) operator.log.info(f"Emitted from Lineage: {datajob}") if config.capture_executions: diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md index 22f3f9cb1d276f..47d8f2e850868c 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md @@ -13,7 +13,7 @@ * Ownership of or `SELECT` privilege on any tables and views you want to ingest * [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html) * [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html) - + To ingest legacy hive_metastore catalog (`include_hive_metastore` - disabled by default), your service principal must have all of the following: + + To ingest legacy hive_metastore catalog (`include_hive_metastore` - enabled by default), your service principal must have all of the following: * `READ_METADATA` and `USAGE` privilege on `hive_metastore` catalog * `READ_METADATA` and `USAGE` privilege on schemas you want to ingest * `READ_METADATA` and `USAGE` privilege on tables and views you want to ingest diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 9def0349fbf821..c0da64e6a68aa0 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -223,7 +223,7 @@ iceberg_common = { # Iceberg Python SDK - "pyiceberg", + "pyiceberg~=0.4", # We currently pin to pydantic v1, since we only test against pydantic v1 in CI. # However, we should remove this once we fix compatibility with newer versions # of pyiceberg, which depend on pydantic v2. diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index a8753509ea0192..23a75745698d90 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -1,4 +1,3 @@ -import json import logging import re from dataclasses import dataclass @@ -89,8 +88,10 @@ dashboard_graphql_query, database_tables_graphql_query, embedded_datasource_graphql_query, + get_filter_pages, get_overridden_info, get_unique_custom_sql, + make_filter, make_fine_grained_lineage_class, make_upstream_class, published_datasource_graphql_query, @@ -796,6 +797,7 @@ def get_connection_object_page( logger.debug( f"Query {connection_type} to get {count} objects with offset {offset}" + f" and filter {query_filter}" ) try: assert self.server is not None @@ -862,45 +864,48 @@ def get_connection_objects( self, query: str, connection_type: str, - query_filter: str, + query_filter: dict, page_size_override: Optional[int] = None, ) -> Iterable[dict]: # Calls the get_connection_object_page function to get the objects, # and automatically handles pagination. - page_size = page_size_override or self.config.page_size - total_count = page_size - has_next_page = 1 - offset = 0 - while has_next_page: - count = ( - page_size if offset + page_size < total_count else total_count - offset - ) - ( - connection_objects, - total_count, - has_next_page, - ) = self.get_connection_object_page( - query, - connection_type, - query_filter, - count, - offset, - ) + filter_pages = get_filter_pages(query_filter, page_size) + + for filter_page in filter_pages: + total_count = page_size + has_next_page = 1 + offset = 0 + while has_next_page: + count = ( + page_size + if offset + page_size < total_count + else total_count - offset + ) + ( + connection_objects, + total_count, + has_next_page, + ) = self.get_connection_object_page( + query, + connection_type, + make_filter(filter_page), + count, + offset, + ) - offset += count + offset += count - for obj in connection_objects.get(c.NODES) or []: - yield obj + for obj in connection_objects.get(c.NODES) or []: + yield obj def emit_workbooks(self) -> Iterable[MetadataWorkUnit]: if self.tableau_project_registry: project_names: List[str] = [ project.name for project in self.tableau_project_registry.values() ] - project_names_str: str = json.dumps(project_names) - projects = f"{c.PROJECT_NAME_WITH_IN}: {project_names_str}" + projects = {c.PROJECT_NAME_WITH_IN: project_names} for workbook in self.get_connection_objects( workbook_graphql_query, @@ -1331,9 +1336,7 @@ def get_transform_operation(self, field: dict) -> str: return op def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: - custom_sql_filter = ( - f"{c.ID_WITH_IN}: {json.dumps(self.custom_sql_ids_being_used)}" - ) + custom_sql_filter = {c.ID_WITH_IN: self.custom_sql_ids_being_used} custom_sql_connection = list( self.get_connection_objects( @@ -2014,9 +2017,7 @@ def _get_datasource_container_key( return container_key def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: - datasource_filter = ( - f"{c.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}" - ) + datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used} for datasource in self.get_connection_objects( published_datasource_graphql_query, @@ -2032,7 +2033,9 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: if tbl.id: tableau_database_table_id_to_urn_map[tbl.id] = urn - tables_filter = f"{c.ID_WITH_IN}: {json.dumps(list(tableau_database_table_id_to_urn_map.keys()))}" + tables_filter = { + c.ID_WITH_IN: list(tableau_database_table_id_to_urn_map.keys()) + } # Emmitting tables that came from Tableau metadata for tableau_table in self.get_connection_objects( @@ -2216,7 +2219,7 @@ def _get_chart_stat_wu( ).as_workunit() def emit_sheets(self) -> Iterable[MetadataWorkUnit]: - sheets_filter = f"{c.ID_WITH_IN}: {json.dumps(self.sheet_ids)}" + sheets_filter = {c.ID_WITH_IN: self.sheet_ids} for sheet in self.get_connection_objects( sheet_graphql_query, @@ -2502,7 +2505,7 @@ def new_work_unit(self, mcp: MetadataChangeProposalWrapper) -> MetadataWorkUnit: ) def emit_dashboards(self) -> Iterable[MetadataWorkUnit]: - dashboards_filter = f"{c.ID_WITH_IN}: {json.dumps(self.dashboard_ids)}" + dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids} for dashboard in self.get_connection_objects( dashboard_graphql_query, @@ -2638,9 +2641,7 @@ def get_browse_paths_aspect( return browse_paths def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]: - datasource_filter = ( - f"{c.ID_WITH_IN}: {json.dumps(self.embedded_datasource_ids_being_used)}" - ) + datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used} for datasource in self.get_connection_objects( embedded_datasource_graphql_query, diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 881f6c63e094d0..98536472c5f619 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -1,4 +1,5 @@ import html +import json import logging from dataclasses import dataclass from functools import lru_cache @@ -851,6 +852,15 @@ def clean_query(query: str) -> str: return query +def make_filter(filter_dict: dict) -> str: + filter = "" + for key, value in filter_dict.items(): + if filter: + filter += ", " + filter += f"{key}: {json.dumps(value)}" + return filter + + def query_metadata( server: Server, main_query: str, @@ -877,3 +887,32 @@ def query_metadata( main_query=main_query, ) return server.metadata.query(query) + + +def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]: + filter_pages = [query_filter] + # If this is primary id filter so we can use divide this query list into + # multiple requests each with smaller filter list (of order page_size). + # It is observed in the past that if list of primary ids grow beyond + # a few ten thousands then tableau server responds with empty response + # causing below error: + # tableauserverclient.server.endpoint.exceptions.NonXMLResponseError: b'' + if ( + len(query_filter.keys()) == 1 + and query_filter.get(c.ID_WITH_IN) + and isinstance(query_filter[c.ID_WITH_IN], list) + and len(query_filter[c.ID_WITH_IN]) > 100 * page_size + ): + ids = query_filter[c.ID_WITH_IN] + filter_pages = [ + { + c.ID_WITH_IN: ids[ + start : ( + start + page_size if start + page_size < len(ids) else len(ids) + ) + ] + } + for start in range(0, len(ids), page_size) + ] + + return filter_pages diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/connection_test.py b/metadata-ingestion/src/datahub/ingestion/source/unity/connection_test.py index da8a6bdac2322f..45d74edd9c8e61 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/connection_test.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/connection_test.py @@ -11,8 +11,8 @@ class UnityCatalogConnectionTest: - def __init__(self, config_dict: dict): - self.config = UnityCatalogSourceConfig.parse_obj_allow_extras(config_dict) + def __init__(self, config: UnityCatalogSourceConfig): + self.config = config self.report = UnityCatalogReport() self.proxy = UnityCatalogApiProxy( self.config.workspace_url, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index d58a12b9cbb0be..f3aeb34002f3f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -236,7 +236,14 @@ def init_hive_metastore_proxy(self): @staticmethod def test_connection(config_dict: dict) -> TestConnectionReport: - return UnityCatalogConnectionTest(config_dict).get_connection_test() + try: + config = UnityCatalogSourceConfig.parse_obj_allow_extras(config_dict) + except Exception as e: + return TestConnectionReport( + internal_failure=True, + internal_failure_reason=f"Failed to parse config due to {e}", + ) + return UnityCatalogConnectionTest(config).get_connection_test() @classmethod def create(cls, config_dict, ctx): diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index f22e15da45df24..59e7f582da5e84 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -401,11 +401,15 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock): output_file_name = "unity_catalog_mcps.json" - with patch("databricks.sdk.WorkspaceClient") as WorkspaceClient, patch.object( + with patch( + "datahub.ingestion.source.unity.proxy.WorkspaceClient" + ) as mock_client, patch.object( HiveMetastoreProxy, "get_inspector" - ) as get_inspector, patch.object(HiveMetastoreProxy, "_execute_sql") as execute_sql: + ) as get_inspector, patch.object( + HiveMetastoreProxy, "_execute_sql" + ) as execute_sql: workspace_client: mock.MagicMock = mock.MagicMock() - WorkspaceClient.return_value = workspace_client + mock_client.return_value = workspace_client register_mock_data(workspace_client) inspector = mock.MagicMock() diff --git a/metadata-ingestion/tests/unit/test_tableau_source.py b/metadata-ingestion/tests/unit/test_tableau_source.py index 9a2b1dd408d803..41117914d552ef 100644 --- a/metadata-ingestion/tests/unit/test_tableau_source.py +++ b/metadata-ingestion/tests/unit/test_tableau_source.py @@ -1,6 +1,8 @@ import pytest +import datahub.ingestion.source.tableau_constant as c from datahub.ingestion.source.tableau import TableauSource +from datahub.ingestion.source.tableau_common import get_filter_pages, make_filter def test_tableau_source_unescapes_lt(): @@ -121,3 +123,48 @@ def test_tableau_source_cleanups_tableau_parameters_in_udfs(p): TableauSource._clean_tableau_query_parameters(f"select myudf({p}) from t") == "select myudf(1) from t" ) + + +def test_make_id_filter(): + ids = [i for i in range(1, 6)] + filter_dict = {c.ID_WITH_IN: ids} + assert make_filter(filter_dict) == f"{c.ID_WITH_IN}: [1, 2, 3, 4, 5]" + + +def test_make_project_filter(): + projects = ["x", "y", "z"] + filter_dict = {c.PROJECT_NAME_WITH_IN: projects} + assert make_filter(filter_dict) == f'{c.PROJECT_NAME_WITH_IN}: ["x", "y", "z"]' + + +def test_make_multiple_filters(): + ids = [i for i in range(1, 6)] + projects = ["x", "y", "z"] + filter_dict = {c.ID_WITH_IN: ids, c.PROJECT_NAME_WITH_IN: projects} + assert ( + make_filter(filter_dict) + == f'{c.ID_WITH_IN}: [1, 2, 3, 4, 5], {c.PROJECT_NAME_WITH_IN}: ["x", "y", "z"]' + ) + + +def test_get_filter_pages_simple(): + ids = [i for i in range(5)] + filter_dict = {c.ID_WITH_IN: ids} + assert get_filter_pages(filter_dict, 10) == [filter_dict] + + +def test_get_filter_pages_non_id_large_filter_passthrough(): + projects = [f"project{i}" for i in range(20000)] + filter_dict = {c.PROJECT_NAME_WITH_IN: projects} + assert get_filter_pages(filter_dict, 10) == [filter_dict] + + +def test_get_filter_pages_id_filter_splits_into_multiple_filters(): + page_size = 10 + num_ids = 20000 + ids = [f"id_{i}" for i in range(num_ids)] + filter_dict = {c.ID_WITH_IN: ids} + assert get_filter_pages(filter_dict, page_size) == [ + {c.ID_WITH_IN: filter_dict[c.ID_WITH_IN][i : i + page_size]} + for i in range(0, num_ids, page_size) + ] diff --git a/metadata-ingestion/tests/unit/test_unity_catalog_config.py b/metadata-ingestion/tests/unit/test_unity_catalog_config.py index 6b97d06b7ff93e..ba554e966669ec 100644 --- a/metadata-ingestion/tests/unit/test_unity_catalog_config.py +++ b/metadata-ingestion/tests/unit/test_unity_catalog_config.py @@ -4,6 +4,7 @@ from freezegun import freeze_time from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig +from datahub.ingestion.source.unity.source import UnityCatalogSource FROZEN_TIME = datetime.fromisoformat("2023-01-01 00:00:00+00:00") @@ -130,6 +131,17 @@ def test_warehouse_id_must_be_set_if_include_hive_metastore_is_true(): ) +def test_warehouse_id_must_be_present_test_connection(): + config_dict = { + "token": "token", + "workspace_url": "https://XXXXXXXXXXXXXXXXXXXXX", + "include_hive_metastore": True, + } + report = UnityCatalogSource.test_connection(config_dict) + assert report.internal_failure + print(report.internal_failure_reason) + + def test_set_profiling_warehouse_id_from_global(): config = UnityCatalogSourceConfig.parse_obj( {