Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 29, 2024
2 parents 1a02497 + 3ab4ec9 commit a1e9c07
Show file tree
Hide file tree
Showing 23 changed files with 429 additions and 103 deletions.
5 changes: 5 additions & 0 deletions docs/businessattributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ Description inherited from business attribute is greyed out to differentiate bet
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/88472958703d5e9236f71bb457c1acd481d123af/imgs/business_attributes/dataset-inherits-businessattribute-properties.png"/>
</p>

### Enable Business Attributes Feature
By default, business attribute is disabled. To enable Business Attributes feature, set the following configuration in [application.yaml](../metadata-service/configuration/src/main/resources/application.yaml)

businessAttributeEntityEnabled : true

### What updates are planned for the Business Attributes feature?

- Ingestion of Business attributes through recipe file (YAML)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,12 @@ private static FieldType getFieldType(
private static FieldType getDefaultFieldType(DataSchema.Type schemaDataType) {
switch (schemaDataType) {
case INT:
case FLOAT:
return FieldType.COUNT;
case MAP:
return FieldType.KEYWORD;
case FLOAT:
case DOUBLE:
return FieldType.DOUBLE;
default:
return FieldType.TEXT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ private static SearchableAnnotation.FieldType getDefaultFieldType(
DataSchema.Type schemaDataType) {
switch (schemaDataType) {
case INT:
case FLOAT:
return SearchableAnnotation.FieldType.COUNT;
case MAP:
return SearchableAnnotation.FieldType.KEYWORD;
case FLOAT:
case DOUBLE:
return SearchableAnnotation.FieldType.DOUBLE;
default:
return SearchableAnnotation.FieldType.TEXT;
}
Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Setup

This source pulls dbt metadata directly from the dbt Cloud APIs.

You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled.

The token should have the "read metadata" permission.

To get the required IDs, go to the job details page (this is the one with the "Run History" table), and look at the URL.
It should look something like this: https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094.
In this example, the account ID is 107298, the project ID is 175705, and the job ID is 148094.
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/dbt/dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ dbt source snapshot-freshness
dbt build
cp target/run_results.json target/run_results_backup.json
dbt docs generate
# Reference target/run_results_backup.json in the dbt source config.
cp target/run_results_backup.json target/run_results.json
```

:::
Expand Down
41 changes: 41 additions & 0 deletions metadata-ingestion/docs/sources/dbt/dbt_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
### Setup

The artifacts used by this source are:

- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source, tests and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json)
- This file contains metadata for sources with freshness checks.
- We transfer dbt's freshness checks to DataHub's last-modified fields.
- Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json)
- This file contains metadata from the result of a dbt run, e.g. dbt test
- When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset

To generate these files, we recommend this workflow for dbt build and datahub ingestion.

```sh
dbt source snapshot-freshness
dbt build
cp target/run_results.json target/run_results_backup.json
dbt docs generate
cp target/run_results_backup.json target/run_results.json

# Run datahub ingestion, pointing at the files in the target/ directory
```

The necessary artifact files will then appear in the `target/` directory of your dbt project.

We also have guides on handling more complex dbt orchestration techniques and multi-project setups below.

:::note Entity is in manifest but missing from catalog

This warning usually appears when the catalog.json file was not generated by a `dbt docs generate` command.
Most other dbt commands generate a partial catalog file, which may impact the completeness of the metadata in ingested into DataHub.

Following the above workflow should ensure that the catalog file is generated correctly.

:::
13 changes: 7 additions & 6 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@
*pydantic_no_v2,
}

mssql_common = {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
}

postgres_common = {
"psycopg2-binary",
"GeoAlchemy2",
Expand Down Expand Up @@ -371,12 +376,8 @@
},
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common
| {
"sqlalchemy-pytds>=0.3",
"pyOpenSSL",
},
"mssql-odbc": sql_common | {"pyodbc"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
"mysql": mysql,
# mariadb should have same dependency as mysql
"mariadb": sql_common | {"pymysql>=1.0.2"},
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
)


def get_class_fields(_class: Type[object]) -> Iterable[str]:
def _get_enum_options(_class: Type[object]) -> Iterable[str]:
return [
f
for f in dir(_class)
Expand All @@ -378,7 +378,8 @@ def get_class_fields(_class: Type[object]) -> Iterable[str]:
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
if ownership_type in get_class_fields(OwnershipTypeClass):
ownership_type = ownership_type.upper()
if ownership_type in _get_enum_options(OwnershipTypeClass):
return ownership_type, None
raise ValueError(f"Unexpected ownership type: {ownership_type}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
ArrayType,
BooleanType,
BytesType,
DateType,
MySqlDDL,
NullType,
NumberType,
Expand Down Expand Up @@ -188,6 +189,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
RecordType,
StringType,
TimeType,
DateType,
NullType,
]
],
Expand All @@ -209,10 +211,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
"STRING": StringType,
"TIME": TimeType,
"TIMESTAMP": TimeType,
"DATE": TimeType,
"DATE": DateType,
"DATETIME": TimeType,
"GEOGRAPHY": NullType,
"JSON": NullType,
"JSON": RecordType,
"INTERVAL": NullType,
"ARRAY": ArrayType,
"STRUCT": RecordType,
Expand Down
49 changes: 39 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@
infer_output_schema,
sqlglot_lineage,
)
from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query
from datahub.sql_parsing.sqlglot_utils import (
detach_ctes,
parse_statements_and_pick,
try_format_query,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
from datahub.utilities.topological_sort import topological_sort
Expand All @@ -138,8 +143,12 @@
@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
sql_statements_parsed: int = 0
sql_parser_detach_ctes_failures: int = 0
sql_parser_skipped_missing_code: int = 0
sql_statements_table_error: int = 0
sql_statements_column_error: int = 0
sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList)
sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList)

in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList)


class EmitDirective(ConfigEnum):
Expand Down Expand Up @@ -989,7 +998,9 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
def _to_schema_info(schema_fields: List[SchemaField]) -> SchemaInfo:
return {column.fieldPath: column.nativeDataType for column in schema_fields}

def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> None:
def _infer_schemas_and_update_cll( # noqa: C901
self, all_nodes_map: Dict[str, DBTNode]
) -> None:
"""Annotate the DBTNode objects with schema information and column-level lineage.
Note that this mutates the DBTNode objects directly.
Expand Down Expand Up @@ -1103,11 +1114,18 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No

# Run sql parser to infer the schema + generate column lineage.
sql_result = None
if node.compiled_code:
if node.node_type in {"source", "test"}:
# For sources, we generate CLL as a 1:1 mapping.
# We don't support CLL for tests (assertions).
pass
elif node.compiled_code:
try:
# Add CTE stops based on the upstreams list.
preprocessed_sql = detach_ctes(
node.compiled_code,
parse_statements_and_pick(
node.compiled_code,
platform=schema_resolver.platform,
),
platform=schema_resolver.platform,
cte_mapping={
cte_name: upstream_node.get_fake_ephemeral_table_name()
Expand All @@ -1123,7 +1141,7 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No
},
)
except Exception as e:
self.report.sql_parser_detach_ctes_failures += 1
self.report.sql_parser_detach_ctes_failures.append(node.dbt_name)
logger.debug(
f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage."
)
Expand All @@ -1132,9 +1150,20 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No
sql_result = sqlglot_lineage(
preprocessed_sql, schema_resolver=schema_resolver
)
self.report.sql_statements_parsed += 1
if sql_result.debug_info.error:
self.report.sql_statements_table_error += 1
logger.info(
f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}"
)
elif sql_result.debug_info.column_error:
self.report.sql_statements_column_error += 1
logger.info(
f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}"
)
else:
self.report.sql_statements_parsed += 1
else:
self.report.sql_parser_skipped_missing_code += 1
self.report.sql_parser_skipped_missing_code.append(node.dbt_name)

# Save the column lineage.
if self.config.include_column_lineage and sql_result:
Expand Down Expand Up @@ -1737,7 +1766,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
if node.cll_debug_info and node.cll_debug_info.error:
self.report.report_warning(
node.dbt_name,
f"Error parsing column lineage: {node.cll_debug_info.error}",
f"Error parsing SQL to generate column lineage: {node.cll_debug_info.error}",
)
cll = [
FineGrainedLineage(
Expand Down
67 changes: 34 additions & 33 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,31 @@ def aws_connection_needed_if_s3_uris_present(


def get_columns(
catalog_node: dict,
dbt_name: str,
catalog_node: Optional[dict],
manifest_node: dict,
tag_prefix: str,
) -> List[DBTColumn]:
columns = []

catalog_columns = catalog_node["columns"]
manifest_columns = manifest_node.get("columns", {})

manifest_columns_lower = {k.lower(): v for k, v in manifest_columns.items()}

if catalog_node is not None:
logger.debug(f"Loading schema info for {dbt_name}")
catalog_columns = catalog_node["columns"]
elif manifest_columns:
# If the end user ran `dbt compile` instead of `dbt docs generate`, then the catalog
# file will not have any column information. In this case, we will fall back to using
# information from the manifest file.
logger.debug(f"Inferring schema info for {dbt_name} from manifest")
catalog_columns = {
k: {"name": col["name"], "type": col["data_type"], "index": i}
for i, (k, col) in enumerate(manifest_columns.items())
}
else:
logger.debug(f"Missing schema info for {dbt_name}")
return []

columns = []
for key, catalog_column in catalog_columns.items():
manifest_column = manifest_columns.get(
key, manifest_columns_lower.get(key.lower(), {})
Expand Down Expand Up @@ -185,10 +199,7 @@ def extract_dbt_entities(
if catalog_node is None:
if materialization not in {"test", "ephemeral"}:
# Test and ephemeral nodes will never show up in the catalog.
report.report_warning(
key,
f"Entity {key} ({name}) is in manifest but missing from catalog",
)
report.in_manifest_but_missing_catalog.append(key)
else:
catalog_type = all_catalog_entities[key]["metadata"]["type"]

Expand Down Expand Up @@ -267,20 +278,26 @@ def extract_dbt_entities(
"ephemeral",
"test",
]:
logger.debug(f"Loading schema info for {dbtNode.dbt_name}")
if catalog_node is not None:
# We already have done the reporting for catalog_node being None above.
dbtNode.columns = get_columns(
catalog_node,
manifest_node,
tag_prefix,
)
dbtNode.columns = get_columns(
dbtNode.dbt_name,
catalog_node,
manifest_node,
tag_prefix,
)

else:
dbtNode.columns = []

dbt_entities.append(dbtNode)

if report.in_manifest_but_missing_catalog:
# We still want this to show up as a warning, but don't want to spam the warnings section
# if there's a lot of them.
report.warning(
"in_manifest_but_missing_catalog",
f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.",
)

return dbt_entities


Expand Down Expand Up @@ -425,22 +442,6 @@ def load_run_results(
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default")
class DBTCoreSource(DBTSourceBase, TestableSource):
"""
The artifacts used by this source are:
- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source, tests and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json)
- This file contains metadata for sources with freshness checks.
- We transfer dbt's freshness checks to DataHub's last-modified fields.
- Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json)
- This file contains metadata from the result of a dbt run, e.g. dbt test
- When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset
"""

config: DBTCoreConfig

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class SQLServerSource(SQLAlchemySource):
- Column types associated with each table/view
- Table, row, and column statistics via optional SQL profiling
We have two options for the underlying library used to connect to SQL Server: (1) [python-tds](https://github.com/denisenkom/pytds) and (2) [pyodbc](https://github.com/mkleehammer/pyodbc). The TDS library is pure Python and hence easier to install.
If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure.
"""

def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
Expand Down
Loading

0 comments on commit a1e9c07

Please sign in to comment.