diff --git a/docs/businessattributes.md b/docs/businessattributes.md index 80d47f0966aae8..1744f48f879e82 100644 --- a/docs/businessattributes.md +++ b/docs/businessattributes.md @@ -70,6 +70,11 @@ Description inherited from business attribute is greyed out to differentiate bet
+### Enable Business Attributes Feature +By default, business attribute is disabled. To enable Business Attributes feature, set the following configuration in [application.yaml](../metadata-service/configuration/src/main/resources/application.yaml) + +businessAttributeEntityEnabled : true + ### What updates are planned for the Business Attributes feature? - Ingestion of Business attributes through recipe file (YAML) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java index 4ec8702efde709..f15dbb61d70511 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableAnnotation.java @@ -153,10 +153,12 @@ private static FieldType getFieldType( private static FieldType getDefaultFieldType(DataSchema.Type schemaDataType) { switch (schemaDataType) { case INT: - case FLOAT: return FieldType.COUNT; case MAP: return FieldType.KEYWORD; + case FLOAT: + case DOUBLE: + return FieldType.DOUBLE; default: return FieldType.TEXT; } diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableRefAnnotation.java b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableRefAnnotation.java index db6cf46dfc96f7..e2ea94c84088b2 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableRefAnnotation.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/annotation/SearchableRefAnnotation.java @@ -98,10 +98,12 @@ private static SearchableAnnotation.FieldType getDefaultFieldType( DataSchema.Type schemaDataType) { switch (schemaDataType) { case INT: - case FLOAT: return SearchableAnnotation.FieldType.COUNT; case MAP: return SearchableAnnotation.FieldType.KEYWORD; + case FLOAT: + case DOUBLE: + return SearchableAnnotation.FieldType.DOUBLE; default: return SearchableAnnotation.FieldType.TEXT; } diff --git a/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md b/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md new file mode 100644 index 00000000000000..ac7283d21015a7 --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md @@ -0,0 +1,11 @@ +### Setup + +This source pulls dbt metadata directly from the dbt Cloud APIs. + +You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled. + +The token should have the "read metadata" permission. + +To get the required IDs, go to the job details page (this is the one with the "Run History" table), and look at the URL. +It should look something like this: https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094. +In this example, the account ID is 107298, the project ID is 175705, and the job ID is 148094. diff --git a/metadata-ingestion/docs/sources/dbt/dbt.md b/metadata-ingestion/docs/sources/dbt/dbt.md index 02a5d760c66d30..eca5101e006426 100644 --- a/metadata-ingestion/docs/sources/dbt/dbt.md +++ b/metadata-ingestion/docs/sources/dbt/dbt.md @@ -187,8 +187,7 @@ dbt source snapshot-freshness dbt build cp target/run_results.json target/run_results_backup.json dbt docs generate - -# Reference target/run_results_backup.json in the dbt source config. +cp target/run_results_backup.json target/run_results.json ``` ::: diff --git a/metadata-ingestion/docs/sources/dbt/dbt_pre.md b/metadata-ingestion/docs/sources/dbt/dbt_pre.md new file mode 100644 index 00000000000000..495c25f94048f1 --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/dbt_pre.md @@ -0,0 +1,41 @@ +### Setup + +The artifacts used by this source are: + +- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) + - This file contains model, source, tests and lineage data. +- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) + - This file contains schema data. + - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models +- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) + - This file contains metadata for sources with freshness checks. + - We transfer dbt's freshness checks to DataHub's last-modified fields. + - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. +- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) + - This file contains metadata from the result of a dbt run, e.g. dbt test + - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset + +To generate these files, we recommend this workflow for dbt build and datahub ingestion. + +```sh +dbt source snapshot-freshness +dbt build +cp target/run_results.json target/run_results_backup.json +dbt docs generate +cp target/run_results_backup.json target/run_results.json + +# Run datahub ingestion, pointing at the files in the target/ directory +``` + +The necessary artifact files will then appear in the `target/` directory of your dbt project. + +We also have guides on handling more complex dbt orchestration techniques and multi-project setups below. + +:::note Entity is in manifest but missing from catalog + +This warning usually appears when the catalog.json file was not generated by a `dbt docs generate` command. +Most other dbt commands generate a partial catalog file, which may impact the completeness of the metadata in ingested into DataHub. + +Following the above workflow should ensure that the catalog file is generated correctly. + +::: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 93b73570a10721..307e519cc9cc6b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -233,6 +233,11 @@ *pydantic_no_v2, } +mssql_common = { + "sqlalchemy-pytds>=0.3", + "pyOpenSSL", +} + postgres_common = { "psycopg2-binary", "GeoAlchemy2", @@ -371,12 +376,8 @@ }, "mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib | sqlglot_lib, "mongodb": {"pymongo[srv]>=3.11", "packaging"}, - "mssql": sql_common - | { - "sqlalchemy-pytds>=0.3", - "pyOpenSSL", - }, - "mssql-odbc": sql_common | {"pyodbc"}, + "mssql": sql_common | mssql_common, + "mssql-odbc": sql_common | mssql_common | {"pyodbc"}, "mysql": mysql, # mariadb should have same dependency as mysql "mariadb": sql_common | {"pymysql>=1.0.2"}, diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 1c29d38273d823..ccc1267cd97f7d 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -367,7 +367,7 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str: ) -def get_class_fields(_class: Type[object]) -> Iterable[str]: +def _get_enum_options(_class: Type[object]) -> Iterable[str]: return [ f for f in dir(_class) @@ -378,7 +378,8 @@ def get_class_fields(_class: Type[object]) -> Iterable[str]: def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]: if ownership_type.startswith("urn:li:"): return OwnershipTypeClass.CUSTOM, ownership_type - if ownership_type in get_class_fields(OwnershipTypeClass): + ownership_type = ownership_type.upper() + if ownership_type in _get_enum_options(OwnershipTypeClass): return ownership_type, None raise ValueError(f"Unexpected ownership type: {ownership_type}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 7b79c83db90e56..eb59d720f1372c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -106,6 +106,7 @@ ArrayType, BooleanType, BytesType, + DateType, MySqlDDL, NullType, NumberType, @@ -188,6 +189,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): RecordType, StringType, TimeType, + DateType, NullType, ] ], @@ -209,10 +211,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): "STRING": StringType, "TIME": TimeType, "TIMESTAMP": TimeType, - "DATE": TimeType, + "DATE": DateType, "DATETIME": TimeType, "GEOGRAPHY": NullType, - "JSON": NullType, + "JSON": RecordType, "INTERVAL": NullType, "ARRAY": ArrayType, "STRUCT": RecordType, diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 4876e2b6fcff4a..b2d93b2e0fd6f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -124,7 +124,12 @@ infer_output_schema, sqlglot_lineage, ) -from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query +from datahub.sql_parsing.sqlglot_utils import ( + detach_ctes, + parse_statements_and_pick, + try_format_query, +) +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.time import datetime_to_ts_millis from datahub.utilities.topological_sort import topological_sort @@ -138,8 +143,12 @@ @dataclass class DBTSourceReport(StaleEntityRemovalSourceReport): sql_statements_parsed: int = 0 - sql_parser_detach_ctes_failures: int = 0 - sql_parser_skipped_missing_code: int = 0 + sql_statements_table_error: int = 0 + sql_statements_column_error: int = 0 + sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList) + sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList) + + in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList) class EmitDirective(ConfigEnum): @@ -989,7 +998,9 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]: def _to_schema_info(schema_fields: List[SchemaField]) -> SchemaInfo: return {column.fieldPath: column.nativeDataType for column in schema_fields} - def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> None: + def _infer_schemas_and_update_cll( # noqa: C901 + self, all_nodes_map: Dict[str, DBTNode] + ) -> None: """Annotate the DBTNode objects with schema information and column-level lineage. Note that this mutates the DBTNode objects directly. @@ -1103,11 +1114,18 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No # Run sql parser to infer the schema + generate column lineage. sql_result = None - if node.compiled_code: + if node.node_type in {"source", "test"}: + # For sources, we generate CLL as a 1:1 mapping. + # We don't support CLL for tests (assertions). + pass + elif node.compiled_code: try: # Add CTE stops based on the upstreams list. preprocessed_sql = detach_ctes( - node.compiled_code, + parse_statements_and_pick( + node.compiled_code, + platform=schema_resolver.platform, + ), platform=schema_resolver.platform, cte_mapping={ cte_name: upstream_node.get_fake_ephemeral_table_name() @@ -1123,7 +1141,7 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No }, ) except Exception as e: - self.report.sql_parser_detach_ctes_failures += 1 + self.report.sql_parser_detach_ctes_failures.append(node.dbt_name) logger.debug( f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage." ) @@ -1132,9 +1150,20 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No sql_result = sqlglot_lineage( preprocessed_sql, schema_resolver=schema_resolver ) - self.report.sql_statements_parsed += 1 + if sql_result.debug_info.error: + self.report.sql_statements_table_error += 1 + logger.info( + f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}" + ) + elif sql_result.debug_info.column_error: + self.report.sql_statements_column_error += 1 + logger.info( + f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}" + ) + else: + self.report.sql_statements_parsed += 1 else: - self.report.sql_parser_skipped_missing_code += 1 + self.report.sql_parser_skipped_missing_code.append(node.dbt_name) # Save the column lineage. if self.config.include_column_lineage and sql_result: @@ -1737,7 +1766,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str: if node.cll_debug_info and node.cll_debug_info.error: self.report.report_warning( node.dbt_name, - f"Error parsing column lineage: {node.cll_debug_info.error}", + f"Error parsing SQL to generate column lineage: {node.cll_debug_info.error}", ) cll = [ FineGrainedLineage( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 89f562fdc71a10..0fc35ddd281c8e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -101,17 +101,31 @@ def aws_connection_needed_if_s3_uris_present( def get_columns( - catalog_node: dict, + dbt_name: str, + catalog_node: Optional[dict], manifest_node: dict, tag_prefix: str, ) -> List[DBTColumn]: - columns = [] - - catalog_columns = catalog_node["columns"] manifest_columns = manifest_node.get("columns", {}) - manifest_columns_lower = {k.lower(): v for k, v in manifest_columns.items()} + if catalog_node is not None: + logger.debug(f"Loading schema info for {dbt_name}") + catalog_columns = catalog_node["columns"] + elif manifest_columns: + # If the end user ran `dbt compile` instead of `dbt docs generate`, then the catalog + # file will not have any column information. In this case, we will fall back to using + # information from the manifest file. + logger.debug(f"Inferring schema info for {dbt_name} from manifest") + catalog_columns = { + k: {"name": col["name"], "type": col["data_type"], "index": i} + for i, (k, col) in enumerate(manifest_columns.items()) + } + else: + logger.debug(f"Missing schema info for {dbt_name}") + return [] + + columns = [] for key, catalog_column in catalog_columns.items(): manifest_column = manifest_columns.get( key, manifest_columns_lower.get(key.lower(), {}) @@ -185,10 +199,7 @@ def extract_dbt_entities( if catalog_node is None: if materialization not in {"test", "ephemeral"}: # Test and ephemeral nodes will never show up in the catalog. - report.report_warning( - key, - f"Entity {key} ({name}) is in manifest but missing from catalog", - ) + report.in_manifest_but_missing_catalog.append(key) else: catalog_type = all_catalog_entities[key]["metadata"]["type"] @@ -267,20 +278,26 @@ def extract_dbt_entities( "ephemeral", "test", ]: - logger.debug(f"Loading schema info for {dbtNode.dbt_name}") - if catalog_node is not None: - # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns( - catalog_node, - manifest_node, - tag_prefix, - ) + dbtNode.columns = get_columns( + dbtNode.dbt_name, + catalog_node, + manifest_node, + tag_prefix, + ) else: dbtNode.columns = [] dbt_entities.append(dbtNode) + if report.in_manifest_but_missing_catalog: + # We still want this to show up as a warning, but don't want to spam the warnings section + # if there's a lot of them. + report.warning( + "in_manifest_but_missing_catalog", + f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.", + ) + return dbt_entities @@ -425,22 +442,6 @@ def load_run_results( @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") class DBTCoreSource(DBTSourceBase, TestableSource): - """ - The artifacts used by this source are: - - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) - - This file contains model, source, tests and lineage data. - - [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) - - This file contains schema data. - - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models - - [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) - - This file contains metadata for sources with freshness checks. - - We transfer dbt's freshness checks to DataHub's last-modified fields. - - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. - - [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) - - This file contains metadata from the result of a dbt run, e.g. dbt test - - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset - """ - config: DBTCoreConfig @classmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index 56706e6f90d38d..c19b22a8622ca2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -151,6 +151,7 @@ class SQLServerSource(SQLAlchemySource): - Column types associated with each table/view - Table, row, and column statistics via optional SQL profiling We have two options for the underlying library used to connect to SQL Server: (1) [python-tds](https://github.com/denisenkom/pytds) and (2) [pyodbc](https://github.com/mkleehammer/pyodbc). The TDS library is pure Python and hence easier to install. + If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure. """ def __init__(self, config: SQLServerConfig, ctx: PipelineContext): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 365539df7a83be..968989e2548d13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -17,6 +17,7 @@ from datahub.ingestion.source.sql.sql_common import SQLSourceReport from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView +from datahub.ingestion.source.sql.sql_utils import check_table_with_profile_pattern from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionType @@ -36,6 +37,10 @@ class DetailedProfilerReportMixin: default_factory=int_top_k_dict ) + profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( + default_factory=int_top_k_dict + ) + profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) num_tables_not_eligible_profiling: Dict[str, int] = field( @@ -272,8 +277,17 @@ def is_dataset_eligible_for_profiling( threshold_time = datetime.now(timezone.utc) - timedelta( self.config.profiling.profile_if_updated_since_days ) - schema_name = dataset_name.rsplit(".", 1)[0] + + if not check_table_with_profile_pattern( + self.config.profile_pattern, dataset_name + ): + self.report.profiling_skipped_table_profile_pattern[schema_name] += 1 + logger.debug( + f"Table {dataset_name} is not allowed for profiling due to profile pattern" + ) + return False + if (threshold_time is not None) and ( last_altered is not None and last_altered < threshold_time ): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py index 70e8ab33f18591..16655d17482872 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py @@ -1,3 +1,4 @@ +import re from typing import Dict, Iterable, List, Optional from datahub.configuration.common import AllowDenyPattern @@ -235,3 +236,27 @@ def schema_requires_v2(canonical_schema: List[SchemaField]) -> bool: if ARRAY_TOKEN in field_name or UNION_TOKEN in field_name: return True return False + + +CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN = re.compile("\\\\?\\.") + + +def check_table_with_profile_pattern( + profile_pattern: AllowDenyPattern, table_name: str +) -> bool: + parts = len(table_name.split(".")) + allow_list: List[str] = [] + + for pattern in profile_pattern.allow: + replaced_pattern = pattern.replace(".*", "").replace(".+", "") + splits = re.split(CHECK_TABLE_TABLE_PART_SEPARATOR_PATTERN, replaced_pattern) + if parts + 1 == len(splits): + table_pattern = pattern[: pattern.find(splits[-2]) + len(splits[-2])] + allow_list.append(table_pattern + "$") + else: + allow_list.append(pattern) + + table_allow_deny_pattern = AllowDenyPattern( + allow=allow_list, deny=profile_pattern.deny + ) + return table_allow_deny_pattern.allowed(table_name) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index bad72e69221014..c7cf975a3a9533 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -64,6 +64,25 @@ def parse_statement( return statement +def parse_statements_and_pick(sql: str, platform: DialectOrStr) -> sqlglot.Expression: + dialect = get_dialect(platform) + statements = [ + expression for expression in sqlglot.parse(sql, dialect=dialect) if expression + ] + if not statements: + raise ValueError(f"No statements found in query: {sql}") + elif len(statements) == 1: + return statements[0] + else: + # If we find multiple statements, we assume the last one is the main query. + # Usually the prior queries are going to be things like `CREATE FUNCTION` + # or `GRANT ...`, which we don't care about. + logger.debug( + "Found multiple statements in query, picking the last one: %s", sql + ) + return statements[-1] + + def _expression_to_string( expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr ) -> str: diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 5d26c3af54d5ef..391657f841167b 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -6,8 +6,13 @@ from functools import reduce from typing import Any, Dict, List, Mapping, Match, Optional, Union, cast +from datahub.configuration.common import ConfigModel from datahub.emitter import mce_builder -from datahub.emitter.mce_builder import OwnerType +from datahub.emitter.mce_builder import ( + OwnerType, + make_user_urn, + validate_ownership_type, +) from datahub.metadata.schema_classes import ( AuditStampClass, InstitutionalMemoryClass, @@ -83,6 +88,36 @@ class Constants: SEPARATOR = "separator" +class _MappingOwner(ConfigModel): + owner: str + owner_type: str = OwnershipTypeClass.DATAOWNER + + +class _DatahubProps(ConfigModel): + owners: List[Union[str, _MappingOwner]] + + def make_owner_category_list(self) -> List[Dict]: + res = [] + for owner in self.owners: + if isinstance(owner, str): + owner_id = owner + owner_category = OwnershipTypeClass.DATAOWNER + else: + owner_id = owner.owner + owner_category = owner.owner_type + owner_id = make_user_urn(owner_id) + owner_category, owner_category_urn = validate_ownership_type(owner_category) + + res.append( + { + "urn": owner_id, + "category": owner_category, + "categoryUrn": owner_category_urn, + } + ) + return res + + class OperationProcessor: """ A general class that processes a dictionary of properties and operations defined on it. @@ -128,7 +163,7 @@ def __init__( self.owner_source_type = owner_source_type self.match_nested_props = match_nested_props - def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: + def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901 # Defining the following local variables - # operations_map - the final resulting map when operations are processed. # Against each operation the values to be applied are stored. @@ -137,9 +172,35 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # operation config: map which contains the parameters to carry out that operation. # For e.g for add_tag operation config will have the tag value. # operation_type: the type of operation (add_tag, add_term, etc.) - aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object + + # Process the special "datahub" property, which supports tags, terms, and owners. + operations_map: Dict[str, list] = {} + try: + datahub_prop = raw_props.get("datahub") + if datahub_prop and isinstance(datahub_prop, dict): + if datahub_prop.get("tags"): + # Note that tags get converted to urns later because we need to support the tag prefix. + tags = datahub_prop["tags"] + operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend( + tags + ) + + if datahub_prop.get("terms"): + terms = datahub_prop["terms"] + operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend( + mce_builder.make_term_urn(term) for term in terms + ) + + if datahub_prop.get("owners"): + owners = _DatahubProps.parse_obj_allow_extras(datahub_prop) + operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend( + owners.make_owner_category_list() + ) + except Exception as e: + logger.error(f"Error while processing datahub property: {e}") + + # Process the actual directives. try: - operations_map: Dict[str, Union[set, list]] = {} for operation_key in self.operation_defs: operation_type = self.operation_defs.get(operation_key, {}).get( Constants.OPERATION @@ -177,42 +238,36 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: isinstance(operation, list) and operation_type == Constants.ADD_OWNER_OPERATION ): - operation_value_list = operations_map.get( - operation_type, list() - ) - cast(List, operation_value_list).extend( + operations_map.setdefault(operation_type, []).extend( operation - ) # cast to silent the lint - operations_map[operation_type] = operation_value_list + ) elif isinstance(operation, (str, list)): - operations_value_set = operations_map.get( - operation_type, set() + operations_map.setdefault(operation_type, []).extend( + operation + if isinstance(operation, list) + else [operation] ) - if isinstance(operation, list): - operations_value_set.update(operation) # type: ignore - else: - operations_value_set.add(operation) # type: ignore - operations_map[operation_type] = operations_value_set else: - operations_value_list = operations_map.get( - operation_type, list() + operations_map.setdefault(operation_type, []).append( + operation ) - operations_value_list.append(operation) # type: ignore - operations_map[operation_type] = operations_value_list - aspect_map = self.convert_to_aspects(operations_map) except Exception as e: logger.error(f"Error while processing operation defs over raw_props: {e}") + + aspect_map: Dict[str, Any] = {} # map of aspect name to aspect object + try: + aspect_map = self.convert_to_aspects(operations_map) + except Exception as e: + logger.error(f"Error while converting operations map to aspects: {e}") return aspect_map - def convert_to_aspects( - self, operation_map: Dict[str, Union[set, list]] - ) -> Dict[str, Any]: + def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]: aspect_map: Dict[str, Any] = {} if Constants.ADD_TAG_OPERATION in operation_map: tag_aspect = mce_builder.make_global_tag_aspect_with_tag_list( - sorted(operation_map[Constants.ADD_TAG_OPERATION]) + sorted(set(operation_map[Constants.ADD_TAG_OPERATION])) ) aspect_map[Constants.ADD_TAG_OPERATION] = tag_aspect @@ -240,7 +295,7 @@ def convert_to_aspects( if Constants.ADD_TERM_OPERATION in operation_map: term_aspect = mce_builder.make_glossary_terms_aspect_from_urn_list( - sorted(operation_map[Constants.ADD_TERM_OPERATION]) + sorted(set(operation_map[Constants.ADD_TERM_OPERATION])) ) aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect @@ -319,12 +374,7 @@ def get_operation_value( operation_config.get(Constants.OWNER_CATEGORY) or OwnershipTypeClass.DATAOWNER ) - owner_category_urn: Optional[str] = None - if owner_category.startswith("urn:li:"): - owner_category_urn = owner_category - owner_category = OwnershipTypeClass.DATAOWNER - else: - owner_category = owner_category.upper() + owner_category, owner_category_urn = validate_ownership_type(owner_category) if self.strip_owner_email_id: owner_ids = [ diff --git a/metadata-ingestion/tests/unit/test_mapping.py b/metadata-ingestion/tests/unit/test_mapping.py index 755a62fa329123..42b13f6dbefc76 100644 --- a/metadata-ingestion/tests/unit/test_mapping.py +++ b/metadata-ingestion/tests/unit/test_mapping.py @@ -235,7 +235,7 @@ def test_operation_processor_ownership_category(): new_owner = ownership_aspect.owners[2] assert new_owner.owner == "urn:li:corpuser:bob" assert new_owner.source and new_owner.source.type == "SOURCE_CONTROL" - assert new_owner.type == OwnershipTypeClass.DATAOWNER # dummy value + assert new_owner.type == OwnershipTypeClass.CUSTOM assert new_owner.typeUrn == "urn:li:ownershipType:architect" @@ -347,3 +347,52 @@ def test_operation_processor_matching_dot_props(): tag_aspect: GlobalTagsClass = aspect_map["add_tag"] assert len(tag_aspect.tags) == 1 assert tag_aspect.tags[0].tag == "urn:li:tag:pii" + + +def test_operation_processor_datahub_props(): + raw_props = { + "datahub": { + "tags": ["tag1", "tag2"], + "terms": ["term1", "term2"], + "owners": [ + "owner1", + "urn:li:corpGroup:group1", + { + "owner": "owner2", + "owner_type": "urn:li:ownershipType:steward", + }, + { + "owner": "urn:li:corpGroup:group2", + "owner_type": "urn:li:ownershipType:steward", + }, + ], + } + } + + processor = OperationProcessor( + operation_defs={}, + owner_source_type="SOURCE_CONTROL", + ) + aspect_map = processor.process(raw_props) + + assert isinstance(aspect_map["add_owner"], OwnershipClass) + assert [ + (owner.owner, owner.type, owner.typeUrn) + for owner in aspect_map["add_owner"].owners + ] == [ + ("urn:li:corpGroup:group1", "DATAOWNER", None), + ("urn:li:corpGroup:group2", "CUSTOM", "urn:li:ownershipType:steward"), + ("urn:li:corpuser:owner1", "DATAOWNER", None), + ("urn:li:corpuser:owner2", "CUSTOM", "urn:li:ownershipType:steward"), + ] + + assert isinstance(aspect_map["add_tag"], GlobalTagsClass) + assert [tag_association.tag for tag_association in aspect_map["add_tag"].tags] == [ + "urn:li:tag:tag1", + "urn:li:tag:tag2", + ] + + assert isinstance(aspect_map["add_term"], GlossaryTermsClass) + assert [ + term_association.urn for term_association in aspect_map["add_term"].terms + ] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"] diff --git a/metadata-ingestion/tests/unit/test_sql_utils.py b/metadata-ingestion/tests/unit/test_sql_utils.py index 23a08ffb6180b6..1b7dc6bcf23f1e 100644 --- a/metadata-ingestion/tests/unit/test_sql_utils.py +++ b/metadata-ingestion/tests/unit/test_sql_utils.py @@ -1,4 +1,10 @@ -from datahub.ingestion.source.sql.sql_utils import gen_schema_key +import pytest + +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.source.sql.sql_utils import ( + check_table_with_profile_pattern, + gen_schema_key, +) def test_guid_generators(): @@ -13,3 +19,61 @@ def test_guid_generators(): guid = schema_key.guid() assert guid == expected_guid + + +test_profile_pattern_matching_on_table_allow_list_test_data = [ + ("db.table.column", "db.table", True), + ("db.table.column2", "db.table", True), + ("db.table..*", "db.table", True), + ("db.*", "db.table", True), + ("db.*", "db.table", True), + ("db.*", "db.schema.table", True), + ("db.schema.*", "db.schema.table", True), + ("db\\.schema\\..*", "db.schema.table", True), + ("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True), + ("db\\.schema\\.table\\.column", "db.schema.table", True), + ("db\\.schema\\.table2\\.column", "db.schema.table", False), + ("db2\\.schema.*", "db.schema.table", False), + ("db2\\.schema.*", "db.schema.table", False), + ("db\\.schema\\.table\\..*", "db.table2", False), +] + + +@pytest.mark.parametrize( + "allow_pattern, table_name, result", + test_profile_pattern_matching_on_table_allow_list_test_data, +) +def test_profile_pattern_matching_on_table_allow_list( + allow_pattern: str, table_name: str, result: bool +) -> None: + pattern = AllowDenyPattern(allow=[allow_pattern]) + assert check_table_with_profile_pattern(pattern, table_name) == result + + +test_profile_pattern_matching_on_table_deny_list_test_data = [ + ("db.table.column", "db.table", True), + ("db.table.column2", "db.table", True), + ("db.table..*", "db.table", True), + ("db.*", "db.table", False), + ("db.*", "db.table", False), + ("db.*", "db.schema.table", False), + ("db.schema.*", "db.schema.table", False), + ("db\\.schema\\..*", "db.schema.table", False), + ("db\\.schema\\.table\\.column_prefix.*", "db.schema.table", True), + ("db\\.schema\\.table\\.column", "db.schema.table", True), + ("db\\.schema\\.table2\\.column", "db.schema.table", True), + ("db2\\.schema.*", "db.schema.table", True), + ("db2\\.schema.*", "db.schema.table", True), + ("db\\.schema\\.table\\..*", "db.table2", True), +] + + +@pytest.mark.parametrize( + "deny_pattern, table_name, result", + test_profile_pattern_matching_on_table_deny_list_test_data, +) +def test_profile_pattern_matching_on_table_deny_list( + deny_pattern: str, table_name: str, result: bool +) -> None: + pattern = AllowDenyPattern(deny=[deny_pattern]) + assert check_table_with_profile_pattern(pattern, table_name) == result diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java index 0c98a011346ef1..a655f90597e20b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/transformer/SearchDocumentTransformer.java @@ -314,6 +314,10 @@ private Optional