diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 606c2b89303b7..b10143565efb4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -173,7 +173,7 @@ looker_common = { # Looker Python SDK - "looker-sdk==23.0.0", + "looker-sdk>=23.0.0", # This version of lkml contains a fix for parsing lists in # LookML files with spaces between an item and the following comma. # See https://github.com/joshtemple/lkml/issues/73. @@ -278,6 +278,9 @@ threading_timeout_common = { "stopit==1.1.2", + # stopit uses pkg_resources internally, which means there's an implied + # dependency on setuptools. + "setuptools", } abs_base = { diff --git a/metadata-ingestion/src/datahub/api/entities/forms/forms.py b/metadata-ingestion/src/datahub/api/entities/forms/forms.py index 9188ea33d6c68..fa6453851e0e7 100644 --- a/metadata-ingestion/src/datahub/api/entities/forms/forms.py +++ b/metadata-ingestion/src/datahub/api/entities/forms/forms.py @@ -16,8 +16,12 @@ ) from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import ( + make_container_urn, make_data_platform_urn, + make_domain_urn, make_group_urn, + make_tag_urn, + make_term_urn, make_user_urn, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -36,6 +40,16 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +FILTER_CRITERION_TYPES = "_entityType" +FILTER_CRITERION_SUB_TYPES = "typeNames.keyword" +FILTER_CRITERION_PLATFORMS = "platform.keyword" +FILTER_CRITERION_PLATFORM_INSTANCES = "dataPlatformInstance.keyword" +FILTER_CRITERION_DOMAINS = "domains.keyword" +FILTER_CRITERION_CONTAINERS = "container.keyword" +FILTER_CRITERION_OWNERS = "owners.keyword" +FILTER_CRITERION_TAGS = "tags.keyword" +FILTER_CRITERION_GLOSSARY_TERMS = "glossaryTerms.keyword" + class PromptType(Enum): STRUCTURED_PROPERTY = "STRUCTURED_PROPERTY" @@ -73,9 +87,14 @@ def has_value(cls, value): class Filters(ConfigModel): types: Optional[List[str]] = None + sub_types: Optional[List[str]] = None platforms: Optional[List[str]] = None + platform_instances: Optional[List[str]] = None domains: Optional[List[str]] = None containers: Optional[List[str]] = None + owners: Optional[List[str]] = None + tags: Optional[List[str]] = None + terms: Optional[List[str]] = None class Entities(ConfigModel): @@ -105,7 +124,8 @@ class Forms(ConfigModel): @validator("urn", pre=True, always=True) def urn_must_be_present(cls, v, values): if not v: - assert values.get("id") is not None, "Form id must be present if urn is not" + if values.get("id") is None: + raise ValueError("Form id must be present if urn is not") return f"urn:li:form:{values['id']}" return v @@ -249,29 +269,70 @@ def create_form_filters(self, emitter: DataHubGraph) -> Union[None, Exception]: # Loop through each entity and assign a filter for it if self.entities and self.entities.filters: filters = self.entities.filters + if filters.types: filters_raw.append( - Forms.format_form_filter("_entityType", filters.types) + Forms.format_form_filter(FILTER_CRITERION_TYPES, filters.types) + ) + + if filters.sub_types: + filters_raw.append( + Forms.format_form_filter( + FILTER_CRITERION_SUB_TYPES, filters.sub_types + ) ) + if filters.platforms: urns = [ make_data_platform_urn(platform) for platform in filters.platforms ] - filters_raw.append(Forms.format_form_filter("platform", urns)) - if filters.domains: + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_PLATFORMS, urns) + ) + + if filters.platform_instances: urns = [] - for domain in filters.domains: - domain_urn = Forms.validate_domain_urn(domain) - if domain_urn: - urns.append(domain_urn) - filters_raw.append(Forms.format_form_filter("domains", urns)) + for platform_instance in filters.platform_instances: + platform_instance_urn = Forms.validate_platform_instance_urn( + platform_instance + ) + if platform_instance_urn: + urns.append(platform_instance_urn) + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_PLATFORM_INSTANCES, urns) + ) + + if filters.domains: + urns = [make_domain_urn(domain) for domain in filters.domains] + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_DOMAINS, urns) + ) + if filters.containers: - urns = [] - for container in filters.containers: - container_urn = Forms.validate_container_urn(container) - if container_urn: - urns.append(container_urn) - filters_raw.append(Forms.format_form_filter("container", urns)) + urns = [ + make_container_urn(container) for container in filters.containers + ] + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_CONTAINERS, urns) + ) + + if filters.owners: + urns = [make_user_urn(owner) for owner in filters.owners] + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_OWNERS, urns) + ) + + if filters.tags: + urns = [make_tag_urn(tag) for tag in filters.tags] + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_TAGS, urns) + ) + + if filters.terms: + urns = [make_term_urn(term) for term in filters.terms] + filters_raw.append( + Forms.format_form_filter(FILTER_CRITERION_GLOSSARY_TERMS, urns) + ) filters_str = ", ".join(item for item in filters_raw) result = emitter.execute_graphql( @@ -279,6 +340,7 @@ def create_form_filters(self, emitter: DataHubGraph) -> Union[None, Exception]: form_urn=self.urn, filters=filters_str ) ) + if not result: return Exception( f"Could not bulk upload urns or filters for form {self.urn}." @@ -314,25 +376,20 @@ def format_form_filter(field: str, urns: List[str]) -> str: return FIELD_FILTER_TEMPLATE.format(field=field, values=formatted_urns) @staticmethod - def validate_domain_urn(domain: str) -> Union[str, None]: - if domain.startswith("urn:li:domain:"): - return domain + def validate_platform_instance_urn(instance: str) -> Union[str, None]: + if instance.startswith("urn:li:dataPlatformInstance:"): + return instance - logger.warning(f"{domain} is not an urn. Unable to create domain filter.") - return None - - @staticmethod - def validate_container_urn(container: str) -> Union[str, None]: - if container.startswith("urn:li:container:"): - return container - - logger.warning(f"{container} is not an urn. Unable to create container filter.") + logger.warning( + f"{instance} is not an urn. Unable to create platform instance filter." + ) return None @staticmethod def from_datahub(graph: DataHubGraph, urn: str) -> "Forms": form: Optional[FormInfoClass] = graph.get_aspect(urn, FormInfoClass) - assert form is not None + if form is None: + raise Exception("FormInfo aspect is None. Unable to create form.") prompts = [] for prompt_raw in form.prompts: prompts.append( diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index 5b188edf9563b..060d6b00b1a12 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -89,7 +89,8 @@ def fqn(self) -> str: @validator("urn", pre=True, always=True) def urn_must_be_present(cls, v, values): if not v: - assert "id" in values, "id must be present if urn is not" + if "id" not in values: + raise ValueError("id must be present if urn is not") return f"urn:li:structuredProperty:{values['id']}" return v @@ -154,7 +155,10 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties": structured_property: Optional[ StructuredPropertyDefinitionClass ] = graph.get_aspect(urn, StructuredPropertyDefinitionClass) - assert structured_property is not None + if structured_property is None: + raise Exception( + "StructuredPropertyDefinition aspect is None. Unable to create structured property." + ) return StructuredProperties( urn=urn, qualified_name=structured_property.qualifiedName, diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index d3a930d988171..63b03db7f5b60 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -163,11 +163,15 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str: def make_container_urn(guid: Union[str, "DatahubKey"]) -> str: - from datahub.emitter.mcp_builder import DatahubKey + if isinstance(guid, str) and guid.startswith("urn:li:container"): + return guid + else: + from datahub.emitter.mcp_builder import DatahubKey + + if isinstance(guid, DatahubKey): + guid = guid.guid() - if isinstance(guid, DatahubKey): - guid = guid.guid() - return f"urn:li:container:{guid}" + return f"urn:li:container:{guid}" def container_urn_to_key(guid: str) -> Optional[ContainerKeyClass]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 0aec9a589cf27..09ade4c905bea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -20,6 +20,7 @@ StatefulIngestionConfigBase, ) from datahub.utilities.lossy_collections import LossyList +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -190,6 +191,15 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): filtered_dashboards: List[str] = dataclass_field(default_factory=list) filtered_charts: List[str] = dataclass_field(default_factory=list) + m_query_parse_timer: PerfTimer = dataclass_field(default_factory=PerfTimer) + m_query_parse_attempts: int = 0 + m_query_parse_successes: int = 0 + m_query_parse_timeouts: int = 0 + m_query_parse_validation_errors: int = 0 + m_query_parse_unexpected_character_errors: int = 0 + m_query_parse_unknown_errors: int = 0 + m_query_resolver_errors: int = 0 + def report_dashboards_scanned(self, count: int = 1) -> None: self.dashboards_scanned += count diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 086ce2c263b0c..a61ad8d289b9d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -74,7 +74,9 @@ def get_upstream_tables( ) try: - parse_tree: Tree = _parse_expression(table.expression) + with reporter.m_query_parse_timer: + reporter.m_query_parse_attempts += 1 + parse_tree: Tree = _parse_expression(table.expression) valid, message = validator.validate_parse_tree( parse_tree, native_query_enabled=config.native_query_parsing @@ -87,10 +89,12 @@ def get_upstream_tables( message="DataAccess function is not present in M-Query expression", context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", ) + reporter.m_query_parse_validation_errors += 1 return [] except KeyboardInterrupt: raise except TimeoutException: + reporter.m_query_parse_timeouts += 1 reporter.warning( title="M-Query Parsing Timeout", message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", @@ -102,8 +106,10 @@ def get_upstream_tables( ) as e: # TODO: Debug why BaseException is needed here and below. if isinstance(e, lark.exceptions.UnexpectedCharacters): error_type = "Unexpected Character Error" + reporter.m_query_parse_unexpected_character_errors += 1 else: error_type = "Unknown Parsing Error" + reporter.m_query_parse_unknown_errors += 1 reporter.warning( title="Unable to extract lineage from M-Query expression", @@ -112,10 +118,10 @@ def get_upstream_tables( exc=e, ) return [] + reporter.m_query_parse_successes += 1 - lineage: List[resolver.Lineage] = [] try: - lineage = resolver.MQueryResolver( + lineage: List[resolver.Lineage] = resolver.MQueryResolver( table=table, parse_tree=parse_tree, reporter=reporter, @@ -126,14 +132,14 @@ def get_upstream_tables( platform_instance_resolver=platform_instance_resolver, ) + return lineage + except BaseException as e: + reporter.m_query_resolver_errors += 1 reporter.warning( title="Unknown M-Query Pattern", message="Encountered a unknown M-Query Expression", context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}", exc=e, ) - - logger.debug(f"Stack trace for {table.full_name}:", exc_info=e) - - return lineage + return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index a5fb6fd2673ac..cc51fcee14104 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -356,8 +356,9 @@ def _process_invoke_expression( ) if arg_list is None: self.reporter.report_warning( - f"{self.table.full_name}-arg-list", - f"Argument list not found for data-access-function {data_access_func}", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (missing argument list)", + context=f"{self.table.full_name}: argument list not found for data-access-function {data_access_func}", ) return None @@ -377,8 +378,9 @@ def _process_invoke_expression( f"Function invocation without argument in expression = {invoke_expression.pretty()}" ) self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - "Function invocation without argument", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)", + context=f"{self.table.full_name}: function invocation without argument", ) return None @@ -403,8 +405,9 @@ def _process_invoke_expression( f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}" ) self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - "Function argument expression is not supported", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (function argument expression is not supported)", + context=f"{self.table.full_name}: function argument expression is not supported", ) return None diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index c358bee6daae8..e7b0527d30d97 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -90,7 +90,9 @@ def schema_count(self) -> int: )[0][0] ) - def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str: + def get_urn_for_table( + self, table: _TableName, lower: bool = False, mixed: bool = False + ) -> str: # TODO: Validate that this is the correct 2/3 layer hierarchy for the platform. table_name = ".".join( @@ -101,7 +103,10 @@ def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str: if lower: table_name = table_name.lower() - platform_instance = platform_instance.lower() if platform_instance else None + if not mixed: + platform_instance = ( + platform_instance.lower() if platform_instance else None + ) if self.platform == "bigquery": # Normalize shard numbers and other BigQuery weirdness. @@ -131,6 +136,20 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: if schema_info: return urn_lower, schema_info + # Our treatment of platform instances when lowercasing urns + # is inconsistent. In some places (e.g. Snowflake), we lowercase + # the table names but not the platform instance. In other places + # (e.g. Databricks), we lowercase everything because it happens + # via the automatic lowercasing helper. + # See https://github.com/datahub-project/datahub/pull/8928. + # While we have this sort of inconsistency, we should also + # check the mixed case urn, as a last resort. + urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True) + if urn_mixed not in {urn, urn_lower}: + schema_info = self._resolve_schema_info(urn_mixed) + if schema_info: + return urn_mixed, schema_info + if self._prefers_urn_lower(): return urn_lower, None else: diff --git a/metadata-ingestion/src/datahub/utilities/partition_executor.py b/metadata-ingestion/src/datahub/utilities/partition_executor.py index 92413eeb674f5..237ffc6dc611b 100644 --- a/metadata-ingestion/src/datahub/utilities/partition_executor.py +++ b/metadata-ingestion/src/datahub/utilities/partition_executor.py @@ -294,6 +294,9 @@ def _clearinghouse_worker(self) -> None: # noqa: C901 def _handle_batch_completion( batch: List[_BatchPartitionWorkItem], future: Future ) -> None: + nonlocal workers_available + workers_available += 1 + with clearinghouse_state_lock: for item in batch: keys_no_longer_in_flight.add(item.key) diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 18384420bfefb..9488683d6d8ca 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -9,7 +9,6 @@ class PerfTimer(AbstractContextManager): """ A context manager that gives easy access to elapsed time for performance measurement. - """ def __init__(self) -> None: diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index 5a33034f274dc..e5fa980bec452 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -41,6 +41,11 @@ def test_get_urn_for_table_lowercase(): == "urn:li:dataset:(urn:li:dataPlatform:mssql,uppercased-instance.database.dataset.table,PROD)" ) + assert ( + schema_resolver.get_urn_for_table(table=table, lower=True, mixed=True) + == "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.database.dataset.table,PROD)" + ) + def test_get_urn_for_table_not_lower_should_keep_capital_letters(): schema_resolver = SchemaResolver( diff --git a/metadata-ingestion/tests/unit/utilities/test_perf_timer.py b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py index 1de76a32fb708..166e40ef54308 100644 --- a/metadata-ingestion/tests/unit/utilities/test_perf_timer.py +++ b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py @@ -8,7 +8,7 @@ approx = partial(pytest.approx, rel=2e-2) -def test_perf_timer_simple(): +def test_perf_timer_simple() -> None: with PerfTimer() as timer: time.sleep(0.4) assert approx(timer.elapsed_seconds()) == 0.4 @@ -16,7 +16,7 @@ def test_perf_timer_simple(): assert approx(timer.elapsed_seconds()) == 0.4 -def test_perf_timer_paused_timer(): +def test_perf_timer_paused_timer() -> None: with PerfTimer() as current_timer: time.sleep(0.5) assert approx(current_timer.elapsed_seconds()) == 0.5 @@ -29,7 +29,7 @@ def test_perf_timer_paused_timer(): assert approx(current_timer.elapsed_seconds()) == 0.7 -def test_generator_with_paused_timer(): +def test_generator_with_paused_timer() -> None: n = 4 def generator_function(): @@ -46,3 +46,15 @@ def generator_function(): seq = generator_function() list([i for i in seq]) assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * n + 0.2 * n + + +def test_perf_timer_reuse() -> None: + timer = PerfTimer() + + with timer: + time.sleep(0.2) + + with timer: + time.sleep(0.3) + + assert approx(timer.elapsed_seconds()) == 0.5 diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 9caa5a6dec65d..bd0a58b635b48 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -184,7 +184,7 @@ information like tokens. | spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | | spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | | spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | -| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | +| spark.datahub.file_partition_regexp | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | | spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. |