Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Apr 18, 2024
2 parents 15c0885 + 76b5783 commit 8db7cc2
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 53 deletions.
6 changes: 4 additions & 2 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ enabled = True # default
| capture_ownership_info | true | Extract DAG ownership. |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| enable_extractors | true | Enable automatic lineage extraction. |
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
| log_level | _no change_ | [debug] Set the log level for the plugin. |
Expand Down Expand Up @@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel):
# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True

# If true (default), we'll materialize and un-soft-delete any urns
# referenced by inlets or outlets.
materialize_iolets: bool = True

capture_executions: bool = False

enable_extractors: bool = True
Expand Down Expand Up @@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig:
"datahub", "capture_ownership_info", fallback=True
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
log_level = conf.get("datahub", "log_level", fallback=None)
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
Expand All @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
capture_ownership_info=capture_ownership_info,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,
enable_extractors=enable_extractors,
log_level=log_level,
debug_emitter=debug_emitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ def on_task_instance_running(

# TODO: Add handling for Airflow mapped tasks using task_instance.map_index

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob start: {datajob}")

if self.config.capture_executions:
Expand Down Expand Up @@ -430,7 +433,10 @@ def on_task_instance_finish(
# Add lineage info.
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")

if self.config.capture_executions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status):
)

task.log.info(f"Emitting Datahub Datajob: {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down Expand Up @@ -200,7 +201,8 @@ def datahub_pre_execution(context):
)

task.log.info(f"Emitting Datahub dataJob {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def send_lineage_to_datahub(
entities_to_datajob_urn_list([let.urn for let in inlets])
)

datajob.emit(emitter)
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp)
operator.log.info(f"Emitted from Lineage: {datajob}")

