Skip to content

Commit

Permalink
feat(ingest/dbt): dbt column-level lineage (datahub-project#8991)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 14, 2023
1 parent ff90fb6 commit 19aa215
Show file tree
Hide file tree
Showing 20 changed files with 2,550 additions and 613 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_long_description():
return pathlib.Path(os.path.join(root, "README.md")).read_text()


_version = package_metadata["__version__"]
_version: str = package_metadata["__version__"]
_self_pin = f"=={_version}" if not _version.endswith("dev0") else ""


Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | aws_common,
"dbt-cloud": {"requests"},
"dbt": {"requests"} | sqlglot_lib | aws_common,
"dbt-cloud": {"requests"} | sqlglot_lib,
"druid": sql_common | {"pydruid>=0.6.2"},
"dynamodb": aws_common,
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from datahub.specific.dataset import DatasetPatchBuilder


def _convert_upstream_lineage_to_patch(
def convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
Expand Down Expand Up @@ -86,16 +86,11 @@ def _merge_upstream_lineage(


def _lineage_wu_via_read_modify_write(
graph: Optional[DataHubGraph],
graph: DataHubGraph,
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
Expand Down Expand Up @@ -131,11 +126,16 @@ def auto_incremental_lineage(
yield wu

if lineage_aspect.fineGrainedLineages:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
yield convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
Expand Down
Loading

0 comments on commit 19aa215

Please sign in to comment.