if config.capture_executions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Ownership of or `SELECT` privilege on any tables and views you want to ingest
* [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html)
* [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
+ To ingest legacy hive_metastore catalog (`include_hive_metastore` - disabled by default), your service principal must have all of the following:
+ To ingest legacy hive_metastore catalog (`include_hive_metastore` - enabled by default), your service principal must have all of the following:
* `READ_METADATA` and `USAGE` privilege on `hive_metastore` catalog
* `READ_METADATA` and `USAGE` privilege on schemas you want to ingest
* `READ_METADATA` and `USAGE` privilege on tables and views you want to ingest
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@

iceberg_common = {
# Iceberg Python SDK
"pyiceberg",
"pyiceberg~=0.4",
# We currently pin to pydantic v1, since we only test against pydantic v1 in CI.
# However, we should remove this once we fix compatibility with newer versions
# of pyiceberg, which depend on pydantic v2.
Expand Down
77 changes: 39 additions & 38 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging
import re
from dataclasses import dataclass
Expand Down Expand Up @@ -89,8 +88,10 @@
dashboard_graphql_query,
database_tables_graphql_query,
embedded_datasource_graphql_query,
get_filter_pages,
get_overridden_info,
get_unique_custom_sql,
make_filter,
make_fine_grained_lineage_class,
make_upstream_class,
published_datasource_graphql_query,
Expand Down Expand Up @@ -796,6 +797,7 @@ def get_connection_object_page(

logger.debug(
f"Query {connection_type} to get {count} objects with offset {offset}"
f" and filter {query_filter}"
)
try:
assert self.server is not None
Expand Down Expand Up @@ -862,45 +864,48 @@ def get_connection_objects(
self,
query: str,
connection_type: str,
query_filter: str,
query_filter: dict,
page_size_override: Optional[int] = None,
) -> Iterable[dict]:
# Calls the get_connection_object_page function to get the objects,
# and automatically handles pagination.

page_size = page_size_override or self.config.page_size

total_count = page_size
has_next_page = 1
offset = 0
while has_next_page:
count = (
page_size if offset + page_size < total_count else total_count - offset
)
(
connection_objects,
total_count,
has_next_page,
) = self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
)
filter_pages = get_filter_pages(query_filter, page_size)

for filter_page in filter_pages:
total_count = page_size
has_next_page = 1
offset = 0
while has_next_page:
count = (
page_size
if offset + page_size < total_count
else total_count - offset
)
(
connection_objects,
total_count,
has_next_page,
) = self.get_connection_object_page(
query,
connection_type,
make_filter(filter_page),
count,
offset,
)

offset += count
offset += count

for obj in connection_objects.get(c.NODES) or []:
yield obj
for obj in connection_objects.get(c.NODES) or []:
yield obj

def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
if self.tableau_project_registry:
project_names: List[str] = [
project.name for project in self.tableau_project_registry.values()
]
project_names_str: str = json.dumps(project_names)
projects = f"{c.PROJECT_NAME_WITH_IN}: {project_names_str}"
projects = {c.PROJECT_NAME_WITH_IN: project_names}

for workbook in self.get_connection_objects(
workbook_graphql_query,
Expand Down Expand Up @@ -1331,9 +1336,7 @@ def get_transform_operation(self, field: dict) -> str:
return op

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
custom_sql_filter = (
f"{c.ID_WITH_IN}: {json.dumps(self.custom_sql_ids_being_used)}"
)
custom_sql_filter = {c.ID_WITH_IN: self.custom_sql_ids_being_used}

custom_sql_connection = list(
self.get_connection_objects(
Expand Down Expand Up @@ -2014,9 +2017,7 @@ def _get_datasource_container_key(
return container_key

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = (
f"{c.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}"
)
datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used}

for datasource in self.get_connection_objects(
published_datasource_graphql_query,
Expand All @@ -2032,7 +2033,9 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:
if tbl.id:
tableau_database_table_id_to_urn_map[tbl.id] = urn

tables_filter = f"{c.ID_WITH_IN}: {json.dumps(list(tableau_database_table_id_to_urn_map.keys()))}"
tables_filter = {
c.ID_WITH_IN: list(tableau_database_table_id_to_urn_map.keys())
}

# Emmitting tables that came from Tableau metadata
for tableau_table in self.get_connection_objects(
Expand Down Expand Up @@ -2216,7 +2219,7 @@ def _get_chart_stat_wu(
).as_workunit()

def emit_sheets(self) -> Iterable[MetadataWorkUnit]:
sheets_filter = f"{c.ID_WITH_IN}: {json.dumps(self.sheet_ids)}"
sheets_filter = {c.ID_WITH_IN: self.sheet_ids}

for sheet in self.get_connection_objects(
sheet_graphql_query,
Expand Down Expand Up @@ -2502,7 +2505,7 @@ def new_work_unit(self, mcp: MetadataChangeProposalWrapper) -> MetadataWorkUnit:
)

def emit_dashboards(self) -> Iterable[MetadataWorkUnit]:
dashboards_filter = f"{c.ID_WITH_IN}: {json.dumps(self.dashboard_ids)}"
dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids}

for dashboard in self.get_connection_objects(
dashboard_graphql_query,
Expand Down Expand Up @@ -2638,9 +2641,7 @@ def get_browse_paths_aspect(
return browse_paths

def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = (
f"{c.ID_WITH_IN}: {json.dumps(self.embedded_datasource_ids_being_used)}"
)
datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}

for datasource in self.get_connection_objects(
embedded_datasource_graphql_query,
Expand Down
39 changes: 39 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import html
import json
import logging
from dataclasses import dataclass
from functools import lru_cache
Expand Down Expand Up @@ -851,6 +852,15 @@ def clean_query(query: str) -> str:
return query


def make_filter(filter_dict: dict) -> str:
filter = ""
for key, value in filter_dict.items():
if filter:
filter += ", "
filter += f"{key}: {json.dumps(value)}"
return filter


def query_metadata(
server: Server,
main_query: str,
Expand All @@ -877,3 +887,32 @@ def query_metadata(
main_query=main_query,
)
return server.metadata.query(query)


def get_filter_pages(query_filter: dict, page_size: int) -> List[dict]:
filter_pages = [query_filter]
# If this is primary id filter so we can use divide this query list into
# multiple requests each with smaller filter list (of order page_size).
# It is observed in the past that if list of primary ids grow beyond
# a few ten thousands then tableau server responds with empty response
# causing below error:
# tableauserverclient.server.endpoint.exceptions.NonXMLResponseError: b''
if (
len(query_filter.keys()) == 1
and query_filter.get(c.ID_WITH_IN)
and isinstance(query_filter[c.ID_WITH_IN], list)
and len(query_filter[c.ID_WITH_IN]) > 100 * page_size
):
ids = query_filter[c.ID_WITH_IN]
filter_pages = [
{
c.ID_WITH_IN: ids[
start : (
start + page_size if start + page_size < len(ids) else len(ids)
)
]
}
for start in range(0, len(ids), page_size)
]

return filter_pages
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


class UnityCatalogConnectionTest:
def __init__(self, config_dict: dict):
self.config = UnityCatalogSourceConfig.parse_obj_allow_extras(config_dict)
def __init__(self, config: UnityCatalogSourceConfig):
self.config = config
self.report = UnityCatalogReport()
self.proxy = UnityCatalogApiProxy(
self.config.workspace_url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ def init_hive_metastore_proxy(self):

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
return UnityCatalogConnectionTest(config_dict).get_connection_test()
try:
config = UnityCatalogSourceConfig.parse_obj_allow_extras(config_dict)
except Exception as e:
return TestConnectionReport(
internal_failure=True,
internal_failure_reason=f"Failed to parse config due to {e}",
)
return UnityCatalogConnectionTest(config).get_connection_test()

@classmethod
def create(cls, config_dict, ctx):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,15 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):

output_file_name = "unity_catalog_mcps.json"

with patch("databricks.sdk.WorkspaceClient") as WorkspaceClient, patch.object(
with patch(
"datahub.ingestion.source.unity.proxy.WorkspaceClient"
) as mock_client, patch.object(
HiveMetastoreProxy, "get_inspector"
) as get_inspector, patch.object(HiveMetastoreProxy, "_execute_sql") as execute_sql:
) as get_inspector, patch.object(
HiveMetastoreProxy, "_execute_sql"
) as execute_sql:
workspace_client: mock.MagicMock = mock.MagicMock()
WorkspaceClient.return_value = workspace_client
mock_client.return_value = workspace_client
register_mock_data(workspace_client)

inspector = mock.MagicMock()
Expand Down
Loading

0 comments on commit 8db7cc2

Please sign in to comment.